Skip to content

Commit 17bfa61

Browse files
leonid133Tsotne Tabidze
andauthored
Add support for DynamoDB and S3 registry (#1483)
* Add support for DynamoDB and S3 registry Signed-off-by: lblokhin <[email protected]> * rcu and wcu as a parameter of dynamodb online store Signed-off-by: lblokhin <[email protected]> * fix linter Signed-off-by: lblokhin <[email protected]> * aws dependency to extras Signed-off-by: lblokhin <[email protected]> * FEAST_S3_ENDPOINT_URL Signed-off-by: lblokhin <[email protected]> * tests Signed-off-by: lblokhin <[email protected]> * fix signature, after merge Signed-off-by: lblokhin <[email protected]> * aws default region name configurable Signed-off-by: lblokhin <[email protected]> * add offlinestore config type to test Signed-off-by: lblokhin <[email protected]> * review changes Signed-off-by: lblokhin <[email protected]> * review requested changes Signed-off-by: lblokhin <[email protected]> * integration test for Dynamo Signed-off-by: lblokhin <[email protected]> * change the rest of table_name to table_instance (where table_name is actually an instance of DynamoDB Table object) Signed-off-by: lblokhin <[email protected]> * fix DynamoDBOnlineStore commit Signed-off-by: lblokhin <[email protected]> * move client to _initialize_dynamodb Signed-off-by: lblokhin <[email protected]> * rename document_id to entity_id and Row to entity_id Signed-off-by: lblokhin <[email protected]> * The default value is None Signed-off-by: lblokhin <[email protected]> * Remove Datastore from the docstring. Signed-off-by: lblokhin <[email protected]> * get rid of the return call from S3RegistryStore Signed-off-by: lblokhin <[email protected]> * merge two exceptions Signed-off-by: lblokhin <[email protected]> * For ci requirement Signed-off-by: lblokhin <[email protected]> * remove configuration from test Signed-off-by: lblokhin <[email protected]> * feast-integration-tests for tests Signed-off-by: lblokhin <[email protected]> * change test path Signed-off-by: lblokhin <[email protected]> * add fixture feature_store_with_s3_registry to test Signed-off-by: lblokhin <[email protected]> * region required Signed-off-by: lblokhin <[email protected]> * Address the rest of the comments Signed-off-by: Tsotne Tabidze <[email protected]> * Update to_table to to_arrow Signed-off-by: Tsotne Tabidze <[email protected]> Co-authored-by: Tsotne Tabidze <[email protected]>
1 parent 0a2e173 commit 17bfa61

20 files changed

+678
-21
lines changed
1.56 KB
Binary file not shown.
90.6 KB
Loading

docs/specs/online_store_format.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ This format is considered part of Feast public API contract; that allows other c
77

88
The format is not entirely technology or cloud agnostic. Since users may opt to use different key-value stores as an underlying engine to store feature data, and we don't want to aim for the lowest common denominator across them, we have to provide different "flavors" of this data format, specialized for every supported store.
99

10-
This version of the Online Store Format supports only Redis as the underlying storage engine. We envision adding more storage engines to this document in the future.
10+
This version of the Online Store Format supports Redis and DynamoDB as storage engine. We envision adding more storage engines to this document in the future.
1111

1212

1313
## Overview

sdk/python/feast/cli.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ def materialize_incremental_command(ctx: click.Context, end_ts: str, views: List
280280
@click.option(
281281
"--template",
282282
"-t",
283-
type=click.Choice(["local", "gcp"], case_sensitive=False),
283+
type=click.Choice(["local", "gcp", "aws"], case_sensitive=False),
284284
help="Specify a template for the created project",
285285
default="local",
286286
)

sdk/python/feast/errors.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,16 @@ def __init__(self, name, project=None):
4040
super().__init__(f"Feature table {name} does not exist")
4141

4242

43+
class S3RegistryBucketNotExist(FeastObjectNotFoundException):
44+
def __init__(self, bucket):
45+
super().__init__(f"S3 bucket {bucket} for the Feast registry does not exist")
46+
47+
48+
class S3RegistryBucketForbiddenAccess(FeastObjectNotFoundException):
49+
def __init__(self, bucket):
50+
super().__init__(f"S3 bucket {bucket} for the Feast registry can't be accessed")
51+
52+
4353
class FeastProviderLoginError(Exception):
4454
"""Error class that indicates a user has not authenticated with their provider."""
4555

sdk/python/feast/infra/aws.py

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
from datetime import datetime
2+
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
3+
4+
import pandas
5+
from tqdm import tqdm
6+
7+
from feast import FeatureTable
8+
from feast.entity import Entity
9+
from feast.feature_view import FeatureView
10+
from feast.infra.offline_stores.helpers import get_offline_store_from_config
11+
from feast.infra.online_stores.helpers import get_online_store_from_config
12+
from feast.infra.provider import (
13+
Provider,
14+
RetrievalJob,
15+
_convert_arrow_to_proto,
16+
_get_column_names,
17+
_run_field_mapping,
18+
)
19+
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
20+
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
21+
from feast.registry import Registry
22+
from feast.repo_config import RepoConfig
23+
24+
25+
class AwsProvider(Provider):
26+
def __init__(self, config: RepoConfig):
27+
self.repo_config = config
28+
self.offline_store = get_offline_store_from_config(config.offline_store)
29+
self.online_store = get_online_store_from_config(config.online_store)
30+
31+
def update_infra(
32+
self,
33+
project: str,
34+
tables_to_delete: Sequence[Union[FeatureTable, FeatureView]],
35+
tables_to_keep: Sequence[Union[FeatureTable, FeatureView]],
36+
entities_to_delete: Sequence[Entity],
37+
entities_to_keep: Sequence[Entity],
38+
partial: bool,
39+
):
40+
self.online_store.update(
41+
config=self.repo_config,
42+
tables_to_delete=tables_to_delete,
43+
tables_to_keep=tables_to_keep,
44+
entities_to_keep=entities_to_keep,
45+
entities_to_delete=entities_to_delete,
46+
partial=partial,
47+
)
48+
49+
def teardown_infra(
50+
self,
51+
project: str,
52+
tables: Sequence[Union[FeatureTable, FeatureView]],
53+
entities: Sequence[Entity],
54+
) -> None:
55+
self.online_store.teardown(self.repo_config, tables, entities)
56+
57+
def online_write_batch(
58+
self,
59+
config: RepoConfig,
60+
table: Union[FeatureTable, FeatureView],
61+
data: List[
62+
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
63+
],
64+
progress: Optional[Callable[[int], Any]],
65+
) -> None:
66+
self.online_store.online_write_batch(config, table, data, progress)
67+
68+
def online_read(
69+
self,
70+
config: RepoConfig,
71+
table: Union[FeatureTable, FeatureView],
72+
entity_keys: List[EntityKeyProto],
73+
requested_features: List[str] = None,
74+
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
75+
result = self.online_store.online_read(config, table, entity_keys)
76+
77+
return result
78+
79+
def materialize_single_feature_view(
80+
self,
81+
config: RepoConfig,
82+
feature_view: FeatureView,
83+
start_date: datetime,
84+
end_date: datetime,
85+
registry: Registry,
86+
project: str,
87+
tqdm_builder: Callable[[int], tqdm],
88+
) -> None:
89+
entities = []
90+
for entity_name in feature_view.entities:
91+
entities.append(registry.get_entity(entity_name, project))
92+
93+
(
94+
join_key_columns,
95+
feature_name_columns,
96+
event_timestamp_column,
97+
created_timestamp_column,
98+
) = _get_column_names(feature_view, entities)
99+
100+
offline_job = self.offline_store.pull_latest_from_table_or_query(
101+
config=config,
102+
data_source=feature_view.input,
103+
join_key_columns=join_key_columns,
104+
feature_name_columns=feature_name_columns,
105+
event_timestamp_column=event_timestamp_column,
106+
created_timestamp_column=created_timestamp_column,
107+
start_date=start_date,
108+
end_date=end_date,
109+
)
110+
111+
table = offline_job.to_arrow()
112+
113+
if feature_view.input.field_mapping is not None:
114+
table = _run_field_mapping(table, feature_view.input.field_mapping)
115+
116+
join_keys = [entity.join_key for entity in entities]
117+
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)
118+
119+
with tqdm_builder(len(rows_to_write)) as pbar:
120+
self.online_write_batch(
121+
self.repo_config, feature_view, rows_to_write, lambda x: pbar.update(x)
122+
)
123+
124+
def get_historical_features(
125+
self,
126+
config: RepoConfig,
127+
feature_views: List[FeatureView],
128+
feature_refs: List[str],
129+
entity_df: Union[pandas.DataFrame, str],
130+
registry: Registry,
131+
project: str,
132+
) -> RetrievalJob:
133+
job = self.offline_store.get_historical_features(
134+
config=config,
135+
feature_views=feature_views,
136+
feature_refs=feature_refs,
137+
entity_df=entity_df,
138+
registry=registry,
139+
project=project,
140+
)
141+
return job

sdk/python/feast/infra/online_stores/datastore.py

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,12 @@
1616
from multiprocessing.pool import ThreadPool
1717
from typing import Any, Callable, Dict, Iterator, List, Optional, Sequence, Tuple, Union
1818

19-
import mmh3
2019
from pydantic import PositiveInt, StrictStr
2120
from pydantic.typing import Literal
2221

2322
from feast import Entity, FeatureTable, utils
2423
from feast.feature_view import FeatureView
25-
from feast.infra.key_encoding_utils import serialize_entity_key
24+
from feast.infra.online_stores.helpers import compute_entity_id
2625
from feast.infra.online_stores.online_store import OnlineStore
2726
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
2827
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
@@ -191,7 +190,7 @@ def _write_minibatch(
191190
):
192191
entities = []
193192
for entity_key, features, timestamp, created_ts in data:
194-
document_id = compute_datastore_entity_id(entity_key)
193+
document_id = compute_entity_id(entity_key)
195194

196195
key = client.key(
197196
"Project", project, "Table", table.name, "Row", document_id,
@@ -236,7 +235,7 @@ def online_read(
236235

237236
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
238237
for entity_key in entity_keys:
239-
document_id = compute_datastore_entity_id(entity_key)
238+
document_id = compute_entity_id(entity_key)
240239
key = client.key(
241240
"Project", feast_project, "Table", table.name, "Row", document_id
242241
)
@@ -253,16 +252,6 @@ def online_read(
253252
return result
254253

255254

256-
def compute_datastore_entity_id(entity_key: EntityKeyProto) -> str:
257-
"""
258-
Compute Datastore Entity id given Feast Entity Key.
259-
260-
Remember that Datastore Entity is a concept from the Datastore data model, that has nothing to
261-
do with the Entity concept we have in Feast.
262-
"""
263-
return mmh3.hash_bytes(serialize_entity_key(entity_key)).hex()
264-
265-
266255
def _delete_all_values(client, key) -> None:
267256
"""
268257
Delete all data under the key path in datastore.

0 commit comments

Comments
 (0)