Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -476,22 +476,29 @@ private BigQueryResult queryRpc(
}

// Query finished running and we can paginate all the results
if (results.getJobComplete() && results.getSchema() != null) {
// Results should be read using the high throughput read API if sufficiently large.
boolean resultsLargeEnoughForReadApi =
connectionSettings.getUseReadAPI()
&& results.getTotalRows() != null
&& results.getTotalRows().longValue() > connectionSettings.getMinResultSize();
if (results.getJobComplete() && results.getSchema() != null && !resultsLargeEnoughForReadApi) {
return processQueryResponseResults(results);
} else {
// Query is long-running (> 10s) and hasn't completed yet, or query completed but didn't
// return the schema, fallback to jobs.insert path. Some operations don't return the schema
// and can be optimized here, but this is left as future work.
Long totalRows = results.getTotalRows() == null ? null : results.getTotalRows().longValue();
Long pageRows = results.getRows() == null ? null : (long) (results.getRows().size());
// Query is long-running (> 10s) and hasn't completed yet, query completed but didn't
// return the schema, or results are sufficiently large to use the high throughput read API,
// fallback to jobs.insert path. Some operations don't return the schema and can be optimized
// here, but this is left as future work.
JobId jobId = JobId.fromPb(results.getJobReference());
GetQueryResultsResponse firstPage = getQueryResultsFirstPage(jobId);
Long totalRows =
firstPage.getTotalRows() == null ? null : firstPage.getTotalRows().longValue();
Long pageRows = firstPage.getRows() == null ? null : (long) (firstPage.getRows().size());
logger.log(
Level.WARNING,
"\n"
+ String.format(
"results.getJobComplete(): %s, isSchemaNull: %s , totalRows: %s, pageRows: %s",
results.getJobComplete(), results.getSchema() == null, totalRows, pageRows));
JobId jobId = JobId.fromPb(results.getJobReference());
GetQueryResultsResponse firstPage = getQueryResultsFirstPage(jobId);
return getSubsequentQueryResultsWithJob(
totalRows, pageRows, jobId, firstPage, hasQueryParameters);
}
Expand Down Expand Up @@ -996,6 +1003,7 @@ BigQueryResult highThroughPutRead(
schema);

logger.log(Level.INFO, "\n Using BigQuery Read API");
stats.getQueryStatistics().setUseReadApi(true);
return new BigQueryResultImpl<BigQueryResultImpl.Row>(schema, totalRows, bufferRow, stats);

} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.auto.value.AutoValue;
import com.google.cloud.StringEnumType;
import com.google.cloud.StringEnumValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
import com.google.common.base.MoreObjects.ToStringHelper;
Expand Down Expand Up @@ -396,6 +397,7 @@ public static class QueryStatistics extends JobStatistics {
private final BiEngineStats biEngineStats;
private final Integer billingTier;
private final Boolean cacheHit;
private Boolean useReadApi;
private final String ddlOperationPerformed;
private final TableId ddlTargetTable;
private final RoutineId ddlTargetRoutine;
Expand Down Expand Up @@ -796,6 +798,7 @@ private QueryStatistics(Builder builder) {
this.biEngineStats = builder.biEngineStats;
this.billingTier = builder.billingTier;
this.cacheHit = builder.cacheHit;
this.useReadApi = false;
this.ddlOperationPerformed = builder.ddlOperationPerformed;
this.ddlTargetTable = builder.ddlTargetTable;
this.ddlTargetRoutine = builder.ddlTargetRoutine;
Expand Down Expand Up @@ -835,6 +838,18 @@ public Boolean getCacheHit() {
return cacheHit;
}

/** Returns whether the query result is read from the high throughput ReadAPI. */
@VisibleForTesting
public Boolean getUseReadApi() {
return useReadApi;
}

/** Sets internal state to reflect the use of the high throughput ReadAPI. */
@VisibleForTesting
public void setUseReadApi(Boolean useReadApi) {
this.useReadApi = useReadApi;
}

/** [BETA] For DDL queries, returns the operation applied to the DDL target table. */
public String getDdlOperationPerformed() {
return ddlOperationPerformed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3489,6 +3489,63 @@ public void testExecuteSelectDefaultConnectionSettings() throws SQLException {
String query = "SELECT corpus FROM `bigquery-public-data.samples.shakespeare` GROUP BY corpus;";
BigQueryResult bigQueryResult = connection.executeSelect(query);
assertEquals(42, bigQueryResult.getTotalRows());
assertFalse(bigQueryResult.getBigQueryResultStats().getQueryStatistics().getUseReadApi());
}

@Test
public void testExecuteSelectWithReadApi() throws SQLException {
final int rowLimit = 5000;
final String QUERY =
"SELECT * FROM bigquery-public-data.new_york_taxi_trips.tlc_yellow_trips_2017 LIMIT %s";
// Job timeout is somewhat arbitrary - just ensures that fast query is not used.
// min result size and page row count ratio ensure that the ReadAPI is used.
ConnectionSettings connectionSettingsReadAPIEnabledFastQueryDisabled =
ConnectionSettings.newBuilder()
.setUseReadAPI(true)
.setJobTimeoutMs(Long.MAX_VALUE)
.setMinResultSize(500)
.setTotalToPageRowCountRatio(1)
.build();

Connection connectionReadAPIEnabled =
bigquery.createConnection(connectionSettingsReadAPIEnabledFastQueryDisabled);

String selectQuery = String.format(QUERY, rowLimit);

BigQueryResult bigQueryResultSet = connectionReadAPIEnabled.executeSelect(selectQuery);
ResultSet rs = bigQueryResultSet.getResultSet();
// Paginate results to avoid an InterruptedException
while (rs.next()) {}

assertTrue(bigQueryResultSet.getBigQueryResultStats().getQueryStatistics().getUseReadApi());
connectionReadAPIEnabled.close();
}

@Test
public void testExecuteSelectWithFastQueryReadApi() throws SQLException {
final int rowLimit = 5000;
final String QUERY =
"SELECT * FROM bigquery-public-data.new_york_taxi_trips.tlc_yellow_trips_2017 LIMIT %s";
// min result size and page row count ratio ensure that the ReadAPI is used.
ConnectionSettings connectionSettingsReadAPIEnabledFastQueryDisabled =
ConnectionSettings.newBuilder()
.setUseReadAPI(true)
.setMinResultSize(500)
.setTotalToPageRowCountRatio(1)
.build();

Connection connectionReadAPIEnabled =
bigquery.createConnection(connectionSettingsReadAPIEnabledFastQueryDisabled);

String selectQuery = String.format(QUERY, rowLimit);

BigQueryResult bigQueryResultSet = connectionReadAPIEnabled.executeSelect(selectQuery);
ResultSet rs = bigQueryResultSet.getResultSet();
// Paginate results to avoid an InterruptedException
while (rs.next()) {}

assertTrue(bigQueryResultSet.getBigQueryResultStats().getQueryStatistics().getUseReadApi());
connectionReadAPIEnabled.close();
}

@Test
Expand Down Expand Up @@ -3522,6 +3579,7 @@ public void testExecuteSelectWithCredentials() throws SQLException {
+ TABLE_ID_LARGE.getTable(); // Large query result is needed to use BigQueryReadClient.
BigQueryResult bigQueryResult = connectionGoodCredentials.executeSelect(query);
assertEquals(313348, bigQueryResult.getTotalRows());
assertTrue(bigQueryResult.getBigQueryResultStats().getQueryStatistics().getUseReadApi());

// Scenario 2.
// Create a new bigQuery object but explicitly an invalid credential.
Expand Down
Loading