Skip to content

Commit 332cd08

Browse files
authored
Merge pull request #3 from DamiaoCastro/create_delete_table
Create delete table
2 parents 2c40e8e + b136cb8 commit 332cd08

File tree

9 files changed

+309
-105
lines changed

9 files changed

+309
-105
lines changed

AzureFunctions.Extensions.GoogleBigQuery.DemoProject1/Function2.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public MyBigQueryRow(DateTime date, string insertId) : base(date, insertId) {
1717

1818
}
1919

20+
[Disable]
2021
[FunctionName("Function2")]
2122
public static void Run([TimerTrigger("0 */5 * * * *", RunOnStartup = true)]TimerInfo myTimer,
2223
[GoogleBigQuery("MyGoogleBigQueryConfig2")]
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
using System;
2+
using System.ComponentModel.DataAnnotations.Schema;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
using Microsoft.Azure.WebJobs;
6+
7+
namespace AzureFunctions.Extensions.GoogleBigQuery.DemoProject1 {
8+
public static class Function5 {
9+
10+
public class MyBigQueryRow : GoogleBigQueryRow {
11+
public MyBigQueryRow(DateTime date, string insertId) : base(date, insertId) {
12+
}
13+
14+
[Column]
15+
public string FunctionName1 { get; set; }
16+
17+
[Column]
18+
public Int64 SomeIntegerValue1 { get; set; }
19+
20+
}
21+
22+
[FunctionName("Function5")]
23+
public static Task Run(
24+
[TimerTrigger("0 */1 * * * *", RunOnStartup = true)]TimerInfo myTimer,
25+
[GoogleBigQueryManagement("MyGoogleBigQueryConfig4")] GoogleBigQueryManagement management,
26+
CancellationToken cancellationToken) {
27+
28+
return management.CreateTableAsync<MyBigQueryRow>(true, cancellationToken);
29+
//return management.DeleteTableAsync("table2$20180113", cancellationToken);
30+
31+
}
32+
}
33+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using Microsoft.Azure.WebJobs;
7+
8+
namespace AzureFunctions.Extensions.GoogleBigQuery
9+
{
10+
11+
public partial class GoogleBigQueryExtensionConfig
12+
{
13+
public class AsyncCollector : ICollector<GoogleBigQueryRow>, IAsyncCollector<GoogleBigQueryRow>
14+
{
15+
16+
private GoogleBigQueryAttribute googleBigQueryAttribute;
17+
private List<GoogleBigQueryRow> items = new List<GoogleBigQueryRow>();
18+
19+
public AsyncCollector(GoogleBigQueryAttribute googleBigQueryAttribute)
20+
{
21+
this.googleBigQueryAttribute = googleBigQueryAttribute;
22+
}
23+
24+
void ICollector<GoogleBigQueryRow>.Add(GoogleBigQueryRow item)
25+
{
26+
if (item == null)
27+
{
28+
throw new ArgumentNullException(nameof(item));
29+
}
30+
31+
items.Add(item);
32+
}
33+
34+
Task IAsyncCollector<GoogleBigQueryRow>.AddAsync(GoogleBigQueryRow item, CancellationToken cancellationToken)
35+
{
36+
if (item == null)
37+
{
38+
throw new ArgumentNullException(nameof(item));
39+
}
40+
41+
items.Add(item);
42+
return Task.WhenAll();
43+
}
44+
45+
Task IAsyncCollector<GoogleBigQueryRow>.FlushAsync(CancellationToken cancellationToken)
46+
{
47+
48+
var tasks = new List<Task>();
49+
50+
if (items.Count > 0)
51+
{
52+
53+
Type itemType = items.First().GetType();
54+
55+
var bqService = new BigQueryService(googleBigQueryAttribute, itemType);
56+
57+
//items without date
58+
{
59+
var rows = items.Where(c => !c.Date.HasValue);
60+
tasks.Add(bqService.InsertRowsAsync(rows, cancellationToken));
61+
}
62+
63+
//items with date
64+
{
65+
var groups = items.Where(c => c.Date.HasValue).GroupBy(c => c.Date.Value.Date);
66+
foreach (var group in groups)
67+
{
68+
tasks.Add(bqService.InsertRowsAsync(group.Key, group, cancellationToken));
69+
}
70+
}
71+
72+
}
73+
74+
return Task.WhenAll(tasks)
75+
.ContinueWith((allTasks) =>
76+
{
77+
if (allTasks.IsFaulted)
78+
{
79+
throw allTasks.Exception.InnerException;
80+
}
81+
});
82+
}
83+
84+
}
85+
86+
}
87+
88+
}

AzureFunctions.Extensions.GoogleBigQuery/BigQueryInsertRowService.cs

Lines changed: 59 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,34 @@
55
using System.Linq;
66
using System.Reflection;
77

8-
namespace AzureFunctions.Extensions.GoogleBigQuery {
9-
internal class BigQueryInsertRowService {
8+
namespace AzureFunctions.Extensions.GoogleBigQuery
9+
{
10+
internal class BigQueryInsertRowService
11+
{
1012

1113
private const string BigQueryDateTimeFormat = "yyyy-MM-dd HH:mm:ss";
1214
private static System.Globalization.CultureInfo cultureUS = new System.Globalization.CultureInfo("en-US");
1315

14-
internal static BigQueryInsertRow GetBigQueryInsertRow(GoogleBigQueryRow row, IDictionary<string, IEnumerable<PropertyInfo>> dictionaryOfProperties) {
16+
internal static BigQueryInsertRow GetBigQueryInsertRow(GoogleBigQueryRow row, IDictionary<string, IEnumerable<PropertyInfo>> dictionaryOfProperties)
17+
{
1518
if (row == null) { throw new System.ArgumentNullException(nameof(row)); }
1619
if (dictionaryOfProperties == null) { throw new System.ArgumentNullException(nameof(dictionaryOfProperties)); }
1720

1821
IDictionary<string, object> dic = GetDictionaryOfValues(dictionaryOfProperties, row);
1922

20-
return new BigQueryInsertRow(row.InsertId) { dic };
23+
if (string.IsNullOrWhiteSpace(row.InsertId))
24+
{
25+
return new BigQueryInsertRow() { dic };
26+
}
27+
else
28+
{
29+
return new BigQueryInsertRow(row.InsertId) { dic };
30+
}
31+
2132
}
2233

23-
private static IDictionary<string, object> GetDictionaryOfValues(IDictionary<string, IEnumerable<PropertyInfo>> dictionaryOfProperties, object obj) {
34+
private static IDictionary<string, object> GetDictionaryOfValues(IDictionary<string, IEnumerable<PropertyInfo>> dictionaryOfProperties, object obj)
35+
{
2436
if (obj == null) { return null; }
2537

2638
var properties = dictionaryOfProperties[obj.GetType().FullName];
@@ -29,29 +41,41 @@ private static IDictionary<string, object> GetDictionaryOfValues(IDictionary<str
2941
return dictionaryOfValues;
3042
}
3143

32-
private static object GetBigQueryValue(IDictionary<string, IEnumerable<PropertyInfo>> dictionaryOfProperties, PropertyInfo property, object obj) {
33-
switch (property.PropertyType.Name.ToUpper()) {
44+
private static object GetBigQueryValue(IDictionary<string, IEnumerable<PropertyInfo>> dictionaryOfProperties, PropertyInfo property, object obj)
45+
{
46+
switch (property.PropertyType.Name.ToUpper())
47+
{
3448
case "IENUMERABLE`1":
3549
return GetArrayFromEnumreable(dictionaryOfProperties, property, obj);
3650
case "NULLABLE`1":
3751
var value = property.GetValue(obj);
38-
if (value == null) {
52+
if (value == null)
53+
{
3954
return null;
40-
} else {
55+
}
56+
else
57+
{
4158
var propertyTypeName = property.PropertyType.GenericTypeArguments[0].Name;
4259
return GetNonEnumerableBigQueryValue(propertyTypeName, value);
4360
}
4461
}
4562

46-
if (property.PropertyType.IsClass && property.PropertyType.Namespace != "System") {//crappy but works for now
47-
if (property.PropertyType.IsArray) {
63+
if (property.PropertyType.IsClass && property.PropertyType.Namespace != "System")
64+
{//crappy but works for now
65+
if (property.PropertyType.IsArray)
66+
{
4867
var array = (IEnumerable<object>)property.GetValue(obj);
4968
return GetSubEntitiesBigQueryInsertRows(dictionaryOfProperties, array);
50-
} else {
69+
}
70+
else
71+
{
5172
var value = property.GetValue(obj);
52-
if (value == null) {
73+
if (value == null)
74+
{
5375
return null;
54-
} else {
76+
}
77+
else
78+
{
5579
return GetSubEntitiesBigQueryInsertRows(dictionaryOfProperties, new List<object> { value }).First();
5680
}
5781
}
@@ -60,8 +84,10 @@ private static object GetBigQueryValue(IDictionary<string, IEnumerable<PropertyI
6084
return GetNonEnumerableBigQueryValue(property.PropertyType.Name, property.GetValue(obj));
6185
}
6286

63-
private static object GetNonEnumerableBigQueryValue(string propertyTypeName, object value) {
64-
switch (propertyTypeName.ToUpper()) {
87+
private static object GetNonEnumerableBigQueryValue(string propertyTypeName, object value)
88+
{
89+
switch (propertyTypeName.ToUpper())
90+
{
6591
case "BYTE":
6692
return (int)(byte)value;
6793
case "CHAR":
@@ -83,11 +109,13 @@ private static object GetNonEnumerableBigQueryValue(string propertyTypeName, obj
83109
}
84110
}
85111

86-
private static object GetArrayFromEnumreable(IDictionary<string, IEnumerable<PropertyInfo>> dictionaryOfProperties, PropertyInfo property, object obj) {
112+
private static object GetArrayFromEnumreable(IDictionary<string, IEnumerable<PropertyInfo>> dictionaryOfProperties, PropertyInfo property, object obj)
113+
{
87114
var enumerableValue = property.GetValue(obj);
88115

89116
Type innerPropertyType = property.PropertyType.GenericTypeArguments[0];
90-
switch (innerPropertyType.Name.ToUpper()) {
117+
switch (innerPropertyType.Name.ToUpper())
118+
{
91119
case "BYTE":
92120
return (byte[])enumerableValue;
93121
case "BOOLEAN":
@@ -99,9 +127,9 @@ private static object GetArrayFromEnumreable(IDictionary<string, IEnumerable<Pro
99127
case "DATETIMEOFFSET":
100128
return (DateTimeOffset[])enumerableValue;
101129
case "DOUBLE":
102-
return ((double[])enumerableValue).Select(c => (float)c).ToArray();
130+
return ((IEnumerable<double>)enumerableValue).Select(c => (float)c).ToArray();
103131
case "DECIMAL":
104-
return ((decimal[])enumerableValue).Select(c => (float)c).ToArray();
132+
return ((IEnumerable<decimal>)enumerableValue).Select(c => (float)c).ToArray();
105133
case "SINGLE":
106134
return ((float[])enumerableValue);
107135
case "GUID":
@@ -120,14 +148,18 @@ private static object GetArrayFromEnumreable(IDictionary<string, IEnumerable<Pro
120148
case "UINT64":
121149
return ((UInt64[])enumerableValue);
122150
default:
123-
if (property.PropertyType.IsArray) {
151+
if (property.PropertyType.IsArray)
152+
{
124153
return enumerableValue;
125154
}
126155

127156
IEnumerable<object> ie = (IEnumerable<object>)enumerableValue;
128-
if (innerPropertyType.IsClass && innerPropertyType.Namespace != "System") {
157+
if (innerPropertyType.IsClass && innerPropertyType.Namespace != "System")
158+
{
129159
return GetSubEntitiesBigQueryInsertRows(dictionaryOfProperties, ie);
130-
} else {
160+
}
161+
else
162+
{
131163
var length = ie.Count();
132164
var i = Array.CreateInstance(innerPropertyType, length);
133165
Array.Copy(ie.ToArray(), i, length);
@@ -148,9 +180,11 @@ internal static IEnumerable<string> GetBigQueryJobLines(IEnumerable<GoogleBigQue
148180
return rows.Select(r => JsonConvert.SerializeObject(r, jsonSerializerSettings));
149181
}
150182

151-
private static BigQueryInsertRow[] GetSubEntitiesBigQueryInsertRows(IDictionary<string, IEnumerable<PropertyInfo>> dictionaryOfProperties, IEnumerable<object> objs) {
183+
private static BigQueryInsertRow[] GetSubEntitiesBigQueryInsertRows(IDictionary<string, IEnumerable<PropertyInfo>> dictionaryOfProperties, IEnumerable<object> objs)
184+
{
152185

153-
if (objs.Count() > 0) {
186+
if (objs.Count() > 0)
187+
{
154188
return objs.Select(c => new BigQueryInsertRow() { GetDictionaryOfValues(dictionaryOfProperties, c) }).ToArray();
155189
}
156190

0 commit comments

Comments
 (0)