Skip to content

Commit a7d7197

Browse files
authored
feat: add streaming.StreamingDataFrame class (#864)
* feat: add StreamingDataFrame support * use setattr for properties * fix bug * read session from DF * fix docs and tests * fix test * add preview warning * resolve comments * move to streaming.read_gbq_table, add logger * fix unit test * fix doc test * update notebook * add back preview warning
1 parent cbf2d42 commit a7d7197

File tree

9 files changed

+1134
-274
lines changed

9 files changed

+1134
-274
lines changed

bigframes/core/blocks.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2354,7 +2354,7 @@ def is_monotonic_decreasing(
23542354
return self._is_monotonic(column_id, increasing=False)
23552355

23562356
def to_sql_query(
2357-
self, include_index: bool
2357+
self, include_index: bool, enable_cache: bool = True
23582358
) -> typing.Tuple[str, list[str], list[Label]]:
23592359
"""
23602360
Compiles this DataFrame's expression tree to SQL, optionally
@@ -2388,7 +2388,9 @@ def to_sql_query(
23882388
# the BigQuery unicode column name feature?
23892389
substitutions[old_id] = new_id
23902390

2391-
sql = self.session._to_sql(array_value, col_id_overrides=substitutions)
2391+
sql = self.session._to_sql(
2392+
array_value, col_id_overrides=substitutions, enable_cache=enable_cache
2393+
)
23922394
return (
23932395
sql,
23942396
new_ids[: len(idx_labels)],

bigframes/dataframe.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ def guarded_meth(df: DataFrame, *args, **kwargs):
105105
@log_adapter.class_logger
106106
class DataFrame(vendored_pandas_frame.DataFrame):
107107
__doc__ = vendored_pandas_frame.DataFrame.__doc__
108+
# internal flag to disable cache at all
109+
_disable_cache_override: bool = False
108110

109111
def __init__(
110112
self,
@@ -367,7 +369,7 @@ def astype(
367369
return self._apply_unary_op(ops.AsTypeOp(to_type=dtype))
368370

369371
def _to_sql_query(
370-
self, include_index: bool
372+
self, include_index: bool, enable_cache: bool = True
371373
) -> Tuple[str, list[str], list[blocks.Label]]:
372374
"""Compiles this DataFrame's expression tree to SQL, optionally
373375
including index columns.
@@ -381,7 +383,7 @@ def _to_sql_query(
381383
If include_index is set to False, index_column_id_list and index_column_label_list
382384
return empty lists.
383385
"""
384-
return self._block.to_sql_query(include_index)
386+
return self._block.to_sql_query(include_index, enable_cache=enable_cache)
385387

386388
@property
387389
def sql(self) -> str:
@@ -3628,6 +3630,8 @@ def _cached(self, *, force: bool = False) -> DataFrame:
36283630
No-op if the dataframe represents a trivial transformation of an existing materialization.
36293631
Force=True is used for BQML integration where need to copy data rather than use snapshot.
36303632
"""
3633+
if self._disable_cache_override:
3634+
return self
36313635
self._block.cached(force=force)
36323636
return self
36333637

bigframes/session/__init__.py

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@
107107
import bigframes.core.indexes
108108
import bigframes.dataframe as dataframe
109109
import bigframes.series
110+
import bigframes.streaming.dataframe as streaming_dataframe
110111

111112
_BIGFRAMES_DEFAULT_CONNECTION_ID = "bigframes-default-connection"
112113

@@ -749,6 +750,38 @@ def read_gbq_table(
749750
filters=filters,
750751
)
751752

753+
def read_gbq_table_streaming(
754+
self, table: str
755+
) -> streaming_dataframe.StreamingDataFrame:
756+
"""Turn a BigQuery table into a StreamingDataFrame.
757+
758+
Note: The bigframes.streaming module is a preview feature, and subject to change.
759+
760+
**Examples:**
761+
762+
>>> import bigframes.streaming as bst
763+
>>> import bigframes.pandas as bpd
764+
>>> bpd.options.display.progress_bar = None
765+
766+
>>> sdf = bst.read_gbq_table("bigquery-public-data.ml_datasets.penguins")
767+
"""
768+
warnings.warn(
769+
"The bigframes.streaming module is a preview feature, and subject to change.",
770+
stacklevel=1,
771+
category=bigframes.exceptions.PreviewWarning,
772+
)
773+
774+
import bigframes.streaming.dataframe as streaming_dataframe
775+
776+
df = self._read_gbq_table(
777+
table,
778+
api_name="read_gbq_table_steaming",
779+
enable_snapshot=False,
780+
index_col=bigframes.enums.DefaultIndexKind.NULL,
781+
)
782+
783+
return streaming_dataframe.StreamingDataFrame._from_table_df(df)
784+
752785
def _read_gbq_table(
753786
self,
754787
query: str,
@@ -759,6 +792,7 @@ def _read_gbq_table(
759792
api_name: str,
760793
use_cache: bool = True,
761794
filters: third_party_pandas_gbq.FiltersType = (),
795+
enable_snapshot: bool = True,
762796
) -> dataframe.DataFrame:
763797
import bigframes.dataframe as dataframe
764798

@@ -877,7 +911,7 @@ def _read_gbq_table(
877911
else (*columns, *[col for col in index_cols if col not in columns])
878912
)
879913

880-
supports_snapshot = bf_read_gbq_table.validate_table(
914+
enable_snapshot = enable_snapshot and bf_read_gbq_table.validate_table(
881915
self.bqclient, table_ref, all_columns, time_travel_timestamp, filter_str
882916
)
883917

@@ -905,7 +939,7 @@ def _read_gbq_table(
905939
table,
906940
schema=schema,
907941
predicate=filter_str,
908-
at_time=time_travel_timestamp if supports_snapshot else None,
942+
at_time=time_travel_timestamp if enable_snapshot else None,
909943
primary_key=index_cols if is_index_unique else (),
910944
session=self,
911945
)
@@ -2056,17 +2090,20 @@ def _to_sql(
20562090
offset_column: typing.Optional[str] = None,
20572091
col_id_overrides: typing.Mapping[str, str] = {},
20582092
ordered: bool = False,
2093+
enable_cache: bool = True,
20592094
) -> str:
20602095
if offset_column:
20612096
array_value = array_value.promote_offsets(offset_column)
2062-
node_w_cached = self._with_cached_executions(array_value.node)
2097+
node = (
2098+
self._with_cached_executions(array_value.node)
2099+
if enable_cache
2100+
else array_value.node
2101+
)
20632102
if ordered:
20642103
return self._compiler.compile_ordered(
2065-
node_w_cached, col_id_overrides=col_id_overrides
2104+
node, col_id_overrides=col_id_overrides
20662105
)
2067-
return self._compiler.compile_unordered(
2068-
node_w_cached, col_id_overrides=col_id_overrides
2069-
)
2106+
return self._compiler.compile_unordered(node, col_id_overrides=col_id_overrides)
20702107

20712108
def _get_table_size(self, destination_table):
20722109
table = self.bqclient.get_table(destination_table)

0 commit comments

Comments
 (0)