Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions bigframes/session/_io/bigquery/read_gbq_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,28 @@
import bigframes.session


def should_return_query_results(query_job: bigquery.QueryJob) -> bool:
"""Returns True if query_job is the kind of query we expect results from.

If the query was DDL or DML, return some job metadata. See
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.statement_type
for possible statement types. Note that destination table does exist
for some DDL operations such as CREATE VIEW, but we don't want to
read from that. See internal issue b/444282709.
"""

if query_job.statement_type == "SELECT":
return True

if query_job.statement_type == "SCRIPT":
# Try to determine if the last statement is a SELECT. Alternatively, we
# could do a jobs.list request using query_job as the parent job and
# try to determine the statement type of the last child job.
return query_job.destination != query_job.ddl_target_table

return False


def create_dataframe_from_query_job_stats(
query_job: Optional[bigquery.QueryJob], *, session: bigframes.session.Session
) -> dataframe.DataFrame:
Expand Down
47 changes: 31 additions & 16 deletions bigframes/session/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from google.cloud import bigquery_storage_v1
import google.cloud.bigquery
import google.cloud.bigquery as bigquery
import google.cloud.bigquery.table
from google.cloud.bigquery_storage_v1 import types as bq_storage_types
import pandas
import pyarrow as pa
Expand Down Expand Up @@ -1004,7 +1005,7 @@ def read_gbq_query(
configuration=configuration,
)
query_job_for_metrics = query_job
rows = None
rows: Optional[google.cloud.bigquery.table.RowIterator] = None
else:
job_config = typing.cast(
bigquery.QueryJobConfig,
Expand Down Expand Up @@ -1037,44 +1038,58 @@ def read_gbq_query(
query_job=query_job_for_metrics, row_iterator=rows
)

# It's possible that there's no job and corresponding destination table.
# In this case, we must create a local node.
# It's possible that there's no job and therefore no corresponding
# destination table. In this case, we must create a local node.
#
# TODO(b/420984164): Tune the threshold for which we download to
# local node. Likely there are a wide range of sizes in which it
# makes sense to download the results beyond the first page, even if
# there is a job and destination table available.
if (
rows is not None
and destination is None
and (
query_job_for_metrics is None
or query_job_for_metrics.statement_type == "SELECT"
)
):
if query_job_for_metrics is None and rows is not None:
return bf_read_gbq_query.create_dataframe_from_row_iterator(
rows,
session=self._session,
index_col=index_col,
columns=columns,
)

# If there was no destination table and we've made it this far, that
# means the query must have been DDL or DML. Return some job metadata,
# instead.
if not destination:
# We already checked rows, so if there's no destination table, then
# there are no results to return.
if destination is None:
return bf_read_gbq_query.create_dataframe_from_query_job_stats(
query_job_for_metrics,
session=self._session,
)

# If the query was DDL or DML, return some job metadata. See
# https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.statement_type
# for possible statement types. Note that destination table does exist
# for some DDL operations such as CREATE VIEW, but we don't want to
# read from that. See internal issue b/444282709.
if (
query_job_for_metrics is not None
and not bf_read_gbq_query.should_return_query_results(query_job_for_metrics)
):
return bf_read_gbq_query.create_dataframe_from_query_job_stats(
query_job_for_metrics,
session=self._session,
)

# Speed up counts by getting counts from result metadata.
if rows is not None:
n_rows = rows.total_rows
elif query_job_for_metrics is not None:
n_rows = query_job_for_metrics.result().total_rows
else:
n_rows = None

return self.read_gbq_table(
f"{destination.project}.{destination.dataset_id}.{destination.table_id}",
index_col=index_col,
columns=columns,
use_cache=configuration["query"]["useQueryCache"],
force_total_order=force_total_order,
n_rows=query_job.result().total_rows,
n_rows=n_rows,
# max_results and filters are omitted because they are already
# handled by to_query(), above.
)
Expand Down
69 changes: 57 additions & 12 deletions tests/system/small/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,18 +430,63 @@ def test_read_gbq_w_max_results(
assert bf_result.shape[0] == max_results


def test_read_gbq_w_script_no_select(session, dataset_id: str):
ddl = f"""
CREATE TABLE `{dataset_id}.test_read_gbq_w_ddl` (
`col_a` INT64,
`col_b` STRING
);

INSERT INTO `{dataset_id}.test_read_gbq_w_ddl`
VALUES (123, 'hello world');
"""
df = session.read_gbq(ddl).to_pandas()
assert df["statement_type"][0] == "SCRIPT"
@pytest.mark.parametrize(
("sql_template", "expected_statement_type"),
(
pytest.param(
"""
CREATE OR REPLACE TABLE `{dataset_id}.test_read_gbq_w_ddl` (
`col_a` INT64,
`col_b` STRING
);
""",
"CREATE_TABLE",
id="ddl-create-table",
),
pytest.param(
# From https://cloud.google.com/bigquery/docs/boosted-tree-classifier-tutorial
"""
CREATE OR REPLACE VIEW `{dataset_id}.test_read_gbq_w_create_view`
AS
SELECT
age,
workclass,
marital_status,
education_num,
occupation,
hours_per_week,
income_bracket,
CASE
WHEN MOD(functional_weight, 10) < 8 THEN 'training'
WHEN MOD(functional_weight, 10) = 8 THEN 'evaluation'
WHEN MOD(functional_weight, 10) = 9 THEN 'prediction'
END AS dataframe
FROM
`bigquery-public-data.ml_datasets.census_adult_income`;
""",
"CREATE_VIEW",
id="ddl-create-view",
),
pytest.param(
"""
CREATE OR REPLACE TABLE `{dataset_id}.test_read_gbq_w_dml` (
`col_a` INT64,
`col_b` STRING
);

INSERT INTO `{dataset_id}.test_read_gbq_w_dml`
VALUES (123, 'hello world');
""",
"SCRIPT",
id="dml",
),
),
)
def test_read_gbq_w_script_no_select(
session, dataset_id: str, sql_template: str, expected_statement_type: str
):
df = session.read_gbq(sql_template.format(dataset_id=dataset_id)).to_pandas()
assert df["statement_type"][0] == expected_statement_type


def test_read_gbq_twice_with_same_timestamp(session, penguins_table_id):
Expand Down