Skip to content

Commit 3d17892

Browse files
feat: Enable materialization for ODFV Transform on Write (feast-dev#5459)
* feat: Enable materialization for ODFV Transform on Write Signed-off-by: Francisco Javier Arceo <[email protected]>
1 parent c5b5cf5 commit 3d17892

File tree

9 files changed

+262
-30
lines changed

9 files changed

+262
-30
lines changed

sdk/python/feast/feature_store.py

Lines changed: 42 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -656,7 +656,7 @@ def _make_inferences(
656656
def _get_feature_views_to_materialize(
657657
self,
658658
feature_views: Optional[List[str]],
659-
) -> List[FeatureView]:
659+
) -> List[Union[FeatureView, OnDemandFeatureView]]:
660660
"""
661661
Returns the list of feature views that should be materialized.
662662
@@ -669,34 +669,53 @@ def _get_feature_views_to_materialize(
669669
FeatureViewNotFoundException: One of the specified feature views could not be found.
670670
ValueError: One of the specified feature views is not configured for materialization.
671671
"""
672-
feature_views_to_materialize: List[FeatureView] = []
672+
feature_views_to_materialize: List[Union[FeatureView, OnDemandFeatureView]] = []
673673

674674
if feature_views is None:
675-
feature_views_to_materialize = utils._list_feature_views(
675+
regular_feature_views = utils._list_feature_views(
676676
self._registry, self.project, hide_dummy_entity=False
677677
)
678-
feature_views_to_materialize = [
679-
fv for fv in feature_views_to_materialize if fv.online
680-
]
678+
feature_views_to_materialize.extend(
679+
[fv for fv in regular_feature_views if fv.online]
680+
)
681681
stream_feature_views_to_materialize = self._list_stream_feature_views(
682682
hide_dummy_entity=False
683683
)
684-
feature_views_to_materialize += [
685-
sfv for sfv in stream_feature_views_to_materialize if sfv.online
686-
]
684+
feature_views_to_materialize.extend(
685+
[sfv for sfv in stream_feature_views_to_materialize if sfv.online]
686+
)
687+
on_demand_feature_views_to_materialize = self.list_on_demand_feature_views()
688+
feature_views_to_materialize.extend(
689+
[
690+
odfv
691+
for odfv in on_demand_feature_views_to_materialize
692+
if odfv.write_to_online_store
693+
]
694+
)
687695
else:
688696
for name in feature_views:
697+
feature_view: Union[FeatureView, OnDemandFeatureView]
689698
try:
690699
feature_view = self._get_feature_view(name, hide_dummy_entity=False)
691700
except FeatureViewNotFoundException:
692-
feature_view = self._get_stream_feature_view(
693-
name, hide_dummy_entity=False
694-
)
701+
try:
702+
feature_view = self._get_stream_feature_view(
703+
name, hide_dummy_entity=False
704+
)
705+
except FeatureViewNotFoundException:
706+
feature_view = self.get_on_demand_feature_view(name)
695707

696-
if not feature_view.online:
708+
if hasattr(feature_view, "online") and not feature_view.online:
697709
raise ValueError(
698710
f"FeatureView {feature_view.name} is not configured to be served online."
699711
)
712+
elif (
713+
hasattr(feature_view, "write_to_online_store")
714+
and not feature_view.write_to_online_store
715+
):
716+
raise ValueError(
717+
f"OnDemandFeatureView {feature_view.name} is not configured for write_to_online_store."
718+
)
700719
feature_views_to_materialize.append(feature_view)
701720

702721
return feature_views_to_materialize
@@ -1312,6 +1331,8 @@ def materialize_incremental(
13121331
)
13131332
# TODO paging large loads
13141333
for feature_view in feature_views_to_materialize:
1334+
if isinstance(feature_view, OnDemandFeatureView):
1335+
continue
13151336
start_date = feature_view.most_recent_end_time
13161337
if start_date is None:
13171338
if feature_view.ttl is None:
@@ -1340,7 +1361,7 @@ def tqdm_builder(length):
13401361
return tqdm(total=length, ncols=100)
13411362

13421363
start_date = utils.make_tzaware(start_date)
1343-
end_date = utils.make_tzaware(end_date)
1364+
end_date = utils.make_tzaware(end_date) or _utc_now()
13441365

13451366
provider.materialize_single_feature_view(
13461367
config=self.config,
@@ -1351,13 +1372,13 @@ def tqdm_builder(length):
13511372
project=self.project,
13521373
tqdm_builder=tqdm_builder,
13531374
)
1354-
1355-
self._registry.apply_materialization(
1356-
feature_view,
1357-
self.project,
1358-
start_date,
1359-
end_date,
1360-
)
1375+
if not isinstance(feature_view, OnDemandFeatureView):
1376+
self._registry.apply_materialization(
1377+
feature_view,
1378+
self.project,
1379+
start_date,
1380+
end_date,
1381+
)
13611382

13621383
def materialize(
13631384
self,

sdk/python/feast/infra/passthrough_provider.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -420,17 +420,24 @@ def ingest_df_to_offline_store(self, feature_view: FeatureView, table: pa.Table)
420420
def materialize_single_feature_view(
421421
self,
422422
config: RepoConfig,
423-
feature_view: FeatureView,
423+
feature_view: Union[FeatureView, OnDemandFeatureView],
424424
start_date: datetime,
425425
end_date: datetime,
426426
registry: BaseRegistry,
427427
project: str,
428428
tqdm_builder: Callable[[int], tqdm],
429429
) -> None:
430+
if isinstance(feature_view, OnDemandFeatureView):
431+
if not feature_view.write_to_online_store:
432+
raise ValueError(
433+
f"OnDemandFeatureView {feature_view.name} does not have write_to_online_store enabled"
434+
)
435+
return
430436
assert (
431437
isinstance(feature_view, BatchFeatureView)
432438
or isinstance(feature_view, StreamFeatureView)
433439
or isinstance(feature_view, FeatureView)
440+
or isinstance(feature_view, OnDemandFeatureView)
434441
), f"Unexpected type for {feature_view.name}: {type(feature_view)}"
435442
task = MaterializationTask(
436443
project=project,

sdk/python/feast/infra/provider.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ def ingest_df_to_offline_store(
217217
def materialize_single_feature_view(
218218
self,
219219
config: RepoConfig,
220-
feature_view: FeatureView,
220+
feature_view: Union[FeatureView, OnDemandFeatureView],
221221
start_date: datetime,
222222
end_date: datetime,
223223
registry: BaseRegistry,

sdk/python/feast/infra/registry/base_registry.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from abc import ABC, abstractmethod
1717
from collections import defaultdict
1818
from datetime import datetime
19-
from typing import Any, Dict, List, Optional
19+
from typing import Any, Dict, List, Optional, Union
2020

2121
from google.protobuf.json_format import MessageToJson
2222
from google.protobuf.message import Message
@@ -432,7 +432,7 @@ def list_all_feature_views(
432432
@abstractmethod
433433
def apply_materialization(
434434
self,
435-
feature_view: FeatureView,
435+
feature_view: Union[FeatureView, OnDemandFeatureView],
436436
project: str,
437437
start_date: datetime,
438438
end_date: datetime,

sdk/python/feast/infra/registry/registry.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from enum import Enum
1717
from pathlib import Path
1818
from threading import Lock
19-
from typing import Any, Dict, List, Optional
19+
from typing import Any, Dict, List, Optional, Union
2020
from urllib.parse import urlparse
2121

2222
from google.protobuf.internal.containers import RepeatedCompositeFieldContainer
@@ -529,7 +529,7 @@ def get_data_source(
529529

530530
def apply_materialization(
531531
self,
532-
feature_view: FeatureView,
532+
feature_view: Union[FeatureView, OnDemandFeatureView],
533533
project: str,
534534
start_date: datetime,
535535
end_date: datetime,

sdk/python/feast/infra/registry/remote.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,7 @@ def list_feature_views(
356356

357357
def apply_materialization(
358358
self,
359-
feature_view: FeatureView,
359+
feature_view: Union[FeatureView, OnDemandFeatureView],
360360
project: str,
361361
start_date: datetime,
362362
end_date: datetime,

sdk/python/feast/infra/registry/snowflake.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -992,7 +992,7 @@ def list_permissions(
992992

993993
def apply_materialization(
994994
self,
995-
feature_view: FeatureView,
995+
feature_view: Union[FeatureView, OnDemandFeatureView],
996996
project: str,
997997
start_date: datetime,
998998
end_date: datetime,

sdk/python/feast/infra/registry/sql.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -702,7 +702,7 @@ def apply_validation_reference(
702702

703703
def apply_materialization(
704704
self,
705-
feature_view: FeatureView,
705+
feature_view: Union[FeatureView, OnDemandFeatureView],
706706
project: str,
707707
start_date: datetime,
708708
end_date: datetime,

0 commit comments

Comments
 (0)