Skip to content

Commit fadc5cb

Browse files
committed
repository transfer
1 parent 1e7c923 commit fadc5cb

File tree

6 files changed

+306
-0
lines changed

6 files changed

+306
-0
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
2+
Microsoft Visual Studio Solution File, Format Version 12.00
3+
# Visual Studio 15
4+
VisualStudioVersion = 15.0.27130.2010
5+
MinimumVisualStudioVersion = 10.0.40219.1
6+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AzureFunctions.Extensions.GoogleBigQuery", "AzureFunctions.Extensions.GoogleBigQuery\AzureFunctions.Extensions.GoogleBigQuery.csproj", "{B781E2D4-4180-4BFA-96F3-EE4CB7E43E0D}"
7+
EndProject
8+
Global
9+
GlobalSection(SolutionConfigurationPlatforms) = preSolution
10+
Debug|Any CPU = Debug|Any CPU
11+
Release|Any CPU = Release|Any CPU
12+
EndGlobalSection
13+
GlobalSection(ProjectConfigurationPlatforms) = postSolution
14+
{B781E2D4-4180-4BFA-96F3-EE4CB7E43E0D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
15+
{B781E2D4-4180-4BFA-96F3-EE4CB7E43E0D}.Debug|Any CPU.Build.0 = Debug|Any CPU
16+
{B781E2D4-4180-4BFA-96F3-EE4CB7E43E0D}.Release|Any CPU.ActiveCfg = Release|Any CPU
17+
{B781E2D4-4180-4BFA-96F3-EE4CB7E43E0D}.Release|Any CPU.Build.0 = Release|Any CPU
18+
EndGlobalSection
19+
GlobalSection(SolutionProperties) = preSolution
20+
HideSolutionNode = FALSE
21+
EndGlobalSection
22+
GlobalSection(ExtensibilityGlobals) = postSolution
23+
SolutionGuid = {7FFAD7E7-052A-4BA0-9F59-7D4B913E0A39}
24+
EndGlobalSection
25+
EndGlobal
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFramework>netstandard2.0</TargetFramework>
5+
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
6+
<Version>0.0.9.3</Version>
7+
<Authors>Damiao Castro</Authors>
8+
<Company>fowet.com</Company>
9+
<Description>Extension for Google BigQuery to work with Azure Functions and Azure Webjobs.
10+
If this package was done by Microsoft itself would be under the namespace "Microsoft.Azure.WebJobs.Extensions.GoogleBigQuery"</Description>
11+
<PackageProjectUrl>https://github.com/DamiaoCastro/AzureFunctionsExtensionGoogleBigQuery/wiki</PackageProjectUrl>
12+
<PackageId>AzureFunctions.Extensions.GoogleBigQuery</PackageId>
13+
<Product>AzureFunctions.Extensions.GoogleBigQuery</Product>
14+
</PropertyGroup>
15+
16+
<ItemGroup>
17+
<Compile Remove="ValueProvider.cs" />
18+
</ItemGroup>
19+
20+
<ItemGroup>
21+
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions" Version="3.0.0-beta3" />
22+
<PackageReference Include="Google.Cloud.BigQuery.V2" Version="1.0.1" />
23+
</ItemGroup>
24+
25+
</Project>
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
using System.Collections.Generic;
2+
using System.Linq;
3+
using System;
4+
using Google.Cloud.BigQuery.V2;
5+
using Google.Apis.Auth.OAuth2;
6+
using Google.Apis.Bigquery.v2.Data;
7+
using System.Threading.Tasks;
8+
using System.Reflection;
9+
using System.ComponentModel.DataAnnotations;
10+
using System.ComponentModel.DataAnnotations.Schema;
11+
using System.Threading;
12+
13+
namespace AzureFunctions.Extensions.GoogleBigQuery {
14+
15+
internal class BigQueryService {
16+
17+
private const string BigQueryDateTimeFormat = "yyyy-MM-dd HH:mm:ss";
18+
private System.Globalization.CultureInfo cultureUS = new System.Globalization.CultureInfo("en-US");
19+
20+
private readonly byte[] credentials;
21+
private readonly string projectId;
22+
private readonly string datasetId;
23+
private readonly string tableId;
24+
private readonly TableSchema tableSchema;
25+
private readonly IEnumerable<PropertyInfo> properties;
26+
27+
public BigQueryService(byte[] credentials, string projectId, string datasetId, string tableId, Type itemType) {
28+
this.credentials = credentials;
29+
this.projectId = projectId;
30+
this.datasetId = datasetId;
31+
this.tableId = tableId;
32+
(this.tableSchema, this.properties) = GetTableSchema(itemType);
33+
}
34+
35+
private (TableSchema, IEnumerable<System.Reflection.PropertyInfo>) GetTableSchema(Type tableType) {
36+
37+
var properties = tableType.GetProperties()
38+
.Where(c => c.PropertyType.IsPublic && c.CustomAttributes.Any(a => a.AttributeType == typeof(ColumnAttribute)));
39+
40+
var fields = from property in properties
41+
let type = GetBigQueryType(property.PropertyType)
42+
let mode = property.CustomAttributes.Any(a => a.AttributeType == typeof(RequiredAttribute)) ? "REQUIRED" : "NULLABLE"
43+
select new TableFieldSchema() { Name = property.Name, Type = type, Mode = mode }
44+
;
45+
46+
var schema = new Google.Apis.Bigquery.v2.Data.TableSchema() { Fields = fields.ToList() };
47+
48+
return (schema, properties);
49+
}
50+
51+
private string GetBigQueryType(Type propertyType) {
52+
53+
//STRING
54+
//BYTES
55+
//INTEGER
56+
//FLOAT
57+
//BOOLEAN
58+
//TIMESTAMP
59+
//DATE
60+
//TIME
61+
//DATETIME
62+
//RECORD
63+
64+
if (propertyType.Name.Equals("Nullable`1")) {
65+
propertyType = propertyType.GenericTypeArguments[0];
66+
}
67+
68+
switch (propertyType.Name.ToUpper()) {
69+
case "INT":
70+
case "INT32":
71+
case "INT64":
72+
return BigQueryDbType.Int64.ToString();
73+
default:
74+
return propertyType.Name.ToUpper();
75+
}
76+
}
77+
78+
private Task<BigQueryTable> GetTable(DateTime date, CancellationToken cancellationToken) {
79+
80+
GoogleCredential googleCredential = GoogleCredential.FromStream(new System.IO.MemoryStream(credentials));
81+
var client = Google.Cloud.BigQuery.V2.BigQueryClient.Create(projectId, googleCredential);
82+
83+
return client.GetOrCreateTableAsync(
84+
datasetId,
85+
$"{tableId}${date.ToString("yyyyMMdd")}",
86+
tableSchema,
87+
new GetTableOptions(),
88+
new CreateTableOptions(),
89+
cancellationToken);
90+
}
91+
92+
public Task InsertRowsAsync(DateTime date, IEnumerable<GoogleBigQueryRow> rows, CancellationToken cancellationToken) {
93+
94+
if (rows != null && rows.Count() > 0) {
95+
int dateDiff = (date - DateTime.UtcNow.Date).Days;
96+
97+
if (dateDiff >= -31 && dateDiff <= 16) {
98+
99+
var bigQueryRows = from r in rows
100+
let dic = properties.ToDictionary(c => c.Name, c => GetBigQueryValue(c, r))
101+
select new BigQueryInsertRow(r.InsertId) { dic };
102+
103+
return GetTable(date, cancellationToken)
104+
.ContinueWith((tableTask) => {
105+
var table = tableTask.Result;
106+
return table.InsertRowsAsync(bigQueryRows, new InsertOptions() { AllowUnknownFields = true }, cancellationToken);
107+
}, cancellationToken).Unwrap();
108+
109+
}
110+
}
111+
112+
return Task.WhenAll();
113+
}
114+
115+
private object GetBigQueryValue(PropertyInfo property, GoogleBigQueryRow row) {
116+
switch (property.PropertyType.Name.ToUpper()) {
117+
case "DATETIME":
118+
var value = (DateTime)property.GetValue(row);
119+
return value.ToString(BigQueryDateTimeFormat, cultureUS);
120+
default:
121+
return property.GetValue(row);
122+
}
123+
}
124+
125+
}
126+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
using Microsoft.Azure.WebJobs.Description;
2+
using System;
3+
4+
namespace AzureFunctions.Extensions.GoogleBigQuery {
5+
6+
[Binding]
7+
[AttributeUsage(AttributeTargets.ReturnValue | AttributeTargets.Parameter)]
8+
public class GoogleBigQueryAttribute : Attribute {
9+
10+
/// <summary>
11+
/// Allows to write rows in Google BigQuery
12+
/// </summary>
13+
/// <param name="credentialsFileName">json file name that is expected to be in the bin folder.
14+
/// Don't forget to set the property of that file to "copy always" to the output directory.
15+
/// ex: "mycredentials.json"</param>
16+
/// <param name="projectId">project id ( differs from project name )</param>
17+
/// <param name="datasetId">dataset id</param>
18+
/// <param name="tableId">table id</param>
19+
public GoogleBigQueryAttribute(string credentialsFileName, string projectId, string datasetId, string tableId) {
20+
if (string.IsNullOrWhiteSpace(credentialsFileName)) { throw new ArgumentNullException(nameof(credentialsFileName)); }
21+
if (string.IsNullOrWhiteSpace(projectId)) { throw new ArgumentNullException(nameof(projectId)); }
22+
if (string.IsNullOrWhiteSpace(datasetId)) { throw new ArgumentNullException(nameof(datasetId)); }
23+
if (string.IsNullOrWhiteSpace(tableId)) { throw new ArgumentNullException(nameof(tableId)); }
24+
25+
CredentialsFileName = credentialsFileName;
26+
ProjectId = projectId;
27+
DatasetId = datasetId;
28+
TableId = tableId;
29+
}
30+
31+
public string CredentialsFileName { get; }
32+
public string ProjectId { get; }
33+
public string DatasetId { get; }
34+
public string TableId { get; }
35+
36+
}
37+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
using Microsoft.Azure.WebJobs.Host.Config;
2+
using System;
3+
using System.Collections.Generic;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using System.Linq;
7+
using Microsoft.Azure.WebJobs;
8+
9+
namespace AzureFunctions.Extensions.GoogleBigQuery {
10+
11+
public class GoogleBigQueryExtensionConfig : IExtensionConfigProvider {
12+
13+
void IExtensionConfigProvider.Initialize(ExtensionConfigContext context) {
14+
if (context == null) { throw new ArgumentNullException(nameof(context)); }
15+
16+
context.AddBindingRule<GoogleBigQueryAttribute>()
17+
.BindToCollector(c => new AsyncCollector(c));
18+
}
19+
20+
public class AsyncCollector : ICollector<GoogleBigQueryRow>, IAsyncCollector<GoogleBigQueryRow> {
21+
22+
private GoogleBigQueryAttribute googleBigQueryAttribute;
23+
private List<GoogleBigQueryRow> items = new List<GoogleBigQueryRow>();
24+
25+
public AsyncCollector(GoogleBigQueryAttribute googleBigQueryAttribute) {
26+
this.googleBigQueryAttribute = googleBigQueryAttribute;
27+
}
28+
29+
void ICollector<GoogleBigQueryRow>.Add(GoogleBigQueryRow item) {
30+
if (item == null) {
31+
throw new ArgumentNullException(nameof(item));
32+
}
33+
34+
items.Add(item);
35+
}
36+
37+
Task IAsyncCollector<GoogleBigQueryRow>.AddAsync(GoogleBigQueryRow item, CancellationToken cancellationToken) {
38+
if (item == null) {
39+
throw new ArgumentNullException(nameof(item));
40+
}
41+
42+
items.Add(item);
43+
return Task.WhenAll();
44+
}
45+
46+
Task IAsyncCollector<GoogleBigQueryRow>.FlushAsync(CancellationToken cancellationToken) {
47+
48+
var tasks = new List<Task>();
49+
50+
if (items.Count > 0) {
51+
var path = System.IO.Path.GetDirectoryName(typeof(GoogleBigQueryAttribute).Assembly.Location);
52+
var fullPath = System.IO.Path.Combine(path, "..", googleBigQueryAttribute.CredentialsFileName);
53+
var credentials = System.IO.File.ReadAllBytes(fullPath);
54+
55+
Type itemType = items.First().GetType();
56+
57+
var bqService =
58+
new BigQueryService(credentials,
59+
googleBigQueryAttribute.ProjectId,
60+
googleBigQueryAttribute.DatasetId,
61+
googleBigQueryAttribute.TableId,
62+
itemType);
63+
64+
var groups = items.GroupBy(c => c.Date.Date);
65+
foreach (var group in groups) {
66+
tasks.Add(bqService.InsertRowsAsync(group.Key, group, cancellationToken));
67+
}
68+
}
69+
70+
return Task.WhenAll(tasks);
71+
}
72+
}
73+
74+
}
75+
76+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
using System;
2+
3+
namespace AzureFunctions.Extensions.GoogleBigQuery {
4+
public class GoogleBigQueryRow {
5+
6+
public GoogleBigQueryRow(DateTime date, string insertId) {
7+
if (string.IsNullOrWhiteSpace(insertId)) { throw new ArgumentNullException(nameof(insertId)); }
8+
9+
Date = date;
10+
InsertId = insertId;
11+
}
12+
13+
public DateTime Date { get; }
14+
public string InsertId { get; }
15+
16+
}
17+
}

0 commit comments

Comments
 (0)