Skip to content

feat: bigframes.streaming module for continuous queries #703

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions bigframes/streaming/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Module for bigquery continuous queries"""

import json
from typing import Optional

from google.cloud import bigquery

import bigframes


def to_bigtable(
query: str,
instance: str,
table: str,
bq_client: Optional[bigquery.Client] = None,
app_profile: Optional[str] = None,
truncate: bool = False,
overwrite: bool = False,
auto_create_column_families: bool = False,
bigtable_options: Optional[dict] = None,
job_id: Optional[str] = None,
job_id_prefix: Optional[str] = None,
) -> bigquery.QueryJob:
"""Launches a BigQuery continuous query and returns a
QueryJob object for some management functionality.

This method requires an existing bigtable preconfigured to
accept the continuous query export statement. For instructions
on export to bigtable, see
https://cloud.google.com/bigquery/docs/export-to-bigtable.

Args:
query (str):
The sql statement to execute as a continuous function.
For example: "SELECT * FROM dataset.table"
This will be wrapped in an EXPORT DATA statement to
launch a continuous query writing to bigtable.
instance (str):
The name of the bigtable instance to export to.
table (str):
The name of the bigtable table to export to.
bq_client (str, default None):
The Client object to use for the query. This determines
the project id and location of the query. If None, will
default to the bigframes global session default client.
app_profile (str, default None):
The bigtable app profile to export to. If None, no app
profile will be used.
truncate (bool, default False):
The export truncate option, see
https://cloud.google.com/bigquery/docs/reference/standard-sql/other-statements#bigtable_export_option
overwrite (bool, default False):
The export overwrite option, see
https://cloud.google.com/bigquery/docs/reference/standard-sql/other-statements#bigtable_export_option
auto_create_column_families (bool, default False):
The auto_create_column_families option, see
https://cloud.google.com/bigquery/docs/reference/standard-sql/other-statements#bigtable_export_option
bigtable_options (dict, default None):
The bigtable options dict, which will be converted to JSON
using json.dumps, see
https://cloud.google.com/bigquery/docs/reference/standard-sql/other-statements#bigtable_export_option
If None, no bigtable_options parameter will be passed.
job_id (str, default None):
If specified, replace the default job id for the query,
see job_id parameter of
https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client#google_cloud_bigquery_client_Client_query
job_id_prefix (str, default None):
If specified, a job id prefix for the query, see
job_id_prefix parameter of
https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client#google_cloud_bigquery_client_Client_query

Returns:
google.cloud.bigquery.QueryJob:
See https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob
The ongoing query job can be managed using this object.
For example, the job can be cancelled or its error status
can be examined.
"""
# get default client if not passed
if bq_client is None:
bq_client = bigframes.get_global_session().bqclient

# build export string from parameters
project = bq_client.project

app_profile_url_string = ""
if app_profile is not None:
app_profile_url_string = f"appProfiles/{app_profile}/"

bigtable_options_parameter_string = ""
if bigtable_options is not None:
bigtable_options_parameter_string = (
'bigtable_options = """' + json.dumps(bigtable_options) + '""",\n'
)

sql = (
"EXPORT DATA\n"
"OPTIONS (\n"
"format = 'CLOUD_BIGTABLE',\n"
f"{bigtable_options_parameter_string}"
f"truncate = {str(truncate)},\n"
f"overwrite = {str(overwrite)},\n"
f"auto_create_column_families = {str(auto_create_column_families)},\n"
f'uri = "https://bigtable.googleapis.com/projects/{project}/instances/{instance}/{app_profile_url_string}tables/{table}"\n'
")\n"
"AS (\n"
f"{query});"
)

# override continuous http parameter
job_config = bigquery.job.QueryJobConfig()
job_config_filled = job_config.from_api_repr({"query": {"continuous": True}})

# begin the query job
query_job = bq_client.query(
sql,
job_config=job_config_filled, # type:ignore
# typing error above is in bq client library
# (should accept abstract job_config, only takes concrete)
job_id=job_id,
job_id_prefix=job_id_prefix,
)

# return the query job to the user for lifetime management
return query_job
76 changes: 76 additions & 0 deletions scripts/create_bigtable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# This script create the bigtable resources required for
# bigframes.streaming testing if they don't already exist

import os
import pathlib
import sys

import google.cloud.bigtable as bigtable

REPO_ROOT = pathlib.Path(__file__).parent.parent

PROJECT_ID = os.getenv("GOOGLE_CLOUD_PROJECT")

if not PROJECT_ID:
print(
"Please set GOOGLE_CLOUD_PROJECT environment variable before running.",
file=sys.stderr,
)
sys.exit(1)


def create_instance(client):
instance_name = "streaming-testing-instance"
instance = bigtable.instance.Instance(
instance_name,
client,
)
cluster_id = "streaming-testing-instance-c1"
cluster = instance.cluster(
cluster_id,
location_id="us-west1-a",
serve_nodes=1,
)
if not instance.exists():
operation = instance.create(
clusters=[cluster],
)
operation.result(timeout=480)
print(f"Created instance {instance_name}")
return instance


def create_table(instance):
table_id = "table-testing"
table = bigtable.table.Table(
table_id,
instance,
)
if not table.exists():
table.create()
print(f"Created table {table_id}")


def main():
client = bigtable.Client(project=PROJECT_ID, admin=True)

instance = create_instance(client)
create_table(instance)


if __name__ == "__main__":
main()
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
"gcsfs >=2023.3.0",
"geopandas >=0.12.2",
"google-auth >=2.15.0,<3.0dev",
"google-cloud-bigtable >=2.24.0",
"google-cloud-bigquery[bqstorage,pandas] >=3.16.0",
"google-cloud-functions >=1.12.0",
"google-cloud-bigquery-connection >=1.12.0",
Expand Down
1 change: 1 addition & 0 deletions testing/constraints-3.9.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ fsspec==2023.3.0
gcsfs==2023.3.0
geopandas==0.12.2
google-auth==2.15.0
google-cloud-bigtable==2.24.0
google-cloud-bigquery==3.16.0
google-cloud-functions==1.12.0
google-cloud-bigquery-connection==1.12.0
Expand Down
48 changes: 48 additions & 0 deletions tests/system/large/test_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import time

import bigframes.streaming


def test_streaming_to_bigtable():
# launch a continuous query
job_id_prefix = "test_streaming_"
sql = """SELECT
body_mass_g, island as rowkey
FROM birds.penguins"""
query_job = bigframes.streaming.to_bigtable(
sql,
"streaming-testing-instance",
"table-testing",
app_profile=None,
truncate=True,
overwrite=True,
auto_create_column_families=True,
bigtable_options={},
job_id=None,
job_id_prefix=job_id_prefix,
)

try:
# wait 100 seconds in order to ensure the query doesn't stop
# (i.e. it is continuous)
time.sleep(100)
assert query_job.error_result is None
assert query_job.errors is None
assert query_job.running()
assert str(query_job.job_id).startswith(job_id_prefix)
finally:
query_job.cancel()