Skip to content

Commit e7fa913

Browse files
refactor: Add function to make all column ids in a tree unique and sequential (#1094)
1 parent f7e4354 commit e7fa913

13 files changed

+614
-128
lines changed

bigframes/core/__init__.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,13 @@ def promote_offsets(self) -> Tuple[ArrayValue, str]:
268268
def concat(self, other: typing.Sequence[ArrayValue]) -> ArrayValue:
269269
"""Append together multiple ArrayValue objects."""
270270
return ArrayValue(
271-
nodes.ConcatNode(children=tuple([self.node, *[val.node for val in other]]))
271+
nodes.ConcatNode(
272+
children=tuple([self.node, *[val.node for val in other]]),
273+
output_ids=tuple(
274+
ids.ColumnId(bigframes.core.guid.generate_guid())
275+
for id in self.column_ids
276+
),
277+
)
272278
)
273279

274280
def compute_values(self, assignments: Sequence[ex.Expression]):

bigframes/core/blocks.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -3139,7 +3139,7 @@ def _pd_index_to_array_value(
31393139
rows = []
31403140
labels_as_tuples = utils.index_as_tuples(index)
31413141
for row_offset in range(len(index)):
3142-
id_gen = bigframes.core.identifiers.standard_identifiers()
3142+
id_gen = bigframes.core.identifiers.standard_id_strings()
31433143
row_label = labels_as_tuples[row_offset]
31443144
row_label = (row_label,) if not isinstance(row_label, tuple) else row_label
31453145
row = {}

bigframes/core/compile/api.py

+13-15
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,15 @@
1818
import google.cloud.bigquery as bigquery
1919

2020
import bigframes.core.compile.compiler as compiler
21-
import bigframes.core.rewrite as rewrites
2221

2322
if TYPE_CHECKING:
2423
import bigframes.core.nodes
2524
import bigframes.core.ordering
2625
import bigframes.core.schema
2726

28-
_STRICT_COMPILER = compiler.Compiler(strict=True)
27+
_STRICT_COMPILER = compiler.Compiler(
28+
strict=True, enable_pruning=True, enable_densify_ids=True
29+
)
2930

3031

3132
class SQLCompiler:
@@ -34,7 +35,7 @@ def __init__(self, strict: bool = True):
3435

3536
def compile_peek(self, node: bigframes.core.nodes.BigFrameNode, n_rows: int) -> str:
3637
"""Compile node into sql that selects N arbitrary rows, may not execute deterministically."""
37-
return self._compiler.compile_unordered_ir(node).peek_sql(n_rows)
38+
return self._compiler.compile_peek_sql(node, n_rows)
3839

3940
def compile_unordered(
4041
self,
@@ -44,9 +45,8 @@ def compile_unordered(
4445
) -> str:
4546
"""Compile node into sql where rows are unsorted, and no ordering information is preserved."""
4647
# TODO: Enable limit pullup, but only if not being used to write to clustered table.
47-
return self._compiler.compile_unordered_ir(node).to_sql(
48-
col_id_overrides=col_id_overrides
49-
)
48+
output_ids = [col_id_overrides.get(id, id) for id in node.schema.names]
49+
return self._compiler.compile_sql(node, ordered=False, output_ids=output_ids)
5050

5151
def compile_ordered(
5252
self,
@@ -56,10 +56,8 @@ def compile_ordered(
5656
) -> str:
5757
"""Compile node into sql where rows are sorted with ORDER BY."""
5858
# If we are ordering the query anyways, compiling the slice as a limit is probably a good idea.
59-
new_node, limit = rewrites.pullup_limit_from_slice(node)
60-
return self._compiler.compile_ordered_ir(new_node).to_sql(
61-
col_id_overrides=col_id_overrides, ordered=True, limit=limit
62-
)
59+
output_ids = [col_id_overrides.get(id, id) for id in node.schema.names]
60+
return self._compiler.compile_sql(node, ordered=True, output_ids=output_ids)
6361

6462
def compile_raw(
6563
self,
@@ -68,13 +66,12 @@ def compile_raw(
6866
str, Sequence[bigquery.SchemaField], bigframes.core.ordering.RowOrdering
6967
]:
7068
"""Compile node into sql that exposes all columns, including hidden ordering-only columns."""
71-
ir = self._compiler.compile_ordered_ir(node)
72-
sql, schema = ir.raw_sql_and_schema()
73-
return sql, schema, ir._ordering
69+
return self._compiler.compile_raw(node)
7470

7571

7672
def test_only_try_evaluate(node: bigframes.core.nodes.BigFrameNode):
7773
"""Use only for unit testing paths - not fully featured. Will throw exception if fails."""
74+
node = _STRICT_COMPILER._preprocess(node)
7875
ibis = _STRICT_COMPILER.compile_ordered_ir(node)._to_ibis_expr(
7976
ordering_mode="unordered"
8077
)
@@ -85,9 +82,10 @@ def test_only_ibis_inferred_schema(node: bigframes.core.nodes.BigFrameNode):
8582
"""Use only for testing paths to ensure ibis inferred schema does not diverge from bigframes inferred schema."""
8683
import bigframes.core.schema
8784

85+
node = _STRICT_COMPILER._preprocess(node)
8886
compiled = _STRICT_COMPILER.compile_unordered_ir(node)
8987
items = tuple(
90-
bigframes.core.schema.SchemaItem(id, compiled.get_column_type(id))
91-
for id in compiled.column_ids
88+
bigframes.core.schema.SchemaItem(name, compiled.get_column_type(ibis_id))
89+
for name, ibis_id in zip(node.schema.names, compiled.column_ids)
9290
)
9391
return bigframes.core.schema.ArraySchema(items)

bigframes/core/compile/compiled.py

+17-34
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,12 @@ def _aggregate_base(
202202
)
203203
# Must have deterministic ordering, so order by the unique "by" column
204204
ordering = TotalOrdering(
205-
tuple([OrderingExpression(column_id) for column_id in by_column_ids]),
205+
tuple(
206+
[
207+
OrderingExpression(ex.DerefOp(ref.id.local_normalized))
208+
for ref in by_column_ids
209+
]
210+
),
206211
total_ordering_columns=frozenset(
207212
[ex.DerefOp(ref.id.local_normalized) for ref in by_column_ids]
208213
),
@@ -266,31 +271,26 @@ def peek_sql(self, n: int):
266271
def to_sql(
267272
self,
268273
offset_column: typing.Optional[str] = None,
269-
col_id_overrides: typing.Mapping[str, str] = {},
270274
ordered: bool = False,
271275
) -> str:
272276
if offset_column or ordered:
273277
raise ValueError("Cannot produce sorted sql in partial ordering mode")
274-
sql = ibis_bigquery.Backend().compile(
275-
self._to_ibis_expr(
276-
col_id_overrides=col_id_overrides,
277-
)
278-
)
278+
sql = ibis_bigquery.Backend().compile(self._to_ibis_expr())
279279
return typing.cast(str, sql)
280280

281-
def row_count(self) -> OrderedIR:
281+
def row_count(self, name: str) -> OrderedIR:
282282
original_table = self._to_ibis_expr()
283283
ibis_table = original_table.agg(
284284
[
285-
original_table.count().name("count"),
285+
original_table.count().name(name),
286286
]
287287
)
288288
return OrderedIR(
289289
ibis_table,
290-
(ibis_table["count"],),
290+
(ibis_table[name],),
291291
ordering=TotalOrdering(
292-
ordering_value_columns=(ascending_over("count"),),
293-
total_ordering_columns=frozenset([ex.deref("count")]),
292+
ordering_value_columns=(ascending_over(name),),
293+
total_ordering_columns=frozenset([ex.deref(name)]),
294294
),
295295
)
296296

@@ -299,7 +299,6 @@ def _to_ibis_expr(
299299
*,
300300
expose_hidden_cols: bool = False,
301301
fraction: Optional[float] = None,
302-
col_id_overrides: typing.Mapping[str, str] = {},
303302
):
304303
"""
305304
Creates an Ibis table expression representing the DataFrame.
@@ -320,8 +319,6 @@ def _to_ibis_expr(
320319
If True, include the hidden ordering columns in the results.
321320
Only compatible with `order_by` and `unordered`
322321
``ordering_mode``.
323-
col_id_overrides:
324-
overrides the column ids for the result
325322
Returns:
326323
An ibis expression representing the data help by the ArrayValue object.
327324
"""
@@ -346,10 +343,6 @@ def _to_ibis_expr(
346343
if self._reduced_predicate is not None:
347344
table = table.filter(base_table[PREDICATE_COLUMN])
348345
table = table.drop(*columns_to_drop)
349-
if col_id_overrides:
350-
table = table.rename(
351-
{value: key for key, value in col_id_overrides.items()}
352-
)
353346
if fraction is not None:
354347
table = table.filter(ibis.random() < ibis.literal(fraction))
355348
return table
@@ -941,7 +934,6 @@ def _reproject_to_table(self) -> OrderedIR:
941934

942935
def to_sql(
943936
self,
944-
col_id_overrides: typing.Mapping[str, str] = {},
945937
ordered: bool = False,
946938
limit: Optional[int] = None,
947939
) -> str:
@@ -951,17 +943,13 @@ def to_sql(
951943
sql = ibis_bigquery.Backend().compile(
952944
baked_ir._to_ibis_expr(
953945
ordering_mode="unordered",
954-
col_id_overrides=col_id_overrides,
955946
expose_hidden_cols=True,
956947
)
957948
)
958-
output_columns = [
959-
col_id_overrides.get(col, col) for col in baked_ir.column_ids
960-
]
961949
sql = (
962950
bigframes.core.compile.googlesql.Select()
963951
.from_(sql)
964-
.select(output_columns)
952+
.select(self.column_ids)
965953
.sql()
966954
)
967955

@@ -979,24 +967,26 @@ def to_sql(
979967
sql = ibis_bigquery.Backend().compile(
980968
self._to_ibis_expr(
981969
ordering_mode="unordered",
982-
col_id_overrides=col_id_overrides,
983970
expose_hidden_cols=False,
984971
)
985972
)
986973
return typing.cast(str, sql)
987974

988975
def raw_sql_and_schema(
989976
self,
977+
column_ids: typing.Sequence[str],
990978
) -> typing.Tuple[str, typing.Sequence[google.cloud.bigquery.SchemaField]]:
991979
"""Return sql with all hidden columns. Used to cache with ordering information.
992980
993981
Also returns schema, as the extra ordering columns are determined compile-time.
994982
"""
983+
col_id_overrides = dict(zip(self.column_ids, column_ids))
995984
all_columns = (*self.column_ids, *self._hidden_ordering_column_names.keys())
996985
as_ibis = self._to_ibis_expr(
997986
ordering_mode="unordered",
998987
expose_hidden_cols=True,
999-
).select(all_columns)
988+
)
989+
as_ibis = as_ibis.select(all_columns).rename(col_id_overrides)
1000990

1001991
# Ibis will produce non-nullable schema types, but bigframes should always be nullable
1002992
fixed_ibis_schema = ibis_schema.Schema.from_tuples(
@@ -1013,7 +1003,6 @@ def _to_ibis_expr(
10131003
*,
10141004
expose_hidden_cols: bool = False,
10151005
fraction: Optional[float] = None,
1016-
col_id_overrides: typing.Mapping[str, str] = {},
10171006
ordering_mode: Literal["string_encoded", "unordered"],
10181007
order_col_name: Optional[str] = ORDER_ID_COLUMN,
10191008
):
@@ -1043,8 +1032,6 @@ def _to_ibis_expr(
10431032
order_col_name:
10441033
If the ordering mode outputs a single ordering or offsets
10451034
column, use this as the column name.
1046-
col_id_overrides:
1047-
overrides the column ids for the result
10481035
Returns:
10491036
An ibis expression representing the data help by the ArrayValue object.
10501037
"""
@@ -1086,10 +1073,6 @@ def _to_ibis_expr(
10861073
if self._reduced_predicate is not None:
10871074
table = table.filter(base_table[PREDICATE_COLUMN])
10881075
table = table.drop(*columns_to_drop)
1089-
if col_id_overrides:
1090-
table = table.rename(
1091-
{value: key for key, value in col_id_overrides.items()}
1092-
)
10931076
if fraction is not None:
10941077
table = table.filter(ibis.random() < ibis.literal(fraction))
10951078
return table

bigframes/core/compile/compiler.py

+54-16
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io
1919
import typing
2020

21+
import google.cloud.bigquery
2122
import ibis
2223
import ibis.backends
2324
import ibis.backends.bigquery
@@ -32,6 +33,7 @@
3233
import bigframes.core.compile.scalar_op_compiler as compile_scalar
3334
import bigframes.core.compile.schema_translator
3435
import bigframes.core.compile.single_column
36+
import bigframes.core.expression as ex
3537
import bigframes.core.guid as guids
3638
import bigframes.core.identifiers as ids
3739
import bigframes.core.nodes as nodes
@@ -50,31 +52,66 @@ class Compiler:
5052
strict: bool = True
5153
scalar_op_compiler = compile_scalar.ScalarOpCompiler()
5254
enable_pruning: bool = False
55+
enable_densify_ids: bool = False
56+
57+
def compile_sql(
58+
self, node: nodes.BigFrameNode, ordered: bool, output_ids: typing.Sequence[str]
59+
) -> str:
60+
node = self.set_output_names(node, output_ids)
61+
if ordered:
62+
node, limit = rewrites.pullup_limit_from_slice(node)
63+
return self.compile_ordered_ir(self._preprocess(node)).to_sql(
64+
ordered=True, limit=limit
65+
)
66+
else:
67+
return self.compile_unordered_ir(self._preprocess(node)).to_sql()
68+
69+
def compile_peek_sql(self, node: nodes.BigFrameNode, n_rows: int) -> str:
70+
return self.compile_unordered_ir(self._preprocess(node)).peek_sql(n_rows)
71+
72+
def compile_raw(
73+
self,
74+
node: bigframes.core.nodes.BigFrameNode,
75+
) -> typing.Tuple[
76+
str, typing.Sequence[google.cloud.bigquery.SchemaField], bf_ordering.RowOrdering
77+
]:
78+
ir = self.compile_ordered_ir(self._preprocess(node))
79+
sql, schema = ir.raw_sql_and_schema(column_ids=node.schema.names)
80+
return sql, schema, ir._ordering
5381

5482
def _preprocess(self, node: nodes.BigFrameNode):
5583
if self.enable_pruning:
5684
used_fields = frozenset(field.id for field in node.fields)
5785
node = node.prune(used_fields)
5886
node = functools.cache(rewrites.replace_slice_ops)(node)
87+
if self.enable_densify_ids:
88+
original_names = [id.name for id in node.ids]
89+
node, _ = rewrites.remap_variables(
90+
node, id_generator=ids.anonymous_serial_ids()
91+
)
92+
node = self.set_output_names(node, original_names)
5993
return node
6094

61-
def compile_ordered_ir(self, node: nodes.BigFrameNode) -> compiled.OrderedIR:
62-
ir = typing.cast(
63-
compiled.OrderedIR, self.compile_node(self._preprocess(node), True)
95+
def set_output_names(
96+
self, node: bigframes.core.nodes.BigFrameNode, output_ids: typing.Sequence[str]
97+
):
98+
# TODO: Create specialized output operators that will handle final names
99+
return nodes.SelectionNode(
100+
node,
101+
tuple(
102+
(ex.DerefOp(old_id), ids.ColumnId(out_id))
103+
for old_id, out_id in zip(node.ids, output_ids)
104+
),
64105
)
106+
107+
def compile_ordered_ir(self, node: nodes.BigFrameNode) -> compiled.OrderedIR:
108+
ir = typing.cast(compiled.OrderedIR, self.compile_node(node, True))
65109
if self.strict:
66110
assert ir.has_total_order
67111
return ir
68112

69113
def compile_unordered_ir(self, node: nodes.BigFrameNode) -> compiled.UnorderedIR:
70-
return typing.cast(
71-
compiled.UnorderedIR, self.compile_node(self._preprocess(node), False)
72-
)
73-
74-
def compile_peak_sql(
75-
self, node: nodes.BigFrameNode, n_rows: int
76-
) -> typing.Optional[str]:
77-
return self.compile_unordered_ir(self._preprocess(node)).peek_sql(n_rows)
114+
return typing.cast(compiled.UnorderedIR, self.compile_node(node, False))
78115

79116
# TODO: Remove cache when schema no longer requires compilation to derive schema (and therefor only compiles for execution)
80117
@functools.lru_cache(maxsize=5000)
@@ -144,11 +181,11 @@ def compile_fromrange(self, node: nodes.FromRangeNode, ordered: bool = True):
144181

145182
labels_array_table = ibis.range(
146183
joined_table[start_column], joined_table[end_column] + node.step, node.step
147-
).name("labels")
184+
).name(node.output_id.sql)
148185
labels = (
149186
typing.cast(ibis.expr.types.ArrayValue, labels_array_table)
150187
.as_table()
151-
.unnest(["labels"])
188+
.unnest([node.output_id.sql])
152189
)
153190
if ordered:
154191
return compiled.OrderedIR(
@@ -307,18 +344,19 @@ def compile_projection(self, node: nodes.ProjectionNode, ordered: bool = True):
307344

308345
@_compile_node.register
309346
def compile_concat(self, node: nodes.ConcatNode, ordered: bool = True):
347+
output_ids = [id.sql for id in node.output_ids]
310348
if ordered:
311349
compiled_ordered = [self.compile_ordered_ir(node) for node in node.children]
312-
return concat_impl.concat_ordered(compiled_ordered)
350+
return concat_impl.concat_ordered(compiled_ordered, output_ids)
313351
else:
314352
compiled_unordered = [
315353
self.compile_unordered_ir(node) for node in node.children
316354
]
317-
return concat_impl.concat_unordered(compiled_unordered)
355+
return concat_impl.concat_unordered(compiled_unordered, output_ids)
318356

319357
@_compile_node.register
320358
def compile_rowcount(self, node: nodes.RowCountNode, ordered: bool = True):
321-
result = self.compile_unordered_ir(node.child).row_count()
359+
result = self.compile_unordered_ir(node.child).row_count(name=node.col_id.sql)
322360
return result if ordered else result.to_unordered()
323361

324362
@_compile_node.register

0 commit comments

Comments
 (0)