Skip to content

Commit 996d442

Browse files
committed
added support for jobs where streaming api doesn't cope
1 parent a52d5fa commit 996d442

File tree

3 files changed

+58
-17
lines changed

3 files changed

+58
-17
lines changed

AzureFunctions.Extensions.GoogleBigQuery/AzureFunctions.Extensions.GoogleBigQuery.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<PropertyGroup>
44
<TargetFramework>netstandard2.0</TargetFramework>
55
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
6-
<Version>0.0.9.6</Version>
6+
<Version>0.0.9.7</Version>
77
<Authors>Damiao Castro</Authors>
88
<Company>fowet.com</Company>
99
<Description>Extension for Google BigQuery to work with Azure Functions and Azure Webjobs.

AzureFunctions.Extensions.GoogleBigQuery/BigQueryInsertRowService.cs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
using Google.Cloud.BigQuery.V2;
2+
using Newtonsoft.Json;
23
using System;
34
using System.Collections.Generic;
45
using System.Linq;
56
using System.Reflection;
67

78
namespace AzureFunctions.Extensions.GoogleBigQuery {
8-
public class BigQueryInsertRowService {
9+
internal class BigQueryInsertRowService {
910

1011
private const string BigQueryDateTimeFormat = "yyyy-MM-dd HH:mm:ss";
1112
private static System.Globalization.CultureInfo cultureUS = new System.Globalization.CultureInfo("en-US");
1213

13-
public static BigQueryInsertRow GetBigQueryInsertRow(GoogleBigQueryRow row, IDictionary<string, IEnumerable<PropertyInfo>> dictionaryOfProperties) {
14+
internal static BigQueryInsertRow GetBigQueryInsertRow(GoogleBigQueryRow row, IDictionary<string, IEnumerable<PropertyInfo>> dictionaryOfProperties) {
1415
if (row == null) { throw new System.ArgumentNullException(nameof(row)); }
1516
if (dictionaryOfProperties == null) { throw new System.ArgumentNullException(nameof(dictionaryOfProperties)); }
1617

@@ -135,6 +136,18 @@ private static object GetArrayFromEnumreable(IDictionary<string, IEnumerable<Pro
135136
}
136137
}
137138

139+
internal static IEnumerable<string> GetBigQueryJobLines(IEnumerable<GoogleBigQueryRow> rows)
140+
{
141+
142+
var jsonSerializerSettings = new JsonSerializerSettings()
143+
{
144+
DateFormatString = BigQueryDateTimeFormat,
145+
Culture = cultureUS
146+
};
147+
148+
return rows.Select(r => JsonConvert.SerializeObject(r, jsonSerializerSettings));
149+
}
150+
138151
private static BigQueryInsertRow[] GetSubEntitiesBigQueryInsertRows(IDictionary<string, IEnumerable<PropertyInfo>> dictionaryOfProperties, IEnumerable<object> objs) {
139152

140153
if (objs.Count() > 0) {

AzureFunctions.Extensions.GoogleBigQuery/BigQueryService.cs

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,50 +20,78 @@ public BigQueryService(GoogleBigQueryAttribute googleBigQueryAttribute, Type ite
2020
(this.tableSchema, this.dictionaryOfProperties) = TableSchemaBuilderService.GetTableSchema(itemType);
2121
}
2222

23-
private Task<BigQueryTable> GetTable(DateTime date, CancellationToken cancellationToken) {
23+
private Task<BigQueryTable> GetTable(DateTime date, CancellationToken cancellationToken)
24+
{
25+
BigQueryClient client = GetClient();
2426

27+
//return client.GetOrCreateTableAsync(datasetId, tableId, tableSchema, null, new CreateTableOptions() { TimePartitioning = new TimePartitioning() { Type = "DAY" } }, cancellationToken)
28+
// .ContinueWith((createTableTask) => {
29+
// return client.GetTableAsync(datasetId, $"{tableId}${date:yyyyMMdd}", null, cancellationToken);
30+
// }, cancellationToken).Unwrap();
31+
return client.GetTableAsync(googleBigQueryAttribute.DatasetId, $"{googleBigQueryAttribute.TableId}${date:yyyyMMdd}", null, cancellationToken);
32+
}
33+
34+
private BigQueryClient GetClient()
35+
{
2536
GoogleCredential googleCredential = null;
26-
if (googleBigQueryAttribute.Credentials != null) {
37+
if (googleBigQueryAttribute.Credentials != null)
38+
{
2739
googleCredential = GoogleCredential.FromStream(new System.IO.MemoryStream(googleBigQueryAttribute.Credentials));
28-
} else {
29-
if (!string.IsNullOrWhiteSpace(googleBigQueryAttribute.CredentialsFileName)) {
40+
}
41+
else
42+
{
43+
if (!string.IsNullOrWhiteSpace(googleBigQueryAttribute.CredentialsFileName))
44+
{
3045
var path = System.IO.Path.GetDirectoryName(typeof(GoogleBigQueryAttribute).Assembly.Location);
3146
var fullPath = System.IO.Path.Combine(path, "..", googleBigQueryAttribute.CredentialsFileName);
3247
var credentials = System.IO.File.ReadAllBytes(fullPath);
3348
googleCredential = GoogleCredential.FromStream(new System.IO.MemoryStream(credentials));
3449
}
3550
}
3651
var client = BigQueryClient.Create(googleBigQueryAttribute.ProjectId, googleCredential);
37-
38-
//return client.GetOrCreateTableAsync(datasetId, tableId, tableSchema, null, new CreateTableOptions() { TimePartitioning = new TimePartitioning() { Type = "DAY" } }, cancellationToken)
39-
// .ContinueWith((createTableTask) => {
40-
// return client.GetTableAsync(datasetId, $"{tableId}${date:yyyyMMdd}", null, cancellationToken);
41-
// }, cancellationToken).Unwrap();
42-
return client.GetTableAsync(googleBigQueryAttribute.DatasetId, $"{googleBigQueryAttribute.TableId}${date:yyyyMMdd}", null, cancellationToken);
52+
return client;
4353
}
4454

4555
public Task InsertRowsAsync(DateTime date, IEnumerable<GoogleBigQueryRow> rows, CancellationToken cancellationToken) {
4656

4757
if (rows != null && rows.Count() > 0) {
4858
int dateDiff = (date - DateTime.UtcNow.Date).Days;
4959

50-
if (dateDiff >= -31 && dateDiff <= 16) {
60+
if (dateDiff >= -31 && dateDiff <= 16)
61+
{
5162

5263
var bigQueryRows = rows.Select(c => BigQueryInsertRowService.GetBigQueryInsertRow(c, dictionaryOfProperties));
5364

5465
return GetTable(date, cancellationToken)
55-
.ContinueWith((tableTask) => {
66+
.ContinueWith((tableTask) =>
67+
{
5668
BigQueryTable table = tableTask.Result;
5769

5870
return table.InsertRowsAsync(bigQueryRows, new InsertOptions() { AllowUnknownFields = true }, cancellationToken)
59-
.ContinueWith((insertRowsTask) => {
60-
if (insertRowsTask.IsFaulted) {
71+
.ContinueWith((insertRowsTask) =>
72+
{
73+
if (insertRowsTask.IsFaulted)
74+
{
6175
throw insertRowsTask.Exception.InnerExceptions.First();
6276
}
6377
});
6478
}, cancellationToken).Unwrap();
6579

6680
}
81+
else {
82+
83+
BigQueryClient client = GetClient();
84+
85+
IEnumerable<string> lines = BigQueryInsertRowService.GetBigQueryJobLines(rows);
86+
87+
return client.UploadJsonAsync(
88+
googleBigQueryAttribute.DatasetId,
89+
$"{googleBigQueryAttribute.TableId}${date:yyyyMMdd}",
90+
tableSchema,
91+
lines,
92+
new UploadJsonOptions() { AllowUnknownFields = true },
93+
cancellationToken);
94+
}
6795
}
6896

6997
return Task.WhenAll();

0 commit comments

Comments
 (0)