Skip to content

Commit 6b10a82

Browse files
authored
Simplify BigQuery load jobs (feast-dev#1935)
* Simplify BigQuery load jobs Signed-off-by: Judah Rand <[email protected]> * Ensure `pyarrow` supports `use_compliant_nested_type` Signed-off-by: Judah Rand <[email protected]>
1 parent df724a8 commit 6b10a82

File tree

4 files changed

+9
-51
lines changed

4 files changed

+9
-51
lines changed

sdk/python/feast/infra/offline_stores/bigquery.py

Lines changed: 3 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -256,8 +256,8 @@ def to_bigquery(
256256
job_config = bigquery.QueryJobConfig(destination=path)
257257

258258
if not job_config.dry_run and self.on_demand_feature_views:
259-
job = _write_pyarrow_table_to_bq(
260-
self.client, self.to_arrow(), job_config.destination
259+
job = self.client.load_table_from_dataframe(
260+
self.to_df(), job_config.destination
261261
)
262262
job.result()
263263
print(f"Done writing to '{job_config.destination}'.")
@@ -366,7 +366,7 @@ def _upload_entity_df_and_get_entity_schema(
366366
elif isinstance(entity_df, pd.DataFrame):
367367
# Drop the index so that we dont have unnecessary columns
368368
entity_df.reset_index(drop=True, inplace=True)
369-
job = _write_df_to_bq(client, entity_df, table_name)
369+
job = client.load_table_from_dataframe(entity_df, table_name)
370370
block_until_done(client, job)
371371
entity_schema = dict(zip(entity_df.columns, entity_df.dtypes))
372372
else:
@@ -400,44 +400,6 @@ def _get_bigquery_client(project: Optional[str] = None, location: Optional[str]
400400
return client
401401

402402

403-
def _write_df_to_bq(
404-
client: bigquery.Client, df: pd.DataFrame, table_name: str
405-
) -> bigquery.LoadJob:
406-
# It is complicated to get BQ to understand that we want an ARRAY<value_type>
407-
# https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions
408-
# https://github.com/googleapis/python-bigquery/issues/19
409-
writer = pyarrow.BufferOutputStream()
410-
pyarrow.parquet.write_table(
411-
pyarrow.Table.from_pandas(df), writer, use_compliant_nested_type=True
412-
)
413-
return _write_pyarrow_buffer_to_bq(client, writer.getvalue(), table_name,)
414-
415-
416-
def _write_pyarrow_table_to_bq(
417-
client: bigquery.Client, table: pyarrow.Table, table_name: str
418-
) -> bigquery.LoadJob:
419-
# It is complicated to get BQ to understand that we want an ARRAY<value_type>
420-
# https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions
421-
# https://github.com/googleapis/python-bigquery/issues/19
422-
writer = pyarrow.BufferOutputStream()
423-
pyarrow.parquet.write_table(table, writer, use_compliant_nested_type=True)
424-
return _write_pyarrow_buffer_to_bq(client, writer.getvalue(), table_name,)
425-
426-
427-
def _write_pyarrow_buffer_to_bq(
428-
client: bigquery.Client, buf: pyarrow.Buffer, table_name: str
429-
) -> bigquery.LoadJob:
430-
reader = pyarrow.BufferReader(buf)
431-
432-
parquet_options = bigquery.format_options.ParquetOptions()
433-
parquet_options.enable_list_inference = True
434-
job_config = bigquery.LoadJobConfig()
435-
job_config.source_format = bigquery.SourceFormat.PARQUET
436-
job_config.parquet_options = parquet_options
437-
438-
return client.load_table_from_file(reader, table_name, job_config=job_config,)
439-
440-
441403
# TODO: Optimizations
442404
# * Use GENERATE_UUID() instead of ROW_NUMBER(), or join on entity columns directly
443405
# * Precompute ROW_NUMBER() so that it doesn't have to be recomputed for every query on entity_dataframe

sdk/python/setup.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
"pandas>=1.0.0",
5555
"pandavro==1.5.*",
5656
"protobuf>=3.10",
57-
"pyarrow>=2.0.0",
57+
"pyarrow>=4.0.0",
5858
"pydantic>=1.0.0",
5959
"PyYAML>=5.4.*",
6060
"tabulate==0.8.*",
@@ -66,7 +66,7 @@
6666
]
6767

6868
GCP_REQUIRED = [
69-
"google-cloud-bigquery>=2.14.0",
69+
"google-cloud-bigquery>=2.28.1",
7070
"google-cloud-bigquery-storage >= 2.0.0",
7171
"google-cloud-datastore>=2.1.*",
7272
"google-cloud-storage>=1.34.*",
@@ -112,7 +112,7 @@
112112
"firebase-admin==4.5.2",
113113
"pre-commit",
114114
"assertpy==1.1",
115-
"google-cloud-bigquery>=2.14.0",
115+
"google-cloud-bigquery>=2.28.1",
116116
"google-cloud-bigquery-storage >= 2.0.0",
117117
"google-cloud-datastore>=2.1.*",
118118
"google-cloud-storage>=1.20.*",

sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,7 @@
66

77
from feast import BigQuerySource
88
from feast.data_source import DataSource
9-
from feast.infra.offline_stores.bigquery import (
10-
BigQueryOfflineStoreConfig,
11-
_write_df_to_bq,
12-
)
9+
from feast.infra.offline_stores.bigquery import BigQueryOfflineStoreConfig
1310
from tests.integration.feature_repos.universal.data_source_creator import (
1411
DataSourceCreator,
1512
)
@@ -69,7 +66,7 @@ def create_data_source(
6966
f"{self.gcp_project}.{self.project_name}.{destination_name}"
7067
)
7168

72-
job = _write_df_to_bq(self.client, df, destination_name)
69+
job = self.client.load_table_from_dataframe(df, destination_name)
7370
job.result()
7471

7572
self.tables.append(destination_name)

sdk/python/tests/utils/data_source_utils.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
from feast import BigQuerySource, FileSource
99
from feast.data_format import ParquetFormat
10-
from feast.infra.offline_stores.bigquery import _write_df_to_bq
1110

1211

1312
@contextlib.contextmanager
@@ -39,7 +38,7 @@ def simple_bq_source_using_table_ref_arg(
3938
client.update_dataset(dataset, ["default_table_expiration_ms"])
4039
table_ref = f"{gcp_project}.{bigquery_dataset}.table_{random.randrange(100, 999)}"
4140

42-
job = _write_df_to_bq(client, df, table_ref)
41+
job = client.load_table_from_dataframe(df, table_ref)
4342
job.result()
4443

4544
return BigQuerySource(

0 commit comments

Comments
 (0)