Skip to content

fix: avoid 403 response too large to return error with read_gbq and large query results #77

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Oct 3, 2023
4 changes: 2 additions & 2 deletions bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1198,8 +1198,8 @@ def cached(self, cluster_cols: typing.Sequence[str]) -> ArrayValue:
destination = self._session._ibis_to_session_table(
ibis_expr, cluster_cols=cluster_cols, api_name="cache"
)
table_expression = self._session.ibis_client.sql(
f"SELECT * FROM `_SESSION`.`{destination.table_id}`"
table_expression = self._session.ibis_client.table(
f"{destination.project}.{destination.dataset_id}.{destination.table_id}"
)
new_columns = [table_expression[column] for column in self.column_names]
new_hidden_columns = [
Expand Down
45 changes: 44 additions & 1 deletion bigframes/core/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@

import datetime
import textwrap
from typing import Dict, Union
import types
from typing import Dict, Iterable, Union

import google.cloud.bigquery as bigquery

Expand Down Expand Up @@ -89,6 +90,48 @@ def create_snapshot_sql(
)


# BigQuery REST API returns types in Legacy SQL format
# https://cloud.google.com/bigquery/docs/data-types but we use Standard SQL
# names
# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
BQ_STANDARD_TYPES = types.MappingProxyType(
{
"BOOLEAN": "BOOL",
"INTEGER": "INT64",
"FLOAT": "FLOAT64",
}
)


def bq_field_to_type_sql(field: bigquery.SchemaField):
if field.mode == "REPEATED":
nested_type = bq_field_to_type_sql(
bigquery.SchemaField(
field.name, field.field_type, mode="NULLABLE", fields=field.fields
)
)
return f"ARRAY<{nested_type}>"

if field.field_type == "RECORD":
nested_fields_sql = ", ".join(
bq_field_to_sql(child_field) for child_field in field.fields
)
return f"STRUCT<{nested_fields_sql}>"

type_ = field.field_type
return BQ_STANDARD_TYPES.get(type_, type_)


def bq_field_to_sql(field: bigquery.SchemaField):
name = field.name
type_ = bq_field_to_type_sql(field)
return f"`{name}` {type_}"


def bq_schema_to_sql(schema: Iterable[bigquery.SchemaField]):
return ", ".join(bq_field_to_sql(field) for field in schema)


def format_option(key: str, value: Union[bool, str]) -> str:
if isinstance(value, bool):
return f"{key}=true" if value else f"{key}=false"
Expand Down
155 changes: 86 additions & 69 deletions bigframes/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,13 +449,6 @@ def _query_to_destination(
index_cols: List[str],
api_name: str,
) -> Tuple[Optional[bigquery.TableReference], Optional[bigquery.QueryJob]]:
# If there are no index columns, then there's no reason to cache to a
# (clustered) session table, as we'll just have to query it again to
# create a default index & ordering.
if not index_cols:
_, query_job = self._start_query(query)
return query_job.destination, query_job

# If a dry_run indicates this is not a query type job, then don't
# bother trying to do a CREATE TEMP TABLE ... AS SELECT ... statement.
dry_run_config = bigquery.QueryJobConfig()
Expand All @@ -465,15 +458,24 @@ def _query_to_destination(
_, query_job = self._start_query(query)
return query_job.destination, query_job

# Make sure we cluster by the index column(s) so that subsequent
# operations are as speedy as they can be.
# Create a table to workaround BigQuery 10 GB query results limit. See:
# internal issue 303057336.
# Since we have a `statement_type == 'SELECT'`, schema should be populated.
schema = typing.cast(Iterable[bigquery.SchemaField], dry_run_job.schema)
temp_table = self._create_session_table_empty(api_name, schema, index_cols)

job_config = bigquery.QueryJobConfig()
job_config.destination = temp_table

try:
ibis_expr = self.ibis_client.sql(query)
return self._ibis_to_session_table(ibis_expr, index_cols, api_name), None
# Write to temp table to workaround BigQuery 10 GB query results
# limit. See: internal issue 303057336.
_, query_job = self._start_query(query, job_config=job_config)
return query_job.destination, query_job
except google.api_core.exceptions.BadRequest:
# Some SELECT statements still aren't compatible with CREATE TEMP
# TABLE ... AS SELECT ... statements. For example, if the query has
# a top-level ORDER BY, this conflicts with our ability to cluster
# Some SELECT statements still aren't compatible with cluster
# tables as the destination. For example, if the query has a
# top-level ORDER BY, this conflicts with our ability to cluster
# the table by the index column(s).
_, query_job = self._start_query(query)
return query_job.destination, query_job
Expand Down Expand Up @@ -1231,6 +1233,54 @@ def _create_session_table(self) -> bigquery.TableReference:
)
return dataset.table(table_name)

def _create_session_table_empty(
self,
api_name: str,
schema: Iterable[bigquery.SchemaField],
cluster_cols: List[str],
) -> bigquery.TableReference:
# Can't set a table in _SESSION as destination via query job API, so we
# run DDL, instead.
table = self._create_session_table()
schema_sql = bigframes_io.bq_schema_to_sql(schema)

clusterable_cols = [
col.name
for col in schema
if col.name in cluster_cols and _can_cluster_bq(col)
][:_MAX_CLUSTER_COLUMNS]

if clusterable_cols:
cluster_cols_sql = ", ".join(
f"`{cluster_col}`" for cluster_col in clusterable_cols
)
cluster_sql = f"CLUSTER BY {cluster_cols_sql}"
else:
cluster_sql = ""

ddl_text = f"""
CREATE TEMP TABLE
`_SESSION`.`{table.table_id}`
({schema_sql})
{cluster_sql}
"""

job_config = bigquery.QueryJobConfig()

# Include a label so that Dataplex Lineage can identify temporary
# tables that BigQuery DataFrames creates. Googlers: See internal issue
# 296779699. We're labeling the job instead of the table because
# otherwise we get `BadRequest: 400 OPTIONS on temporary tables are not
# supported`.
job_config.labels = {"source": "bigquery-dataframes-temp"}
job_config.labels["bigframes-api"] = api_name

_, query_job = self._start_query(ddl_text, job_config=job_config)

# Use fully-qualified name instead of `_SESSION` name so that the
# created table can be used as the destination table.
return query_job.destination

def _create_sequential_ordering(
self,
table: ibis_types.Table,
Expand All @@ -1249,7 +1299,9 @@ def _create_sequential_ordering(
cluster_cols=list(index_cols) + [default_ordering_name],
api_name=api_name,
)
table = self.ibis_client.sql(f"SELECT * FROM `{table_ref.table_id}`")
table = self.ibis_client.table(
f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}"
)
ordering_reference = core.OrderingColumnReference(default_ordering_name)
ordering = core.ExpressionOrdering(
ordering_value_columns=[ordering_reference],
Expand All @@ -1264,55 +1316,13 @@ def _ibis_to_session_table(
cluster_cols: Iterable[str],
api_name: str,
) -> bigquery.TableReference:
clusterable_cols = [
col for col in cluster_cols if _can_cluster(table[col].type())
][:_MAX_CLUSTER_COLUMNS]
return self._query_to_session_table(
desination, _ = self._query_to_destination(
self.ibis_client.compile(table),
cluster_cols=clusterable_cols,
index_cols=list(cluster_cols),
api_name=api_name,
)

def _query_to_session_table(
self,
query_text: str,
cluster_cols: Iterable[str],
api_name: str,
) -> bigquery.TableReference:
if len(list(cluster_cols)) > _MAX_CLUSTER_COLUMNS:
raise ValueError(
f"Too many cluster columns: {list(cluster_cols)}, max {_MAX_CLUSTER_COLUMNS} allowed."
)
# Can't set a table in _SESSION as destination via query job API, so we
# run DDL, instead.
table = self._create_session_table()
cluster_cols_sql = ", ".join(f"`{cluster_col}`" for cluster_col in cluster_cols)

# TODO(swast): This might not support multi-statement SQL queries (scripts).
ddl_text = f"""
CREATE TEMP TABLE `_SESSION`.`{table.table_id}`
CLUSTER BY {cluster_cols_sql}
AS {query_text}
"""

job_config = bigquery.QueryJobConfig()

# Include a label so that Dataplex Lineage can identify temporary
# tables that BigQuery DataFrames creates. Googlers: See internal issue
# 296779699. We're labeling the job instead of the table because
# otherwise we get `BadRequest: 400 OPTIONS on temporary tables are not
# supported`.
job_config.labels = {"source": "bigquery-dataframes-temp"}
job_config.labels["bigframes-api"] = api_name

try:
self._start_query(
ddl_text, job_config=job_config
) # Wait for the job to complete
except google.api_core.exceptions.Conflict:
# Allow query retry to succeed.
pass
return table
# There should always be a destination table for this query type.
return typing.cast(bigquery.TableReference, desination)

def remote_function(
self,
Expand Down Expand Up @@ -1494,14 +1504,21 @@ def connect(context: Optional[bigquery_options.BigQueryOptions] = None) -> Sessi
return Session(context)


def _can_cluster(ibis_type: ibis_dtypes.DataType):
def _can_cluster_bq(field: bigquery.SchemaField):
# https://cloud.google.com/bigquery/docs/clustered-tables
# Notably, float is excluded
return (
ibis_type.is_integer()
or ibis_type.is_string()
or ibis_type.is_decimal()
or ibis_type.is_date()
or ibis_type.is_timestamp()
or ibis_type.is_boolean()
type_ = field.field_type
return type_ in (
"INTEGER",
"INT64",
"STRING",
"NUMERIC",
"DECIMAL",
"BIGNUMERIC",
"BIGDECIMAL",
"DATE",
"DATETIME",
"TIMESTAMP",
"BOOL",
"BOOLEAN",
)
3 changes: 2 additions & 1 deletion tests/system/small/ml/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import bigframes
from bigframes.ml import core
import tests.system.utils


def test_model_eval(
Expand Down Expand Up @@ -224,7 +225,7 @@ def test_pca_model_principal_component_info(penguins_bqml_pca_model: core.BqmlMo
"cumulative_explained_variance_ratio": [0.469357, 0.651283, 0.812383],
},
)
pd.testing.assert_frame_equal(
tests.system.utils.assert_pandas_df_equal_ignore_ordering(
result,
expected,
check_exact=False,
Expand Down
5 changes: 3 additions & 2 deletions tests/system/small/ml/test_decomposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import pandas as pd

from bigframes.ml import decomposition
import tests.system.utils


def test_pca_predict(penguins_pca_model, new_penguins_df):
Expand Down Expand Up @@ -129,7 +130,7 @@ def test_pca_explained_variance_(penguins_pca_model: decomposition.PCA):
"explained_variance": [3.278657, 1.270829, 1.125354],
},
)
pd.testing.assert_frame_equal(
tests.system.utils.assert_pandas_df_equal_ignore_ordering(
result,
expected,
check_exact=False,
Expand All @@ -148,7 +149,7 @@ def test_pca_explained_variance_ratio_(penguins_pca_model: decomposition.PCA):
"explained_variance_ratio": [0.469357, 0.181926, 0.1611],
},
)
pd.testing.assert_frame_equal(
tests.system.utils.assert_pandas_df_equal_ignore_ordering(
result,
expected,
check_exact=False,
Expand Down
1 change: 1 addition & 0 deletions tests/system/small/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def test_read_gbq_tokyo(
),
pytest.param(
"""SELECT
t.int64_col + 1 as my_ints,
t.float64_col * 2 AS my_floats,
CONCAT(t.string_col, "_2") AS my_strings,
t.int64_col > 0 AS my_bools,
Expand Down
55 changes: 55 additions & 0 deletions tests/unit/core/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
# limitations under the License.

import datetime
from typing import Iterable

import google.cloud.bigquery as bigquery
import pytest

import bigframes.core.io

Expand Down Expand Up @@ -47,3 +49,56 @@ def test_create_snapshot_sql_doesnt_timetravel_session_datasets():

# Don't need the project ID for _SESSION tables.
assert "my-test-project" not in sql


@pytest.mark.parametrize(
("schema", "expected"),
(
(
[bigquery.SchemaField("My Column", "INTEGER")],
"`My Column` INT64",
),
(
[
bigquery.SchemaField("My Column", "INTEGER"),
bigquery.SchemaField("Float Column", "FLOAT"),
bigquery.SchemaField("Bool Column", "BOOLEAN"),
],
"`My Column` INT64, `Float Column` FLOAT64, `Bool Column` BOOL",
),
(
[
bigquery.SchemaField("My Column", "INTEGER", mode="REPEATED"),
bigquery.SchemaField("Float Column", "FLOAT", mode="REPEATED"),
bigquery.SchemaField("Bool Column", "BOOLEAN", mode="REPEATED"),
],
"`My Column` ARRAY<INT64>, `Float Column` ARRAY<FLOAT64>, `Bool Column` ARRAY<BOOL>",
),
(
[
bigquery.SchemaField(
"My Column",
"RECORD",
mode="REPEATED",
fields=(
bigquery.SchemaField("Float Column", "FLOAT", mode="REPEATED"),
bigquery.SchemaField("Bool Column", "BOOLEAN", mode="REPEATED"),
bigquery.SchemaField(
"Nested Column",
"RECORD",
fields=(bigquery.SchemaField("Int Column", "INTEGER"),),
),
),
),
],
(
"`My Column` ARRAY<STRUCT<"
+ "`Float Column` ARRAY<FLOAT64>,"
+ " `Bool Column` ARRAY<BOOL>,"
+ " `Nested Column` STRUCT<`Int Column` INT64>>>"
),
),
),
)
def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str):
pass