Skip to content

Commit 866ba9e

Browse files
chore: support timedeltas for read_pandas() (#1349)
* chore: support timedeltas for read_pandas() * fix format * fix mypy error * centralize timedelta to microsecs replacement logic * fix format * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * remove redundant imports * polish todo comment * update timdelta to microsecond conversion algo * update python doc --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 3c4abf2 commit 866ba9e

File tree

9 files changed

+123
-6
lines changed

9 files changed

+123
-6
lines changed

bigframes/core/compile/compiler.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import google.cloud.bigquery
2525
import pandas as pd
2626

27+
from bigframes.core import utils
2728
import bigframes.core.compile.compiled as compiled
2829
import bigframes.core.compile.concat as concat_impl
2930
import bigframes.core.compile.explode
@@ -173,6 +174,10 @@ def compile_readlocal(self, node: nodes.ReadLocalNode):
173174
io.BytesIO(node.feather_bytes),
174175
columns=[item.source_id for item in node.scan_list.items],
175176
)
177+
178+
# Convert timedeltas to microseconds for compatibility with BigQuery
179+
_ = utils.replace_timedeltas_with_micros(array_as_pd)
180+
176181
offsets = node.offsets_col.sql if node.offsets_col else None
177182
return compiled.UnorderedIR.from_pandas(
178183
array_as_pd, node.scan_list, offsets=offsets

bigframes/core/local_data.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ def arrow_type_replacements(type: pa.DataType) -> pa.DataType:
5959
if pa.types.is_time64(type):
6060
# This is potentially lossy, but BigFrames doesn't support ns
6161
return pa.time64("us")
62+
if pa.types.is_duration(type):
63+
# This is potentially lossy, but BigFrames doesn't support ns
64+
return pa.duration("us")
6265
if pa.types.is_decimal128(type):
6366
return pa.decimal128(38, 9)
6467
if pa.types.is_decimal256(type):

bigframes/core/schema.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,13 @@ class ArraySchema:
3838
items: typing.Tuple[SchemaItem, ...]
3939

4040
@classmethod
41-
def from_bq_table(cls, table: google.cloud.bigquery.Table):
41+
def from_bq_table(
42+
cls,
43+
table: google.cloud.bigquery.Table,
44+
column_type_overrides: typing.Dict[str, bigframes.dtypes.Dtype] = {},
45+
):
4246
items = tuple(
43-
SchemaItem(name, dtype)
47+
SchemaItem(name, column_type_overrides.get(name, dtype))
4448
for name, dtype in bigframes.dtypes.bf_type_from_type_kind(
4549
table.schema
4650
).items()

bigframes/core/utils.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import bigframes_vendored.pandas.io.common as vendored_pandas_io_common
2121
import pandas as pd
22+
import pandas.api.types as pdtypes
2223
import typing_extensions
2324

2425
import bigframes.exceptions as bfe
@@ -184,3 +185,29 @@ def wrapper(*args, **kwargs):
184185
return wrapper
185186

186187
return decorator
188+
189+
190+
def timedelta_to_micros(td: pd.Timedelta) -> int:
191+
# td.value returns total nanoseconds.
192+
return td.value // 1000
193+
194+
195+
def replace_timedeltas_with_micros(dataframe: pd.DataFrame) -> List[str]:
196+
"""
197+
Replaces in-place timedeltas to integer values in microseconds. Nanosecond part is ignored.
198+
199+
Returns:
200+
The names of updated columns
201+
"""
202+
updated_columns = []
203+
204+
for col in dataframe.columns:
205+
if pdtypes.is_timedelta64_dtype(dataframe[col].dtype):
206+
dataframe[col] = dataframe[col].apply(timedelta_to_micros)
207+
updated_columns.append(col)
208+
209+
if pdtypes.is_timedelta64_dtype(dataframe.index.dtype):
210+
dataframe.index = dataframe.index.map(timedelta_to_micros)
211+
updated_columns.append(dataframe.index.name)
212+
213+
return updated_columns

bigframes/dtypes.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,11 +409,16 @@ def dtype_for_etype(etype: ExpressionType) -> Dtype:
409409
def arrow_dtype_to_bigframes_dtype(arrow_dtype: pa.DataType) -> Dtype:
410410
if arrow_dtype in _ARROW_TO_BIGFRAMES:
411411
return _ARROW_TO_BIGFRAMES[arrow_dtype]
412+
412413
if pa.types.is_list(arrow_dtype):
413414
return pd.ArrowDtype(arrow_dtype)
415+
414416
if pa.types.is_struct(arrow_dtype):
415417
return pd.ArrowDtype(arrow_dtype)
416418

419+
if pa.types.is_duration(arrow_dtype):
420+
return pd.ArrowDtype(arrow_dtype)
421+
417422
# BigFrames doesn't distinguish between string and large_string because the
418423
# largest string (2 GB) is already larger than the largest BigQuery row.
419424
if pa.types.is_string(arrow_dtype) or pa.types.is_large_string(arrow_dtype):

bigframes/session/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@
120120
pandas.ArrowDtype(pa.timestamp("us", tz="UTC")),
121121
pandas.ArrowDtype(pa.decimal128(38, 9)),
122122
pandas.ArrowDtype(pa.decimal256(76, 38)),
123+
pandas.ArrowDtype(pa.duration("us")),
123124
)
124125

125126

bigframes/session/_io/pandas.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from __future__ import annotations
1515

1616
import dataclasses
17-
from typing import Collection, Union
17+
from typing import Collection, List, Union
1818

1919
import bigframes_vendored.constants as constants
2020
import db_dtypes # type: ignore
@@ -38,6 +38,7 @@ class DataFrameAndLabels:
3838
column_labels: Collection
3939
index_labels: Collection
4040
ordering_col: str
41+
timedelta_cols: List[str]
4142

4243

4344
def _arrow_to_pandas_arrowdtype(
@@ -163,9 +164,12 @@ def pandas_to_bq_compatible(pandas_dataframe: pandas.DataFrame) -> DataFrameAndL
163164
pandas_dataframe_copy.columns = pandas.Index(new_col_ids)
164165
pandas_dataframe_copy[ordering_col] = np.arange(pandas_dataframe_copy.shape[0])
165166

167+
timedelta_cols = utils.replace_timedeltas_with_micros(pandas_dataframe_copy)
168+
166169
return DataFrameAndLabels(
167170
df=pandas_dataframe_copy,
168171
column_labels=col_labels,
169172
index_labels=idx_labels,
170173
ordering_col=ordering_col,
174+
timedelta_cols=timedelta_cols,
171175
)

bigframes/session/loader.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -176,10 +176,16 @@ def read_pandas_load_job(
176176
self._start_generic_job(load_job)
177177

178178
destination_table = self._bqclient.get_table(load_table_destination)
179+
col_type_overrides: typing.Dict[str, bigframes.dtypes.Dtype] = {
180+
col: bigframes.dtypes.TIMEDETLA_DTYPE
181+
for col in df_and_labels.timedelta_cols
182+
}
179183
array_value = core.ArrayValue.from_table(
180184
table=destination_table,
181-
# TODO: Generate this directly from original pandas df.
182-
schema=schemata.ArraySchema.from_bq_table(destination_table),
185+
# TODO (b/394156190): Generate this directly from original pandas df.
186+
schema=schemata.ArraySchema.from_bq_table(
187+
destination_table, col_type_overrides
188+
),
183189
session=self._session,
184190
offsets_col=ordering_col,
185191
).drop_columns([ordering_col])
@@ -229,10 +235,16 @@ def read_pandas_streaming(
229235
f"Problem loading at least one row from DataFrame: {errors}. {constants.FEEDBACK_LINK}"
230236
)
231237

238+
col_type_overrides: typing.Dict[str, bigframes.dtypes.Dtype] = {
239+
col: bigframes.dtypes.TIMEDETLA_DTYPE
240+
for col in df_and_labels.timedelta_cols
241+
}
232242
array_value = (
233243
core.ArrayValue.from_table(
234244
table=destination_table,
235-
schema=schemata.ArraySchema.from_bq_table(destination_table),
245+
schema=schemata.ArraySchema.from_bq_table(
246+
destination_table, col_type_overrides
247+
),
236248
session=self._session,
237249
# Don't set the offsets column because we want to group by it.
238250
)

tests/system/small/test_session.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -691,6 +691,62 @@ def test_read_pandas_tokyo(
691691
assert len(expected) == result.total_rows
692692

693693

694+
@pytest.mark.parametrize(
695+
"write_engine",
696+
["default", "bigquery_inline", "bigquery_load", "bigquery_streaming"],
697+
)
698+
def test_read_pandas_timedelta_dataframes(session, write_engine):
699+
expected_df = pd.DataFrame({"my_col": pd.to_timedelta([1, 2, 3], unit="d")})
700+
701+
actual_result = (
702+
session.read_pandas(expected_df, write_engine=write_engine)
703+
.to_pandas()
704+
.astype("timedelta64[ns]")
705+
)
706+
707+
if write_engine == "bigquery_streaming":
708+
expected_df.index = pd.Index([pd.NA] * 3, dtype="Int64")
709+
pd.testing.assert_frame_equal(actual_result, expected_df, check_index_type=False)
710+
711+
712+
@pytest.mark.parametrize(
713+
"write_engine",
714+
["default", "bigquery_inline", "bigquery_load", "bigquery_streaming"],
715+
)
716+
def test_read_pandas_timedelta_series(session, write_engine):
717+
expected_series = pd.Series(pd.to_timedelta([1, 2, 3], unit="d"))
718+
719+
actual_result = (
720+
session.read_pandas(expected_series, write_engine=write_engine)
721+
.to_pandas()
722+
.astype("timedelta64[ns]")
723+
)
724+
725+
if write_engine == "bigquery_streaming":
726+
expected_series.index = pd.Index([pd.NA] * 3, dtype="Int64")
727+
pd.testing.assert_series_equal(
728+
actual_result, expected_series, check_index_type=False
729+
)
730+
731+
732+
@pytest.mark.parametrize(
733+
"write_engine",
734+
["default", "bigquery_inline", "bigquery_load"],
735+
)
736+
def test_read_pandas_timedelta_index(session, write_engine):
737+
expected_index = pd.to_timedelta(
738+
[1, 2, 3], unit="d"
739+
) # to_timedelta returns an index
740+
741+
actual_result = (
742+
session.read_pandas(expected_index, write_engine=write_engine)
743+
.to_pandas()
744+
.astype("timedelta64[ns]")
745+
)
746+
747+
pd.testing.assert_index_equal(actual_result, expected_index)
748+
749+
694750
@utils.skip_legacy_pandas
695751
@pytest.mark.parametrize(
696752
("write_engine",),

0 commit comments

Comments
 (0)