diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java index 1f08bf4eb..728a1fdae 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java @@ -476,22 +476,29 @@ private BigQueryResult queryRpc( } // Query finished running and we can paginate all the results - if (results.getJobComplete() && results.getSchema() != null) { + // Results should be read using the high throughput read API if sufficiently large. + boolean resultsLargeEnoughForReadApi = + connectionSettings.getUseReadAPI() + && results.getTotalRows() != null + && results.getTotalRows().longValue() > connectionSettings.getMinResultSize(); + if (results.getJobComplete() && results.getSchema() != null && !resultsLargeEnoughForReadApi) { return processQueryResponseResults(results); } else { - // Query is long-running (> 10s) and hasn't completed yet, or query completed but didn't - // return the schema, fallback to jobs.insert path. Some operations don't return the schema - // and can be optimized here, but this is left as future work. - Long totalRows = results.getTotalRows() == null ? null : results.getTotalRows().longValue(); - Long pageRows = results.getRows() == null ? null : (long) (results.getRows().size()); + // Query is long-running (> 10s) and hasn't completed yet, query completed but didn't + // return the schema, or results are sufficiently large to use the high throughput read API, + // fallback to jobs.insert path. Some operations don't return the schema and can be optimized + // here, but this is left as future work. + JobId jobId = JobId.fromPb(results.getJobReference()); + GetQueryResultsResponse firstPage = getQueryResultsFirstPage(jobId); + Long totalRows = + firstPage.getTotalRows() == null ? null : firstPage.getTotalRows().longValue(); + Long pageRows = firstPage.getRows() == null ? null : (long) (firstPage.getRows().size()); logger.log( Level.WARNING, "\n" + String.format( "results.getJobComplete(): %s, isSchemaNull: %s , totalRows: %s, pageRows: %s", results.getJobComplete(), results.getSchema() == null, totalRows, pageRows)); - JobId jobId = JobId.fromPb(results.getJobReference()); - GetQueryResultsResponse firstPage = getQueryResultsFirstPage(jobId); return getSubsequentQueryResultsWithJob( totalRows, pageRows, jobId, firstPage, hasQueryParameters); } @@ -996,6 +1003,7 @@ BigQueryResult highThroughPutRead( schema); logger.log(Level.INFO, "\n Using BigQuery Read API"); + stats.getQueryStatistics().setUseReadApi(true); return new BigQueryResultImpl(schema, totalRows, bufferRow, stats); } catch (IOException e) { diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/JobStatistics.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/JobStatistics.java index efbfda022..407e25a8f 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/JobStatistics.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/JobStatistics.java @@ -27,6 +27,7 @@ import com.google.auto.value.AutoValue; import com.google.cloud.StringEnumType; import com.google.cloud.StringEnumValue; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.MoreObjects; import com.google.common.base.MoreObjects.ToStringHelper; @@ -396,6 +397,7 @@ public static class QueryStatistics extends JobStatistics { private final BiEngineStats biEngineStats; private final Integer billingTier; private final Boolean cacheHit; + private Boolean useReadApi; private final String ddlOperationPerformed; private final TableId ddlTargetTable; private final RoutineId ddlTargetRoutine; @@ -796,6 +798,7 @@ private QueryStatistics(Builder builder) { this.biEngineStats = builder.biEngineStats; this.billingTier = builder.billingTier; this.cacheHit = builder.cacheHit; + this.useReadApi = false; this.ddlOperationPerformed = builder.ddlOperationPerformed; this.ddlTargetTable = builder.ddlTargetTable; this.ddlTargetRoutine = builder.ddlTargetRoutine; @@ -835,6 +838,18 @@ public Boolean getCacheHit() { return cacheHit; } + /** Returns whether the query result is read from the high throughput ReadAPI. */ + @VisibleForTesting + public Boolean getUseReadApi() { + return useReadApi; + } + + /** Sets internal state to reflect the use of the high throughput ReadAPI. */ + @VisibleForTesting + public void setUseReadApi(Boolean useReadApi) { + this.useReadApi = useReadApi; + } + /** [BETA] For DDL queries, returns the operation applied to the DDL target table. */ public String getDdlOperationPerformed() { return ddlOperationPerformed; diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java index bf8d51314..95201361f 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java @@ -3489,6 +3489,63 @@ public void testExecuteSelectDefaultConnectionSettings() throws SQLException { String query = "SELECT corpus FROM `bigquery-public-data.samples.shakespeare` GROUP BY corpus;"; BigQueryResult bigQueryResult = connection.executeSelect(query); assertEquals(42, bigQueryResult.getTotalRows()); + assertFalse(bigQueryResult.getBigQueryResultStats().getQueryStatistics().getUseReadApi()); + } + + @Test + public void testExecuteSelectWithReadApi() throws SQLException { + final int rowLimit = 5000; + final String QUERY = + "SELECT * FROM bigquery-public-data.new_york_taxi_trips.tlc_yellow_trips_2017 LIMIT %s"; + // Job timeout is somewhat arbitrary - just ensures that fast query is not used. + // min result size and page row count ratio ensure that the ReadAPI is used. + ConnectionSettings connectionSettingsReadAPIEnabledFastQueryDisabled = + ConnectionSettings.newBuilder() + .setUseReadAPI(true) + .setJobTimeoutMs(Long.MAX_VALUE) + .setMinResultSize(500) + .setTotalToPageRowCountRatio(1) + .build(); + + Connection connectionReadAPIEnabled = + bigquery.createConnection(connectionSettingsReadAPIEnabledFastQueryDisabled); + + String selectQuery = String.format(QUERY, rowLimit); + + BigQueryResult bigQueryResultSet = connectionReadAPIEnabled.executeSelect(selectQuery); + ResultSet rs = bigQueryResultSet.getResultSet(); + // Paginate results to avoid an InterruptedException + while (rs.next()) {} + + assertTrue(bigQueryResultSet.getBigQueryResultStats().getQueryStatistics().getUseReadApi()); + connectionReadAPIEnabled.close(); + } + + @Test + public void testExecuteSelectWithFastQueryReadApi() throws SQLException { + final int rowLimit = 5000; + final String QUERY = + "SELECT * FROM bigquery-public-data.new_york_taxi_trips.tlc_yellow_trips_2017 LIMIT %s"; + // min result size and page row count ratio ensure that the ReadAPI is used. + ConnectionSettings connectionSettingsReadAPIEnabledFastQueryDisabled = + ConnectionSettings.newBuilder() + .setUseReadAPI(true) + .setMinResultSize(500) + .setTotalToPageRowCountRatio(1) + .build(); + + Connection connectionReadAPIEnabled = + bigquery.createConnection(connectionSettingsReadAPIEnabledFastQueryDisabled); + + String selectQuery = String.format(QUERY, rowLimit); + + BigQueryResult bigQueryResultSet = connectionReadAPIEnabled.executeSelect(selectQuery); + ResultSet rs = bigQueryResultSet.getResultSet(); + // Paginate results to avoid an InterruptedException + while (rs.next()) {} + + assertTrue(bigQueryResultSet.getBigQueryResultStats().getQueryStatistics().getUseReadApi()); + connectionReadAPIEnabled.close(); } @Test @@ -3522,6 +3579,7 @@ public void testExecuteSelectWithCredentials() throws SQLException { + TABLE_ID_LARGE.getTable(); // Large query result is needed to use BigQueryReadClient. BigQueryResult bigQueryResult = connectionGoodCredentials.executeSelect(query); assertEquals(313348, bigQueryResult.getTotalRows()); + assertTrue(bigQueryResult.getBigQueryResultStats().getQueryStatistics().getUseReadApi()); // Scenario 2. // Create a new bigQuery object but explicitly an invalid credential.