Skip to content

Commit f32b4f4

Browse files
authored
Adding a local feature server test (feast-dev#2217)
* merge Signed-off-by: Danny Chiao <[email protected]> * Fix port collision Signed-off-by: Danny Chiao <[email protected]> * Fix lint Signed-off-by: Danny Chiao <[email protected]> * fix lint Signed-off-by: Danny Chiao <[email protected]> * fix lint Signed-off-by: Danny Chiao <[email protected]> * fix lint Signed-off-by: Danny Chiao <[email protected]> * fix lint Signed-off-by: Danny Chiao <[email protected]> * change static method Signed-off-by: Danny Chiao <[email protected]>
1 parent 1b98ec9 commit f32b4f4

File tree

6 files changed

+60
-13
lines changed

6 files changed

+60
-13
lines changed

sdk/python/feast/feature_server.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import traceback
2+
13
import click
24
import uvicorn
35
from fastapi import FastAPI, HTTPException, Request
@@ -59,7 +61,7 @@ def get_online_features(body=Depends(get_body)):
5961
)
6062
except Exception as e:
6163
# Print the original exception on the server side
62-
logger.exception(e)
64+
logger.exception(traceback.format_exc())
6365
# Raise HTTPException to return the error message to the client
6466
raise HTTPException(status_code=500, detail=str(e))
6567

sdk/python/tests/conftest.py

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,17 @@
1313
# limitations under the License.
1414
import logging
1515
import multiprocessing
16+
import time
1617
from datetime import datetime, timedelta
18+
from multiprocessing import Process
1719
from sys import platform
1820
from typing import List
1921

2022
import pandas as pd
2123
import pytest
2224
from _pytest.nodes import Item
2325

26+
from feast import FeatureStore
2427
from tests.data.data_creator import create_dataset
2528
from tests.integration.feature_repos.integration_test_repo_config import (
2629
IntegrationTestRepoConfig,
@@ -137,23 +140,41 @@ def simple_dataset_2() -> pd.DataFrame:
137140
return pd.DataFrame.from_dict(data)
138141

139142

143+
def start_test_local_server(repo_path: str, port: int):
144+
fs = FeatureStore(repo_path)
145+
fs.serve("localhost", port, no_access_log=True)
146+
147+
140148
@pytest.fixture(
141149
params=FULL_REPO_CONFIGS, scope="session", ids=[str(c) for c in FULL_REPO_CONFIGS]
142150
)
143-
def environment(request):
144-
e = construct_test_environment(request.param)
151+
def environment(request, worker_id: str):
152+
e = construct_test_environment(request.param, worker_id=worker_id)
153+
proc = Process(
154+
target=start_test_local_server,
155+
args=(e.feature_store.repo_path, e.get_local_server_port()),
156+
daemon=True,
157+
)
158+
if e.python_feature_server and e.test_repo_config.provider == "local":
159+
proc.start()
160+
# Wait for server to start
161+
time.sleep(3)
145162

146163
def cleanup():
147164
e.feature_store.teardown()
165+
if proc.is_alive():
166+
proc.kill()
148167

149168
request.addfinalizer(cleanup)
169+
150170
return e
151171

152172

153173
@pytest.fixture()
154174
def local_redis_environment(request, worker_id):
155-
156-
e = construct_test_environment(IntegrationTestRepoConfig(online_store=REDIS_CONFIG))
175+
e = construct_test_environment(
176+
IntegrationTestRepoConfig(online_store=REDIS_CONFIG), worker_id=worker_id
177+
)
157178

158179
def cleanup():
159180
e.feature_store.teardown()

sdk/python/tests/example_repos/example_feature_repo_with_duplicated_featureview_names.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
name="driver_hourly_stats", # Intentionally use the same FeatureView name
1111
entities=["driver_id"],
1212
online=False,
13-
input=driver_hourly_stats,
13+
batch_source=driver_hourly_stats,
1414
ttl=Duration(seconds=10),
1515
tags={},
1616
)
@@ -19,7 +19,7 @@
1919
name="driver_hourly_stats", # Intentionally use the same FeatureView name
2020
entities=["driver_id"],
2121
online=False,
22-
input=driver_hourly_stats,
22+
batch_source=driver_hourly_stats,
2323
ttl=Duration(seconds=10),
2424
tags={},
2525
)

sdk/python/tests/integration/feature_repos/repo_configuration.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import importlib
22
import json
33
import os
4+
import re
45
import tempfile
56
import uuid
67
from dataclasses import dataclass, field
@@ -51,6 +52,7 @@
5152
DEFAULT_FULL_REPO_CONFIGS: List[IntegrationTestRepoConfig] = [
5253
# Local configurations
5354
IntegrationTestRepoConfig(),
55+
IntegrationTestRepoConfig(python_feature_server=True),
5456
]
5557
if os.getenv("FEAST_IS_LOCAL_TEST", "False") != "True":
5658
DEFAULT_FULL_REPO_CONFIGS.extend(
@@ -217,6 +219,7 @@ class Environment:
217219
feature_store: FeatureStore
218220
data_source_creator: DataSourceCreator
219221
python_feature_server: bool
222+
worker_id: str
220223

221224
end_date: datetime = field(
222225
default=datetime.utcnow().replace(microsecond=0, second=0, minute=0)
@@ -225,6 +228,20 @@ class Environment:
225228
def __post_init__(self):
226229
self.start_date: datetime = self.end_date - timedelta(days=3)
227230

231+
def get_feature_server_endpoint(self) -> str:
232+
if self.python_feature_server and self.test_repo_config.provider == "local":
233+
return f"http://localhost:{self.get_local_server_port()}"
234+
return self.feature_store.get_feature_server_endpoint()
235+
236+
def get_local_server_port(self) -> int:
237+
# Heuristic when running with xdist to extract unique ports for each worker
238+
parsed_worker_id = re.findall("gw(\\d+)", self.worker_id)
239+
if len(parsed_worker_id) != 0:
240+
worker_id_num = int(parsed_worker_id[0])
241+
else:
242+
worker_id_num = 0
243+
return 6566 + worker_id_num
244+
228245

229246
def table_name_from_data_source(ds: DataSource) -> Optional[str]:
230247
if hasattr(ds, "table_ref"):
@@ -237,6 +254,7 @@ def table_name_from_data_source(ds: DataSource) -> Optional[str]:
237254
def construct_test_environment(
238255
test_repo_config: IntegrationTestRepoConfig,
239256
test_suite_name: str = "integration_test",
257+
worker_id: str = "worker_id",
240258
) -> Environment:
241259

242260
_uuid = str(uuid.uuid4()).replace("-", "")[:8]
@@ -254,7 +272,7 @@ def construct_test_environment(
254272

255273
repo_dir_name = tempfile.mkdtemp()
256274

257-
if test_repo_config.python_feature_server:
275+
if test_repo_config.python_feature_server and test_repo_config.provider == "aws":
258276
from feast.infra.feature_servers.aws_lambda.config import (
259277
AwsLambdaFeatureServerConfig,
260278
)
@@ -266,6 +284,7 @@ def construct_test_environment(
266284

267285
registry = f"s3://feast-integration-tests/registries/{project}/registry.db"
268286
else:
287+
# Note: even if it's a local feature server, the repo config does not have this configured
269288
feature_server = None
270289
registry = str(Path(repo_dir_name) / "registry.db")
271290

@@ -293,6 +312,7 @@ def construct_test_environment(
293312
feature_store=fs,
294313
data_source_creator=offline_creator,
295314
python_feature_server=test_repo_config.python_feature_server,
315+
worker_id=worker_id,
296316
)
297317

298318
return environment

sdk/python/tests/integration/feature_repos/universal/feature_views.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ def driver_feature_view(
2020
entities=["driver"],
2121
features=None if infer_features else [Feature("value", value_type)],
2222
ttl=timedelta(days=5),
23-
input=data_source,
23+
batch_source=data_source,
2424
)
2525

2626

@@ -35,7 +35,7 @@ def global_feature_view(
3535
entities=[],
3636
features=None if infer_features else [Feature("entityless_value", value_type)],
3737
ttl=timedelta(days=5),
38-
input=data_source,
38+
batch_source=data_source,
3939
)
4040

4141

sdk/python/tests/integration/online_store/test_universal_online.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ def _get_online_features_dict_remotely(
185185
186186
The output should be identical to:
187187
188-
>>> fs.get_online_features(features=features, entity_rows=entity_rows, full_feature_names=full_feature_names).to_dict()
188+
fs.get_online_features(features=features, entity_rows=entity_rows, full_feature_names=full_feature_names).to_dict()
189189
190190
This makes it easy to test the remote feature server by comparing the output to the local method.
191191
@@ -212,6 +212,10 @@ def _get_online_features_dict_remotely(
212212
time.sleep(1)
213213
else:
214214
raise Exception("Failed to get online features from remote feature server")
215+
if "metadata" not in response:
216+
raise Exception(
217+
f"Failed to get online features from remote feature server {response}"
218+
)
215219
keys = response["metadata"]["feature_names"]
216220
# Get rid of unnecessary structure in the response, leaving list of dicts
217221
response = [row["values"] for row in response["results"]]
@@ -238,8 +242,8 @@ def get_online_features_dict(
238242
assertpy.assert_that(online_features).is_not_none()
239243
dict1 = online_features.to_dict()
240244

241-
endpoint = environment.feature_store.get_feature_server_endpoint()
242-
# If endpoint is None, it means that the remote feature server isn't configured
245+
endpoint = environment.get_feature_server_endpoint()
246+
# If endpoint is None, it means that a local / remote feature server aren't configured
243247
if endpoint is not None:
244248
dict2 = _get_online_features_dict_remotely(
245249
endpoint=endpoint,

0 commit comments

Comments
 (0)