Skip to content

feat: sql fast path impl #509

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Sep 22, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
f6e93cc
feat: sql fast path impl
stephaniewang526 Jun 26, 2020
d8ab960
add logic for DML and DDL queries
stephaniewang526 Jul 7, 2020
780b836
update ITs to check table content correctness, update fastquery logic
stephaniewang526 Jul 10, 2020
7126437
add test for bogus query
stephaniewang526 Jul 13, 2020
08d6c7e
add check for idempotent requestId
stephaniewang526 Jul 13, 2020
31a55ce
update QueryRequestInfo and error handling logic
stephaniewang526 Jul 15, 2020
bcecbb0
add mock test for query JobException
stephaniewang526 Jul 16, 2020
81937fc
update mock test
stephaniewang526 Jul 16, 2020
b62b569
fix unit tests, nit update
stephaniewang526 Jul 16, 2020
7225101
update exception handling from JobException to BigQueryException
stephaniewang526 Jul 17, 2020
79bc75f
update based on comments
stephaniewang526 Aug 7, 2020
0fcb5b6
nit
stephaniewang526 Aug 7, 2020
2495fbb
update based on comments
stephaniewang526 Aug 7, 2020
7c2ae39
add maxResult support
stephaniewang526 Aug 21, 2020
293f3e6
Merge branch 'master' into sql-client
stephaniewang526 Aug 27, 2020
187c86e
Merge remote-tracking branch 'upstream/master' into sql-client
Sep 3, 2020
2862ad8
update code
Sep 3, 2020
d8f1229
add test coverage
stephaniewang526 Sep 3, 2020
f7d73c4
Merge remote-tracking branch 'origin/sql-client' into sql-client
stephaniewang526 Sep 3, 2020
fd9dcae
lint fix
stephaniewang526 Sep 3, 2020
0cdf672
feat: add more code cov
Sep 11, 2020
27d1a63
set method back
stephaniewang526 Sep 11, 2020
23c9008
Merge branch 'master' into sql-client
stephaniewang526 Sep 11, 2020
48397ad
feat: code cove
Sep 15, 2020
e161cf9
add codecov
stephaniewang526 Sep 16, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
add codecov
  • Loading branch information
stephaniewang526 committed Sep 16, 2020
commit e161cf91c22567fc28b35be30d1c8acea1ebd7b3
Original file line number Diff line number Diff line change
Expand Up @@ -200,19 +200,17 @@ public Page<FieldValueList> getNextPage() {
}
}

private class QueryPageFetcher implements NextPageFetcher<FieldValueList> {
private class QueryPageFetcher extends Thread implements NextPageFetcher<FieldValueList> {

private static final long serialVersionUID = -8501991114794410114L;
private final Map<BigQueryRpc.Option, ?> requestOptions;
private final BigQueryOptions serviceOptions;
private final Job job;
private final boolean jobStatus;
private Job job;
private final TableId table;
private final Schema schema;

QueryPageFetcher(
JobId jobId,
boolean jobStatus,
Schema schema,
BigQueryOptions serviceOptions,
String cursor,
Expand All @@ -221,19 +219,19 @@ private class QueryPageFetcher implements NextPageFetcher<FieldValueList> {
PageImpl.nextRequestOptions(BigQueryRpc.Option.PAGE_TOKEN, cursor, optionMap);
this.serviceOptions = serviceOptions;
this.job = getJob(jobId);
this.jobStatus = jobStatus;
this.table = ((QueryJobConfiguration) job.getConfiguration()).getDestinationTable();
this.schema = schema;
}

@Override
public Page<FieldValueList> getNextPage() {
try {
if (!jobStatus) {
job.waitFor();
while (!JobStatus.State.DONE.equals(job.getStatus().getState())) {
try {
sleep(5000);
} catch (InterruptedException ex) {
throw new RuntimeException(ex.getMessage());
}
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
job = job.reload();
}
return listTableData(table, schema, serviceOptions, requestOptions).x();
}
Expand Down Expand Up @@ -1277,15 +1275,13 @@ public com.google.api.services.bigquery.model.QueryResponse call() {

if (results.getPageToken() != null) {
JobId jobId = JobId.fromPb(results.getJobReference());
boolean jobStatus = results.getJobComplete();
String cursor = results.getPageToken();
return new TableResult(
schema,
numRows,
new PageImpl<>(
// fetch next pages of results
new QueryPageFetcher(
jobId, jobStatus, schema, getOptions(), cursor, optionMap(options)),
new QueryPageFetcher(jobId, schema, getOptions(), cursor, optionMap(options)),
cursor,
// cache first page of result
transformTableData(results.getRows(), schema)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1879,6 +1879,17 @@ public void testFastQueryMultiplePages() throws InterruptedException {
.setStatus(new com.google.api.services.bigquery.model.JobStatus().setState("DONE"));
responseJob.getConfiguration().getQuery().setDestinationTable(TABLE_ID.toPb());
when(bigqueryRpcMock.getJob(PROJECT, JOB, null, EMPTY_RPC_OPTIONS)).thenReturn(responseJob);
when(bigqueryRpcMock.listTableData(
PROJECT,
DATASET,
TABLE,
BigQueryImpl.optionMap(BigQuery.TableDataListOption.pageToken(CURSOR))))
.thenReturn(
new TableDataList()
.setPageToken(CURSOR)
.setRows(ImmutableList.of(TABLE_ROW))
.setTotalRows(1L));

com.google.api.services.bigquery.model.QueryResponse queryResponsePb =
new com.google.api.services.bigquery.model.QueryResponse()
.setCacheHit(false)
Expand All @@ -1898,7 +1909,14 @@ public void testFastQueryMultiplePages() throws InterruptedException {
TableResult result = bigquery.query(QUERY_JOB_CONFIGURATION_FOR_QUERY);
assertTrue(result.hasNextPage());
assertNotNull(result.getNextPageToken());
assertNotNull(result.getNextPage());
verify(bigqueryRpcMock).getJob(PROJECT, JOB, null, EMPTY_RPC_OPTIONS);
verify(bigqueryRpcMock)
.listTableData(
PROJECT,
DATASET,
TABLE,
BigQueryImpl.optionMap(BigQuery.TableDataListOption.pageToken(CURSOR)));
verify(bigqueryRpcMock).queryRpc(PROJECT, requestInfo.toPb());
}

Expand Down