Skip to content

Commit 31e40da

Browse files
authored
Cancel BigQuery job if timeout hits (#1672)
* Cancel BigQuery job if timedout hits Signed-off-by: Matt Delacour <[email protected]> * Fix typo Signed-off-by: Matt Delacour <[email protected]>
1 parent d0ae38f commit 31e40da

File tree

2 files changed

+26
-6
lines changed

2 files changed

+26
-6
lines changed

sdk/python/feast/errors.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,11 @@ def __init__(self, repo_obj_type: str, specific_issue: str):
126126
)
127127

128128

129+
class BigQueryJobCancelled(Exception):
130+
def __init__(self, job_id):
131+
super().__init__(f"The BigQuery job with ID '{job_id}' was cancelled")
132+
133+
129134
class RedshiftCredentialsError(Exception):
130135
def __init__(self):
131136
super().__init__("Redshift API failed due to incorrect credentials")

sdk/python/feast/infra/offline_stores/bigquery.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
from feast import errors
1515
from feast.data_source import BigQuerySource, DataSource
16-
from feast.errors import FeastProviderLoginError
16+
from feast.errors import BigQueryJobCancelled, FeastProviderLoginError
1717
from feast.feature_view import FeatureView
1818
from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob
1919
from feast.infra.provider import (
@@ -249,10 +249,6 @@ def to_bigquery(self, job_config: bigquery.QueryJobConfig = None) -> Optional[st
249249
Returns the destination table name or returns None if job_config.dry_run is True.
250250
"""
251251

252-
@retry(wait=wait_fixed(10), stop=stop_after_delay(1800), reraise=True)
253-
def _block_until_done():
254-
return self.client.get_job(bq_job.job_id).state in ["PENDING", "RUNNING"]
255-
256252
if not job_config:
257253
today = date.today().strftime("%Y%m%d")
258254
rand_id = str(uuid.uuid4())[:7]
@@ -261,7 +257,7 @@ def _block_until_done():
261257

262258
bq_job = self.client.query(self.query, job_config=job_config)
263259

264-
_block_until_done()
260+
block_until_done(client=self.client, bq_job=bq_job)
265261

266262
if bq_job.exception():
267263
raise bq_job.exception()
@@ -279,6 +275,25 @@ def to_arrow(self) -> pyarrow.Table:
279275
return self.client.query(self.query).to_arrow()
280276

281277

278+
def block_until_done(client, bq_job):
279+
def _is_done(job_id):
280+
return client.get_job(job_id).state in ["PENDING", "RUNNING"]
281+
282+
@retry(wait=wait_fixed(10), stop=stop_after_delay(1800), reraise=True)
283+
def _wait_until_done(job_id):
284+
return _is_done(job_id)
285+
286+
job_id = bq_job.job_id
287+
_wait_until_done(job_id=job_id)
288+
289+
if not _is_done(job_id):
290+
client.cancel_job(job_id)
291+
raise BigQueryJobCancelled(job_id=job_id)
292+
293+
if bq_job.exception():
294+
raise bq_job.exception()
295+
296+
282297
@dataclass(frozen=True)
283298
class FeatureViewQueryContext:
284299
"""Context object used to template a BigQuery point-in-time SQL query"""

0 commit comments

Comments
 (0)