Skip to content

Commit 25daab3

Browse files
authored
Don't lose materialization interval tracking when re-applying feature views (feast-dev#1559)
* Don't lose materialization interval tracking when re-applying feature views Signed-off-by: Jacob Klegar <[email protected]> * lint Signed-off-by: Jacob Klegar <[email protected]> * Add a test Signed-off-by: Jacob Klegar <[email protected]> * lint Signed-off-by: Jacob Klegar <[email protected]> * Use better error + fix other errors Signed-off-by: Jacob Klegar <[email protected]>
1 parent 7877828 commit 25daab3

File tree

7 files changed

+125
-20
lines changed

7 files changed

+125
-20
lines changed

sdk/python/feast/feature_store.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,9 @@ def materialize_incremental(
371371
def tqdm_builder(length):
372372
return tqdm(total=length, ncols=100)
373373

374+
start_date = utils.make_tzaware(start_date)
375+
end_date = utils.make_tzaware(end_date)
376+
374377
provider.materialize_single_feature_view(
375378
feature_view,
376379
start_date,
@@ -380,6 +383,10 @@ def tqdm_builder(length):
380383
tqdm_builder,
381384
)
382385

386+
self._registry.apply_materialization(
387+
feature_view, self.project, start_date, end_date
388+
)
389+
383390
@log_exceptions_and_usage
384391
def materialize(
385392
self,
@@ -442,6 +449,9 @@ def materialize(
442449
def tqdm_builder(length):
443450
return tqdm(total=length, ncols=100)
444451

452+
start_date = utils.make_tzaware(start_date)
453+
end_date = utils.make_tzaware(end_date)
454+
445455
provider.materialize_single_feature_view(
446456
feature_view,
447457
start_date,
@@ -451,6 +461,10 @@ def tqdm_builder(length):
451461
tqdm_builder,
452462
)
453463

464+
self._registry.apply_materialization(
465+
feature_view, self.project, start_date, end_date
466+
)
467+
454468
@log_exceptions_and_usage
455469
def get_online_features(
456470
self, feature_refs: List[str], entity_rows: List[Dict[str, Any]],

sdk/python/feast/feature_view.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ def __init__(
9797
self.name = name
9898
self.entities = entities
9999
self.features = features
100-
self.tags = tags
100+
self.tags = tags if tags is not None else {}
101101

102102
if isinstance(ttl, Duration):
103103
self.ttl = timedelta(seconds=int(ttl.seconds))

sdk/python/feast/infra/gcp.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -169,9 +169,6 @@ def materialize_single_feature_view(
169169
created_timestamp_column,
170170
) = _get_column_names(feature_view, entities)
171171

172-
start_date = utils.make_tzaware(start_date)
173-
end_date = utils.make_tzaware(end_date)
174-
175172
table = self.offline_store.pull_latest_from_table_or_query(
176173
data_source=feature_view.input,
177174
join_key_columns=join_key_columns,
@@ -193,9 +190,6 @@ def materialize_single_feature_view(
193190
project, feature_view, rows_to_write, lambda x: pbar.update(x)
194191
)
195192

196-
feature_view.materialization_intervals.append((start_date, end_date))
197-
registry.apply_feature_view(feature_view, project)
198-
199193
def get_historical_features(
200194
self,
201195
config: RepoConfig,

sdk/python/feast/infra/local.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import pytz
99
from tqdm import tqdm
1010

11-
from feast import FeatureTable, utils
11+
from feast import FeatureTable
1212
from feast.entity import Entity
1313
from feast.feature_view import FeatureView
1414
from feast.infra.key_encoding_utils import serialize_entity_key
@@ -180,9 +180,6 @@ def materialize_single_feature_view(
180180
created_timestamp_column,
181181
) = _get_column_names(feature_view, entities)
182182

183-
start_date = utils.make_tzaware(start_date)
184-
end_date = utils.make_tzaware(end_date)
185-
186183
table = self.offline_store.pull_latest_from_table_or_query(
187184
data_source=feature_view.input,
188185
join_key_columns=join_key_columns,
@@ -204,9 +201,6 @@ def materialize_single_feature_view(
204201
project, feature_view, rows_to_write, lambda x: pbar.update(x)
205202
)
206203

207-
feature_view.materialization_intervals.append((start_date, end_date))
208-
registry.apply_feature_view(feature_view, project)
209-
210204
def get_historical_features(
211205
self,
212206
config: RepoConfig,

sdk/python/feast/registry.py

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -185,11 +185,58 @@ def updater(registry_proto: RegistryProto):
185185
== feature_view_proto.spec.name
186186
and existing_feature_view_proto.spec.project == project
187187
):
188+
# do not update if feature view has not changed; updating will erase tracked materialization intervals
189+
if (
190+
FeatureView.from_proto(existing_feature_view_proto)
191+
== feature_view
192+
):
193+
return registry_proto
194+
else:
195+
del registry_proto.feature_views[idx]
196+
registry_proto.feature_views.append(feature_view_proto)
197+
return registry_proto
198+
registry_proto.feature_views.append(feature_view_proto)
199+
return registry_proto
200+
201+
self._registry_store.update_registry_proto(updater)
202+
203+
def apply_materialization(
204+
self,
205+
feature_view: FeatureView,
206+
project: str,
207+
start_date: datetime,
208+
end_date: datetime,
209+
):
210+
"""
211+
Updates materialization intervals tracked for a single feature view in Feast
212+
213+
Args:
214+
feature_view: Feature view that will be updated with an additional materialization interval tracked
215+
project: Feast project that this feature view belongs to
216+
start_date (datetime): Start date of the materialization interval to track
217+
end_date (datetime): End date of the materialization interval to track
218+
"""
219+
220+
def updater(registry_proto: RegistryProto):
221+
for idx, existing_feature_view_proto in enumerate(
222+
registry_proto.feature_views
223+
):
224+
if (
225+
existing_feature_view_proto.spec.name == feature_view.name
226+
and existing_feature_view_proto.spec.project == project
227+
):
228+
existing_feature_view = FeatureView.from_proto(
229+
existing_feature_view_proto
230+
)
231+
existing_feature_view.materialization_intervals.append(
232+
(start_date, end_date)
233+
)
234+
feature_view_proto = existing_feature_view.to_proto()
235+
feature_view_proto.spec.project = project
188236
del registry_proto.feature_views[idx]
189237
registry_proto.feature_views.append(feature_view_proto)
190238
return registry_proto
191-
registry_proto.feature_views.append(feature_view_proto)
192-
return registry_proto
239+
raise FeatureViewNotFoundException(feature_view.name, project)
193240

194241
self._registry_store.update_registry_proto(updater)
195242

@@ -249,7 +296,7 @@ def get_feature_table(self, name: str, project: str) -> FeatureTable:
249296
and feature_table_proto.spec.project == project
250297
):
251298
return FeatureTable.from_proto(feature_table_proto)
252-
raise FeatureTableNotFoundException(project, name)
299+
raise FeatureTableNotFoundException(name, project)
253300

254301
def get_feature_view(self, name: str, project: str) -> FeatureView:
255302
"""
@@ -291,7 +338,7 @@ def updater(registry_proto: RegistryProto):
291338
):
292339
del registry_proto.feature_tables[idx]
293340
return registry_proto
294-
raise FeatureTableNotFoundException(project, name)
341+
raise FeatureTableNotFoundException(name, project)
295342

296343
self._registry_store.update_registry_proto(updater)
297344
return

sdk/python/feast/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from pytz import utc
44

55

6-
def make_tzaware(t: datetime):
6+
def make_tzaware(t: datetime) -> datetime:
77
""" We assume tz-naive datetimes are UTC """
88
if t.tzinfo is None:
99
return t.replace(tzinfo=utc)

sdk/python/tests/test_feature_store.py

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414
import time
15-
from datetime import timedelta
15+
from datetime import datetime, timedelta
1616
from tempfile import mkstemp
1717

1818
import pytest
@@ -386,3 +386,59 @@ def test_apply_remote_repo():
386386
online_store=SqliteOnlineStoreConfig(path=online_store_path),
387387
)
388388
)
389+
390+
391+
@pytest.mark.parametrize(
392+
"test_feature_store", [lazy_fixture("feature_store_with_local_registry")],
393+
)
394+
@pytest.mark.parametrize("dataframe_source", [lazy_fixture("simple_dataset_1")])
395+
def test_reapply_feature_view_success(test_feature_store, dataframe_source):
396+
with prep_file_source(
397+
df=dataframe_source, event_timestamp_column="ts_1"
398+
) as file_source:
399+
400+
e = Entity(name="id", value_type=ValueType.STRING)
401+
402+
# Create Feature View
403+
fv1 = FeatureView(
404+
name="my_feature_view_1",
405+
features=[Feature(name="string_col", dtype=ValueType.STRING)],
406+
entities=["id"],
407+
input=file_source,
408+
ttl=timedelta(minutes=5),
409+
)
410+
411+
# Register Feature View
412+
test_feature_store.apply([fv1, e])
413+
414+
# Check Feature View
415+
fv_stored = test_feature_store.get_feature_view(fv1.name)
416+
assert len(fv_stored.materialization_intervals) == 0
417+
418+
# Run materialization
419+
test_feature_store.materialize(datetime(2020, 1, 1), datetime(2021, 1, 1))
420+
421+
# Check Feature View
422+
fv_stored = test_feature_store.get_feature_view(fv1.name)
423+
assert len(fv_stored.materialization_intervals) == 1
424+
425+
# Apply again
426+
test_feature_store.apply([fv1])
427+
428+
# Check Feature View
429+
fv_stored = test_feature_store.get_feature_view(fv1.name)
430+
assert len(fv_stored.materialization_intervals) == 1
431+
432+
# Change and apply Feature View
433+
fv1 = FeatureView(
434+
name="my_feature_view_1",
435+
features=[Feature(name="int64_col", dtype=ValueType.INT64)],
436+
entities=["id"],
437+
input=file_source,
438+
ttl=timedelta(minutes=5),
439+
)
440+
test_feature_store.apply([fv1])
441+
442+
# Check Feature View
443+
fv_stored = test_feature_store.get_feature_view(fv1.name)
444+
assert len(fv_stored.materialization_intervals) == 0

0 commit comments

Comments
 (0)