Skip to content

fix: use anonymous dataset to create remote_function #205

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,9 @@ definition. To view and manage connections, do the following:
3. In the Explorer pane, expand that project and then expand External connections.

BigQuery remote functions are created in the dataset you specify, or
in a dataset with the name ``bigframes_temp_location``, where location is
the location used by the BigQuery DataFrames session. For example,
``bigframes_temp_us_central1``. To view and manage remote functions, do
the following:
in a special type of `hidden dataset <https://cloud.google.com/bigquery/docs/datasets#hidden_datasets>`__
referred to as an anonymous dataset. To view and manage remote functions created
in a user provided dataset, do the following:

1. Go to `BigQuery in the Google Cloud Console <https://console.cloud.google.com/bigquery>`__.
2. Select the project in which you created the remote function.
Expand Down
6 changes: 2 additions & 4 deletions bigframes/remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ def get_routine_reference(
raise DatasetMissingError

dataset_ref = bigquery.DatasetReference(
bigquery_client.project, session._session_dataset_id
bigquery_client.project, session._anonymous_dataset.dataset_id
)
return dataset_ref.routine(routine_ref_str)

Expand Down Expand Up @@ -778,9 +778,7 @@ def remote_function(
dataset, default_project=bigquery_client.project
)
else:
dataset_ref = bigquery.DatasetReference.from_string(
session._session_dataset_id, default_project=bigquery_client.project
)
dataset_ref = session._anonymous_dataset

bq_location, cloud_function_region = get_remote_function_locations(
bigquery_client.location
Expand Down
14 changes: 0 additions & 14 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,13 +197,6 @@ def cloudfunctionsclient(self):
def resourcemanagerclient(self):
return self._clients_provider.resourcemanagerclient

@property
def _session_dataset_id(self):
"""A dataset for storing temporary objects local to the session
This is a workaround for remote functions that do not
yet support session-temporary instances."""
return self._session_dataset.dataset_id

@property
def _project(self):
return self.bqclient.project
Expand All @@ -228,13 +221,6 @@ def _create_bq_datasets(self):
query_destination.dataset_id,
)

# Dataset for storing remote functions, which don't yet
# support proper session temporary storage yet
self._session_dataset = bigquery.Dataset(
f"{self.bqclient.project}.bigframes_temp_{self._location.lower().replace('-', '_')}"
)
self._session_dataset.location = self._location

def close(self):
"""No-op. Temporary resources are deleted after 7 days."""

Expand Down
47 changes: 46 additions & 1 deletion tests/system/large/test_remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import textwrap

from google.api_core.exceptions import NotFound, ResourceExhausted
from google.cloud import functions_v2
from google.cloud import bigquery, functions_v2
import pandas
import pytest
import test_utils.prefixer
Expand Down Expand Up @@ -1210,3 +1210,48 @@ def square(x):
cleanup_remote_function_assets(
session.bqclient, session.cloudfunctionsclient, square
)


@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_anonymous_dataset(session, scalars_dfs):
try:
# This usage of remote_function is expected to create the remote
# function in the bigframes session's anonymous dataset. Use reuse=False
# param to make sure parallel instances of the test don't step over each
# other due to the common anonymous dataset.
@session.remote_function([int], int, reuse=False)
def square(x):
return x * x

assert (
bigquery.Routine(square.bigframes_remote_function).dataset_id
== session._anonymous_dataset.dataset_id
)

scalars_df, scalars_pandas_df = scalars_dfs

bf_int64_col = scalars_df["int64_col"]
bf_int64_col_filter = bf_int64_col.notnull()
bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter]
bf_result_col = bf_int64_col_filtered.apply(square)
bf_result = (
bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas()
)

pd_int64_col = scalars_pandas_df["int64_col"]
pd_int64_col_filter = pd_int64_col.notnull()
pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter]
pd_result_col = pd_int64_col_filtered.apply(lambda x: x * x)
# TODO(shobs): Figure why pandas .apply() changes the dtype, i.e.
# pd_int64_col_filtered.dtype is Int64Dtype()
# pd_int64_col_filtered.apply(lambda x: x * x).dtype is int64.
# For this test let's force the pandas dtype to be same as bigframes' dtype.
pd_result_col = pd_result_col.astype(pandas.Int64Dtype())
pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col)

assert_pandas_df_equal(bf_result, pd_result)
finally:
# clean up the gcp assets created for the remote function
cleanup_remote_function_assets(
session.bqclient, session.cloudfunctionsclient, square
)
37 changes: 11 additions & 26 deletions tests/system/small/test_remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,12 @@ def bq_cf_connection_location_project_mismatched() -> str:


@pytest.fixture(scope="module")
def session_with_bq_connection_and_permanent_dataset(
def session_with_bq_connection(
bq_cf_connection, dataset_id_permanent
) -> bigframes.Session:
session = bigframes.Session(
bigframes.BigQueryOptions(bq_connection=bq_cf_connection)
)
session._session_dataset = bigquery.Dataset(dataset_id_permanent)
return session


Expand Down Expand Up @@ -277,13 +276,11 @@ def square(x):


@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_direct_session_param(
session_with_bq_connection_and_permanent_dataset, scalars_dfs
):
def test_remote_function_direct_session_param(session_with_bq_connection, scalars_dfs):
@rf.remote_function(
[int],
int,
session=session_with_bq_connection_and_permanent_dataset,
session=session_with_bq_connection,
)
def square(x):
return x * x
Expand Down Expand Up @@ -313,17 +310,15 @@ def square(x):


@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_via_session_default(
session_with_bq_connection_and_permanent_dataset, scalars_dfs
):
def test_remote_function_via_session_default(session_with_bq_connection, scalars_dfs):
# Session has bigquery connection initialized via context. Without an
# explicit dataset the default dataset from the session would be used.
# Without an explicit bigquery connection, the one present in Session set
# through the explicit BigQueryOptions would be used. Without an explicit `reuse`
# the default behavior of reuse=True will take effect. Please note that the
# udf is same as the one used in other tests in this file so the underlying
# cloud function would be common and quickly reused.
@session_with_bq_connection_and_permanent_dataset.remote_function([int], int)
@session_with_bq_connection.remote_function([int], int)
def square(x):
return x * x

Expand Down Expand Up @@ -391,15 +386,11 @@ def square(x):


@pytest.mark.flaky(retries=2, delay=120)
def test_dataframe_applymap(
session_with_bq_connection_and_permanent_dataset, scalars_dfs
):
def test_dataframe_applymap(session_with_bq_connection, scalars_dfs):
def add_one(x):
return x + 1

remote_add_one = session_with_bq_connection_and_permanent_dataset.remote_function(
[int], int
)(add_one)
remote_add_one = session_with_bq_connection.remote_function([int], int)(add_one)

scalars_df, scalars_pandas_df = scalars_dfs
int64_cols = ["int64_col", "int64_too"]
Expand All @@ -422,15 +413,11 @@ def add_one(x):


@pytest.mark.flaky(retries=2, delay=120)
def test_dataframe_applymap_na_ignore(
session_with_bq_connection_and_permanent_dataset, scalars_dfs
):
def test_dataframe_applymap_na_ignore(session_with_bq_connection, scalars_dfs):
def add_one(x):
return x + 1

remote_add_one = session_with_bq_connection_and_permanent_dataset.remote_function(
[int], int
)(add_one)
remote_add_one = session_with_bq_connection.remote_function([int], int)(add_one)

scalars_df, scalars_pandas_df = scalars_dfs
int64_cols = ["int64_col", "int64_too"]
Expand All @@ -451,13 +438,11 @@ def add_one(x):


@pytest.mark.flaky(retries=2, delay=120)
def test_series_map(session_with_bq_connection_and_permanent_dataset, scalars_dfs):
def test_series_map(session_with_bq_connection, scalars_dfs):
def add_one(x):
return x + 1

remote_add_one = session_with_bq_connection_and_permanent_dataset.remote_function(
[int], int
)(add_one)
remote_add_one = session_with_bq_connection.remote_function([int], int)(add_one)

scalars_df, scalars_pandas_df = scalars_dfs

Expand Down