Skip to content

Commit cbb97d3

Browse files
authored
Pass entities information to Provider (feast-dev#1498)
Signed-off-by: Matt Delacour <[email protected]>
1 parent 36c77dd commit cbb97d3

File tree

6 files changed

+50
-5
lines changed

6 files changed

+50
-5
lines changed

sdk/python/feast/feature_store.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,12 +217,14 @@ def apply(
217217
if isinstance(objects, Entity) or isinstance(objects, FeatureView):
218218
objects = [objects]
219219
views_to_update = []
220+
entities_to_update = []
220221
for ob in objects:
221222
if isinstance(ob, FeatureView):
222223
self._registry.apply_feature_view(ob, project=self.project)
223224
views_to_update.append(ob)
224225
elif isinstance(ob, Entity):
225226
self._registry.apply_entity(ob, project=self.project)
227+
entities_to_update.append(ob)
226228
else:
227229
raise ValueError(
228230
f"Unknown object type ({type(ob)}) provided as part of apply() call"
@@ -231,6 +233,8 @@ def apply(
231233
project=self.project,
232234
tables_to_delete=[],
233235
tables_to_keep=views_to_update,
236+
entities_to_delete=[],
237+
entities_to_keep=entities_to_update,
234238
partial=True,
235239
)
236240

sdk/python/feast/infra/gcp.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from google.auth.exceptions import DefaultCredentialsError
1010

1111
from feast import FeatureTable, utils
12+
from feast.entity import Entity
1213
from feast.errors import FeastProviderLoginError
1314
from feast.feature_view import FeatureView
1415
from feast.infra.key_encoding_utils import serialize_entity_key
@@ -55,6 +56,8 @@ def update_infra(
5556
project: str,
5657
tables_to_delete: Sequence[Union[FeatureTable, FeatureView]],
5758
tables_to_keep: Sequence[Union[FeatureTable, FeatureView]],
59+
entities_to_delete: Sequence[Entity],
60+
entities_to_keep: Sequence[Entity],
5861
partial: bool,
5962
):
6063
from google.cloud import datastore
@@ -77,7 +80,10 @@ def update_infra(
7780
client.delete(key)
7881

7982
def teardown_infra(
80-
self, project: str, tables: Sequence[Union[FeatureTable, FeatureView]]
83+
self,
84+
project: str,
85+
tables: Sequence[Union[FeatureTable, FeatureView]],
86+
entities: Sequence[Entity],
8187
) -> None:
8288
client = self._initialize_client()
8389

sdk/python/feast/infra/local.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import pytz
99

1010
from feast import FeatureTable, utils
11+
from feast.entity import Entity
1112
from feast.feature_view import FeatureView
1213
from feast.infra.key_encoding_utils import serialize_entity_key
1314
from feast.infra.offline_stores.helpers import get_offline_store_from_sources
@@ -50,6 +51,8 @@ def update_infra(
5051
project: str,
5152
tables_to_delete: Sequence[Union[FeatureTable, FeatureView]],
5253
tables_to_keep: Sequence[Union[FeatureTable, FeatureView]],
54+
entities_to_delete: Sequence[Entity],
55+
entities_to_keep: Sequence[Entity],
5356
partial: bool,
5457
):
5558
conn = self._get_conn()
@@ -65,7 +68,10 @@ def update_infra(
6568
conn.execute(f"DROP TABLE IF EXISTS {_table_id(project, table)}")
6669

6770
def teardown_infra(
68-
self, project: str, tables: Sequence[Union[FeatureTable, FeatureView]]
71+
self,
72+
project: str,
73+
tables: Sequence[Union[FeatureTable, FeatureView]],
74+
entities: Sequence[Entity],
6975
) -> None:
7076
os.unlink(self._db_path)
7177

sdk/python/feast/infra/provider.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ def update_infra(
2626
project: str,
2727
tables_to_delete: Sequence[Union[FeatureTable, FeatureView]],
2828
tables_to_keep: Sequence[Union[FeatureTable, FeatureView]],
29+
entities_to_delete: Sequence[Entity],
30+
entities_to_keep: Sequence[Entity],
2931
partial: bool,
3032
):
3133
"""
@@ -37,21 +39,29 @@ def update_infra(
3739
clean up the corresponding cloud resources.
3840
tables_to_keep: Tables that are still in the feature repo. Depending on implementation,
3941
provider may or may not need to update the corresponding resources.
42+
entities_to_delete: Entities that were deleted from the feature repo, so provider needs to
43+
clean up the corresponding cloud resources.
44+
entities_to_keep: Entities that are still in the feature repo. Depending on implementation,
45+
provider may or may not need to update the corresponding resources.
4046
partial: if true, then tables_to_delete and tables_to_keep are *not* exhaustive lists.
4147
There may be other tables that are not touched by this update.
4248
"""
4349
...
4450

4551
@abc.abstractmethod
4652
def teardown_infra(
47-
self, project: str, tables: Sequence[Union[FeatureTable, FeatureView]]
53+
self,
54+
project: str,
55+
tables: Sequence[Union[FeatureTable, FeatureView]],
56+
entities: Sequence[Entity],
4857
):
4958
"""
5059
Tear down all cloud resources for a repo.
5160
5261
Args:
5362
project: Feast project to which tables belong
5463
tables: Tables that are declared in the feature repo.
64+
entities: Entities that are declared in the feature repo.
5565
"""
5666
...
5767

sdk/python/feast/repo_operations.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,14 @@ def apply_total(repo_config: RepoConfig, repo_path: Path):
129129
all_to_keep.extend(repo.feature_tables)
130130
all_to_keep.extend(repo.feature_views)
131131

132+
entities_to_delete: List[Entity] = []
133+
repo_entities_names = set([e.name for e in repo.entities])
134+
for registry_entity in registry.list_entities(project=project):
135+
if registry_entity.name not in repo_entities_names:
136+
entities_to_delete.append(registry_entity)
137+
138+
entities_to_keep: List[Entity] = repo.entities
139+
132140
for name in [view.name for view in repo.feature_tables] + [
133141
table.name for table in repo.feature_views
134142
]:
@@ -141,10 +149,13 @@ def apply_total(repo_config: RepoConfig, repo_path: Path):
141149
click.echo(
142150
f"Removing infrastructure for {Style.BRIGHT + Fore.GREEN}{name}{Style.RESET_ALL}"
143151
)
152+
144153
infra_provider.update_infra(
145154
project,
146155
tables_to_delete=all_to_delete,
147156
tables_to_keep=all_to_keep,
157+
entities_to_delete=entities_to_delete,
158+
entities_to_keep=entities_to_keep,
148159
partial=False,
149160
)
150161

@@ -160,8 +171,13 @@ def teardown(repo_config: RepoConfig, repo_path: Path):
160171
registry_tables: List[Union[FeatureTable, FeatureView]] = []
161172
registry_tables.extend(registry.list_feature_tables(project=project))
162173
registry_tables.extend(registry.list_feature_views(project=project))
174+
175+
registry_entities: List[Entity] = registry.list_entities(project=project)
176+
163177
infra_provider = get_provider(repo_config, repo_path)
164-
infra_provider.teardown_infra(project, tables=registry_tables)
178+
infra_provider.teardown_infra(
179+
project, tables=registry_tables, entities=registry_entities
180+
)
165181

166182

167183
def registry_dump(repo_config: RepoConfig, repo_path: Path):

sdk/python/tests/online_write_benchmark.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,10 @@ def benchmark_writes():
8989
)
9090

9191
registry_tables = store.list_feature_views()
92-
provider.teardown_infra(store.project, tables=registry_tables)
92+
registry_entities = store.list_entities()
93+
provider.teardown_infra(
94+
store.project, tables=registry_tables, entities=registry_entities
95+
)
9396

9497

9598
if __name__ == "__main__":

0 commit comments

Comments
 (0)