Skip to content

fix: avoid 403 response too large to return error with read_gbq and large query results #77

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Oct 3, 2023
Merged
Prev Previous commit
Next Next commit
support struct / array in read_gbq queries
  • Loading branch information
tswast committed Oct 3, 2023
commit fb0a476ee519f9245251d270512b659598743b26
40 changes: 32 additions & 8 deletions bigframes/core/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,44 @@ def create_snapshot_sql(
)


# BigQuery REST API returns types in Legacy SQL format
# https://cloud.google.com/bigquery/docs/data-types but we use Standard SQL
# names
# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
BQ_STANDARD_TYPES = {
"INT": "INT64",
"BOOLEAN": "BOOL",
"INTEGER": "INT64",
"FLOAT": "FLOAT64",
}


def bq_field_to_type_sql(field: bigquery.SchemaField):
if field.mode == "REPEATED":
nested_type = bq_field_to_type_sql(
bigquery.SchemaField(
field.name, field.field_type, mode="NULLABLE", fields=field.fields
)
)
return f"ARRAY<{nested_type}>"

if field.field_type == "RECORD":
nested_fields_sql = ", ".join(
bq_field_to_sql(child_field) for child_field in field.fields
)
return f"STRUCT<{nested_fields_sql}>"

type_ = field.field_type
return BQ_STANDARD_TYPES.get(type_, type_)


def bq_field_to_sql(field: bigquery.SchemaField):
name = field.name
type_ = bq_field_to_type_sql(field)
return f"`{name}` {type_}"


def bq_schema_to_sql(schema: Iterable[bigquery.SchemaField]):
field_strings = []
for field in schema:
name = field.name
type_ = field.field_type
type_ = BQ_STANDARD_TYPES.get(type_, type_)
field_strings.append(f"`{name}` {type_}")
return ", ".join(field_strings)
return ", ".join(bq_field_to_sql(field) for field in schema)


def format_option(key: str, value: Union[bool, str]) -> str:
Expand Down
32 changes: 20 additions & 12 deletions bigframes/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,8 +467,18 @@ def _query_to_destination(
job_config = bigquery.QueryJobConfig()
job_config.destination = temp_table

_, query_job = self._start_query(query, job_config=job_config)
return query_job.destination, query_job
try:
# Write to temp table to workaround BigQuery 10 GB query results
# limit. See: internal issue 303057336.
_, query_job = self._start_query(query, job_config=job_config)
return query_job.destination, query_job
except google.api_core.exceptions.BadRequest:
# Some SELECT statements still aren't compatible with cluster
# tables as the destination. For example, if the query has a
# top-level ORDER BY, this conflicts with our ability to cluster
# the table by the index column(s).
_, query_job = self._start_query(query)
return query_job.destination, query_job

def read_gbq_query(
self,
Expand Down Expand Up @@ -1229,23 +1239,20 @@ def _create_session_table_empty(
schema: Iterable[bigquery.SchemaField],
cluster_cols: List[str],
) -> bigquery.TableReference:
# Can't set a table in _SESSION as destination via query job API, so we
# run DDL, instead.
table = self._create_session_table()
schema_sql = bigframes_io.bq_schema_to_sql(schema)

clusterable_cols = [
col.name
for col in schema
if col.name in cluster_cols and _can_cluster_bq(col)
][:_MAX_CLUSTER_COLUMNS]

# Can't set a table in _SESSION as destination via query job API, so we
# run DDL, instead.
table = self._create_session_table()
cluster_cols_sql = ", ".join(f"`{cluster_col}`" for cluster_col in cluster_cols)

# TODO(swast): Handle STRUCT (RECORD) / ARRAY (REPEATED) columns.
schema_sql = bigframes_io.bq_schema_to_sql(schema)

if clusterable_cols:
cluster_cols_sql = ", ".join(
f"`{cluster_col}`" for cluster_col in cluster_cols
f"`{cluster_col}`" for cluster_col in clusterable_cols
)
cluster_sql = f"CLUSTER BY {cluster_cols_sql}"
else:
Expand Down Expand Up @@ -1562,7 +1569,8 @@ def _can_cluster_bq(field: bigquery.SchemaField):
"NUMERIC",
"DECIMAL",
"BIGNUMERIC",
"BIGDECIMAL" "DATE",
"BIGDECIMAL",
"DATE",
"DATETIME",
"TIMESTAMP",
"BOOL",
Expand Down
1 change: 1 addition & 0 deletions tests/system/small/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def test_read_gbq_tokyo(
),
pytest.param(
"""SELECT
t.int64_col + 1 as my_ints,
t.float64_col * 2 AS my_floats,
CONCAT(t.string_col, "_2") AS my_strings,
t.int64_col > 0 AS my_bools,
Expand Down