Skip to content

refactor: simplify ArrayValue public interface #82

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Oct 5, 2023
245 changes: 90 additions & 155 deletions bigframes/core/__init__.py

Large diffs are not rendered by default.

116 changes: 86 additions & 30 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def value_columns(self) -> Sequence[str]:
"""All value columns, mutually exclusive with index columns."""
return [
column
for column in self._expr.column_names
for column in self._expr.column_ids
if column not in self.index_columns
]

Expand Down Expand Up @@ -444,9 +444,7 @@ def _compute_and_count(
# TODO(swast): Allow for dry run and timeout.
expr = self._apply_value_keys_to_expr(value_keys=value_keys)

results_iterator, query_job = expr.start_query(
max_results=max_results, expose_extra_columns=True
)
results_iterator, query_job = expr.start_query(max_results=max_results)

table_size = expr._get_table_size(query_job.destination) / _BYTES_TO_MEGABYTES
fraction = (
Expand Down Expand Up @@ -483,12 +481,6 @@ def _compute_and_count(
if self.index_columns:
df.set_index(list(self.index_columns), inplace=True)
df.index.names = self.index.names # type: ignore

df.drop(
[col for col in df.columns if col not in self.value_columns],
axis=1,
inplace=True,
)
elif (sampling_method == _UNIFORM) and (random_state is None):
filtered_expr = self.expr._uniform_sampling(fraction)
block = Block(
Expand Down Expand Up @@ -520,12 +512,6 @@ def _compute_and_count(
df.set_index(list(self.index_columns), inplace=True)
df.index.names = self.index.names # type: ignore

df.drop(
[col for col in df.columns if col not in self.value_columns],
axis=1,
inplace=True,
)

return df, total_rows, query_job

def _split(
Expand Down Expand Up @@ -1087,7 +1073,7 @@ def _normalize_expression(
):
"""Normalizes expression by moving index columns to left."""
value_columns = [
col_id for col_id in expr.column_names.keys() if col_id not in index_columns
col_id for col_id in expr.column_ids if col_id not in index_columns
]
if (assert_value_size is not None) and (
len(value_columns) != assert_value_size
Expand All @@ -1096,20 +1082,92 @@ def _normalize_expression(
return expr.select_columns([*index_columns, *value_columns])

def slice(
self: bigframes.core.blocks.Block,
self,
start: typing.Optional[int] = None,
stop: typing.Optional[int] = None,
step: typing.Optional[int] = None,
) -> bigframes.core.blocks.Block:
sliced_expr = self.expr.slice(start=start, stop=stop, step=step)
# since this is slice, return a copy even if unchanged
block = Block(
sliced_expr,
index_columns=self.index_columns,
column_labels=self.column_labels,
index_labels=self._index_labels,
if step is None:
step = 1
if step == 0:
raise ValueError("slice step cannot be zero")
if step < 0:
reverse_start = (-start - 1) if start else 0
reverse_stop = (-stop - 1) if stop else None
reverse_step = -step
return self.reversed()._forward_slice(
reverse_start, reverse_stop, reverse_step
)
return self._forward_slice(start or 0, stop, step)

def _forward_slice(self, start: int = 0, stop=None, step: int = 1):
"""Performs slice but only for positive step size."""
if step <= 0:
raise ValueError("forward_slice only supports positive step size")

use_postive_offsets = (
(start > 0)
or ((stop is not None) and (stop >= 0))
or ((step > 1) and (start >= 0))
)
return block
use_negative_offsets = (
(start < 0) or (stop and (stop < 0)) or ((step > 1) and (start < 0))
)

block = self

# only generate offsets that are used
positive_offsets = None
negative_offsets = None

if use_postive_offsets:
block, positive_offsets = self.promote_offsets()
if use_negative_offsets:
block, negative_offsets = block.reversed().promote_offsets()
block = block.reversed()

conditions = []
if start != 0:
if start > 0:
op = ops.partial_right(ops.ge_op, start)
assert positive_offsets
block, start_cond = block.apply_unary_op(positive_offsets, op)
else:
op = ops.partial_right(ops.le_op, -start - 1)
assert negative_offsets
block, start_cond = block.apply_unary_op(negative_offsets, op)
conditions.append(start_cond)
if stop is not None:
if stop >= 0:
op = ops.partial_right(ops.lt_op, stop)
assert positive_offsets
block, stop_cond = block.apply_unary_op(positive_offsets, op)
else:
op = ops.partial_right(ops.gt_op, -stop - 1)
assert negative_offsets
block, stop_cond = block.apply_unary_op(negative_offsets, op)
conditions.append(stop_cond)

if step > 1:
op = ops.partial_right(ops.mod_op, step)
if start >= 0:
op = ops.partial_right(ops.sub_op, start)
assert positive_offsets
block, start_diff = block.apply_unary_op(positive_offsets, op)
else:
op = ops.partial_right(ops.sub_op, -start + 1)
assert negative_offsets
block, start_diff = block.apply_unary_op(negative_offsets, op)
modulo_op = ops.partial_right(ops.mod_op, step)
block, mod = block.apply_unary_op(start_diff, modulo_op)
is_zero_op = ops.partial_right(ops.eq_op, 0)
block, step_cond = block.apply_unary_op(mod, is_zero_op)
conditions.append(step_cond)

for cond in conditions:
block = block.filter(cond)

return block.select_columns(self.value_columns)

# Using cache to optimize for Jupyter Notebook's behavior where both '__repr__'
# and '__repr_html__' are called in a single display action, reducing redundant
Expand Down Expand Up @@ -1396,7 +1454,7 @@ def concat(
)
result_block = Block(
result_expr,
index_columns=list(result_expr.column_names.keys())[:index_nlevels],
index_columns=list(result_expr.column_ids)[:index_nlevels],
column_labels=aligned_blocks[0].column_labels,
index_labels=result_labels,
)
Expand Down Expand Up @@ -1530,9 +1588,7 @@ def to_sql_query(
# the BigQuery unicode column name feature?
substitutions[old_id] = new_id

sql = array_value.to_sql(
ordering_mode="unordered", col_id_overrides=substitutions
)
sql = array_value.to_sql(col_id_overrides=substitutions)
return (
sql,
new_ids[: len(idx_labels)],
Expand Down
4 changes: 0 additions & 4 deletions bigframes/core/groupby/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,10 +426,6 @@ def __init__(
self._value_name = value_name
self._dropna = dropna # Applies to aggregations but not windowing

@property
def _value(self):
return self._block.expr.get_column(self._value_column)

def all(self) -> series.Series:
return self._aggregate(agg_ops.all_op)

Expand Down
4 changes: 1 addition & 3 deletions bigframes/core/indexes/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,9 +398,7 @@ def to_pandas(self) -> pandas.Index:
"""Executes deferred operations and downloads the results."""
# Project down to only the index column. So the query can be cached to visualize other data.
index_columns = list(self._block.index_columns)
expr = self._expr.projection(
[self._expr.get_any_column(col) for col in index_columns]
)
expr = self._expr.select_columns(index_columns)
results, _ = expr.start_query()
df = expr._session._rows_to_dataframe(results)
df = df.set_index(index_columns)
Expand Down
16 changes: 8 additions & 8 deletions bigframes/core/joins/row_identity.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ def join_by_row_identity(
f"Only how='outer','left','inner' currently supported. {constants.FEEDBACK_LINK}"
)

if not left.table.equals(right.table):
if not left._table.equals(right._table):
raise ValueError(
"Cannot combine objects without an explicit join/merge key. "
f"Left based on: {left.table.compile()}, but "
f"right based on: {right.table.compile()}"
f"Left based on: {left._table.compile()}, but "
f"right based on: {right._table.compile()}"
)

left_predicates = left._predicates
Expand All @@ -63,11 +63,11 @@ def join_by_row_identity(
left_mask = left_relative_predicates if how in ["right", "outer"] else None
right_mask = right_relative_predicates if how in ["left", "outer"] else None
joined_columns = [
_mask_value(left.get_column(key), left_mask).name(map_left_id(key))
for key in left.column_names.keys()
_mask_value(left._get_ibis_column(key), left_mask).name(map_left_id(key))
for key in left.column_ids
] + [
_mask_value(right.get_column(key), right_mask).name(map_right_id(key))
for key in right.column_names.keys()
_mask_value(right._get_ibis_column(key), right_mask).name(map_right_id(key))
for key in right.column_ids
]

# If left isn't being masked, can just use left ordering
Expand Down Expand Up @@ -108,7 +108,7 @@ def join_by_row_identity(

joined_expr = core.ArrayValue(
left._session,
left.table,
left._table,
columns=joined_columns,
hidden_ordering_columns=hidden_ordering_columns,
ordering=new_ordering,
Expand Down
22 changes: 12 additions & 10 deletions bigframes/core/joins/single_column.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,14 @@ def join_by_column(
if (
allow_row_identity_join
and how in bigframes.core.joins.row_identity.SUPPORTED_ROW_IDENTITY_HOW
and left.table.equals(right.table)
and left._table.equals(right._table)
# Make sure we're joining on exactly the same column(s), at least with
# regards to value its possible that they both have the same names but
# were modified in different ways. Ignore differences in the names.
and all(
left.get_any_column(lcol)
left._get_any_column(lcol)
.name("index")
.equals(right.get_any_column(rcol).name("index"))
.equals(right._get_any_column(rcol).name("index"))
for lcol, rcol in zip(left_column_ids, right_column_ids)
)
):
Expand All @@ -90,14 +90,16 @@ def join_by_column(
get_column_right,
) = bigframes.core.joins.row_identity.join_by_row_identity(left, right, how=how)
left_join_keys = [
combined_expr.get_column(get_column_left(col)) for col in left_column_ids
combined_expr._get_ibis_column(get_column_left(col))
for col in left_column_ids
]
right_join_keys = [
combined_expr.get_column(get_column_right(col)) for col in right_column_ids
combined_expr._get_ibis_column(get_column_right(col))
for col in right_column_ids
]
join_key_cols = get_coalesced_join_cols(left_join_keys, right_join_keys, how)
join_key_ids = [col.get_name() for col in join_key_cols]
combined_expr = combined_expr.projection(
combined_expr = combined_expr._projection(
[*join_key_cols, *combined_expr.columns]
)
if sort:
Expand All @@ -119,13 +121,13 @@ def join_by_column(
lmapping = {
col_id: guid.generate_guid()
for col_id in itertools.chain(
left.column_names, left._hidden_ordering_column_names
left.column_ids, left._hidden_ordering_column_names
)
}
rmapping = {
col_id: guid.generate_guid()
for col_id in itertools.chain(
right.column_names, right._hidden_ordering_column_names
right.column_ids, right._hidden_ordering_column_names
)
}

Expand All @@ -136,12 +138,12 @@ def get_column_right(col_id):
return rmapping[col_id]

left_table = left._to_ibis_expr(
ordering_mode="unordered",
"unordered",
expose_hidden_cols=True,
col_id_overrides=lmapping,
)
right_table = right._to_ibis_expr(
ordering_mode="unordered",
"unordered",
expose_hidden_cols=True,
col_id_overrides=rmapping,
)
Expand Down
6 changes: 2 additions & 4 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ def _apply_series_binop(
other._block.index, how=how
)

series_column_id = other._value.get_name()
series_column_id = other._value_column
series_col = get_column_right(series_column_id)
block = joined_index._block
for column_id, label in zip(
Expand Down Expand Up @@ -2382,13 +2382,11 @@ def _create_io_query(self, index: bool, ordering_id: Optional[str]) -> str:

if ordering_id is not None:
return array_value.to_sql(
ordering_mode="offset_col",
offset_column=ordering_id,
col_id_overrides=id_overrides,
order_col_name=ordering_id,
)
else:
return array_value.to_sql(
ordering_mode="unordered",
col_id_overrides=id_overrides,
)

Expand Down
6 changes: 0 additions & 6 deletions bigframes/operations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import typing

import ibis.expr.types as ibis_types
import pandas as pd

import bigframes.constants as constants
Expand Down Expand Up @@ -106,11 +105,6 @@ def __init__(
if pd_series.name is None:
self._block = self._block.with_column_labels([None])

@property
def _value(self) -> ibis_types.Value:
"""Private property to get Ibis expression for the value column."""
return self._block.expr.get_column(self._value_column)

@property
def _value_column(self) -> str:
return self._block.value_columns[0]
Expand Down
6 changes: 5 additions & 1 deletion bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -1150,7 +1150,11 @@ def _groupby_values(
def apply(self, func) -> Series:
# TODO(shobs, b/274645634): Support convert_dtype, args, **kwargs
# is actually a ternary op
return self._apply_unary_op(ops.RemoteFunctionOp(func))
# Reproject as workaround to applying filter too late. This forces the filter
# to be applied before passing data to remote function, protecting from bad
# inputs causing errors.
reprojected_series = Series(self._block._force_reproject())
return reprojected_series._apply_unary_op(ops.RemoteFunctionOp(func))

def add_prefix(self, prefix: str, axis: int | str | None = None) -> Series:
return Series(self._get_block().add_prefix(prefix))
Expand Down
Loading