Skip to content

Commit eba75be

Browse files
committed
fix error and add non-day partioned tables features
2 parents 315f6aa + 2c40e8e commit eba75be

File tree

6 files changed

+252
-86
lines changed

6 files changed

+252
-86
lines changed

AzureFunctions.Extensions.GoogleBigQuery/AsyncCollector.cs

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,49 +5,70 @@
55
using System.Threading.Tasks;
66
using Microsoft.Azure.WebJobs;
77

8-
namespace AzureFunctions.Extensions.GoogleBigQuery {
8+
namespace AzureFunctions.Extensions.GoogleBigQuery
9+
{
910

10-
public partial class GoogleBigQueryExtensionConfig {
11-
public class AsyncCollector : ICollector<GoogleBigQueryRow>, IAsyncCollector<GoogleBigQueryRow> {
11+
public partial class GoogleBigQueryExtensionConfig
12+
{
13+
public class AsyncCollector : ICollector<GoogleBigQueryRow>, IAsyncCollector<GoogleBigQueryRow>
14+
{
1215

1316
private GoogleBigQueryAttribute googleBigQueryAttribute;
1417
private List<GoogleBigQueryRow> items = new List<GoogleBigQueryRow>();
1518

16-
public AsyncCollector(GoogleBigQueryAttribute googleBigQueryAttribute) {
19+
public AsyncCollector(GoogleBigQueryAttribute googleBigQueryAttribute)
20+
{
1721
this.googleBigQueryAttribute = googleBigQueryAttribute;
1822
}
1923

20-
void ICollector<GoogleBigQueryRow>.Add(GoogleBigQueryRow item) {
21-
if (item == null) {
24+
void ICollector<GoogleBigQueryRow>.Add(GoogleBigQueryRow item)
25+
{
26+
if (item == null)
27+
{
2228
throw new ArgumentNullException(nameof(item));
2329
}
2430

2531
items.Add(item);
2632
}
2733

28-
Task IAsyncCollector<GoogleBigQueryRow>.AddAsync(GoogleBigQueryRow item, CancellationToken cancellationToken) {
29-
if (item == null) {
34+
Task IAsyncCollector<GoogleBigQueryRow>.AddAsync(GoogleBigQueryRow item, CancellationToken cancellationToken)
35+
{
36+
if (item == null)
37+
{
3038
throw new ArgumentNullException(nameof(item));
3139
}
3240

3341
items.Add(item);
3442
return Task.WhenAll();
3543
}
3644

37-
Task IAsyncCollector<GoogleBigQueryRow>.FlushAsync(CancellationToken cancellationToken) {
45+
Task IAsyncCollector<GoogleBigQueryRow>.FlushAsync(CancellationToken cancellationToken)
46+
{
3847

3948
var tasks = new List<Task>();
4049

41-
if (items.Count > 0) {
42-
50+
if (items.Count > 0)
51+
{
52+
4353
Type itemType = items.First().GetType();
4454

4555
var bqService = new BigQueryService(googleBigQueryAttribute, itemType);
4656

47-
var groups = items.GroupBy(c => c.Date.Date);
48-
foreach (var group in groups) {
49-
tasks.Add(bqService.InsertRowsAsync(group.Key, group, cancellationToken));
57+
//items without date
58+
{
59+
var rows = items.Where(c => !c.Date.HasValue);
60+
tasks.Add(bqService.InsertRowsAsync(rows, cancellationToken));
61+
}
62+
63+
//items with date
64+
{
65+
var groups = items.Where(c=> c.Date.HasValue).GroupBy(c => c.Date.Value.Date);
66+
foreach (var group in groups)
67+
{
68+
tasks.Add(bqService.InsertRowsAsync(group.Key, group, cancellationToken));
69+
}
5070
}
71+
5172
}
5273

5374
return Task.WhenAll(tasks);

AzureFunctions.Extensions.GoogleBigQuery/BigQueryInsertRowService.cs

Lines changed: 50 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,16 @@
55
using System.Linq;
66
using System.Reflection;
77

8-
namespace AzureFunctions.Extensions.GoogleBigQuery {
9-
internal class BigQueryInsertRowService {
8+
namespace AzureFunctions.Extensions.GoogleBigQuery
9+
{
10+
internal class BigQueryInsertRowService
11+
{
1012

1113
private const string BigQueryDateTimeFormat = "yyyy-MM-dd HH:mm:ss";
1214
private static System.Globalization.CultureInfo cultureUS = new System.Globalization.CultureInfo("en-US");
1315

14-
internal static BigQueryInsertRow GetBigQueryInsertRow(GoogleBigQueryRow row, IDictionary<string, IEnumerable<PropertyInfo>> dictionaryOfProperties) {
16+
internal static BigQueryInsertRow GetBigQueryInsertRow(GoogleBigQueryRow row, IDictionary<string, IEnumerable<PropertyInfo>> dictionaryOfProperties)
17+
{
1518
if (row == null) { throw new System.ArgumentNullException(nameof(row)); }
1619
if (dictionaryOfProperties == null) { throw new System.ArgumentNullException(nameof(dictionaryOfProperties)); }
1720

@@ -20,7 +23,8 @@ internal static BigQueryInsertRow GetBigQueryInsertRow(GoogleBigQueryRow row, ID
2023
return new BigQueryInsertRow(row.InsertId) { dic };
2124
}
2225

23-
private static IDictionary<string, object> GetDictionaryOfValues(IDictionary<string, IEnumerable<PropertyInfo>> dictionaryOfProperties, object obj) {
26+
private static IDictionary<string, object> GetDictionaryOfValues(IDictionary<string, IEnumerable<PropertyInfo>> dictionaryOfProperties, object obj)
27+
{
2428
if (obj == null) { return null; }
2529

2630
var properties = dictionaryOfProperties[obj.GetType().FullName];
@@ -29,29 +33,41 @@ private static IDictionary<string, object> GetDictionaryOfValues(IDictionary<str
2933
return dictionaryOfValues;
3034
}
3135

32-
private static object GetBigQueryValue(IDictionary<string, IEnumerable<PropertyInfo>> dictionaryOfProperties, PropertyInfo property, object obj) {
33-
switch (property.PropertyType.Name.ToUpper()) {
36+
private static object GetBigQueryValue(IDictionary<string, IEnumerable<PropertyInfo>> dictionaryOfProperties, PropertyInfo property, object obj)
37+
{
38+
switch (property.PropertyType.Name.ToUpper())
39+
{
3440
case "IENUMERABLE`1":
3541
return GetArrayFromEnumreable(dictionaryOfProperties, property, obj);
3642
case "NULLABLE`1":
3743
var value = property.GetValue(obj);
38-
if (value == null) {
44+
if (value == null)
45+
{
3946
return null;
40-
} else {
47+
}
48+
else
49+
{
4150
var propertyTypeName = property.PropertyType.GenericTypeArguments[0].Name;
4251
return GetNonEnumerableBigQueryValue(propertyTypeName, value);
4352
}
4453
}
4554

46-
if (property.PropertyType.IsClass && property.PropertyType.Namespace != "System") {//crappy but works for now
47-
if (property.PropertyType.IsArray) {
55+
if (property.PropertyType.IsClass && property.PropertyType.Namespace != "System")
56+
{//crappy but works for now
57+
if (property.PropertyType.IsArray)
58+
{
4859
var array = (IEnumerable<object>)property.GetValue(obj);
4960
return GetSubEntitiesBigQueryInsertRows(dictionaryOfProperties, array);
50-
} else {
61+
}
62+
else
63+
{
5164
var value = property.GetValue(obj);
52-
if (value == null) {
65+
if (value == null)
66+
{
5367
return null;
54-
} else {
68+
}
69+
else
70+
{
5571
return GetSubEntitiesBigQueryInsertRows(dictionaryOfProperties, new List<object> { value }).First();
5672
}
5773
}
@@ -60,8 +76,10 @@ private static object GetBigQueryValue(IDictionary<string, IEnumerable<PropertyI
6076
return GetNonEnumerableBigQueryValue(property.PropertyType.Name, property.GetValue(obj));
6177
}
6278

63-
private static object GetNonEnumerableBigQueryValue(string propertyTypeName, object value) {
64-
switch (propertyTypeName.ToUpper()) {
79+
private static object GetNonEnumerableBigQueryValue(string propertyTypeName, object value)
80+
{
81+
switch (propertyTypeName.ToUpper())
82+
{
6583
case "BYTE":
6684
return (int)(byte)value;
6785
case "CHAR":
@@ -83,11 +101,13 @@ private static object GetNonEnumerableBigQueryValue(string propertyTypeName, obj
83101
}
84102
}
85103

86-
private static object GetArrayFromEnumreable(IDictionary<string, IEnumerable<PropertyInfo>> dictionaryOfProperties, PropertyInfo property, object obj) {
104+
private static object GetArrayFromEnumreable(IDictionary<string, IEnumerable<PropertyInfo>> dictionaryOfProperties, PropertyInfo property, object obj)
105+
{
87106
var enumerableValue = property.GetValue(obj);
88107

89108
Type innerPropertyType = property.PropertyType.GenericTypeArguments[0];
90-
switch (innerPropertyType.Name.ToUpper()) {
109+
switch (innerPropertyType.Name.ToUpper())
110+
{
91111
case "BYTE":
92112
return (byte[])enumerableValue;
93113
case "BOOLEAN":
@@ -99,9 +119,9 @@ private static object GetArrayFromEnumreable(IDictionary<string, IEnumerable<Pro
99119
case "DATETIMEOFFSET":
100120
return (DateTimeOffset[])enumerableValue;
101121
case "DOUBLE":
102-
return ((double[])enumerableValue).Select(c => (float)c).ToArray();
122+
return ((IEnumerable<double>)enumerableValue).Select(c => (float)c).ToArray();
103123
case "DECIMAL":
104-
return ((decimal[])enumerableValue).Select(c => (float)c).ToArray();
124+
return ((IEnumerable<decimal>)enumerableValue).Select(c => (float)c).ToArray();
105125
case "SINGLE":
106126
return ((float[])enumerableValue);
107127
case "GUID":
@@ -120,14 +140,18 @@ private static object GetArrayFromEnumreable(IDictionary<string, IEnumerable<Pro
120140
case "UINT64":
121141
return ((UInt64[])enumerableValue);
122142
default:
123-
if (property.PropertyType.IsArray) {
143+
if (property.PropertyType.IsArray)
144+
{
124145
return enumerableValue;
125146
}
126147

127148
IEnumerable<object> ie = (IEnumerable<object>)enumerableValue;
128-
if (innerPropertyType.IsClass && innerPropertyType.Namespace != "System") {
149+
if (innerPropertyType.IsClass && innerPropertyType.Namespace != "System")
150+
{
129151
return GetSubEntitiesBigQueryInsertRows(dictionaryOfProperties, ie);
130-
} else {
152+
}
153+
else
154+
{
131155
var length = ie.Count();
132156
var i = Array.CreateInstance(innerPropertyType, length);
133157
Array.Copy(ie.ToArray(), i, length);
@@ -148,9 +172,11 @@ internal static IEnumerable<string> GetBigQueryJobLines(IEnumerable<GoogleBigQue
148172
return rows.Select(r => JsonConvert.SerializeObject(r, jsonSerializerSettings));
149173
}
150174

151-
private static BigQueryInsertRow[] GetSubEntitiesBigQueryInsertRows(IDictionary<string, IEnumerable<PropertyInfo>> dictionaryOfProperties, IEnumerable<object> objs) {
175+
private static BigQueryInsertRow[] GetSubEntitiesBigQueryInsertRows(IDictionary<string, IEnumerable<PropertyInfo>> dictionaryOfProperties, IEnumerable<object> objs)
176+
{
152177

153-
if (objs.Count() > 0) {
178+
if (objs.Count() > 0)
179+
{
154180
return objs.Select(c => new BigQueryInsertRow() { GetDictionaryOfValues(dictionaryOfProperties, c) }).ToArray();
155181
}
156182

0 commit comments

Comments
 (0)