Skip to content

Commit 082bbff

Browse files
author
Tsotne Tabidze
authored
Fix feast apply bugs (feast-dev#1754)
Signed-off-by: Tsotne Tabidze <[email protected]>
1 parent 651d066 commit 082bbff

File tree

3 files changed

+48
-26
lines changed

3 files changed

+48
-26
lines changed

sdk/python/feast/feature_store.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,11 @@
2424

2525
from feast import utils
2626
from feast.entity import Entity
27-
from feast.errors import FeatureNameCollisionError, FeatureViewNotFoundException
27+
from feast.errors import (
28+
EntityNotFoundException,
29+
FeatureNameCollisionError,
30+
FeatureViewNotFoundException,
31+
)
2832
from feast.feature_service import FeatureService
2933
from feast.feature_table import FeatureTable
3034
from feast.feature_view import FeatureView
@@ -654,9 +658,7 @@ def get_online_features(
654658
try:
655659
join_key = entity_name_to_join_key_map[entity_name]
656660
except KeyError:
657-
raise Exception(
658-
f"Entity {entity_name} does not exist in project {self.project}"
659-
)
661+
raise EntityNotFoundException(entity_name, self.project)
660662
join_key_row[join_key] = entity_value
661663
join_key_rows.append(join_key_row)
662664

sdk/python/feast/registry.py

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,13 @@ def __init__(self, registry_path: str, repo_path: Path, cache_ttl: timedelta):
7878
self.cached_registry_proto_ttl = cache_ttl
7979

8080
def _initialize_registry(self):
81-
"""Explicitly initializes the registry with an empty proto."""
82-
registry_proto = RegistryProto()
83-
registry_proto.registry_schema_version = REGISTRY_SCHEMA_VERSION
84-
self._registry_store.update_registry_proto(registry_proto)
81+
"""Explicitly initializes the registry with an empty proto if it doesn't exist."""
82+
try:
83+
self._get_registry_proto()
84+
except FileNotFoundError:
85+
registry_proto = RegistryProto()
86+
registry_proto.registry_schema_version = REGISTRY_SCHEMA_VERSION
87+
self._registry_store.update_registry_proto(registry_proto)
8588

8689
def apply_entity(self, entity: Entity, project: str, commit: bool = True):
8790
"""
@@ -409,22 +412,29 @@ def get_feature_view(self, name: str, project: str) -> FeatureView:
409412
return FeatureView.from_proto(feature_view_proto)
410413
raise FeatureViewNotFoundException(name, project)
411414

412-
def delete_feature_service(self, name: str, project: str):
415+
def delete_feature_service(self, name: str, project: str, commit: bool = True):
413416
"""
414417
Deletes a feature service or raises an exception if not found.
415418
416419
Args:
417420
name: Name of feature service
418421
project: Feast project that this feature service belongs to
422+
commit: Whether the change should be persisted immediately
419423
"""
420-
registry_proto = self._get_registry_proto()
421-
for idx, feature_service_proto in enumerate(registry_proto.feature_services):
424+
self._prepare_registry_for_changes()
425+
assert self.cached_registry_proto
426+
427+
for idx, feature_service_proto in enumerate(
428+
self.cached_registry_proto.feature_services
429+
):
422430
if (
423431
feature_service_proto.spec.name == name
424432
and feature_service_proto.spec.project == project
425433
):
426-
del registry_proto.feature_services[idx]
427-
return feature_service_proto
434+
del self.cached_registry_proto.feature_services[idx]
435+
if commit:
436+
self.commit()
437+
return
428438
raise FeatureServiceNotFoundException(name, project)
429439

430440
def delete_feature_table(self, name: str, project: str, commit: bool = True):

sdk/python/feast/repo_operations.py

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -116,18 +116,24 @@ def parse_repo(repo_root: Path) -> ParsedRepo:
116116
return res
117117

118118

119-
def apply_feature_services(registry: Registry, project: str, repo: ParsedRepo):
119+
def apply_feature_services(
120+
registry: Registry,
121+
project: str,
122+
repo: ParsedRepo,
123+
existing_feature_services: List[FeatureService],
124+
):
120125
from colorama import Fore, Style
121126

122127
# Determine which feature services should be deleted.
123-
existing_feature_services = registry.list_feature_services(project)
124128
for feature_service in repo.feature_services:
125129
if feature_service in existing_feature_services:
126130
existing_feature_services.remove(feature_service)
127131

128132
# The remaining features services in the list should be deleted.
129133
for feature_service_to_delete in existing_feature_services:
130-
registry.delete_feature_service(feature_service_to_delete.name, project)
134+
registry.delete_feature_service(
135+
feature_service_to_delete.name, project, commit=False
136+
)
131137
click.echo(
132138
f"Deleted feature service {Style.BRIGHT + Fore.GREEN}{feature_service_to_delete.name}{Style.RESET_ALL} "
133139
f"from registry"
@@ -192,6 +198,16 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation
192198
if registry_view.name not in repo_table_names:
193199
views_to_delete.append(registry_view)
194200

201+
entities_to_delete: List[Entity] = []
202+
repo_entities_names = set([e.name for e in repo.entities])
203+
for registry_entity in registry.list_entities(project=project):
204+
if registry_entity.name not in repo_entities_names:
205+
entities_to_delete.append(registry_entity)
206+
207+
entities_to_keep: List[Entity] = repo.entities
208+
209+
existing_feature_services = registry.list_feature_services(project)
210+
195211
sys.dont_write_bytecode = False
196212
for entity in repo.entities:
197213
registry.apply_entity(entity, project=project, commit=False)
@@ -228,9 +244,8 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation
228244
click.echo(
229245
f"Registered feature view {Style.BRIGHT + Fore.GREEN}{view.name}{Style.RESET_ALL}"
230246
)
231-
registry.commit()
232247

233-
apply_feature_services(registry, project, repo)
248+
apply_feature_services(registry, project, repo, existing_feature_services)
234249

235250
infra_provider = get_provider(repo_config, repo_path)
236251

@@ -242,14 +257,6 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation
242257
all_to_keep.extend(repo.feature_tables)
243258
all_to_keep.extend(repo.feature_views)
244259

245-
entities_to_delete: List[Entity] = []
246-
repo_entities_names = set([e.name for e in repo.entities])
247-
for registry_entity in registry.list_entities(project=project):
248-
if registry_entity.name not in repo_entities_names:
249-
entities_to_delete.append(registry_entity)
250-
251-
entities_to_keep: List[Entity] = repo.entities
252-
253260
for name in [view.name for view in repo.feature_tables] + [
254261
table.name for table in repo.feature_views
255262
]:
@@ -272,6 +279,9 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation
272279
partial=False,
273280
)
274281

282+
# Commit the update to the registry only after successful infra update
283+
registry.commit()
284+
275285

276286
@log_exceptions_and_usage
277287
def teardown(repo_config: RepoConfig, repo_path: Path):

0 commit comments

Comments
 (0)