-
Notifications
You must be signed in to change notification settings - Fork 50
feat: bigframes.streaming module for continuous queries #703
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 10 commits
Commits
Show all changes
21 commits
Select commit
Hold shift + click to select a range
e23cb8f
feat: bigframes.streaming module for continuous queries
milkshakeiii f7a9920
Merge branch 'main' into b333824051-streaming
milkshakeiii a15de6c
mypy fix
milkshakeiii b2c9ef7
mypy fix 2
milkshakeiii 8ae2378
mypy fix 3
milkshakeiii 3286ae6
ignore mypy, error is in bq library
milkshakeiii 90c7c92
Merge branch 'main' into b333824051-streaming
milkshakeiii 0163893
Merge branch 'main' into b333824051-streaming
milkshakeiii 6440758
address comments from meeting
milkshakeiii 5716330
fix mypy
milkshakeiii a694479
Merge branch 'main' into b333824051-streaming
milkshakeiii 9bd499b
don't use app profile
milkshakeiii 53ede95
address comments
milkshakeiii 9e89d4e
Merge branch 'main' into b333824051-streaming
milkshakeiii bb4ed64
check job_id
milkshakeiii 6d9b0d9
add bigtable setup script
milkshakeiii 635d938
further simplify string
milkshakeiii 8c50ebb
fix bugs
milkshakeiii 5218967
Merge branch 'main' into b333824051-streaming
milkshakeiii 15f42ed
add str() for consistent clarification
milkshakeiii 46063ad
Merge branch 'main' into b333824051-streaming
milkshakeiii File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
# Copyright 2023 Google LLC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
"""Module for bigquery continuous queries""" | ||
|
||
import json | ||
from typing import Optional | ||
|
||
from google.cloud import bigquery | ||
|
||
import bigframes | ||
|
||
|
||
def to_bigtable( | ||
query: str, | ||
instance: str, | ||
table: str, | ||
bq_client: Optional[bigquery.Client] = None, | ||
app_profile: Optional[str] = None, | ||
truncate: bool = False, | ||
overwrite: bool = False, | ||
auto_create_column_families: bool = False, | ||
bigtable_options: Optional[dict] = None, | ||
job_id: Optional[str] = None, | ||
job_id_prefix: Optional[str] = None, | ||
) -> bigquery.QueryJob: | ||
"""Launches a BigQuery continuous query and returns a | ||
QueryJob object for some management functionality. | ||
|
||
This method requires an existing bigtable preconfigured to | ||
accept the continuous query export statement. For instructions | ||
on export to bigtable, see | ||
https://cloud.google.com/bigquery/docs/export-to-bigtable. | ||
|
||
Args: | ||
query (str): | ||
The sql statement to execute as a continuous function. | ||
For example: "SELECT * FROM dataset.table" | ||
This will be wrapped in an EXPORT DATA statement to | ||
launch a continuous query writing to bigtable. | ||
instance (str): | ||
The name of the bigtable instance to export to. | ||
table (str): | ||
The name of the bigtable table to export to. | ||
bq_client (str, default None): | ||
The Client object to use for the query. This determines | ||
the project id and location of the query. If None, will | ||
default to the bigframes global session default client. | ||
app_profile (str, default None): | ||
The bigtable app profile to export to. If None, no app | ||
profile will be used. | ||
truncate (bool, default False): | ||
The export truncate option, see | ||
https://cloud.google.com/bigquery/docs/reference/standard-sql/other-statements#bigtable_export_option | ||
overwrite (bool, default False): | ||
The export overwrite option, see | ||
https://cloud.google.com/bigquery/docs/reference/standard-sql/other-statements#bigtable_export_option | ||
auto_create_column_families (bool, default False): | ||
The auto_create_column_families option, see | ||
https://cloud.google.com/bigquery/docs/reference/standard-sql/other-statements#bigtable_export_option | ||
bigtable_options (dict, default None): | ||
The bigtable options dict, which will be converted to JSON | ||
using json.dumps, see | ||
https://cloud.google.com/bigquery/docs/reference/standard-sql/other-statements#bigtable_export_option | ||
If None, no bigtable_options parameter will be passed. | ||
job_id (str, default None): | ||
If specified, replace the default job id for the query, | ||
see job_id parameter of | ||
https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client#google_cloud_bigquery_client_Client_query | ||
job_id_prefix (str, default None): | ||
If specified, a job id prefix for the query, see | ||
job_id_prefix parameter of | ||
https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client#google_cloud_bigquery_client_Client_query | ||
|
||
Returns: | ||
google.cloud.bigquery.QueryJob: | ||
See https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob | ||
The ongoing query job can be managed using this object. | ||
For example, the job can be cancelled or its error status | ||
can be examined. | ||
""" | ||
# get default client if not passed | ||
if bq_client is None: | ||
bq_client = bigframes.get_global_session().bqclient | ||
|
||
# build export string from parameters | ||
app_profile_url_string = "" | ||
if app_profile is not None: | ||
app_profile_url_string = f"appProfiles/{app_profile}/" | ||
|
||
truncate_string = "FALSE" | ||
milkshakeiii marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if truncate: | ||
truncate_string = "TRUE" | ||
|
||
overwrite_string = "FALSE" | ||
if overwrite: | ||
overwrite_string = "TRUE" | ||
|
||
auto_create_column_families_string = "FALSE" | ||
if auto_create_column_families: | ||
auto_create_column_families_string = "TRUE" | ||
|
||
project = bq_client.project | ||
|
||
bigtable_options_parameter_string = "" | ||
if bigtable_options is not None: | ||
bigtable_options_parameter_string = ( | ||
'bigtable_options = """' + json.dumps(bigtable_options) + '""",\n' | ||
) | ||
|
||
sql = ( | ||
"EXPORT DATA\n" | ||
"OPTIONS (\n" | ||
"format = 'CLOUD_BIGTABLE',\n" | ||
f"{bigtable_options_parameter_string}" | ||
f"truncate = {truncate_string},\n" | ||
f"overwrite = {overwrite_string},\n" | ||
f"auto_create_column_families = {auto_create_column_families_string},\n" | ||
f'uri = "https://bigtable.googleapis.com/projects/{project}/instances/{instance}/{app_profile_url_string}tables/{table}"\n' | ||
")\n" | ||
"AS (\n" | ||
f"{query});" | ||
) | ||
|
||
# override continuous http parameter | ||
job_config = bigquery.job.QueryJobConfig() | ||
job_config_filled = job_config.from_api_repr({"query": {"continuous": True}}) | ||
|
||
# begin the query job | ||
query_job = bq_client.query( | ||
sql, | ||
job_config=job_config_filled, # type:ignore | ||
job_id=job_id, | ||
job_id_prefix=job_id_prefix, | ||
) | ||
# typing error is in bq client library (should accept abstract job_config, only takes concrete) | ||
|
||
# return the query job to the user for lifetime management | ||
return query_job |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
# Copyright 2023 Google LLC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import time | ||
|
||
import bigframes.streaming | ||
|
||
|
||
def test_streaming_to_bigtable(): | ||
# launch a continuous query | ||
sql = """SELECT | ||
body_mass_g, island as rowkey | ||
FROM birds.penguins""" | ||
query_job = bigframes.streaming.to_bigtable( | ||
sql, | ||
milkshakeiii marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"streaming-testing-instance", | ||
"table-testing", | ||
app_profile="test-profile", | ||
truncate=True, | ||
overwrite=True, | ||
auto_create_column_families=True, | ||
bigtable_options={}, | ||
job_id="test_streaming", | ||
job_id_prefix="large_test", | ||
) | ||
|
||
try: | ||
# wait 100 seconds in order to ensure the query doesn't stop | ||
# (i.e. it is continuous) | ||
time.sleep(100) | ||
assert query_job.errors is None | ||
assert not query_job.done() | ||
finally: | ||
query_job.cancel() |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.