Skip to content

feat: add groupby cumcount #1798

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions bigframes/core/array_value.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,8 +403,23 @@ def project_window_op(
never_skip_nulls: will disable null skipping for operators that would otherwise do so
skip_reproject_unsafe: skips the reprojection step, can be used when performing many non-dependent window operations, user responsible for not nesting window expressions, or using outputs as join, filter or aggregation keys before a reprojection
"""

return self.project_window_expr(
ex.UnaryAggregation(op, ex.deref(column_name)),
window_spec,
never_skip_nulls,
skip_reproject_unsafe,
)

def project_window_expr(
self,
expression: ex.Aggregation,
window: WindowSpec,
never_skip_nulls=False,
skip_reproject_unsafe: bool = False,
):
# TODO: Support non-deterministic windowing
if window_spec.is_row_bounded or not op.order_independent:
if window.is_row_bounded or not expression.op.order_independent:
if self.node.order_ambiguous and not self.session._strictly_ordered:
if not self.session._allows_ambiguity:
raise ValueError(
Expand All @@ -415,14 +430,13 @@ def project_window_op(
"Window ordering may be ambiguous, this can cause unstable results."
)
warnings.warn(msg, category=bfe.AmbiguousWindowWarning)

output_name = self._gen_namespaced_uid()
return (
ArrayValue(
nodes.WindowOpNode(
child=self.node,
expression=ex.UnaryAggregation(op, ex.deref(column_name)),
window_spec=window_spec,
expression=expression,
window_spec=window,
output_name=ids.ColumnId(output_name),
never_skip_nulls=never_skip_nulls,
skip_reproject_unsafe=skip_reproject_unsafe,
Expand Down
32 changes: 25 additions & 7 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1012,16 +1012,34 @@ def apply_window_op(
skip_null_groups: bool = False,
skip_reproject_unsafe: bool = False,
never_skip_nulls: bool = False,
) -> typing.Tuple[Block, str]:
agg_expr = ex.UnaryAggregation(op, ex.deref(column))
return self.apply_analytic(
agg_expr,
window_spec,
result_label,
skip_reproject_unsafe=skip_reproject_unsafe,
never_skip_nulls=never_skip_nulls,
skip_null_groups=skip_null_groups,
)

def apply_analytic(
self,
agg_expr: ex.Aggregation,
window: windows.WindowSpec,
result_label: Label,
*,
skip_reproject_unsafe: bool = False,
never_skip_nulls: bool = False,
skip_null_groups: bool = False,
) -> typing.Tuple[Block, str]:
block = self
if skip_null_groups:
for key in window_spec.grouping_keys:
block, not_null_id = block.apply_unary_op(key.id.name, ops.notnull_op)
block = block.filter_by_id(not_null_id).drop_columns([not_null_id])
expr, result_id = block._expr.project_window_op(
column,
op,
window_spec,
for key in window.grouping_keys:
block = block.filter(ops.notnull_op.as_expr(key.id.name))
expr, result_id = block._expr.project_window_expr(
agg_expr,
window,
skip_reproject_unsafe=skip_reproject_unsafe,
never_skip_nulls=never_skip_nulls,
)
Expand Down
29 changes: 26 additions & 3 deletions bigframes/core/groupby/dataframe_group_by.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,27 @@ def count(self) -> df.DataFrame:
def nunique(self) -> df.DataFrame:
return self._aggregate_all(agg_ops.nunique_op)

@validations.requires_ordering()
def cumcount(self, ascending: bool = True) -> series.Series:
window_spec = (
window_specs.cumulative_rows(grouping_keys=tuple(self._by_col_ids))
if ascending
else window_specs.inverse_cumulative_rows(
grouping_keys=tuple(self._by_col_ids)
)
)
block, result_id = self._block.apply_analytic(
ex.NullaryAggregation(agg_ops.size_op),
window=window_spec,
result_label=None,
)
result = series.Series(block.select_column(result_id)) - 1
if self._dropna and (len(self._by_col_ids) == 1):
result = result.mask(
series.Series(block.select_column(self._by_col_ids[0])).isna()
)
return result

@validations.requires_ordering()
def cumsum(self, *args, numeric_only: bool = False, **kwargs) -> df.DataFrame:
if not numeric_only:
Expand Down Expand Up @@ -546,10 +567,12 @@ def _apply_window_op(
)
columns, _ = self._aggregated_columns(numeric_only=numeric_only)
block, result_ids = self._block.multi_apply_window_op(
columns, op, window_spec=window_spec
columns,
op,
window_spec=window_spec,
)
block = block.select_columns(result_ids)
return df.DataFrame(block)
result = df.DataFrame(block.select_columns(result_ids))
return result

def _resolve_label(self, label: blocks.Label) -> str:
"""Resolve label to column id."""
Expand Down
46 changes: 37 additions & 9 deletions tests/system/small/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,14 +383,14 @@ def test_dataframe_groupby_multi_sum(


@pytest.mark.parametrize(
("operator"),
("operator", "dropna"),
[
(lambda x: x.cumsum(numeric_only=True)),
(lambda x: x.cummax(numeric_only=True)),
(lambda x: x.cummin(numeric_only=True)),
(lambda x: x.cumsum(numeric_only=True), True),
(lambda x: x.cummax(numeric_only=True), True),
(lambda x: x.cummin(numeric_only=True), False),
# Pre-pandas 2.2 doesn't always proeduce float.
(lambda x: x.cumprod().astype("Float64")),
(lambda x: x.shift(periods=2)),
(lambda x: x.cumprod().astype("Float64"), False),
(lambda x: x.shift(periods=2), True),
],
ids=[
"cumsum",
Expand All @@ -401,16 +401,44 @@ def test_dataframe_groupby_multi_sum(
],
)
def test_dataframe_groupby_analytic(
scalars_df_index, scalars_pandas_df_index, operator
scalars_df_index,
scalars_pandas_df_index,
operator,
dropna,
):
col_names = ["float64_col", "int64_col", "bool_col", "string_col"]
bf_result = operator(scalars_df_index[col_names].groupby("string_col"))
pd_result = operator(scalars_pandas_df_index[col_names].groupby("string_col"))
bf_result = operator(
scalars_df_index[col_names].groupby("string_col", dropna=dropna)
)
pd_result = operator(
scalars_pandas_df_index[col_names].groupby("string_col", dropna=dropna)
)
bf_result_computed = bf_result.to_pandas()

pd.testing.assert_frame_equal(pd_result, bf_result_computed, check_dtype=False)


@pytest.mark.parametrize(
("ascending", "dropna"),
[
(True, True),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why leave (False, True) and (True, False)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, just that they are orthogonal, and mostly just wanted to test both modes for each flag, but there isn't really any interaction

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, might be worth adding a note (if not adding the combinations)

(False, False),
],
)
def test_dataframe_groupby_cumcount(
scalars_df_index, scalars_pandas_df_index, ascending, dropna
):
bf_result = scalars_df_index.groupby("string_col", dropna=dropna).cumcount(
ascending
)
pd_result = scalars_pandas_df_index.groupby("string_col", dropna=dropna).cumcount(
ascending
)
bf_result_computed = bf_result.to_pandas()

pd.testing.assert_series_equal(pd_result, bf_result_computed, check_dtype=False)


def test_dataframe_groupby_size_as_index_false(
scalars_df_index, scalars_pandas_df_index
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,6 @@ def max(
def cumcount(self, ascending: bool = True):
"""
Number each item in each group from 0 to the length of that group - 1.
(DataFrameGroupBy functionality is not yet available.)

**Examples:**

Expand Down