@@ -94,8 +94,14 @@ def get_historical_features(
9494 expected_join_keys = _get_join_keys (project , feature_views , registry )
9595
9696 assert isinstance (config .offline_store , BigQueryOfflineStoreConfig )
97+ dataset_project = config .offline_store .project_id or client .project
98+
9799 table = _upload_entity_df_into_bigquery (
98- client , config .project , config .offline_store .dataset , entity_df
100+ client = client ,
101+ project = config .project ,
102+ dataset_name = config .offline_store .dataset ,
103+ dataset_project = dataset_project ,
104+ entity_df = entity_df ,
99105 )
100106
101107 entity_df_event_timestamp_col = _infer_event_timestamp_from_bigquery_query (
@@ -225,7 +231,8 @@ def _block_until_done():
225231
226232 today = date .today ().strftime ("%Y%m%d" )
227233 rand_id = str (uuid .uuid4 ())[:7 ]
228- path = f"{ self .client .project } .{ self .config .offline_store .dataset } .historical_{ today } _{ rand_id } "
234+ dataset_project = self .config .offline_store .project_id or self .client .project
235+ path = f"{ dataset_project } .{ self .config .offline_store .dataset } .historical_{ today } _{ rand_id } "
229236 job_config = bigquery .QueryJobConfig (destination = path , dry_run = dry_run )
230237 bq_job = self .client .query (self .query , job_config = job_config )
231238
@@ -261,12 +268,12 @@ class FeatureViewQueryContext:
261268
262269
263270def _get_table_id_for_new_entity (
264- client : Client , project : str , dataset_name : str
271+ client : Client , project : str , dataset_name : str , dataset_project : str
265272) -> str :
266273 """Gets the table_id for the new entity to be uploaded."""
267274
268275 # First create the BigQuery dataset if it doesn't exist
269- dataset = bigquery .Dataset (f"{ client . project } .{ dataset_name } " )
276+ dataset = bigquery .Dataset (f"{ dataset_project } .{ dataset_name } " )
270277 dataset .location = "US"
271278
272279 try :
@@ -275,18 +282,21 @@ def _get_table_id_for_new_entity(
275282 # Only create the dataset if it does not exist
276283 client .create_dataset (dataset , exists_ok = True )
277284
278- return f"{ client . project } .{ dataset_name } .entity_df_{ project } _{ int (time .time ())} "
285+ return f"{ dataset_project } .{ dataset_name } .entity_df_{ project } _{ int (time .time ())} "
279286
280287
281288def _upload_entity_df_into_bigquery (
282289 client : Client ,
283290 project : str ,
284291 dataset_name : str ,
292+ dataset_project : str ,
285293 entity_df : Union [pandas .DataFrame , str ],
286294) -> Table :
287295 """Uploads a Pandas entity dataframe into a BigQuery table and returns the resulting table"""
288296
289- table_id = _get_table_id_for_new_entity (client , project , dataset_name )
297+ table_id = _get_table_id_for_new_entity (
298+ client , project , dataset_name , dataset_project
299+ )
290300
291301 if type (entity_df ) is str :
292302 job = client .query (f"CREATE TABLE { table_id } AS ({ entity_df } )" )
0 commit comments