Skip to content

Commit 29f2895

Browse files
chore: Check configs and data sources in all offline store methods (feast-dev#3107)
Check configs and data sources in all offline store methods Signed-off-by: Felix Wang <[email protected]> Signed-off-by: Felix Wang <[email protected]>
1 parent 2b493e0 commit 29f2895

File tree

9 files changed

+53
-78
lines changed

9 files changed

+53
-78
lines changed

sdk/python/feast/infra/offline_stores/bigquery.py

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ def pull_latest_from_table_or_query(
106106
start_date: datetime,
107107
end_date: datetime,
108108
) -> RetrievalJob:
109+
assert isinstance(config.offline_store, BigQueryOfflineStoreConfig)
109110
assert isinstance(data_source, BigQuerySource)
110111
from_expression = data_source.get_table_query_string()
111112

@@ -156,6 +157,7 @@ def pull_all_from_table_or_query(
156157
start_date: datetime,
157158
end_date: datetime,
158159
) -> RetrievalJob:
160+
assert isinstance(config.offline_store, BigQueryOfflineStoreConfig)
159161
assert isinstance(data_source, BigQuerySource)
160162
from_expression = data_source.get_table_query_string()
161163

@@ -191,6 +193,8 @@ def get_historical_features(
191193
) -> RetrievalJob:
192194
# TODO: Add entity_df validation in order to fail before interacting with BigQuery
193195
assert isinstance(config.offline_store, BigQueryOfflineStoreConfig)
196+
for fv in feature_views:
197+
assert isinstance(fv.batch_source, BigQuerySource)
194198

195199
client = _get_bigquery_client(
196200
project=config.offline_store.project_id,
@@ -333,18 +337,8 @@ def offline_write_batch(
333337
table: pyarrow.Table,
334338
progress: Optional[Callable[[int], Any]],
335339
):
336-
if not feature_view.batch_source:
337-
raise ValueError(
338-
"feature view does not have a batch source to persist offline data"
339-
)
340-
if not isinstance(config.offline_store, BigQueryOfflineStoreConfig):
341-
raise ValueError(
342-
f"offline store config is of type {type(config.offline_store)} when bigquery type required"
343-
)
344-
if not isinstance(feature_view.batch_source, BigQuerySource):
345-
raise ValueError(
346-
f"feature view batch source is {type(feature_view.batch_source)} not bigquery source"
347-
)
340+
assert isinstance(config.offline_store, BigQueryOfflineStoreConfig)
341+
assert isinstance(feature_view.batch_source, BigQuerySource)
348342

349343
pa_schema, column_names = offline_utils.get_pyarrow_schema_from_batch_source(
350344
config, feature_view.batch_source

sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@ def pull_latest_from_table_or_query(
7777
start_date: datetime,
7878
end_date: datetime,
7979
) -> RetrievalJob:
80-
assert isinstance(data_source, AthenaSource)
8180
assert isinstance(config.offline_store, AthenaOfflineStoreConfig)
81+
assert isinstance(data_source, AthenaSource)
8282

8383
from_expression = data_source.get_table_query_string(config)
8484

@@ -136,6 +136,7 @@ def pull_all_from_table_or_query(
136136
start_date: datetime,
137137
end_date: datetime,
138138
) -> RetrievalJob:
139+
assert isinstance(config.offline_store, AthenaOfflineStoreConfig)
139140
assert isinstance(data_source, AthenaSource)
140141
from_expression = data_source.get_table_query_string(config)
141142

@@ -175,6 +176,8 @@ def get_historical_features(
175176
full_feature_names: bool = False,
176177
) -> RetrievalJob:
177178
assert isinstance(config.offline_store, AthenaOfflineStoreConfig)
179+
for fv in feature_views:
180+
assert isinstance(fv.batch_source, AthenaSource)
178181

179182
athena_client = aws_utils.get_athena_data_client(config.offline_store.region)
180183
s3_resource = aws_utils.get_s3_resource(config.offline_store.region)

sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ def pull_latest_from_table_or_query(
6767
start_date: datetime,
6868
end_date: datetime,
6969
) -> RetrievalJob:
70+
assert isinstance(config.offline_store, PostgreSQLOfflineStoreConfig)
7071
assert isinstance(data_source, PostgreSQLSource)
7172
from_expression = data_source.get_table_query_string()
7273

@@ -117,6 +118,9 @@ def get_historical_features(
117118
project: str,
118119
full_feature_names: bool = False,
119120
) -> RetrievalJob:
121+
assert isinstance(config.offline_store, PostgreSQLOfflineStoreConfig)
122+
for fv in feature_views:
123+
assert isinstance(fv.batch_source, PostgreSQLSource)
120124

121125
entity_schema = _get_entity_schema(entity_df, config)
122126

@@ -206,6 +210,7 @@ def pull_all_from_table_or_query(
206210
start_date: datetime,
207211
end_date: datetime,
208212
) -> RetrievalJob:
213+
assert isinstance(config.offline_store, PostgreSQLOfflineStoreConfig)
209214
assert isinstance(data_source, PostgreSQLSource)
210215
from_expression = data_source.get_table_query_string()
211216

sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,9 @@ def get_historical_features(
121121
full_feature_names: bool = False,
122122
) -> RetrievalJob:
123123
assert isinstance(config.offline_store, SparkOfflineStoreConfig)
124+
for fv in feature_views:
125+
assert isinstance(fv.batch_source, SparkSource)
126+
124127
warnings.warn(
125128
"The spark offline store is an experimental feature in alpha development. "
126129
"Some functionality may still be unstable so functionality can change in the future.",
@@ -198,18 +201,8 @@ def offline_write_batch(
198201
table: pyarrow.Table,
199202
progress: Optional[Callable[[int], Any]],
200203
):
201-
if not feature_view.batch_source:
202-
raise ValueError(
203-
"feature view does not have a batch source to persist offline data"
204-
)
205-
if not isinstance(config.offline_store, SparkOfflineStoreConfig):
206-
raise ValueError(
207-
f"offline store config is of type {type(config.offline_store)} when spark type required"
208-
)
209-
if not isinstance(feature_view.batch_source, SparkSource):
210-
raise ValueError(
211-
f"feature view batch source is {type(feature_view.batch_source)} not spark source"
212-
)
204+
assert isinstance(config.offline_store, SparkOfflineStoreConfig)
205+
assert isinstance(feature_view.batch_source, SparkSource)
213206

214207
pa_schema, column_names = offline_utils.get_pyarrow_schema_from_batch_source(
215208
config, feature_view.batch_source
@@ -269,6 +262,7 @@ def pull_all_from_table_or_query(
269262
created_timestamp_column have all already been mapped to column names of the
270263
source table and those column names are the values passed into this function.
271264
"""
265+
assert isinstance(config.offline_store, SparkOfflineStoreConfig)
272266
assert isinstance(data_source, SparkSource)
273267
warnings.warn(
274268
"The spark offline store is an experimental feature in alpha development. "

sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -161,14 +161,8 @@ def pull_latest_from_table_or_query(
161161
auth: Optional[Authentication] = None,
162162
http_scheme: Optional[str] = None,
163163
) -> TrinoRetrievalJob:
164-
if not isinstance(data_source, TrinoSource):
165-
raise ValueError(
166-
f"The data_source object is not a TrinoSource but is instead '{type(data_source)}'"
167-
)
168-
if not isinstance(config.offline_store, TrinoOfflineStoreConfig):
169-
raise ValueError(
170-
f"The config.offline_store object is not a TrinoOfflineStoreConfig but is instead '{type(config.offline_store)}'"
171-
)
164+
assert isinstance(config.offline_store, TrinoOfflineStoreConfig)
165+
assert isinstance(data_source, TrinoSource)
172166

173167
from_expression = data_source.get_table_query_string()
174168

@@ -222,10 +216,9 @@ def get_historical_features(
222216
auth: Optional[Authentication] = None,
223217
http_scheme: Optional[str] = None,
224218
) -> TrinoRetrievalJob:
225-
if not isinstance(config.offline_store, TrinoOfflineStoreConfig):
226-
raise ValueError(
227-
f"This function should be used with a TrinoOfflineStoreConfig object. Instead we have config.offline_store being '{type(config.offline_store)}'"
228-
)
219+
assert isinstance(config.offline_store, TrinoOfflineStoreConfig)
220+
for fv in feature_views:
221+
assert isinstance(fv.batch_source, TrinoSource)
229222

230223
client = _get_trino_client(
231224
config=config, user=user, auth=auth, http_scheme=http_scheme
@@ -314,10 +307,8 @@ def pull_all_from_table_or_query(
314307
auth: Optional[Authentication] = None,
315308
http_scheme: Optional[str] = None,
316309
) -> RetrievalJob:
317-
if not isinstance(data_source, TrinoSource):
318-
raise ValueError(
319-
f"The data_source object is not a TrinoSource object but is instead a {type(data_source)}"
320-
)
310+
assert isinstance(config.offline_store, TrinoOfflineStoreConfig)
311+
assert isinstance(data_source, TrinoSource)
321312
from_expression = data_source.get_table_query_string()
322313

323314
client = _get_trino_client(

sdk/python/feast/infra/offline_stores/file.py

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,10 @@ def get_historical_features(
130130
project: str,
131131
full_feature_names: bool = False,
132132
) -> RetrievalJob:
133+
assert isinstance(config.offline_store, FileOfflineStoreConfig)
134+
for fv in feature_views:
135+
assert isinstance(fv.batch_source, FileSource)
136+
133137
if not isinstance(entity_df, pd.DataFrame) and not isinstance(
134138
entity_df, dd.DataFrame
135139
):
@@ -298,6 +302,7 @@ def pull_latest_from_table_or_query(
298302
start_date: datetime,
299303
end_date: datetime,
300304
) -> RetrievalJob:
305+
assert isinstance(config.offline_store, FileOfflineStoreConfig)
301306
assert isinstance(data_source, FileSource)
302307

303308
# Create lazy function that is only called from the RetrievalJob object
@@ -378,6 +383,9 @@ def pull_all_from_table_or_query(
378383
start_date: datetime,
379384
end_date: datetime,
380385
) -> RetrievalJob:
386+
assert isinstance(config.offline_store, FileOfflineStoreConfig)
387+
assert isinstance(data_source, FileSource)
388+
381389
return FileOfflineStore.pull_latest_from_table_or_query(
382390
config=config,
383391
data_source=data_source,
@@ -398,6 +406,7 @@ def write_logged_features(
398406
logging_config: LoggingConfig,
399407
registry: BaseRegistry,
400408
):
409+
assert isinstance(config.offline_store, FileOfflineStoreConfig)
401410
destination = logging_config.destination
402411
assert isinstance(destination, FileLoggingDestination)
403412

@@ -428,18 +437,8 @@ def offline_write_batch(
428437
table: pyarrow.Table,
429438
progress: Optional[Callable[[int], Any]],
430439
):
431-
if not feature_view.batch_source:
432-
raise ValueError(
433-
"feature view does not have a batch source to persist offline data"
434-
)
435-
if not isinstance(config.offline_store, FileOfflineStoreConfig):
436-
raise ValueError(
437-
f"offline store config is of type {type(config.offline_store)} when file type required"
438-
)
439-
if not isinstance(feature_view.batch_source, FileSource):
440-
raise ValueError(
441-
f"feature view batch source is {type(feature_view.batch_source)} not file source"
442-
)
440+
assert isinstance(config.offline_store, FileOfflineStoreConfig)
441+
assert isinstance(feature_view.batch_source, FileSource)
443442

444443
pa_schema, column_names = get_pyarrow_schema_from_batch_source(
445444
config, feature_view.batch_source

sdk/python/feast/infra/offline_stores/offline_store.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,9 @@ class OfflineStore(ABC):
218218
"""
219219
An offline store defines the interface that Feast uses to interact with the storage and compute system that
220220
handles offline features.
221+
222+
Each offline store implementation is designed to work only with the corresponding data source. For example,
223+
the SnowflakeOfflineStore can handle SnowflakeSources but not FileSources.
221224
"""
222225

223226
@staticmethod

sdk/python/feast/infra/offline_stores/redshift.py

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ def pull_all_from_table_or_query(
141141
start_date: datetime,
142142
end_date: datetime,
143143
) -> RetrievalJob:
144+
assert isinstance(config.offline_store, RedshiftOfflineStoreConfig)
144145
assert isinstance(data_source, RedshiftSource)
145146
from_expression = data_source.get_table_query_string()
146147

@@ -182,6 +183,8 @@ def get_historical_features(
182183
full_feature_names: bool = False,
183184
) -> RetrievalJob:
184185
assert isinstance(config.offline_store, RedshiftOfflineStoreConfig)
186+
for fv in feature_views:
187+
assert isinstance(fv.batch_source, RedshiftSource)
185188

186189
redshift_client = aws_utils.get_redshift_data_client(
187190
config.offline_store.region
@@ -308,18 +311,8 @@ def offline_write_batch(
308311
table: pyarrow.Table,
309312
progress: Optional[Callable[[int], Any]],
310313
):
311-
if not feature_view.batch_source:
312-
raise ValueError(
313-
"feature view does not have a batch source to persist offline data"
314-
)
315-
if not isinstance(config.offline_store, RedshiftOfflineStoreConfig):
316-
raise ValueError(
317-
f"offline store config is of type {type(config.offline_store)} when redshift type required"
318-
)
319-
if not isinstance(feature_view.batch_source, RedshiftSource):
320-
raise ValueError(
321-
f"feature view batch source is {type(feature_view.batch_source)} not redshift source"
322-
)
314+
assert isinstance(config.offline_store, RedshiftOfflineStoreConfig)
315+
assert isinstance(feature_view.batch_source, RedshiftSource)
323316

324317
pa_schema, column_names = offline_utils.get_pyarrow_schema_from_batch_source(
325318
config, feature_view.batch_source

sdk/python/feast/infra/offline_stores/snowflake.py

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,8 @@ def pull_latest_from_table_or_query(
117117
start_date: datetime,
118118
end_date: datetime,
119119
) -> RetrievalJob:
120-
assert isinstance(data_source, SnowflakeSource)
121120
assert isinstance(config.offline_store, SnowflakeOfflineStoreConfig)
121+
assert isinstance(data_source, SnowflakeSource)
122122

123123
from_expression = data_source.get_table_query_string()
124124
if not data_source.database and data_source.table:
@@ -183,6 +183,7 @@ def pull_all_from_table_or_query(
183183
start_date: datetime,
184184
end_date: datetime,
185185
) -> RetrievalJob:
186+
assert isinstance(config.offline_store, SnowflakeOfflineStoreConfig)
186187
assert isinstance(data_source, SnowflakeSource)
187188

188189
from_expression = data_source.get_table_query_string()
@@ -228,6 +229,8 @@ def get_historical_features(
228229
full_feature_names: bool = False,
229230
) -> RetrievalJob:
230231
assert isinstance(config.offline_store, SnowflakeOfflineStoreConfig)
232+
for fv in feature_views:
233+
assert isinstance(fv.batch_source, SnowflakeSource)
231234

232235
snowflake_conn = get_snowflake_conn(config.offline_store)
233236

@@ -332,18 +335,8 @@ def offline_write_batch(
332335
table: pyarrow.Table,
333336
progress: Optional[Callable[[int], Any]],
334337
):
335-
if not feature_view.batch_source:
336-
raise ValueError(
337-
"feature view does not have a batch source to persist offline data"
338-
)
339-
if not isinstance(config.offline_store, SnowflakeOfflineStoreConfig):
340-
raise ValueError(
341-
f"offline store config is of type {type(config.offline_store)} when snowflake type required"
342-
)
343-
if not isinstance(feature_view.batch_source, SnowflakeSource):
344-
raise ValueError(
345-
f"feature view batch source is {type(feature_view.batch_source)} not snowflake source"
346-
)
338+
assert isinstance(config.offline_store, SnowflakeOfflineStoreConfig)
339+
assert isinstance(feature_view.batch_source, SnowflakeSource)
347340

348341
pa_schema, column_names = offline_utils.get_pyarrow_schema_from_batch_source(
349342
config, feature_view.batch_source

0 commit comments

Comments
 (0)