Skip to content

feat: Support to_csv/parquet/json to local files/objects #858

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions bigframes/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
UNNAMED_INDEX_ID = "bigframes_unnamed_index"


def is_gcs_path(value) -> typing_extensions.TypeGuard[str]:
return isinstance(value, str) and value.startswith("gs://")


def get_axis_number(axis: typing.Union[str, int]) -> typing.Literal[0, 1]:
if axis in {0, "index", "rows"}:
return 0
Expand Down
50 changes: 32 additions & 18 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2950,15 +2950,21 @@ def from_records(
)

def to_csv(
self, path_or_buf: str, sep=",", *, header: bool = True, index: bool = True
) -> None:
self,
path_or_buf=None,
sep=",",
*,
header: bool = True,
index: bool = True,
) -> Optional[str]:
# TODO(swast): Can we support partition columns argument?
# TODO(chelsealin): Support local file paths.
# TODO(swast): Some warning that wildcard is recommended for large
# query results? See:
# https://cloud.google.com/bigquery/docs/exporting-data#limit_the_exported_file_size
if not path_or_buf.startswith("gs://"):
raise NotImplementedError(ERROR_IO_ONLY_GS_PATHS)
if not utils.is_gcs_path(path_or_buf):
pd_df = self.to_pandas()
return pd_df.to_csv(path_or_buf, sep=sep, header=header, index=index)
if "*" not in path_or_buf:
raise NotImplementedError(ERROR_IO_REQUIRES_WILDCARD)

Expand All @@ -2975,22 +2981,28 @@ def to_csv(
export_data_statement, api_name="dataframe-to_csv"
)
self._set_internal_query_job(query_job)
return None

def to_json(
self,
path_or_buf: str,
orient: Literal[
"split", "records", "index", "columns", "values", "table"
] = "columns",
path_or_buf=None,
orient: Optional[
Literal["split", "records", "index", "columns", "values", "table"]
] = None,
*,
lines: bool = False,
index: bool = True,
) -> None:
) -> Optional[str]:
# TODO(swast): Can we support partition columns argument?
# TODO(chelsealin): Support local file paths.
if not path_or_buf.startswith("gs://"):
raise NotImplementedError(ERROR_IO_ONLY_GS_PATHS)

if not utils.is_gcs_path(path_or_buf):
pd_df = self.to_pandas()
return pd_df.to_json(
path_or_buf,
orient=orient,
lines=lines,
index=index,
default_handler=str,
)
if "*" not in path_or_buf:
raise NotImplementedError(ERROR_IO_REQUIRES_WILDCARD)

Expand Down Expand Up @@ -3019,6 +3031,7 @@ def to_json(
export_data_statement, api_name="dataframe-to_json"
)
self._set_internal_query_job(query_job)
return None

def to_gbq(
self,
Expand Down Expand Up @@ -3117,19 +3130,19 @@ def __array__(self, dtype=None) -> numpy.ndarray:

def to_parquet(
self,
path: str,
path=None,
*,
compression: Optional[Literal["snappy", "gzip"]] = "snappy",
index: bool = True,
) -> None:
) -> Optional[bytes]:
# TODO(swast): Can we support partition columns argument?
# TODO(chelsealin): Support local file paths.
# TODO(swast): Some warning that wildcard is recommended for large
# query results? See:
# https://cloud.google.com/bigquery/docs/exporting-data#limit_the_exported_file_size
if not path.startswith("gs://"):
raise NotImplementedError(ERROR_IO_ONLY_GS_PATHS)

if not utils.is_gcs_path(path):
pd_df = self.to_pandas()
return pd_df.to_parquet(path, compression=compression, index=index)
if "*" not in path:
raise NotImplementedError(ERROR_IO_REQUIRES_WILDCARD)

Expand All @@ -3153,6 +3166,7 @@ def to_parquet(
export_data_statement, api_name="dataframe-to_parquet"
)
self._set_internal_query_job(query_job)
return None

def to_dict(
self,
Expand Down
41 changes: 30 additions & 11 deletions bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -1652,9 +1652,22 @@ def to_frame(self, name: blocks.Label = None) -> bigframes.dataframe.DataFrame:
return bigframes.dataframe.DataFrame(block)

def to_csv(
self, path_or_buf: str, sep=",", *, header: bool = True, index: bool = True
) -> None:
return self.to_frame().to_csv(path_or_buf, sep=sep, header=header, index=index)
self,
path_or_buf=None,
sep=",",
*,
header: bool = True,
index: bool = True,
) -> Optional[str]:
if utils.is_gcs_path(path_or_buf):
return self.to_frame().to_csv(
path_or_buf, sep=sep, header=header, index=index
)
else:
pd_series = self.to_pandas()
return pd_series.to_csv(
path_or_buf=path_or_buf, sep=sep, header=header, index=index
)

def to_dict(self, into: type[dict] = dict) -> typing.Mapping:
return typing.cast(dict, self.to_pandas().to_dict(into)) # type: ignore
Expand All @@ -1664,17 +1677,23 @@ def to_excel(self, excel_writer, sheet_name="Sheet1", **kwargs) -> None:

def to_json(
self,
path_or_buf: str,
orient: typing.Literal[
"split", "records", "index", "columns", "values", "table"
] = "columns",
path_or_buf=None,
orient: Optional[
typing.Literal["split", "records", "index", "columns", "values", "table"]
] = None,
*,
lines: bool = False,
index: bool = True,
) -> None:
return self.to_frame().to_json(
path_or_buf=path_or_buf, orient=orient, lines=lines, index=index
)
) -> Optional[str]:
if utils.is_gcs_path(path_or_buf):
return self.to_frame().to_json(
path_or_buf=path_or_buf, orient=orient, lines=lines, index=index
)
else:
pd_series = self.to_pandas()
return pd_series.to_json(
path_or_buf=path_or_buf, orient=orient, lines=lines, index=index # type: ignore
)

def to_latex(
self, buf=None, columns=None, header=True, index=True, **kwargs
Expand Down
68 changes: 67 additions & 1 deletion tests/system/small/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4125,6 +4125,72 @@ def test_df_to_latex(scalars_df_index, scalars_pandas_df_index):
assert bf_result == pd_result


def test_df_to_json_local_str(scalars_df_index, scalars_pandas_df_index):
bf_result = scalars_df_index.to_json()
# default_handler for arrow types that have no default conversion
pd_result = scalars_pandas_df_index.to_json(default_handler=str)

assert bf_result == pd_result


@skip_legacy_pandas
def test_df_to_json_local_file(scalars_df_index, scalars_pandas_df_index):
with tempfile.TemporaryFile() as bf_result_file, tempfile.TemporaryFile() as pd_result_file:
scalars_df_index.to_json(bf_result_file, orient="table")
# default_handler for arrow types that have no default conversion
scalars_pandas_df_index.to_json(
pd_result_file, orient="table", default_handler=str
)

bf_result = bf_result_file.read()
pd_result = pd_result_file.read()

assert bf_result == pd_result


def test_df_to_csv_local_str(scalars_df_index, scalars_pandas_df_index):
bf_result = scalars_df_index.to_csv()
# default_handler for arrow types that have no default conversion
pd_result = scalars_pandas_df_index.to_csv()

assert bf_result == pd_result


def test_df_to_csv_local_file(scalars_df_index, scalars_pandas_df_index):
with tempfile.TemporaryFile() as bf_result_file, tempfile.TemporaryFile() as pd_result_file:
scalars_df_index.to_csv(bf_result_file)
scalars_pandas_df_index.to_csv(pd_result_file)

bf_result = bf_result_file.read()
pd_result = pd_result_file.read()

assert bf_result == pd_result


def test_df_to_parquet_local_bytes(scalars_df_index, scalars_pandas_df_index):
# GEOGRAPHY not supported in parquet export.
unsupported = ["geography_col"]

bf_result = scalars_df_index.drop(columns=unsupported).to_parquet()
# default_handler for arrow types that have no default conversion
pd_result = scalars_pandas_df_index.drop(columns=unsupported).to_parquet()

assert bf_result == pd_result


def test_df_to_parquet_local_file(scalars_df_index, scalars_pandas_df_index):
# GEOGRAPHY not supported in parquet export.
unsupported = ["geography_col"]
with tempfile.TemporaryFile() as bf_result_file, tempfile.TemporaryFile() as pd_result_file:
scalars_df_index.drop(columns=unsupported).to_parquet(bf_result_file)
scalars_pandas_df_index.drop(columns=unsupported).to_parquet(pd_result_file)

bf_result = bf_result_file.read()
pd_result = pd_result_file.read()

assert bf_result == pd_result


def test_df_to_records(scalars_df_index, scalars_pandas_df_index):
unsupported = ["numeric_col"]
bf_result = scalars_df_index.drop(columns=unsupported).to_records()
Expand Down Expand Up @@ -4166,7 +4232,7 @@ def test_df_to_pickle(scalars_df_index, scalars_pandas_df_index):
scalars_df_index.to_pickle(bf_result_file)
scalars_pandas_df_index.to_pickle(pd_result_file)
bf_result = bf_result_file.read()
pd_result = bf_result_file.read()
pd_result = pd_result_file.read()

assert bf_result == pd_result

Expand Down
38 changes: 38 additions & 0 deletions tests/system/small/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -2753,6 +2753,44 @@ def test_to_latex(scalars_df_index, scalars_pandas_df_index):
assert bf_result == pd_result


def test_series_to_json_local_str(scalars_df_index, scalars_pandas_df_index):
bf_result = scalars_df_index.int64_col.to_json()
pd_result = scalars_pandas_df_index.int64_col.to_json()

assert bf_result == pd_result


@skip_legacy_pandas
def test_series_to_json_local_file(scalars_df_index, scalars_pandas_df_index):
with tempfile.TemporaryFile() as bf_result_file, tempfile.TemporaryFile() as pd_result_file:
scalars_df_index.int64_col.to_json(bf_result_file)
scalars_pandas_df_index.int64_col.to_json(pd_result_file)

bf_result = bf_result_file.read()
pd_result = pd_result_file.read()

assert bf_result == pd_result


def test_series_to_csv_local_str(scalars_df_index, scalars_pandas_df_index):
bf_result = scalars_df_index.int64_col.to_csv()
# default_handler for arrow types that have no default conversion
pd_result = scalars_pandas_df_index.int64_col.to_csv()

assert bf_result == pd_result


def test_series_to_csv_local_file(scalars_df_index, scalars_pandas_df_index):
with tempfile.TemporaryFile() as bf_result_file, tempfile.TemporaryFile() as pd_result_file:
scalars_df_index.int64_col.to_csv(bf_result_file)
scalars_pandas_df_index.int64_col.to_csv(pd_result_file)

bf_result = bf_result_file.read()
pd_result = pd_result_file.read()

assert bf_result == pd_result


def test_to_dict(scalars_df_index, scalars_pandas_df_index):
bf_result = scalars_df_index["int64_too"].to_dict()

Expand Down
14 changes: 9 additions & 5 deletions third_party/bigframes_vendored/pandas/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,11 +476,11 @@ def to_gbq(

def to_parquet(
self,
path: str,
path: Optional[str],
*,
compression: Optional[Literal["snappy", "gzip"]] = "snappy",
index: bool = True,
) -> None:
) -> Optional[bytes]:
"""Write a DataFrame to the binary Parquet format.

This function writes the dataframe as a `parquet file
Expand All @@ -496,9 +496,13 @@ def to_parquet(
>>> df.to_parquet(path=gcs_bucket)

Args:
path (str):
path (str, path object, file-like object, or None, default None):
String, path object (implementing ``os.PathLike[str]``), or file-like
object implementing a binary ``write()`` function. If None, the result is
returned as bytes. If a string or path, it will be used as Root Directory
path when writing a partitioned dataset.
Destination URI(s) of Cloud Storage files(s) to store the extracted dataframe
in format of ``gs://<bucket_name>/<object_name_or_glob>``.
should be formatted ``gs://<bucket_name>/<object_name_or_glob>``.
If the data size is more than 1GB, you must use a wildcard to export
the data into multiple files and the size of the files varies.

Expand All @@ -511,7 +515,7 @@ def to_parquet(
If ``False``, they will not be written to the file.

Returns:
None.
bytes if no path argument is provided else None
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

Expand Down
Loading