Skip to content

feat: use ArrowDtype for STRUCT columns in to_pandas #85

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
implement explode
  • Loading branch information
tswast committed Sep 29, 2023
commit 05105a8de1bd9e1510fa62def8e16849a725c8d8
10 changes: 9 additions & 1 deletion bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,15 @@ def __init__(
columns=columns, # type:ignore
dtype=dtype, # type:ignore
)
if pd_dataframe.size < MAX_INLINE_DF_SIZE:
if (
pd_dataframe.size < MAX_INLINE_DF_SIZE
# TODO(swast): Workaround data types limitation in inline data.
and not any(
dt.pyarrow_dtype
for dt in pd_dataframe.dtypes
if isinstance(dt, pandas.ArrowDtype)
)
):
self._block = blocks.block_from_local(
pd_dataframe, session or bigframes.pandas.get_global_session()
)
Expand Down
10 changes: 9 additions & 1 deletion bigframes/operations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,15 @@ def __init__(
if pd_series.name is None:
# to_frame will set default numeric column label if unnamed, but we do not support int column label, so must rename
pd_dataframe = pd_dataframe.set_axis(["unnamed_col"], axis=1)
if pd_dataframe.size < MAX_INLINE_SERIES_SIZE:
if (
pd_dataframe.size < MAX_INLINE_SERIES_SIZE
# TODO(swast): Workaround data types limitation in inline data.
and not any(
dt.pyarrow_dtype
for dt in pd_dataframe.dtypes
if isinstance(dt, pd.ArrowDtype)
)
):
self._block = blocks.block_from_local(
pd_dataframe, session or bigframes.pandas.get_global_session()
)
Expand Down
18 changes: 16 additions & 2 deletions bigframes/operations/structs.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def _as_ibis(self, x: ibis_types.Value):
name = self._name_or_index
else:
name = struct_value.names[self._name_or_index]
return struct_value[name]
return struct_value[name].name(name)


class StructAccessor(
Expand All @@ -44,4 +44,18 @@ class StructAccessor(
__doc__ = vendoracessors.StructAccessor.__doc__

def field(self, name_or_index: str | int) -> bigframes.series.Series:
return self._apply_unary_op(StructField(name_or_index))
series = self._apply_unary_op(StructField(name_or_index))
if isinstance(name_or_index, str):
name = name_or_index
else:
struct_field = self._dtype.pyarrow_dtype[name_or_index]
name = struct_field.name
return series.rename(name)

def explode(self) -> bigframes.dataframe.DataFrame:
import bigframes.pandas

pa_type = self._dtype.pyarrow_dtype
return bigframes.pandas.concat(
[self.field(i) for i in range(pa_type.num_fields)], axis="columns"
)
5 changes: 5 additions & 0 deletions bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import bigframes.operations.base
import bigframes.operations.datetimes as dt
import bigframes.operations.strings as strings
import bigframes.operations.structs as structs
import third_party.bigframes_vendored.pandas.core.series as vendored_pandas_series

LevelType = typing.Union[str, int]
Expand Down Expand Up @@ -118,6 +119,10 @@ def query_job(self) -> Optional[bigquery.QueryJob]:
self._set_internal_query_job(self._compute_dry_run())
return self._query_job

@property
def struct(self) -> structs.StructAccessor:
return structs.StructAccessor(self._block)

def _set_internal_query_job(self, query_job: bigquery.QueryJob):
self._query_job = query_job

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,39 @@ def field(self, name_or_index: str | int):
Name: version, dtype: int64[pyarrow]
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

def explode(self):
"""
Extract all child fields of a struct as a DataFrame.

Returns
-------
pandas.DataFrame
The data corresponding to all child fields.

See Also
--------
Series.struct.field : Return a single child field as a Series.

Examples
--------
>>> import bigframes.pandas as bpd
>>> import pyarrow as pa
>>> s = bpd.Series(
... [
... {"version": 1, "project": "pandas"},
... {"version": 2, "project": "pandas"},
... {"version": 1, "project": "numpy"},
... ],
... dtype=bpd.ArrowDtype(pa.struct(
... [("version", pa.int64()), ("project", pa.string())]
... ))
... )

>>> s.struct.explode()
version project
0 1 pandas
1 2 pandas
2 1 numpy
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)