Skip to content

Commit 2fcddaa

Browse files
davidheryantofeast-ci-bot
authored andcommitted
Always set destination table in BigQuery query config in Feast Batch Serving so it can handle large results (feast-dev#392)
* Update BQ query config to always set destination table, so that it can work with large results Refer to: https://cloud.google.com/bigquery/quotas#query_jobs, maximum reponse-size bullet point. * Replace prefix for temp table name * Set expiry on entity rows table * Include exception message in error description GRPC client such as Feast Python SDK will usually not show error cause only error description * Code cleanup * Update batch-retrieval e2e test. Output rows may not have the same order as requested entity rows
1 parent 5801e58 commit 2fcddaa

File tree

4 files changed

+80
-20
lines changed

4 files changed

+80
-20
lines changed

serving/src/main/java/feast/serving/service/BigQueryServingService.java

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import com.google.cloud.bigquery.Schema;
3333
import com.google.cloud.bigquery.Table;
3434
import com.google.cloud.bigquery.TableId;
35+
import com.google.cloud.bigquery.TableInfo;
3536
import com.google.cloud.storage.Storage;
3637
import feast.core.FeatureSetProto.FeatureSetSpec;
3738
import feast.serving.ServingAPIProto;
@@ -56,10 +57,13 @@
5657
import java.util.Optional;
5758
import java.util.UUID;
5859
import java.util.stream.Collectors;
60+
import org.joda.time.Duration;
5961
import org.slf4j.Logger;
6062

6163
public class BigQueryServingService implements ServingService {
6264

65+
// Default no of millis for which a temporary table should exist before it is deleted in BigQuery.
66+
public static final long TEMP_TABLE_EXPIRY_DURATION_MS = Duration.standardDays(1).getMillis();
6367
private static final Logger log = org.slf4j.LoggerFactory.getLogger(BigQueryServingService.class);
6468

6569
private final BigQuery bigquery;
@@ -182,22 +186,29 @@ private Table loadEntities(DatasetSource datasetSource) {
182186
switch (datasetSource.getDatasetSourceCase()) {
183187
case FILE_SOURCE:
184188
try {
185-
String tableName = generateTemporaryTableName();
186-
log.info("Loading entity dataset to table {}.{}.{}", projectId, datasetId, tableName);
187-
TableId tableId = TableId.of(projectId, datasetId, tableName);
188-
// Currently only avro supported
189+
// Currently only AVRO format is supported
189190
if (datasetSource.getFileSource().getDataFormat() != DataFormat.DATA_FORMAT_AVRO) {
190191
throw Status.INVALID_ARGUMENT
191-
.withDescription("Invalid file format, only avro supported")
192+
.withDescription("Invalid file format, only AVRO is supported.")
192193
.asRuntimeException();
193194
}
195+
196+
TableId tableId = TableId.of(projectId, datasetId, createTempTableName());
197+
log.info("Loading entity rows to: {}.{}.{}", projectId, datasetId, tableId.getTable());
194198
LoadJobConfiguration loadJobConfiguration =
195199
LoadJobConfiguration.of(
196200
tableId, datasetSource.getFileSource().getFileUrisList(), FormatOptions.avro());
197201
loadJobConfiguration =
198202
loadJobConfiguration.toBuilder().setUseAvroLogicalTypes(true).build();
199203
Job job = bigquery.create(JobInfo.of(loadJobConfiguration));
200204
job.waitFor();
205+
TableInfo expiry =
206+
bigquery
207+
.getTable(tableId)
208+
.toBuilder()
209+
.setExpirationTime(System.currentTimeMillis() + TEMP_TABLE_EXPIRY_DURATION_MS)
210+
.build();
211+
bigquery.update(expiry);
201212
loadedEntityTable = bigquery.getTable(tableId);
202213
if (!loadedEntityTable.exists()) {
203214
throw new RuntimeException(
@@ -207,7 +218,7 @@ private Table loadEntities(DatasetSource datasetSource) {
207218
} catch (Exception e) {
208219
log.error("Exception has occurred in loadEntities method: ", e);
209220
throw Status.INTERNAL
210-
.withDescription("Failed to load entity dataset into store")
221+
.withDescription("Failed to load entity dataset into store: " + e.toString())
211222
.withCause(e)
212223
.asRuntimeException();
213224
}
@@ -219,20 +230,23 @@ private Table loadEntities(DatasetSource datasetSource) {
219230
}
220231
}
221232

222-
private String generateTemporaryTableName() {
223-
String source = String.format("feastserving%d", System.currentTimeMillis());
224-
String guid = UUID.nameUUIDFromBytes(source.getBytes()).toString();
225-
String suffix = guid.substring(0, Math.min(guid.length(), 10)).replaceAll("-", "");
226-
return String.format("temp_%s", suffix);
227-
}
228-
229233
private TableId generateUUIDs(Table loadedEntityTable) {
230234
try {
231235
String uuidQuery =
232236
createEntityTableUUIDQuery(generateFullTableName(loadedEntityTable.getTableId()));
233-
QueryJobConfiguration queryJobConfig = QueryJobConfiguration.newBuilder(uuidQuery).build();
237+
QueryJobConfiguration queryJobConfig =
238+
QueryJobConfiguration.newBuilder(uuidQuery)
239+
.setDestinationTable(TableId.of(projectId, datasetId, createTempTableName()))
240+
.build();
234241
Job queryJob = bigquery.create(JobInfo.of(queryJobConfig));
235242
queryJob.waitFor();
243+
TableInfo expiry =
244+
bigquery
245+
.getTable(queryJobConfig.getDestinationTable())
246+
.toBuilder()
247+
.setExpirationTime(System.currentTimeMillis() + TEMP_TABLE_EXPIRY_DURATION_MS)
248+
.build();
249+
bigquery.update(expiry);
236250
queryJobConfig = queryJob.getConfiguration();
237251
return queryJobConfig.getDestinationTable();
238252
} catch (InterruptedException | BigQueryException e) {
@@ -242,4 +256,8 @@ private TableId generateUUIDs(Table loadedEntityTable) {
242256
.asRuntimeException();
243257
}
244258
}
259+
260+
public static String createTempTableName() {
261+
return "_" + UUID.randomUUID().toString().replace("-", "");
262+
}
245263
}

serving/src/main/java/feast/serving/store/bigquery/BatchRetrievalQueryRunnable.java

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*/
1717
package feast.serving.store.bigquery;
1818

19+
import static feast.serving.service.BigQueryServingService.TEMP_TABLE_EXPIRY_DURATION_MS;
20+
import static feast.serving.service.BigQueryServingService.createTempTableName;
1921
import static feast.serving.store.bigquery.QueryTemplater.createTimestampLimitQuery;
2022

2123
import com.google.auto.value.AutoValue;
@@ -27,6 +29,8 @@
2729
import com.google.cloud.bigquery.Job;
2830
import com.google.cloud.bigquery.JobInfo;
2931
import com.google.cloud.bigquery.QueryJobConfiguration;
32+
import com.google.cloud.bigquery.TableId;
33+
import com.google.cloud.bigquery.TableInfo;
3034
import com.google.cloud.bigquery.TableResult;
3135
import com.google.cloud.storage.Blob;
3236
import com.google.cloud.storage.Storage;
@@ -175,23 +179,26 @@ Job runBatchQuery(List<String> featureSetQueries)
175179
ExecutorCompletionService<FeatureSetInfo> executorCompletionService =
176180
new ExecutorCompletionService<>(executorService);
177181

178-
179182
List<FeatureSetInfo> featureSetInfos = new ArrayList<>();
180183

181184
for (int i = 0; i < featureSetQueries.size(); i++) {
182185
QueryJobConfiguration queryJobConfig =
183-
QueryJobConfiguration.newBuilder(featureSetQueries.get(i)).build();
186+
QueryJobConfiguration.newBuilder(featureSetQueries.get(i))
187+
.setDestinationTable(TableId.of(projectId(), datasetId(), createTempTableName()))
188+
.build();
184189
Job subqueryJob = bigquery().create(JobInfo.of(queryJobConfig));
185190
executorCompletionService.submit(
186191
SubqueryCallable.builder()
192+
.setBigquery(bigquery())
187193
.setFeatureSetInfo(featureSetInfos().get(i))
188194
.setSubqueryJob(subqueryJob)
189195
.build());
190196
}
191197

192198
for (int i = 0; i < featureSetQueries.size(); i++) {
193199
try {
194-
FeatureSetInfo featureSetInfo = executorCompletionService.take().get(SUBQUERY_TIMEOUT_SECS, TimeUnit.SECONDS);
200+
FeatureSetInfo featureSetInfo =
201+
executorCompletionService.take().get(SUBQUERY_TIMEOUT_SECS, TimeUnit.SECONDS);
195202
featureSetInfos.add(featureSetInfo);
196203
} catch (InterruptedException | ExecutionException | TimeoutException e) {
197204
jobService()
@@ -214,9 +221,20 @@ Job runBatchQuery(List<String> featureSetQueries)
214221
String joinQuery =
215222
QueryTemplater.createJoinQuery(
216223
featureSetInfos, entityTableColumnNames(), entityTableName());
217-
QueryJobConfiguration queryJobConfig = QueryJobConfiguration.newBuilder(joinQuery).build();
224+
QueryJobConfiguration queryJobConfig =
225+
QueryJobConfiguration.newBuilder(joinQuery)
226+
.setDestinationTable(TableId.of(projectId(), datasetId(), createTempTableName()))
227+
.build();
218228
queryJob = bigquery().create(JobInfo.of(queryJobConfig));
219229
queryJob.waitFor();
230+
TableInfo expiry =
231+
bigquery()
232+
.getTable(queryJobConfig.getDestinationTable())
233+
.toBuilder()
234+
.setExpirationTime(
235+
System.currentTimeMillis() + TEMP_TABLE_EXPIRY_DURATION_MS)
236+
.build();
237+
bigquery().update(expiry);
220238

221239
return queryJob;
222240
}
@@ -248,10 +266,19 @@ private FieldValueList getTimestampLimits(String entityTableName) {
248266
QueryJobConfiguration getTimestampLimitsQuery =
249267
QueryJobConfiguration.newBuilder(createTimestampLimitQuery(entityTableName))
250268
.setDefaultDataset(DatasetId.of(projectId(), datasetId()))
269+
.setDestinationTable(TableId.of(projectId(), datasetId(), createTempTableName()))
251270
.build();
252271
try {
253272
Job job = bigquery().create(JobInfo.of(getTimestampLimitsQuery));
254273
TableResult getTimestampLimitsQueryResult = job.waitFor().getQueryResults();
274+
TableInfo expiry =
275+
bigquery()
276+
.getTable(getTimestampLimitsQuery.getDestinationTable())
277+
.toBuilder()
278+
.setExpirationTime(System.currentTimeMillis() + TEMP_TABLE_EXPIRY_DURATION_MS)
279+
.build();
280+
bigquery().update(expiry);
281+
255282
FieldValueList result = null;
256283
for (FieldValueList fields : getTimestampLimitsQueryResult.getValues()) {
257284
result = fields;

serving/src/main/java/feast/serving/store/bigquery/SubqueryCallable.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,16 @@
1616
*/
1717
package feast.serving.store.bigquery;
1818

19+
import static feast.serving.service.BigQueryServingService.TEMP_TABLE_EXPIRY_DURATION_MS;
1920
import static feast.serving.store.bigquery.QueryTemplater.generateFullTableName;
2021

2122
import com.google.auto.value.AutoValue;
23+
import com.google.cloud.bigquery.BigQuery;
2224
import com.google.cloud.bigquery.BigQueryException;
2325
import com.google.cloud.bigquery.Job;
2426
import com.google.cloud.bigquery.QueryJobConfiguration;
2527
import com.google.cloud.bigquery.TableId;
28+
import com.google.cloud.bigquery.TableInfo;
2629
import feast.serving.store.bigquery.model.FeatureSetInfo;
2730
import java.util.concurrent.Callable;
2831

@@ -33,6 +36,8 @@
3336
@AutoValue
3437
public abstract class SubqueryCallable implements Callable<FeatureSetInfo> {
3538

39+
public abstract BigQuery bigquery();
40+
3641
public abstract FeatureSetInfo featureSetInfo();
3742

3843
public abstract Job subqueryJob();
@@ -44,6 +49,8 @@ public static Builder builder() {
4449
@AutoValue.Builder
4550
public abstract static class Builder {
4651

52+
public abstract Builder setBigquery(BigQuery bigquery);
53+
4754
public abstract Builder setFeatureSetInfo(FeatureSetInfo featureSetInfo);
4855

4956
public abstract Builder setSubqueryJob(Job subqueryJob);
@@ -57,6 +64,13 @@ public FeatureSetInfo call() throws BigQueryException, InterruptedException {
5764
subqueryJob().waitFor();
5865
subqueryConfig = subqueryJob().getConfiguration();
5966
TableId destinationTable = subqueryConfig.getDestinationTable();
67+
TableInfo expiry =
68+
bigquery()
69+
.getTable(destinationTable)
70+
.toBuilder()
71+
.setExpirationTime(System.currentTimeMillis() + TEMP_TABLE_EXPIRY_DURATION_MS)
72+
.build();
73+
bigquery().update(expiry);
6074
String fullTablePath = generateFullTableName(destinationTable);
6175

6276
return new FeatureSetInfo(featureSetInfo(), fullTablePath);

tests/e2e/bq-batch-retrieval.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from feast.type_map import ValueType
1515
from google.protobuf.duration_pb2 import Duration
1616

17+
pd.set_option('display.max_columns', None)
1718

1819
@pytest.fixture(scope="module")
1920
def core_url(pytestconfig):
@@ -112,8 +113,8 @@ def test_additional_columns_in_entity_table(client):
112113
feature_retrieval_job = client.get_batch_features(
113114
entity_rows=entity_df, feature_ids=["additional_columns:1:feature_value"]
114115
)
115-
output = feature_retrieval_job.to_dataframe()
116-
print(output.head())
116+
output = feature_retrieval_job.to_dataframe().sort_values(by=["entity_id"])
117+
print(output.head(10))
117118

118119
assert np.allclose(output["additional_float_col"], entity_df["additional_float_col"])
119120
assert output["additional_string_col"].to_list() == entity_df["additional_string_col"].to_list()

0 commit comments

Comments
 (0)