Skip to content

Commit 06e9823

Browse files
committed
Merge branch 'master' into create_delete_table
2 parents 552dec6 + 877cabc commit 06e9823

File tree

3 files changed

+70
-13
lines changed

3 files changed

+70
-13
lines changed

AzureFunctions.Extensions.GoogleBigQuery/AzureFunctions.Extensions.GoogleBigQuery.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<PropertyGroup>
44
<TargetFramework>netstandard2.0</TargetFramework>
55
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
6-
<Version>0.0.9.6</Version>
6+
<Version>0.0.9.7</Version>
77
<Authors>Damiao Castro</Authors>
88
<Company>fowet.com</Company>
99
<Description>Extension for Google BigQuery to work with Azure Functions and Azure Webjobs.

AzureFunctions.Extensions.GoogleBigQuery/BigQueryInsertRowService.cs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
using Google.Cloud.BigQuery.V2;
2+
using Newtonsoft.Json;
23
using System;
34
using System.Collections.Generic;
45
using System.Linq;
56
using System.Reflection;
67

78
namespace AzureFunctions.Extensions.GoogleBigQuery {
8-
public class BigQueryInsertRowService {
9+
internal class BigQueryInsertRowService {
910

1011
private const string BigQueryDateTimeFormat = "yyyy-MM-dd HH:mm:ss";
1112
private static System.Globalization.CultureInfo cultureUS = new System.Globalization.CultureInfo("en-US");
1213

13-
public static BigQueryInsertRow GetBigQueryInsertRow(GoogleBigQueryRow row, IDictionary<string, IEnumerable<PropertyInfo>> dictionaryOfProperties) {
14+
internal static BigQueryInsertRow GetBigQueryInsertRow(GoogleBigQueryRow row, IDictionary<string, IEnumerable<PropertyInfo>> dictionaryOfProperties) {
1415
if (row == null) { throw new System.ArgumentNullException(nameof(row)); }
1516
if (dictionaryOfProperties == null) { throw new System.ArgumentNullException(nameof(dictionaryOfProperties)); }
1617

@@ -135,6 +136,18 @@ private static object GetArrayFromEnumreable(IDictionary<string, IEnumerable<Pro
135136
}
136137
}
137138

139+
internal static IEnumerable<string> GetBigQueryJobLines(IEnumerable<GoogleBigQueryRow> rows)
140+
{
141+
142+
var jsonSerializerSettings = new JsonSerializerSettings()
143+
{
144+
DateFormatString = BigQueryDateTimeFormat,
145+
Culture = cultureUS
146+
};
147+
148+
return rows.Select(r => JsonConvert.SerializeObject(r, jsonSerializerSettings));
149+
}
150+
138151
private static BigQueryInsertRow[] GetSubEntitiesBigQueryInsertRows(IDictionary<string, IEnumerable<PropertyInfo>> dictionaryOfProperties, IEnumerable<object> objs) {
139152

140153
if (objs.Count() > 0) {

AzureFunctions.Extensions.GoogleBigQuery/BigQueryService.cs

Lines changed: 54 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,39 @@ public BigQueryService(GoogleBigQueryAttribute googleBigQueryAttribute, Type ite
2020
(this.tableSchema, this.dictionaryOfProperties) = TableSchemaBuilderService.GetTableSchema(itemType);
2121
}
2222

23+
public Task CreateTable(bool timePartitioning, CancellationToken cancellationToken) {
24+
25+
BigQueryClient client = GetBiqQueryClient();
26+
27+
return client.GetOrCreateTableAsync(
28+
googleBigQueryAttribute.DatasetId,
29+
googleBigQueryAttribute.TableId,
30+
tableSchema,
31+
null,
32+
timePartitioning ? new CreateTableOptions() { TimePartitioning = new TimePartitioning() { Type = "DAY" } } : null,
33+
cancellationToken);
34+
35+
}
36+
37+
public Task DeleteTable(CancellationToken cancellationToken) {
38+
39+
BigQueryClient client = GetBiqQueryClient();
40+
41+
return client.DeleteTableAsync(
42+
googleBigQueryAttribute.DatasetId,
43+
googleBigQueryAttribute.TableId,
44+
null,
45+
cancellationToken);
46+
47+
}
48+
2349
private Task<BigQueryTable> GetTable(DateTime date, CancellationToken cancellationToken) {
50+
BigQueryClient client = GetBiqQueryClient();
2451

52+
return client.GetTableAsync(googleBigQueryAttribute.DatasetId, $"{googleBigQueryAttribute.TableId}${date:yyyyMMdd}", null, cancellationToken);
53+
}
54+
55+
private BigQueryClient GetBiqQueryClient() {
2556
GoogleCredential googleCredential = null;
2657
if (googleBigQueryAttribute.Credentials != null) {
2758
googleCredential = GoogleCredential.FromStream(new System.IO.MemoryStream(googleBigQueryAttribute.Credentials));
@@ -34,36 +65,49 @@ private Task<BigQueryTable> GetTable(DateTime date, CancellationToken cancellati
3465
}
3566
}
3667
var client = BigQueryClient.Create(googleBigQueryAttribute.ProjectId, googleCredential);
37-
38-
//return client.GetOrCreateTableAsync(datasetId, tableId, tableSchema, null, new CreateTableOptions() { TimePartitioning = new TimePartitioning() { Type = "DAY" } }, cancellationToken)
39-
// .ContinueWith((createTableTask) => {
40-
// return client.GetTableAsync(datasetId, $"{tableId}${date:yyyyMMdd}", null, cancellationToken);
41-
// }, cancellationToken).Unwrap();
42-
return client.GetTableAsync(googleBigQueryAttribute.DatasetId, $"{googleBigQueryAttribute.TableId}${date:yyyyMMdd}", null, cancellationToken);
68+
return client;
4369
}
4470

4571
public Task InsertRowsAsync(DateTime date, IEnumerable<GoogleBigQueryRow> rows, CancellationToken cancellationToken) {
4672

4773
if (rows != null && rows.Count() > 0) {
4874
int dateDiff = (date - DateTime.UtcNow.Date).Days;
4975

50-
if (dateDiff >= -31 && dateDiff <= 16) {
76+
if (dateDiff >= -31 && dateDiff <= 16)
77+
{
5178

5279
var bigQueryRows = rows.Select(c => BigQueryInsertRowService.GetBigQueryInsertRow(c, dictionaryOfProperties));
5380

5481
return GetTable(date, cancellationToken)
55-
.ContinueWith((tableTask) => {
82+
.ContinueWith((tableTask) =>
83+
{
5684
BigQueryTable table = tableTask.Result;
5785

5886
return table.InsertRowsAsync(bigQueryRows, new InsertOptions() { AllowUnknownFields = true }, cancellationToken)
59-
.ContinueWith((insertRowsTask) => {
60-
if (insertRowsTask.IsFaulted) {
87+
.ContinueWith((insertRowsTask) =>
88+
{
89+
if (insertRowsTask.IsFaulted)
90+
{
6191
throw insertRowsTask.Exception.InnerExceptions.First();
6292
}
6393
});
6494
}, cancellationToken).Unwrap();
6595

6696
}
97+
else {
98+
99+
BigQueryClient client = GetBiqQueryClient();
100+
101+
IEnumerable<string> lines = BigQueryInsertRowService.GetBigQueryJobLines(rows);
102+
103+
return client.UploadJsonAsync(
104+
googleBigQueryAttribute.DatasetId,
105+
$"{googleBigQueryAttribute.TableId}${date:yyyyMMdd}",
106+
tableSchema,
107+
lines,
108+
new UploadJsonOptions() { AllowUnknownFields = true },
109+
cancellationToken);
110+
}
67111
}
68112

69113
return Task.WhenAll();

0 commit comments

Comments
 (0)