Skip to content

feat: support gcf max instance count in remote_function #657

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 27 additions & 5 deletions bigframes/functions/remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,12 @@ def generate_cloud_function_code(self, def_, dir, package_requirements=None):
return entry_point

def create_cloud_function(
self, def_, cf_name, package_requirements=None, cloud_function_timeout=600
self,
def_,
cf_name,
package_requirements=None,
timeout_seconds=600,
max_instance_count=None,
):
"""Create a cloud function from the given user defined function."""

Expand Down Expand Up @@ -411,14 +416,16 @@ def create_cloud_function(
)
function.service_config = functions_v2.ServiceConfig()
function.service_config.available_memory = "1024M"
if cloud_function_timeout is not None:
if cloud_function_timeout > 1200:
if timeout_seconds is not None:
if timeout_seconds > 1200:
raise ValueError(
"BigQuery remote function can wait only up to 20 minutes"
", see for more details "
"https://cloud.google.com/bigquery/quotas#remote_function_limits."
)
function.service_config.timeout_seconds = cloud_function_timeout
function.service_config.timeout_seconds = timeout_seconds
if max_instance_count is not None:
function.service_config.max_instance_count = max_instance_count
function.service_config.service_account_email = (
self._cloud_function_service_account
)
Expand Down Expand Up @@ -466,6 +473,7 @@ def provision_bq_remote_function(
package_requirements,
max_batching_rows,
cloud_function_timeout,
cloud_function_max_instance_count,
):
"""Provision a BigQuery remote function."""
# If reuse of any existing function with the same name (indicated by the
Expand All @@ -487,7 +495,11 @@ def provision_bq_remote_function(
# Create the cloud function if it does not exist
if not cf_endpoint:
cf_endpoint = self.create_cloud_function(
def_, cloud_function_name, package_requirements, cloud_function_timeout
def_,
cloud_function_name,
package_requirements,
cloud_function_timeout,
cloud_function_max_instance_count,
)
else:
logger.info(f"Cloud function {cloud_function_name} already exists.")
Expand Down Expand Up @@ -642,6 +654,7 @@ def remote_function(
cloud_function_docker_repository: Optional[str] = None,
max_batching_rows: Optional[int] = 1000,
cloud_function_timeout: Optional[int] = 600,
cloud_function_max_instances: Optional[int] = None,
):
"""Decorator to turn a user defined function into a BigQuery remote function.

Expand Down Expand Up @@ -778,6 +791,14 @@ def remote_function(
https://cloud.google.com/bigquery/quotas#remote_function_limits.
By default BigQuery DataFrames uses a 10 minute timeout. `None`
can be passed to let the cloud functions default timeout take effect.
cloud_function_max_instances (int, Optional):
The maximumm instance count for the cloud function created. This
can be used to control how many cloud function instances can be
active at max at any given point of time. Lower setting can help
control the spike in the billing. Higher setting can help
support processing larger scale data. When not specified, cloud
function's default setting applies. For more details see
https://cloud.google.com/functions/docs/configuring/max-instances
"""
if isinstance(input_types, type):
input_types = [input_types]
Expand Down Expand Up @@ -906,6 +927,7 @@ def wrapper(f):
packages,
max_batching_rows,
cloud_function_timeout,
cloud_function_max_instances,
)

# TODO: Move ibis logic to compiler step
Expand Down
2 changes: 2 additions & 0 deletions bigframes/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,7 @@ def remote_function(
cloud_function_docker_repository: Optional[str] = None,
max_batching_rows: Optional[int] = 1000,
cloud_function_timeout: Optional[int] = 600,
cloud_function_max_instances: Optional[int] = None,
):
return global_session.with_default_session(
bigframes.session.Session.remote_function,
Expand All @@ -667,6 +668,7 @@ def remote_function(
cloud_function_docker_repository=cloud_function_docker_repository,
max_batching_rows=max_batching_rows,
cloud_function_timeout=cloud_function_timeout,
cloud_function_max_instances=cloud_function_max_instances,
)


Expand Down
10 changes: 10 additions & 0 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1458,6 +1458,7 @@ def remote_function(
cloud_function_docker_repository: Optional[str] = None,
max_batching_rows: Optional[int] = 1000,
cloud_function_timeout: Optional[int] = 600,
cloud_function_max_instances: Optional[int] = None,
):
"""Decorator to turn a user defined function into a BigQuery remote function. Check out
the code samples at: https://cloud.google.com/bigquery/docs/remote-functions#bigquery-dataframes.
Expand Down Expand Up @@ -1572,6 +1573,14 @@ def remote_function(
https://cloud.google.com/bigquery/quotas#remote_function_limits.
By default BigQuery DataFrames uses a 10 minute timeout. `None`
can be passed to let the cloud functions default timeout take effect.
cloud_function_max_instances (int, Optional):
The maximumm instance count for the cloud function created. This
can be used to control how many cloud function instances can be
active at max at any given point of time. Lower setting can help
control the spike in the billing. Higher setting can help
support processing larger scale data. When not specified, cloud
function's default setting applies. For more details see
https://cloud.google.com/functions/docs/configuring/max-instances
Returns:
callable: A remote function object pointing to the cloud assets created
in the background to support the remote execution. The cloud assets can be
Expand All @@ -1595,6 +1604,7 @@ def remote_function(
cloud_function_docker_repository=cloud_function_docker_repository,
max_batching_rows=max_batching_rows,
cloud_function_timeout=cloud_function_timeout,
cloud_function_max_instances=cloud_function_max_instances,
)

def read_gbq_function(
Expand Down
40 changes: 40 additions & 0 deletions tests/system/large/test_remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -1414,3 +1414,43 @@ def test_remote_function_gcf_timeout_max_supported_exceeded(session):
@session.remote_function([int], int, reuse=False, cloud_function_timeout=1201)
def square(x):
return x * x


@pytest.mark.parametrize(
("max_instances_args", "expected_max_instances"),
[
pytest.param({}, 100, id="no-set"),
pytest.param({"cloud_function_max_instances": None}, 100, id="set-None"),
pytest.param({"cloud_function_max_instances": 1000}, 1000, id="set-explicit"),
],
)
@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_max_instances(
session, scalars_dfs, max_instances_args, expected_max_instances
):
try:

def square(x):
return x * x

square_remote = session.remote_function(
[int], int, reuse=False, **max_instances_args
)(square)

# Assert that the GCF is created with the intended max instance count
gcf = session.cloudfunctionsclient.get_function(
name=square_remote.bigframes_cloud_function
)
assert gcf.service_config.max_instance_count == expected_max_instances

scalars_df, scalars_pandas_df = scalars_dfs

bf_result = scalars_df["int64_too"].apply(square_remote).to_pandas()
pd_result = scalars_pandas_df["int64_too"].apply(square)

pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False)
finally:
# clean up the gcp assets created for the remote function
cleanup_remote_function_assets(
session.bqclient, session.cloudfunctionsclient, square_remote
)