Skip to content

feat: sql fast path impl #509

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Sep 22, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
f6e93cc
feat: sql fast path impl
stephaniewang526 Jun 26, 2020
d8ab960
add logic for DML and DDL queries
stephaniewang526 Jul 7, 2020
780b836
update ITs to check table content correctness, update fastquery logic
stephaniewang526 Jul 10, 2020
7126437
add test for bogus query
stephaniewang526 Jul 13, 2020
08d6c7e
add check for idempotent requestId
stephaniewang526 Jul 13, 2020
31a55ce
update QueryRequestInfo and error handling logic
stephaniewang526 Jul 15, 2020
bcecbb0
add mock test for query JobException
stephaniewang526 Jul 16, 2020
81937fc
update mock test
stephaniewang526 Jul 16, 2020
b62b569
fix unit tests, nit update
stephaniewang526 Jul 16, 2020
7225101
update exception handling from JobException to BigQueryException
stephaniewang526 Jul 17, 2020
79bc75f
update based on comments
stephaniewang526 Aug 7, 2020
0fcb5b6
nit
stephaniewang526 Aug 7, 2020
2495fbb
update based on comments
stephaniewang526 Aug 7, 2020
7c2ae39
add maxResult support
stephaniewang526 Aug 21, 2020
293f3e6
Merge branch 'master' into sql-client
stephaniewang526 Aug 27, 2020
187c86e
Merge remote-tracking branch 'upstream/master' into sql-client
Sep 3, 2020
2862ad8
update code
Sep 3, 2020
d8f1229
add test coverage
stephaniewang526 Sep 3, 2020
f7d73c4
Merge remote-tracking branch 'origin/sql-client' into sql-client
stephaniewang526 Sep 3, 2020
fd9dcae
lint fix
stephaniewang526 Sep 3, 2020
0cdf672
feat: add more code cov
Sep 11, 2020
27d1a63
set method back
stephaniewang526 Sep 11, 2020
23c9008
Merge branch 'master' into sql-client
stephaniewang526 Sep 11, 2020
48397ad
feat: code cove
Sep 15, 2020
e161cf9
add codecov
stephaniewang526 Sep 16, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add logic for DML and DDL queries
enable requestId
add integration tests for fast path multipages query, DML, and DDL queries

fix requestId logic

update QueryRequestInfo and add mock test

add mock test cases for SQL, DML, and DDL
clean up code

fix IT

add schema test
  • Loading branch information
stephaniewang526 committed Jul 9, 2020
commit d8ab960cb196313c86d065883fed7f430a904755
Original file line number Diff line number Diff line change
Expand Up @@ -1187,8 +1187,9 @@ public TableResult query(QueryJobConfiguration configuration, JobOption... optio
private TableResult fastQuery(
final String projectId, final QueryRequest content, JobOption... options)
throws InterruptedException {
com.google.api.services.bigquery.model.QueryResponse results;
try {
com.google.api.services.bigquery.model.QueryResponse queryResponse =
results =
runWithRetries(
new Callable<com.google.api.services.bigquery.model.QueryResponse>() {
@Override
Expand All @@ -1199,26 +1200,37 @@ public com.google.api.services.bigquery.model.QueryResponse call() {
getOptions().getRetrySettings(),
EXCEPTION_HANDLER,
getOptions().getClock());

// Return result if there is only 1 page, otherwise use jobId returned from backend to return
// full results
if (queryResponse.getPageToken() == null) {
return new TableResult(
Schema.fromPb(queryResponse.getSchema()),
queryResponse.getTotalRows().longValue(),
new PageImpl<>(
new TableDataPageFetcher(null, getOptions(), null, optionMap(options)),
null,
transformTableData(queryResponse.getRows())));
} else {
String jobId = queryResponse.getJobReference().getJobId();
Job job = getJob(JobId.of(jobId));
job.waitFor();
return job.getQueryResults();
}
} catch (RetryHelper.RetryHelperException e) {
} catch (RetryHelperException e) {
throw BigQueryException.translateAndThrow(e);
}
Long numRows;
if (results.getNumDmlAffectedRows() == null && results.getTotalRows() == null) {
// DDL queries
numRows = 0L;
} else if (results.getNumDmlAffectedRows() != null) {
// DML queries
numRows = results.getNumDmlAffectedRows();
} else {
// SQL queries
numRows = results.getTotalRows().longValue();
}

// Return result if there is only 1 page, otherwise use jobId returned from backend to return
// full results
if (results.getPageToken() == null) {
return new TableResult(
Schema.fromPb(results.getSchema()),
numRows,
new PageImpl<>(
new TableDataPageFetcher(null, getOptions(), null, optionMap(options)),
null,
transformTableData(results.getRows())));
} else {
String jobId = results.getJobReference().getJobId();
Job job = getJob(JobId.of(jobId));
job.waitFor();
return job.getQueryResults();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,7 @@ ToStringHelper toStringHelper() {
.add("flattenResults", flattenResults)
.add("priority", priority)
.add("tableDefinitions", tableDefinitions)
.add("userQueryCache", useQueryCache)
.add("useQueryCache", useQueryCache)
.add("userDefinedFunctions", userDefinedFunctions)
.add("createDisposition", createDisposition)
.add("writeDisposition", writeDisposition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,40 @@

package com.google.cloud.bigquery;

import com.google.api.services.bigquery.model.JobConfiguration;
import com.google.api.services.bigquery.model.JobConfigurationQuery;
import com.google.api.services.bigquery.model.QueryParameter;
import com.google.api.services.bigquery.model.QueryRequest;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.Map;
import java.util.UUID;

final class QueryRequestInfo {

private static final String REQUEST_ID = UUID.randomUUID().toString();
private QueryJobConfiguration config;
private final List<ConnectionProperty> connectionProperties;
private final DatasetId defaultDataset;
private final Boolean dryRun;
private final Map<String, String> labels;
private final Long maximumBytesBilled;
private final String query;
private final List<QueryParameter> queryParameters;
private final Boolean useQueryCache;
private final Boolean useLegacySql;

QueryRequestInfo(QueryJobConfiguration config) {
this.config = config;
this.connectionProperties = config.getConnectionProperties();
this.defaultDataset = config.getDefaultDataset();
this.dryRun = config.dryRun();
this.labels = config.getLabels();
this.maximumBytesBilled = config.getMaximumBytesBilled();
this.query = config.getQuery();
this.queryParameters = config.toPb().getQuery().getQueryParameters();
this.useLegacySql = config.useLegacySql();
this.useQueryCache = config.useQueryCache();
}

boolean isFastQuerySupported() {
Expand All @@ -45,37 +68,73 @@ boolean isFastQuerySupported() {
}

QueryRequest toPb() {
QueryRequest query = new QueryRequest();
if (config.getConnectionProperties() != null) {
query.setConnectionProperties(
Lists.transform(config.getConnectionProperties(), ConnectionProperty.TO_PB_FUNCTION));
QueryRequest request = new QueryRequest();
if (connectionProperties != null) {
request.setConnectionProperties(
Lists.transform(connectionProperties, ConnectionProperty.TO_PB_FUNCTION));
}
if (config.getDefaultDataset() != null) {
query.setDefaultDataset(config.getDefaultDataset().toPb());
if (defaultDataset != null) {
request.setDefaultDataset(defaultDataset.toPb());
}
if (config.dryRun() != null) {
query.setDryRun(config.dryRun());
if (dryRun != null) {
request.setDryRun(dryRun);
}
if (config.getLabels() != null) {
query.setLabels(config.getLabels());
if (labels != null) {
request.setLabels(labels);
}
if (config.getMaximumBytesBilled() != null) {
query.setMaximumBytesBilled(config.getMaximumBytesBilled());
if (maximumBytesBilled != null) {
request.setMaximumBytesBilled(maximumBytesBilled);
}
query.setQuery(config.getQuery());
// TODO: add back when supported
// query.setRequestId(UUID.randomUUID().toString());
JobConfiguration jobConfiguration = config.toPb();
JobConfigurationQuery configurationQuery = jobConfiguration.getQuery();
if (configurationQuery.getQueryParameters() != null) {
query.setQueryParameters(configurationQuery.getQueryParameters());
request.setQuery(query);
request.setRequestId(REQUEST_ID);
if (queryParameters != null) {
request.setQueryParameters(queryParameters);
}
if (config.useLegacySql() != null) {
query.setUseLegacySql(config.useLegacySql());
if (useLegacySql != null) {
request.setUseLegacySql(useLegacySql);
}
if (config.useQueryCache() != null) {
query.setUseQueryCache(config.useQueryCache());
if (useQueryCache != null) {
request.setUseQueryCache(useQueryCache);
}
return query;
return request;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("connectionProperties", connectionProperties)
.add("defaultDataset", defaultDataset)
.add("dryRun", dryRun)
.add("labels", labels)
.add("maximumBytesBilled", maximumBytesBilled)
.add("query", query)
.add("requestId", REQUEST_ID)
.add("queryParameters", queryParameters)
.add("useQueryCache", useQueryCache)
.add("useLegacySql", useLegacySql)
.toString();
}

@Override
public int hashCode() {
return Objects.hashCode(
connectionProperties,
defaultDataset,
dryRun,
labels,
maximumBytesBilled,
query,
queryParameters,
REQUEST_ID,
useQueryCache,
useLegacySql);
}

@Override
public boolean equals(Object obj) {
return obj == this
|| obj != null
&& obj.getClass().equals(QueryRequestInfo.class)
&& java.util.Objects.equals(toPb(), ((QueryRequestInfo) obj).toPb());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.google.api.services.bigquery.model.ErrorProto;
import com.google.api.services.bigquery.model.GetQueryResultsResponse;
import com.google.api.services.bigquery.model.JobConfigurationQuery;
import com.google.api.services.bigquery.model.QueryRequest;
import com.google.api.services.bigquery.model.TableCell;
import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
Expand Down Expand Up @@ -212,6 +213,16 @@ public class BigQueryImplTest {
.setDefaultDataset(DatasetId.of(PROJECT, DATASET))
.setUseQueryCache(false)
.build();
private static final QueryJobConfiguration QUERY_JOB_CONFIGURATION_FOR_DMLQUERY =
QueryJobConfiguration.newBuilder("DML")
.setDefaultDataset(DatasetId.of(PROJECT, DATASET))
.setUseQueryCache(false)
.build();
private static final QueryJobConfiguration QUERY_JOB_CONFIGURATION_FOR_DDLQUERY =
QueryJobConfiguration.newBuilder("DDL")
.setDefaultDataset(DatasetId.of(PROJECT, DATASET))
.setUseQueryCache(false)
.build();
private static final JobInfo JOB_INFO =
JobInfo.newBuilder(QUERY_JOB_CONFIGURATION_FOR_QUERY)
.setJobId(JobId.of(PROJECT, JOB))
Expand Down Expand Up @@ -1815,12 +1826,10 @@ public void testQueryRequestCompleted() throws InterruptedException {

@Test
public void testFastQueryRequestCompleted() throws InterruptedException {
JobId queryJob = JobId.of(PROJECT, JOB);
com.google.api.services.bigquery.model.QueryResponse queryResponsePb =
new com.google.api.services.bigquery.model.QueryResponse()
.setCacheHit(false)
.setJobComplete(true)
.setJobReference(queryJob.toPb())
.setKind("bigquery#queryResponse")
.setPageToken(null)
.setRows(ImmutableList.of(TABLE_ROW))
Expand Down Expand Up @@ -2109,6 +2118,113 @@ public void testQueryDryRun() throws Exception {
}
}

@Test
public void testFastQuerySQLShouldRetry() throws Exception {
com.google.api.services.bigquery.model.QueryResponse responsePb =
new com.google.api.services.bigquery.model.QueryResponse()
.setCacheHit(false)
.setJobComplete(true)
.setRows(ImmutableList.of(TABLE_ROW))
.setPageToken(null)
.setTotalBytesProcessed(42L)
.setTotalRows(BigInteger.valueOf(1L))
.setSchema(TABLE_SCHEMA.toPb())
.setErrors(ImmutableList.of(new ErrorProto().setMessage("ErrorMessage")));

QueryRequestInfo requestInfo = new QueryRequestInfo(QUERY_JOB_CONFIGURATION_FOR_QUERY);
QueryRequest requestPb = requestInfo.toPb();

when(bigqueryRpcMock.fastQuery(PROJECT, requestPb))
.thenThrow(new BigQueryException(500, "InternalError"))
.thenThrow(new BigQueryException(502, "Bad Gateway"))
.thenThrow(new BigQueryException(503, "Service Unavailable"))
.thenThrow(new BigQueryException(504, "Gateway Timeout"))
.thenReturn(responsePb);

bigquery =
options
.toBuilder()
.setRetrySettings(ServiceOptions.getDefaultRetrySettings())
.build()
.getService();

TableResult response = bigquery.query(QUERY_JOB_CONFIGURATION_FOR_QUERY);
assertEquals(TABLE_SCHEMA, response.getSchema());
assertEquals(1, response.getTotalRows());
verify(bigqueryRpcMock, times(5)).fastQuery(PROJECT, requestPb);
}

@Test
public void testFastQueryDMLShouldRetry() throws Exception {
com.google.api.services.bigquery.model.QueryResponse responsePb =
new com.google.api.services.bigquery.model.QueryResponse()
.setCacheHit(false)
.setJobComplete(true)
.setRows(ImmutableList.of(TABLE_ROW))
.setPageToken(null)
.setTotalBytesProcessed(42L)
.setNumDmlAffectedRows(1L)
.setSchema(TABLE_SCHEMA.toPb())
.setErrors(ImmutableList.of(new ErrorProto().setMessage("ErrorMessage")));

QueryRequestInfo requestInfo = new QueryRequestInfo(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY);
QueryRequest requestPb = requestInfo.toPb();

when(bigqueryRpcMock.fastQuery(PROJECT, requestPb))
.thenThrow(new BigQueryException(500, "InternalError"))
.thenThrow(new BigQueryException(502, "Bad Gateway"))
.thenThrow(new BigQueryException(503, "Service Unavailable"))
.thenThrow(new BigQueryException(504, "Gateway Timeout"))
.thenReturn(responsePb);

bigquery =
options
.toBuilder()
.setRetrySettings(ServiceOptions.getDefaultRetrySettings())
.build()
.getService();

TableResult response = bigquery.query(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY);
assertEquals(TABLE_SCHEMA, response.getSchema());
assertEquals(1, response.getTotalRows());
verify(bigqueryRpcMock, times(5)).fastQuery(PROJECT, requestPb);
}

@Test
public void testFastQueryDDLShouldRetry() throws Exception {
com.google.api.services.bigquery.model.QueryResponse responsePb =
new com.google.api.services.bigquery.model.QueryResponse()
.setCacheHit(false)
.setJobComplete(true)
.setRows(ImmutableList.of(TABLE_ROW))
.setPageToken(null)
.setTotalBytesProcessed(42L)
.setSchema(TABLE_SCHEMA.toPb())
.setErrors(ImmutableList.of(new ErrorProto().setMessage("ErrorMessage")));

QueryRequestInfo requestInfo = new QueryRequestInfo(QUERY_JOB_CONFIGURATION_FOR_DDLQUERY);
QueryRequest requestPb = requestInfo.toPb();

when(bigqueryRpcMock.fastQuery(PROJECT, requestPb))
.thenThrow(new BigQueryException(500, "InternalError"))
.thenThrow(new BigQueryException(502, "Bad Gateway"))
.thenThrow(new BigQueryException(503, "Service Unavailable"))
.thenThrow(new BigQueryException(504, "Gateway Timeout"))
.thenReturn(responsePb);

bigquery =
options
.toBuilder()
.setRetrySettings(ServiceOptions.getDefaultRetrySettings())
.build()
.getService();

TableResult response = bigquery.query(QUERY_JOB_CONFIGURATION_FOR_DDLQUERY);
assertEquals(TABLE_SCHEMA, response.getSchema());
assertEquals(0, response.getTotalRows());
verify(bigqueryRpcMock, times(5)).fastQuery(PROJECT, requestPb);
}

@Test
public void testCreateRoutine() {
RoutineInfo routineInfo = ROUTINE_INFO.setProjectId(OTHER_PROJECT);
Expand Down
Loading