Skip to content

Commit 2c40e8e

Browse files
committed
cache the objects
1 parent 877cabc commit 2c40e8e

File tree

3 files changed

+101
-33
lines changed

3 files changed

+101
-33
lines changed

AzureFunctions.Extensions.GoogleBigQuery/BigQueryService.cs

Lines changed: 45 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,24 @@
77
using System.Threading.Tasks;
88
using System.Threading;
99

10-
namespace AzureFunctions.Extensions.GoogleBigQuery {
10+
namespace AzureFunctions.Extensions.GoogleBigQuery
11+
{
1112

12-
public class BigQueryService {
13+
public class BigQueryService
14+
{
1315

1416
private readonly GoogleBigQueryAttribute googleBigQueryAttribute;
1517
private readonly TableSchema tableSchema;
1618
private readonly IDictionary<string, IEnumerable<System.Reflection.PropertyInfo>> dictionaryOfProperties;
1719

18-
public BigQueryService(GoogleBigQueryAttribute googleBigQueryAttribute, Type itemType) {
20+
public BigQueryService(GoogleBigQueryAttribute googleBigQueryAttribute, Type itemType)
21+
{
1922
this.googleBigQueryAttribute = GoogleBigQueryAttribute.GetAttributeByConfiguration(googleBigQueryAttribute);
2023
(this.tableSchema, this.dictionaryOfProperties) = TableSchemaBuilderService.GetTableSchema(itemType);
2124
}
2225

23-
public Task CreateTable(bool timePartitioning, CancellationToken cancellationToken) {
26+
public Task CreateTable(bool timePartitioning, CancellationToken cancellationToken)
27+
{
2428

2529
BigQueryClient client = GetBiqQueryClient();
2630

@@ -34,7 +38,8 @@ public Task CreateTable(bool timePartitioning, CancellationToken cancellationTok
3438

3539
}
3640

37-
public Task DeleteTable(CancellationToken cancellationToken) {
41+
public Task DeleteTable(CancellationToken cancellationToken)
42+
{
3843

3944
BigQueryClient client = GetBiqQueryClient();
4045

@@ -46,31 +51,44 @@ public Task DeleteTable(CancellationToken cancellationToken) {
4651

4752
}
4853

49-
private Task<BigQueryTable> GetTable(DateTime date, CancellationToken cancellationToken) {
54+
private Task<BigQueryTable> GetTable(DateTime date, CancellationToken cancellationToken)
55+
{
5056
BigQueryClient client = GetBiqQueryClient();
5157

5258
return client.GetTableAsync(googleBigQueryAttribute.DatasetId, $"{googleBigQueryAttribute.TableId}${date:yyyyMMdd}", null, cancellationToken);
5359
}
5460

55-
private BigQueryClient GetBiqQueryClient() {
61+
private static BigQueryClient _client = null;
62+
63+
private BigQueryClient GetBiqQueryClient()
64+
{
65+
66+
if (_client != null) { return _client; }
67+
5668
GoogleCredential googleCredential = null;
57-
if (googleBigQueryAttribute.Credentials != null) {
69+
if (googleBigQueryAttribute.Credentials != null)
70+
{
5871
googleCredential = GoogleCredential.FromStream(new System.IO.MemoryStream(googleBigQueryAttribute.Credentials));
59-
} else {
60-
if (!string.IsNullOrWhiteSpace(googleBigQueryAttribute.CredentialsFileName)) {
72+
}
73+
else
74+
{
75+
if (!string.IsNullOrWhiteSpace(googleBigQueryAttribute.CredentialsFileName))
76+
{
6177
var path = System.IO.Path.GetDirectoryName(typeof(GoogleBigQueryAttribute).Assembly.Location);
6278
var fullPath = System.IO.Path.Combine(path, "..", googleBigQueryAttribute.CredentialsFileName);
6379
var credentials = System.IO.File.ReadAllBytes(fullPath);
6480
googleCredential = GoogleCredential.FromStream(new System.IO.MemoryStream(credentials));
6581
}
6682
}
67-
var client = BigQueryClient.Create(googleBigQueryAttribute.ProjectId, googleCredential);
68-
return client;
83+
_client = BigQueryClient.Create(googleBigQueryAttribute.ProjectId, googleCredential);
84+
return _client;
6985
}
7086

71-
public Task InsertRowsAsync(DateTime date, IEnumerable<GoogleBigQueryRow> rows, CancellationToken cancellationToken) {
87+
public Task InsertRowsAsync(DateTime date, IEnumerable<GoogleBigQueryRow> rows, CancellationToken cancellationToken)
88+
{
7289

73-
if (rows != null && rows.Count() > 0) {
90+
if (rows != null && rows.Count() > 0)
91+
{
7492
int dateDiff = (date - DateTime.UtcNow.Date).Days;
7593

7694
if (dateDiff >= -31 && dateDiff <= 16)
@@ -94,19 +112,22 @@ public Task InsertRowsAsync(DateTime date, IEnumerable<GoogleBigQueryRow> rows,
94112
}, cancellationToken).Unwrap();
95113

96114
}
97-
else {
115+
else
116+
{
117+
118+
//throw new Exception("BQ streaming API doesn't allow to write data 31 days to de past and 16 for tthe future in day partitioned tables.");
98119

99-
BigQueryClient client = GetBiqQueryClient();
120+
//BigQueryClient client = GetBiqQueryClient();
100121

101-
IEnumerable<string> lines = BigQueryInsertRowService.GetBigQueryJobLines(rows);
122+
//IEnumerable<string> lines = BigQueryInsertRowService.GetBigQueryJobLines(rows);
102123

103-
return client.UploadJsonAsync(
104-
googleBigQueryAttribute.DatasetId,
105-
$"{googleBigQueryAttribute.TableId}${date:yyyyMMdd}",
106-
tableSchema,
107-
lines,
108-
new UploadJsonOptions() { AllowUnknownFields = true },
109-
cancellationToken);
124+
//return client.UploadJsonAsync(
125+
// googleBigQueryAttribute.DatasetId,
126+
// $"{googleBigQueryAttribute.TableId}${date:yyyyMMdd}",
127+
// tableSchema,
128+
// lines,
129+
// new UploadJsonOptions() { AllowUnknownFields = true },
130+
// cancellationToken);
110131
}
111132
}
112133

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
using System;
2+
using System.Collections.Concurrent;
3+
4+
namespace AzureFunctions.Extensions.GoogleBigQuery
5+
{
6+
internal class BigQueryServiceCache
7+
{
8+
9+
private static ConcurrentDictionary<int, ExpiringBigQueryService> publisherClientCache = new ConcurrentDictionary<int, ExpiringBigQueryService>();
10+
11+
public static BigQueryService GetPublisherClient(GoogleBigQueryAttribute googleBigQueryAttribute, Type itemType)
12+
{
13+
var key = $"{googleBigQueryAttribute.GetHashCode()}-{itemType.GetType().FullName}".GetHashCode();
14+
15+
if (publisherClientCache.ContainsKey(key))
16+
{
17+
var expiringBigQueryService = publisherClientCache[key];
18+
if ((DateTime.UtcNow - expiringBigQueryService.CreatedUtc).TotalHours > 1) {
19+
var bigQueryService = new BigQueryService(googleBigQueryAttribute, itemType);
20+
var expiringBigQueryService1 = new ExpiringBigQueryService(DateTime.UtcNow, bigQueryService);
21+
publisherClientCache.AddOrUpdate(key, expiringBigQueryService1, (newkey, oldValue) => expiringBigQueryService1);
22+
23+
return bigQueryService;
24+
}
25+
26+
return expiringBigQueryService.BigQueryService;
27+
}
28+
else
29+
{
30+
var bigQueryService = new BigQueryService(googleBigQueryAttribute, itemType);
31+
var expiringBigQueryService = new ExpiringBigQueryService(DateTime.UtcNow, bigQueryService);
32+
publisherClientCache.AddOrUpdate(key, expiringBigQueryService, (newkey, oldValue) => expiringBigQueryService);
33+
34+
return bigQueryService;
35+
}
36+
37+
}
38+
39+
private class ExpiringBigQueryService
40+
{
41+
42+
public ExpiringBigQueryService(DateTime createdUtc, BigQueryService bigQueryService)
43+
{
44+
CreatedUtc = createdUtc;
45+
BigQueryService = bigQueryService;
46+
}
47+
48+
public DateTime CreatedUtc { get; }
49+
public BigQueryService BigQueryService { get; }
50+
51+
}
52+
53+
}
54+
}

AzureFunctions.Extensions.GoogleBigQuery/GoogleBigQueryExtensionConfig.cs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,17 +48,10 @@ Task IAsyncCollector<GoogleBigQueryRow>.FlushAsync(CancellationToken cancellatio
4848
var tasks = new List<Task>();
4949

5050
if (items.Count > 0) {
51-
52-
//byte[] credentials = null;
53-
//if (!string.IsNullOrWhiteSpace(googleBigQueryAttribute.CredentialsFileName)) {
54-
// var path = System.IO.Path.GetDirectoryName(typeof(GoogleBigQueryAttribute).Assembly.Location);
55-
// var fullPath = System.IO.Path.Combine(path, "..", googleBigQueryAttribute.CredentialsFileName);
56-
// credentials = System.IO.File.ReadAllBytes(fullPath);
57-
//}
58-
51+
5952
Type itemType = items.First().GetType();
6053

61-
var bqService = new BigQueryService(googleBigQueryAttribute, itemType);
54+
var bqService = BigQueryServiceCache.GetPublisherClient(googleBigQueryAttribute, itemType);
6255

6356
var groups = items.GroupBy(c => c.Date.Date);
6457
foreach (var group in groups) {

0 commit comments

Comments
 (0)