Skip to content

Commit 7977a53

Browse files
authored
Add the foundation of the universal feature repo and a test that uses it (feast-dev#1734)
* Add the foundation of the universal feature repo and a test that uses it Signed-off-by: Achal Shah <[email protected]> * Make tests actually work Signed-off-by: Achal Shah <[email protected]> * Make format Signed-off-by: Achal Shah <[email protected]> * Make format Signed-off-by: Achal Shah <[email protected]> * add a redshift data source creator Signed-off-by: Achal Shah <[email protected]> * integration test Signed-off-by: Achal Shah <[email protected]> * file data source creator Signed-off-by: Achal Shah <[email protected]> * fix online store ref Signed-off-by: Achal Shah <[email protected]> * dynamodb region Signed-off-by: Achal Shah <[email protected]> * fix file Signed-off-by: Achal Shah <[email protected]> * remove impor Signed-off-by: Achal Shah <[email protected]> * close not delete Signed-off-by: Achal Shah <[email protected]> * Refactor configs into test_repo_config Signed-off-by: Achal Shah <[email protected]> * make forma Signed-off-by: Achal Shah <[email protected]> * Add a sweet decorator per feedback Signed-off-by: Achal Shah <[email protected]> * make format Signed-off-by: Achal Shah <[email protected]> * move stuff into with Signed-off-by: Achal Shah <[email protected]> * Specify repo_path for tests to succeed Signed-off-by: Achal Shah <[email protected]> * fix comments Signed-off-by: Achal Shah <[email protected]> * fix format Signed-off-by: Achal Shah <[email protected]>
1 parent 1f505e0 commit 7977a53

File tree

11 files changed

+491
-42
lines changed

11 files changed

+491
-42
lines changed

sdk/python/feast/repo_config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ class RepoConfig(FeastBaseModel):
8080

8181
def __init__(self, **data: Any):
8282
super().__init__(**data)
83+
8384
if isinstance(self.online_store, Dict):
8485
self.online_store = get_online_config_from_type(self.online_store["type"])(
8586
**self.online_store
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
from datetime import datetime, timedelta
2+
3+
import pandas as pd
4+
from pytz import timezone, utc
5+
6+
7+
def create_dataset() -> pd.DataFrame:
8+
now = datetime.utcnow()
9+
ts = pd.Timestamp(now).round("ms")
10+
data = {
11+
"id": [1, 2, 1, 3, 3],
12+
"value": [0.1, None, 0.3, 4, 5],
13+
"ts_1": [
14+
ts - timedelta(hours=4),
15+
ts,
16+
ts - timedelta(hours=3),
17+
# Use different time zones to test tz-naive -> tz-aware conversion
18+
(ts - timedelta(hours=4))
19+
.replace(tzinfo=utc)
20+
.astimezone(tz=timezone("Europe/Berlin")),
21+
(ts - timedelta(hours=1))
22+
.replace(tzinfo=utc)
23+
.astimezone(tz=timezone("US/Pacific")),
24+
],
25+
"created_ts": [ts, ts, ts, ts, ts],
26+
}
27+
return pd.DataFrame.from_dict(data)
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
import math
2+
from datetime import datetime, timedelta
3+
from typing import Optional
4+
5+
import pandas as pd
6+
from pytz import utc
7+
8+
from feast import FeatureStore, FeatureView
9+
from tests.integration.feature_repos.test_repo_configuration import parametrize_e2e_test
10+
11+
12+
@parametrize_e2e_test
13+
def test_e2e_consistency(fs: FeatureStore):
14+
run_offline_online_store_consistency_test(fs)
15+
16+
17+
def check_offline_and_online_features(
18+
fs: FeatureStore,
19+
fv: FeatureView,
20+
driver_id: int,
21+
event_timestamp: datetime,
22+
expected_value: Optional[float],
23+
full_feature_names: bool,
24+
check_offline_store: bool = True,
25+
) -> None:
26+
# Check online store
27+
response_dict = fs.get_online_features(
28+
[f"{fv.name}:value"],
29+
[{"driver": driver_id}],
30+
full_feature_names=full_feature_names,
31+
).to_dict()
32+
33+
if full_feature_names:
34+
if expected_value:
35+
assert abs(response_dict[f"{fv.name}__value"][0] - expected_value) < 1e-6
36+
else:
37+
assert response_dict[f"{fv.name}__value"][0] is None
38+
else:
39+
if expected_value:
40+
assert abs(response_dict["value"][0] - expected_value) < 1e-6
41+
else:
42+
assert response_dict["value"][0] is None
43+
44+
# Check offline store
45+
if check_offline_store:
46+
df = fs.get_historical_features(
47+
entity_df=pd.DataFrame.from_dict(
48+
{"driver_id": [driver_id], "event_timestamp": [event_timestamp]}
49+
),
50+
features=[f"{fv.name}:value"],
51+
full_feature_names=full_feature_names,
52+
).to_df()
53+
54+
if full_feature_names:
55+
if expected_value:
56+
assert abs(df.to_dict()[f"{fv.name}__value"][0] - expected_value) < 1e-6
57+
else:
58+
assert math.isnan(df.to_dict()[f"{fv.name}__value"][0])
59+
else:
60+
if expected_value:
61+
assert abs(df.to_dict()["value"][0] - expected_value) < 1e-6
62+
else:
63+
assert math.isnan(df.to_dict()["value"][0])
64+
65+
66+
def run_offline_online_store_consistency_test(fs: FeatureStore,) -> None:
67+
now = datetime.utcnow()
68+
69+
fv = fs.get_feature_view("test_correctness")
70+
full_feature_names = True
71+
check_offline_store: bool = True
72+
73+
# Run materialize()
74+
# use both tz-naive & tz-aware timestamps to test that they're both correctly handled
75+
start_date = (now - timedelta(hours=5)).replace(tzinfo=utc)
76+
end_date = now - timedelta(hours=2)
77+
fs.materialize(feature_views=[fv.name], start_date=start_date, end_date=end_date)
78+
79+
# check result of materialize()
80+
check_offline_and_online_features(
81+
fs=fs,
82+
fv=fv,
83+
driver_id=1,
84+
event_timestamp=end_date,
85+
expected_value=0.3,
86+
full_feature_names=full_feature_names,
87+
check_offline_store=check_offline_store,
88+
)
89+
90+
check_offline_and_online_features(
91+
fs=fs,
92+
fv=fv,
93+
driver_id=2,
94+
event_timestamp=end_date,
95+
expected_value=None,
96+
full_feature_names=full_feature_names,
97+
check_offline_store=check_offline_store,
98+
)
99+
100+
# check prior value for materialize_incremental()
101+
check_offline_and_online_features(
102+
fs=fs,
103+
fv=fv,
104+
driver_id=3,
105+
event_timestamp=end_date,
106+
expected_value=4,
107+
full_feature_names=full_feature_names,
108+
check_offline_store=check_offline_store,
109+
)
110+
111+
# run materialize_incremental()
112+
fs.materialize_incremental(feature_views=[fv.name], end_date=now)
113+
114+
# check result of materialize_incremental()
115+
check_offline_and_online_features(
116+
fs=fs,
117+
fv=fv,
118+
driver_id=3,
119+
event_timestamp=now,
120+
expected_value=5,
121+
full_feature_names=full_feature_names,
122+
check_offline_store=check_offline_store,
123+
)
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
import tempfile
2+
import uuid
3+
from contextlib import contextmanager
4+
from pathlib import Path
5+
from typing import Dict, List, Union
6+
7+
import pytest
8+
from attr import dataclass
9+
10+
from feast import FeatureStore, RepoConfig, importer
11+
from tests.data.data_creator import create_dataset
12+
from tests.integration.feature_repos.universal.data_source_creator import (
13+
DataSourceCreator,
14+
)
15+
from tests.integration.feature_repos.universal.entities import driver
16+
from tests.integration.feature_repos.universal.feature_views import (
17+
correctness_feature_view,
18+
)
19+
20+
21+
@dataclass
22+
class TestRepoConfig:
23+
"""
24+
This class should hold all possible parameters that may need to be varied by individual tests.
25+
"""
26+
27+
provider: str = "local"
28+
online_store: Union[str, Dict] = "sqlite"
29+
30+
offline_store_creator: str = "tests.integration.feature_repos.universal.data_sources.file.FileDataSourceCreator"
31+
32+
full_feature_names: bool = True
33+
34+
35+
FULL_REPO_CONFIGS: List[TestRepoConfig] = [
36+
TestRepoConfig(), # Local
37+
TestRepoConfig(
38+
provider="aws",
39+
offline_store_creator="tests.integration.feature_repos.universal.data_sources.redshift.RedshiftDataSourceCreator",
40+
online_store={"type": "dynamodb", "region": "us-west-2"},
41+
),
42+
TestRepoConfig(
43+
provider="gcp",
44+
offline_store_creator="tests.integration.feature_repos.universal.data_sources.bigquery.BigQueryDataSourceCreator",
45+
online_store="datastore",
46+
),
47+
]
48+
49+
50+
OFFLINE_STORES: List[str] = []
51+
ONLINE_STORES: List[str] = []
52+
PROVIDERS: List[str] = []
53+
54+
55+
@contextmanager
56+
def construct_feature_store(test_repo_config: TestRepoConfig) -> FeatureStore:
57+
"""
58+
This method should take in the parameters from the test repo config and created a feature repo, apply it,
59+
and return the constructed feature store object to callers.
60+
61+
This feature store object can be interacted for the purposes of tests.
62+
The user is *not* expected to perform any clean up actions.
63+
64+
:param test_repo_config: configuration
65+
:return: A feature store built using the supplied configuration.
66+
"""
67+
df = create_dataset()
68+
69+
project = f"test_correctness_{str(uuid.uuid4()).replace('-', '')[:8]}"
70+
71+
module_name, config_class_name = test_repo_config.offline_store_creator.rsplit(
72+
".", 1
73+
)
74+
75+
offline_creator: DataSourceCreator = importer.get_class_from_type(
76+
module_name, config_class_name, "DataSourceCreator"
77+
)()
78+
ds = offline_creator.create_data_source(project, df)
79+
offline_store = offline_creator.create_offline_store_config()
80+
online_store = test_repo_config.online_store
81+
82+
with tempfile.TemporaryDirectory() as repo_dir_name:
83+
config = RepoConfig(
84+
registry=str(Path(repo_dir_name) / "registry.db"),
85+
project=project,
86+
provider=test_repo_config.provider,
87+
offline_store=offline_store,
88+
online_store=online_store,
89+
repo_path=repo_dir_name,
90+
)
91+
fs = FeatureStore(config=config)
92+
fv = correctness_feature_view(ds)
93+
entity = driver()
94+
fs.apply([fv, entity])
95+
96+
yield fs
97+
98+
fs.teardown()
99+
offline_creator.teardown(project)
100+
101+
102+
def parametrize_e2e_test(e2e_test):
103+
@pytest.mark.integration
104+
@pytest.mark.parametrize("config", FULL_REPO_CONFIGS, ids=lambda v: v.provider)
105+
def inner_test(config):
106+
with construct_feature_store(config) as fs:
107+
e2e_test(fs)
108+
109+
return inner_test
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
from abc import ABC, abstractmethod
2+
3+
import pandas as pd
4+
5+
from feast.data_source import DataSource
6+
from feast.repo_config import FeastConfigBaseModel
7+
8+
9+
class DataSourceCreator(ABC):
10+
@abstractmethod
11+
def create_data_source(
12+
self,
13+
name: str,
14+
df: pd.DataFrame,
15+
event_timestamp_column="ts",
16+
created_timestamp_column="created_ts",
17+
) -> DataSource:
18+
...
19+
20+
@abstractmethod
21+
def create_offline_store_config(self) -> FeastConfigBaseModel:
22+
...
23+
24+
@abstractmethod
25+
def teardown(self, name: str):
26+
...
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import time
2+
3+
import pandas as pd
4+
from google.cloud import bigquery
5+
6+
from feast import BigQuerySource
7+
from feast.data_source import DataSource
8+
from feast.infra.offline_stores.bigquery import BigQueryOfflineStoreConfig
9+
from tests.integration.feature_repos.universal.data_source_creator import (
10+
DataSourceCreator,
11+
)
12+
13+
14+
class BigQueryDataSourceCreator(DataSourceCreator):
15+
def teardown(self, name: str):
16+
pass
17+
18+
def __init__(self):
19+
self.client = bigquery.Client()
20+
21+
def create_offline_store_config(self):
22+
return BigQueryOfflineStoreConfig()
23+
24+
def create_data_source(
25+
self,
26+
name: str,
27+
df: pd.DataFrame,
28+
event_timestamp_column="ts",
29+
created_timestamp_column="created_ts",
30+
**kwargs,
31+
) -> DataSource:
32+
gcp_project = self.client.project
33+
bigquery_dataset = "test_ingestion"
34+
dataset = bigquery.Dataset(f"{gcp_project}.{bigquery_dataset}")
35+
self.client.create_dataset(dataset, exists_ok=True)
36+
dataset.default_table_expiration_ms = (
37+
1000 * 60 * 60 * 24 * 14
38+
) # 2 weeks in milliseconds
39+
self.client.update_dataset(dataset, ["default_table_expiration_ms"])
40+
41+
job_config = bigquery.LoadJobConfig()
42+
table_ref = f"{gcp_project}.{bigquery_dataset}.{name}_{int(time.time_ns())}"
43+
job = self.client.load_table_from_dataframe(
44+
df, table_ref, job_config=job_config
45+
)
46+
job.result()
47+
48+
return BigQuerySource(
49+
table_ref=table_ref,
50+
event_timestamp_column=event_timestamp_column,
51+
created_timestamp_column=created_timestamp_column,
52+
date_partition_column="",
53+
field_mapping={"ts_1": "ts", "id": "driver_id"},
54+
)
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import tempfile
2+
from typing import Any
3+
4+
import pandas as pd
5+
6+
from feast import FileSource
7+
from feast.data_format import ParquetFormat
8+
from feast.data_source import DataSource
9+
from feast.infra.offline_stores.file import FileOfflineStoreConfig
10+
from feast.repo_config import FeastConfigBaseModel
11+
from tests.integration.feature_repos.universal.data_source_creator import (
12+
DataSourceCreator,
13+
)
14+
15+
16+
class FileDataSourceCreator(DataSourceCreator):
17+
f: Any
18+
19+
def create_data_source(
20+
self,
21+
name: str,
22+
df: pd.DataFrame,
23+
event_timestamp_column="ts",
24+
created_timestamp_column="created_ts",
25+
) -> DataSource:
26+
self.f = tempfile.NamedTemporaryFile(suffix=".parquet", delete=False)
27+
df.to_parquet(self.f.name)
28+
return FileSource(
29+
file_format=ParquetFormat(),
30+
path=f"file://{self.f.name}",
31+
event_timestamp_column=event_timestamp_column,
32+
created_timestamp_column=created_timestamp_column,
33+
date_partition_column="",
34+
field_mapping={"ts_1": "ts", "id": "driver_id"},
35+
)
36+
37+
def create_offline_store_config(self) -> FeastConfigBaseModel:
38+
return FileOfflineStoreConfig()
39+
40+
def teardown(self, name: str):
41+
self.f.close()

0 commit comments

Comments
 (0)