diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 58b8515418..830fa74f0b 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -2096,7 +2096,7 @@ def _get_rows_as_json_values(self) -> Block: ) column_names.append(serialized_column_name) - column_names_csv = sql.csv(column_names, quoted=True) + column_names_csv = sql.csv(map(sql.simple_literal, column_names)) # index columns count index_columns_count = len(self.index_columns) @@ -2108,22 +2108,22 @@ def _get_rows_as_json_values(self) -> Block: # types of the columns to serialize for the row column_types = list(self.index.dtypes) + list(self.dtypes) - column_types_csv = sql.csv([str(typ) for typ in column_types], quoted=True) + column_types_csv = sql.csv( + [sql.simple_literal(str(typ)) for typ in column_types] + ) # row dtype to use for deserializing the row as pandas series pandas_row_dtype = bigframes.dtypes.lcd_type(*column_types) if pandas_row_dtype is None: pandas_row_dtype = "object" - pandas_row_dtype = sql.quote(str(pandas_row_dtype)) + pandas_row_dtype = sql.simple_literal(str(pandas_row_dtype)) # create a json column representing row through SQL manipulation row_json_column_name = guid.generate_guid() select_columns = ( [ordering_column_name] + list(self.index_columns) + [row_json_column_name] ) - select_columns_csv = sql.csv( - [sql.column_reference(col) for col in select_columns] - ) + select_columns_csv = sql.csv([sql.identifier(col) for col in select_columns]) json_sql = f"""\ With T0 AS ( {textwrap.indent(expr_sql, " ")} @@ -2136,7 +2136,7 @@ def _get_rows_as_json_values(self) -> Block: "values", [{column_references_csv}], "indexlength", {index_columns_count}, "dtype", {pandas_row_dtype} - ) AS {row_json_column_name} FROM T0 + ) AS {sql.identifier(row_json_column_name)} FROM T0 ) SELECT {select_columns_csv} FROM T1 """ diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index cc1d6baaa1..b57e0c4d35 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -16,9 +16,8 @@ import abc import functools import itertools -import textwrap import typing -from typing import Collection, Iterable, Literal, Optional, Sequence +from typing import Collection, Literal, Optional, Sequence import bigframes_vendored.ibis.expr.operations as vendored_ibis_ops import ibis @@ -40,6 +39,7 @@ OrderingExpression, ) import bigframes.core.schema as schemata +import bigframes.core.sql from bigframes.core.window_spec import RangeWindowBounds, RowsWindowBounds, WindowSpec import bigframes.dtypes import bigframes.operations.aggregations as agg_ops @@ -821,15 +821,13 @@ def to_sql( ) ) output_columns = [ - col_id_overrides.get(col) if (col in col_id_overrides) else col - for col in baked_ir.column_ids + col_id_overrides.get(col, col) for col in baked_ir.column_ids ] - selection = ", ".join(map(lambda col_id: f"`{col_id}`", output_columns)) + sql = bigframes.core.sql.select_from(output_columns, sql) - sql = textwrap.dedent(f"SELECT {selection}\n" "FROM (\n" f"{sql}\n" ")\n") # Single row frames may not have any ordering columns if len(baked_ir._ordering.all_ordering_columns) > 0: - order_by_clause = baked_ir._ordering_clause( + order_by_clause = bigframes.core.sql.ordering_clause( baked_ir._ordering.all_ordering_columns ) sql += f"{order_by_clause}\n" @@ -843,22 +841,6 @@ def to_sql( ) return typing.cast(str, sql) - def _ordering_clause(self, ordering: Iterable[OrderingExpression]) -> str: - parts = [] - for col_ref in ordering: - asc_desc = "ASC" if col_ref.direction.is_ascending else "DESC" - null_clause = "NULLS LAST" if col_ref.na_last else "NULLS FIRST" - ordering_expr = col_ref.scalar_expression - # We don't know how to compile scalar expressions in isolation - if ordering_expr.is_const: - # Probably shouldn't have constants in ordering definition, but best to ignore if somehow they end up here. - continue - if not isinstance(ordering_expr, ex.UnboundVariableExpression): - raise ValueError("Expected direct column reference.") - part = f"`{ordering_expr.id}` {asc_desc} {null_clause}" - parts.append(part) - return f"ORDER BY {' ,'.join(parts)}" - def _to_ibis_expr( self, *, diff --git a/bigframes/core/sql.py b/bigframes/core/sql.py index 31ee5f9064..3ad06610b6 100644 --- a/bigframes/core/sql.py +++ b/bigframes/core/sql.py @@ -11,49 +11,152 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations """ Utility functions for SQL construction. """ -from typing import Iterable +import datetime +import math +import textwrap +from typing import Iterable, TYPE_CHECKING +# Literals and identifiers matching this pattern can be unquoted +unquoted = r"^[A-Za-z_][A-Za-z_0-9]*$" -def quote(value: str): - """Return quoted input string.""" - # Let's use repr which also escapes any special characters - # - # >>> for val in [ - # ... "123", - # ... "str with no special chars", - # ... "str with special chars.,'\"/\\" - # ... ]: - # ... print(f"{val} -> {repr(val)}") - # ... - # 123 -> '123' - # str with no special chars -> 'str with no special chars' - # str with special chars.,'"/\ -> 'str with special chars.,\'"/\\' +if TYPE_CHECKING: + import google.cloud.bigquery as bigquery - return repr(value) + import bigframes.core.ordering -def column_reference(column_name: str): +### Writing SQL Values (literals, column references, table references, etc.) +def simple_literal(value: str | int | bool | float): + """Return quoted input string.""" + # https://cloud.google.com/bigquery/docs/reference/standard-sql/lexical#literals + if isinstance(value, str): + # Single quoting seems to work nicer with ibis than double quoting + return f"'{escape_special_characters(value)}'" + elif isinstance(value, (bool, int)): + return str(value) + elif isinstance(value, float): + # https://cloud.google.com/bigquery/docs/reference/standard-sql/lexical#floating_point_literals + if math.isnan(value): + return 'CAST("nan" as FLOAT)' + if value == math.inf: + return 'CAST("+inf" as FLOAT)' + if value == -math.inf: + return 'CAST("-inf" as FLOAT)' + return str(value) + else: + raise ValueError(f"Cannot produce literal for {value}") + + +def multi_literal(*values: str): + literal_strings = [simple_literal(i) for i in values] + return "(" + ", ".join(literal_strings) + ")" + + +def identifier(id: str) -> str: """Return a string representing column reference in a SQL.""" + # https://cloud.google.com/bigquery/docs/reference/standard-sql/lexical#identifiers + # Just always escape, otherwise need to check against every reserved sql keyword + return f"`{escape_special_characters(id)}`" + + +def escape_special_characters(value: str): + """Escapes all special charactesrs""" + # https://cloud.google.com/bigquery/docs/reference/standard-sql/lexical#string_and_bytes_literals + trans_table = str.maketrans( + { + "\a": r"\a", + "\b": r"\b", + "\f": r"\f", + "\n": r"\n", + "\r": r"\r", + "\t": r"\t", + "\v": r"\v", + "\\": r"\\", + "?": r"\?", + '"': r"\"", + "'": r"\'", + "`": r"\`", + } + ) + return value.translate(trans_table) + + +def cast_as_string(column_name: str) -> str: + """Return a string representing string casting of a column.""" - return f"`{column_name}`" + return f"CAST({identifier(column_name)} AS STRING)" -def cast_as_string(column_name: str): - """Return a string representing string casting of a column.""" +def csv(values: Iterable[str]) -> str: + """Return a string of comma separated values.""" + return ", ".join(values) - return f"CAST({column_reference(column_name)} AS STRING)" +def table_reference(table_ref: bigquery.TableReference) -> str: + return f"`{escape_special_characters(table_ref.project)}`.`{escape_special_characters(table_ref.dataset_id)}`.`{escape_special_characters(table_ref.table_id)}`" -def csv(values: Iterable[str], quoted=False): - """Return a string of comma separated values.""" - if quoted: - values = [quote(val) for val in values] +def infix_op(opname: str, left_arg: str, right_arg: str): + # Maybe should add parentheses?? + return f"{left_arg} {opname} {right_arg}" - return ", ".join(values) + +### Writing SELECT expressions +def select_from(columns: Iterable[str], subquery: str, distinct: bool = False): + selection = ", ".join(map(identifier, columns)) + distinct_clause = "DISTINCT " if distinct else "" + + return textwrap.dedent( + f"SELECT {distinct_clause}{selection}\nFROM (\n" f"{subquery}\n" ")\n" + ) + + +def select_table(table_ref: bigquery.TableReference): + return textwrap.dedent(f"SELECT * FROM {table_reference(table_ref)}") + + +def is_distinct_sql(columns: Iterable[str], table_sql: str) -> str: + is_unique_sql = f"""WITH full_table AS ( + {select_from(columns, table_sql)} + ), + distinct_table AS ( + {select_from(columns, table_sql, distinct=True)} + ) + + SELECT (SELECT COUNT(*) FROM full_table) AS `total_count`, + (SELECT COUNT(*) FROM distinct_table) AS `distinct_count` + """ + return is_unique_sql + + +def ordering_clause( + ordering: Iterable[bigframes.core.ordering.OrderingExpression], +) -> str: + import bigframes.core.expression + + parts = [] + for col_ref in ordering: + asc_desc = "ASC" if col_ref.direction.is_ascending else "DESC" + null_clause = "NULLS LAST" if col_ref.na_last else "NULLS FIRST" + ordering_expr = col_ref.scalar_expression + # We don't know how to compile scalar expressions in isolation + if ordering_expr.is_const: + # Probably shouldn't have constants in ordering definition, but best to ignore if somehow they end up here. + continue + assert isinstance( + ordering_expr, bigframes.core.expression.UnboundVariableExpression + ) + part = f"`{ordering_expr.id}` {asc_desc} {null_clause}" + parts.append(part) + return f"ORDER BY {' ,'.join(parts)}" + + +def snapshot_clause(time_travel_timestamp: datetime.datetime): + return f"FOR SYSTEM_TIME AS OF TIMESTAMP({repr(time_travel_timestamp.isoformat())})" diff --git a/bigframes/session/_io/bigquery/__init__.py b/bigframes/session/_io/bigquery/__init__.py index ed1bd39ada..95ab16fecf 100644 --- a/bigframes/session/_io/bigquery/__init__.py +++ b/bigframes/session/_io/bigquery/__init__.py @@ -31,6 +31,7 @@ import bigframes from bigframes.core import log_adapter +import bigframes.core.sql import bigframes.formatting_helpers as formatting_helpers IO_ORDERING_ID = "bqdf_row_nums" @@ -353,7 +354,7 @@ def to_query( else: select_clause = "SELECT *" - where_clause = "" + filter_string = "" if filters: valid_operators: Mapping[third_party_pandas_gbq.FilterOps, str] = { "in": "IN", @@ -373,12 +374,11 @@ def to_query( ): filters = typing.cast(third_party_pandas_gbq.FiltersType, [filters]) - or_expressions = [] for group in filters: if not isinstance(group, Iterable): group = [group] - and_expressions = [] + and_expression = "" for filter_item in group: if not isinstance(filter_item, tuple) or (len(filter_item) != 3): raise ValueError( @@ -397,17 +397,29 @@ def to_query( operator_str = valid_operators[operator] + column_ref = bigframes.core.sql.identifier(column) if operator_str in ["IN", "NOT IN"]: - value_list = ", ".join([repr(v) for v in value]) - expression = f"`{column}` {operator_str} ({value_list})" + value_literal = bigframes.core.sql.multi_literal(*value) else: - expression = f"`{column}` {operator_str} {repr(value)}" - and_expressions.append(expression) - - or_expressions.append(" AND ".join(and_expressions)) + value_literal = bigframes.core.sql.simple_literal(value) + expression = bigframes.core.sql.infix_op( + operator_str, column_ref, value_literal + ) + if and_expression: + and_expression = bigframes.core.sql.infix_op( + "AND", and_expression, expression + ) + else: + and_expression = expression - if or_expressions: - where_clause = " WHERE " + " OR ".join(or_expressions) + if filter_string: + filter_string = bigframes.core.sql.infix_op( + "OR", filter_string, and_expression + ) + else: + filter_string = and_expression - full_query = f"{select_clause} FROM {sub_query} AS sub{where_clause}" - return full_query + if filter_string: + return f"{select_clause} FROM {sub_query} AS sub WHERE {filter_string}" + else: + return f"{select_clause} FROM {sub_query} AS sub" diff --git a/bigframes/session/_io/bigquery/read_gbq_table.py b/bigframes/session/_io/bigquery/read_gbq_table.py index 0f6a3dadd2..370ee546d7 100644 --- a/bigframes/session/_io/bigquery/read_gbq_table.py +++ b/bigframes/session/_io/bigquery/read_gbq_table.py @@ -40,6 +40,7 @@ import bigframes.core.compile import bigframes.core.guid as guid import bigframes.core.ordering as order +import bigframes.core.sql import bigframes.dtypes import bigframes.session._io.bigquery.read_gbq_table import bigframes.session.clients @@ -131,14 +132,14 @@ def _create_time_travel_sql( """Query a table via 'time travel' for consistent reads.""" # If we have an anonymous query results table, it can't be modified and # there isn't any BigQuery time travel. + selection = bigframes.core.sql.select_table(table_ref) if table_ref.dataset_id.startswith("_"): - return f"SELECT * FROM `{table_ref.project}`.`{table_ref.dataset_id}`.`{table_ref.table_id}`" + return selection return textwrap.dedent( f""" - SELECT * - FROM `{table_ref.project}`.`{table_ref.dataset_id}`.`{table_ref.table_id}` - FOR SYSTEM_TIME AS OF TIMESTAMP({repr(time_travel_timestamp.isoformat())}) + {selection} + {bigframes.core.sql.snapshot_clause(time_travel_timestamp)} """ ) @@ -149,9 +150,8 @@ def get_ibis_time_travel_table( time_travel_timestamp: datetime.datetime, ) -> ibis_types.Table: try: - return ibis_client.sql( - _create_time_travel_sql(table_ref, time_travel_timestamp) - ) + sql = _create_time_travel_sql(table_ref, time_travel_timestamp) + return ibis_client.sql(sql) except google.api_core.exceptions.Forbidden as ex: # Ibis does a dry run to get the types of the columns from the SQL. if "Drive credentials" in ex.message: @@ -166,25 +166,14 @@ def _check_index_uniqueness( index_cols: List[str], api_name: str, ) -> bool: - distinct_table = table.select(*index_cols).distinct() - is_unique_sql = f"""WITH full_table AS ( - {ibis_client.compile(table)} - ), - distinct_table AS ( - {ibis_client.compile(distinct_table)} - ) - - SELECT (SELECT COUNT(*) FROM full_table) AS `total_count`, - (SELECT COUNT(*) FROM distinct_table) AS `distinct_count` - """ + table_sql = ibis_client.compile(table) + is_unique_sql = bigframes.core.sql.is_distinct_sql(index_cols, table_sql) job_config = bigquery.QueryJobConfig() job_config.labels["bigframes-api"] = api_name results = bqclient.query_and_wait(is_unique_sql, job_config=job_config) row = next(iter(results)) - total_count = row["total_count"] - distinct_count = row["distinct_count"] - return total_count == distinct_count + return row["total_count"] == row["distinct_count"] def _get_primary_keys( diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index 0fa1d90e8b..4aa62c0f6d 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -221,7 +221,7 @@ def stringify(x): ) -# @pytest.mark.flaky(retries=2, delay=120) +@pytest.mark.flaky(retries=2, delay=120) def test_remote_function_binop(session, scalars_dfs, dataset_id, bq_cf_connection): try: diff --git a/tests/unit/session/test_io_bigquery.py b/tests/unit/session/test_io_bigquery.py index 5a3470e883..57f9e00363 100644 --- a/tests/unit/session/test_io_bigquery.py +++ b/tests/unit/session/test_io_bigquery.py @@ -249,7 +249,7 @@ def test_bq_schema_to_sql(schema: Iterable[bigquery.SchemaField], expected: str) rowindex, string_col, FROM `test_table` AS t - ) AS sub WHERE `rowindex` < 4 AND `string_col` = 'Hello, World!'""", + ) AS sub WHERE `rowindex` < 4 AND `string_col` = \'Hello, World!\'""", id="subquery-all_params-filter_and_operation", ), pytest.param(