Skip to content

Commit e66a74f

Browse files
Tsotne Tabidzefelixwang9817
authored andcommitted
Add integration tests for AWS Lambda feature server (feast-dev#2001)
* Add integration tests for AWS Lambda feature server Signed-off-by: Tsotne Tabidze <[email protected]> * Fix feature server bugs & integration test Signed-off-by: Tsotne Tabidze <[email protected]> * Address comments & ignore incorrect linter errors Signed-off-by: Tsotne Tabidze <[email protected]> * Fix float comparison & lambda api call retries Signed-off-by: Tsotne Tabidze <[email protected]> * Increase retries and catch other ClientErrors Signed-off-by: Tsotne Tabidze <[email protected]> * Fix lambda name shortening (md5 hashing instead of base64 prefixing) base64-ing project name and then taking the prefix was not working when the prefix of the project name was identical between tests (only their endings were different for some tests). This caused lambda names between test cases to be the same, causing conflicts between updates. Now, we instead use md5 hash which depends on the entire body of the input, eliminating this issue. Signed-off-by: Tsotne Tabidze <[email protected]>
1 parent b5707ab commit e66a74f

File tree

13 files changed

+271
-83
lines changed

13 files changed

+271
-83
lines changed

.github/workflows/pr_integration_tests.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ jobs:
3838
env:
3939
ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }}
4040
ECR_REPOSITORY: feast-python-server
41+
# Note: the image tags should be in sync with sdk/python/feast/infra/aws.py:_get_docker_image_version
4142
run: |
4243
docker build \
4344
--file sdk/python/feast/infra/feature_servers/aws_lambda/Dockerfile \

sdk/python/feast/constants.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
# Maximum interval(secs) to wait between retries for retry function
1818
MAX_WAIT_INTERVAL: str = "60"
1919

20-
AWS_LAMBDA_FEATURE_SERVER_IMAGE = "feastdev/feature-server:aws"
20+
AWS_LAMBDA_FEATURE_SERVER_IMAGE = "feastdev/feature-server"
2121
AWS_LAMBDA_FEATURE_SERVER_REPOSITORY = "feast-python-server"
2222

2323
# feature_store.yaml environment variable name for remote feature server

sdk/python/feast/feature_server.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import feast
88
from feast import proto_json
99
from feast.protos.feast.serving.ServingService_pb2 import GetOnlineFeaturesRequest
10+
from feast.type_map import feast_value_type_to_python_type
1011

1112

1213
def get_app(store: "feast.FeatureStore"):
@@ -36,7 +37,10 @@ async def get_online_features(request: Request):
3637
raise HTTPException(status_code=500, detail="Uneven number of columns")
3738

3839
entity_rows = [
39-
{k: v.val[idx] for k, v in request_proto.entities.items()}
40+
{
41+
k: feast_value_type_to_python_type(v.val[idx])
42+
for k, v in request_proto.entities.items()
43+
}
4044
for idx in range(num_entities)
4145
]
4246

@@ -45,7 +49,9 @@ async def get_online_features(request: Request):
4549
).proto
4650

4751
# Convert the Protobuf object to JSON and return it
48-
return MessageToDict(response_proto, preserving_proto_field_name=True)
52+
return MessageToDict( # type: ignore
53+
response_proto, preserving_proto_field_name=True, float_precision=18
54+
)
4955
except Exception as e:
5056
# Print the original exception on the server side
5157
logger.exception(e)

sdk/python/feast/infra/aws.py

Lines changed: 58 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import base64
2+
import hashlib
23
import logging
34
import os
5+
import subprocess
46
import uuid
57
from datetime import datetime
68
from pathlib import Path
@@ -10,6 +12,7 @@
1012

1113
from colorama import Fore, Style
1214

15+
from feast import flags_helper
1316
from feast.constants import (
1417
AWS_LAMBDA_FEATURE_SERVER_IMAGE,
1518
AWS_LAMBDA_FEATURE_SERVER_REPOSITORY,
@@ -88,18 +91,18 @@ def update_infra(
8891
)
8992

9093
ecr_client = boto3.client("ecr")
94+
docker_image_version = _get_docker_image_version()
9195
repository_uri = self._create_or_get_repository_uri(ecr_client)
92-
version = _get_version_for_aws()
9396
# Only download & upload the docker image if it doesn't already exist in ECR
9497
if not ecr_client.batch_get_image(
9598
repositoryName=AWS_LAMBDA_FEATURE_SERVER_REPOSITORY,
96-
imageIds=[{"imageTag": version}],
99+
imageIds=[{"imageTag": docker_image_version}],
97100
).get("images"):
98101
image_uri = self._upload_docker_image(
99-
ecr_client, repository_uri, version
102+
ecr_client, repository_uri, docker_image_version
100103
)
101104
else:
102-
image_uri = f"{repository_uri}:{version}"
105+
image_uri = f"{repository_uri}:{docker_image_version}"
103106

104107
self._deploy_feature_server(project, image_uri)
105108

@@ -154,11 +157,10 @@ def _deploy_feature_server(self, project: str, image_uri: str):
154157
# feature views, feature services, and other definitions does not update lambda).
155158
_logger.info(" Updating AWS Lambda...")
156159

157-
lambda_client.update_function_configuration(
158-
FunctionName=resource_name,
159-
Environment={
160-
"Variables": {FEATURE_STORE_YAML_ENV_NAME: config_base64}
161-
},
160+
aws_utils.update_lambda_function_environment(
161+
lambda_client,
162+
resource_name,
163+
{"Variables": {FEATURE_STORE_YAML_ENV_NAME: config_base64}},
162164
)
163165

164166
api = aws_utils.get_first_api_gateway(api_gateway_client, resource_name)
@@ -235,7 +237,7 @@ def get_feature_server_endpoint(self) -> Optional[str]:
235237
return f"https://{api_id}.execute-api.{region}.amazonaws.com"
236238

237239
def _upload_docker_image(
238-
self, ecr_client, repository_uri: str, version: str
240+
self, ecr_client, repository_uri: str, docker_image_version: str
239241
) -> str:
240242
"""
241243
Pulls the AWS Lambda docker image from Dockerhub and uploads it to AWS ECR.
@@ -258,12 +260,11 @@ def _upload_docker_image(
258260

259261
raise DockerDaemonNotRunning()
260262

263+
dockerhub_image = f"{AWS_LAMBDA_FEATURE_SERVER_IMAGE}:{docker_image_version}"
261264
_logger.info(
262-
f"Pulling remote image {Style.BRIGHT + Fore.GREEN}{AWS_LAMBDA_FEATURE_SERVER_IMAGE}{Style.RESET_ALL}"
265+
f"Pulling remote image {Style.BRIGHT + Fore.GREEN}{dockerhub_image}{Style.RESET_ALL}"
263266
)
264-
for line in docker_client.api.pull(
265-
AWS_LAMBDA_FEATURE_SERVER_IMAGE, stream=True, decode=True
266-
):
267+
for line in docker_client.api.pull(dockerhub_image, stream=True, decode=True):
267268
_logger.debug(f" {line}")
268269

269270
auth_token = ecr_client.get_authorization_token()["authorizationData"][0][
@@ -280,14 +281,14 @@ def _upload_docker_image(
280281
)
281282
_logger.debug(f" {login_status}")
282283

283-
image = docker_client.images.get(AWS_LAMBDA_FEATURE_SERVER_IMAGE)
284-
image_remote_name = f"{repository_uri}:{version}"
284+
image = docker_client.images.get(dockerhub_image)
285+
image_remote_name = f"{repository_uri}:{docker_image_version}"
285286
_logger.info(
286287
f"Pushing local image to remote {Style.BRIGHT + Fore.GREEN}{image_remote_name}{Style.RESET_ALL}"
287288
)
288289
image.tag(image_remote_name)
289290
for line in docker_client.api.push(
290-
repository_uri, tag=version, stream=True, decode=True
291+
repository_uri, tag=docker_image_version, stream=True, decode=True
291292
):
292293
_logger.debug(f" {line}")
293294

@@ -310,21 +311,53 @@ def _create_or_get_repository_uri(self, ecr_client):
310311

311312
def _get_lambda_name(project: str):
312313
lambda_prefix = AWS_LAMBDA_FEATURE_SERVER_REPOSITORY
313-
lambda_suffix = f"{project}-{_get_version_for_aws()}"
314+
lambda_suffix = f"{project}-{_get_docker_image_version()}"
314315
# AWS Lambda name can't have the length greater than 64 bytes.
315-
# This usually occurs during integration tests or when feast is
316-
# installed in editable mode (pip install -e), where feast version is long
316+
# This usually occurs during integration tests where feast version is long
317317
if len(lambda_prefix) + len(lambda_suffix) >= 63:
318-
lambda_suffix = base64.b64encode(lambda_suffix.encode()).decode()[:40]
318+
lambda_suffix = hashlib.md5(lambda_suffix.encode()).hexdigest()
319319
return f"{lambda_prefix}-{lambda_suffix}"
320320

321321

322-
def _get_version_for_aws():
323-
"""Returns Feast version with certain characters replaced.
322+
def _get_docker_image_version() -> str:
323+
"""Returns a version for the feature server Docker image.
324+
325+
For public Feast releases this equals to the Feast SDK version modified by replacing "." with "_".
326+
For example, Feast SDK version "0.14.1" would correspond to Docker image version "0_14_1".
327+
328+
For integration tests this equals to the git commit hash of HEAD. This is necessary,
329+
because integration tests need to use images built from the same commit hash.
330+
331+
During development (when Feast is installed in editable mode) this equals to the Feast SDK version
332+
modified by removing the "dev..." suffix and replacing "." with "_". For example, Feast SDK version
333+
"0.14.1.dev41+g1cbfa225.d20211103" would correspond to Docker image version "0_14_1". This way,
334+
Feast SDK will use an already existing Docker image built during the previous public release.
324335
325-
This allows the version to be included in names for AWS resources.
326336
"""
327-
return get_version().replace(".", "_").replace("+", "_")
337+
if flags_helper.is_test():
338+
# Note: this should be in sync with https://github.com/feast-dev/feast/blob/6fbe01b6e9a444dc77ec3328a54376f4d9387664/.github/workflows/pr_integration_tests.yml#L41
339+
return (
340+
subprocess.check_output(
341+
["git", "rev-parse", "HEAD"], cwd=Path(__file__).resolve().parent
342+
)
343+
.decode()
344+
.strip()
345+
)
346+
else:
347+
version = get_version()
348+
if "dev" in version:
349+
version = version[: version.find("dev") - 1].replace(".", "_")
350+
_logger.warning(
351+
"You are trying to use AWS Lambda feature server while Feast is in a development mode. "
352+
f"Feast will use a docker image version {version} derived from Feast SDK "
353+
f"version {get_version()}. If you want to update the Feast SDK version, make "
354+
"sure to first fetch all new release tags from Github and then reinstall the library:\n"
355+
"> git fetch --all --tags\n"
356+
"> pip install -e sdk/python"
357+
)
358+
else:
359+
version = version.replace(".", "_")
360+
return version
328361

329362

330363
class S3RegistryStore(RegistryStore):

sdk/python/feast/infra/feature_servers/aws_lambda/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ COPY protos protos
99
COPY README.md README.md
1010

1111
# Install Feast for AWS with Lambda dependencies
12-
RUN pip3 install -e 'sdk/python[aws,redis]'
12+
RUN pip3 install -e 'sdk/python[aws]'
1313
RUN pip3 install -r sdk/python/feast/infra/feature_servers/aws_lambda/requirements.txt --target "${LAMBDA_TASK_ROOT}"
1414

1515
# Set the CMD to your handler (could also be done as a parameter override outside of the Dockerfile)

sdk/python/feast/infra/utils/aws_utils.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import os
33
import tempfile
44
import uuid
5-
from typing import Dict, Iterator, Optional, Tuple
5+
from typing import Any, Dict, Iterator, Optional, Tuple
66

77
import pandas as pd
88
import pyarrow as pa
@@ -60,6 +60,7 @@ def get_bucket_and_key(s3_path: str) -> Tuple[str, str]:
6060
wait=wait_exponential(multiplier=1, max=4),
6161
retry=retry_if_exception_type(ConnectionClosedError),
6262
stop=stop_after_attempt(5),
63+
reraise=True,
6364
)
6465
def execute_redshift_statement_async(
6566
redshift_data_client, cluster_id: str, database: str, user: str, query: str
@@ -96,6 +97,7 @@ class RedshiftStatementNotFinishedError(Exception):
9697
wait=wait_exponential(multiplier=1, max=30),
9798
retry=retry_if_exception_type(RedshiftStatementNotFinishedError),
9899
stop=stop_after_delay(300), # 300 seconds
100+
reraise=True,
99101
)
100102
def wait_for_redshift_statement(redshift_data_client, statement: dict) -> None:
101103
"""Waits for the Redshift statement to finish. Raises RedshiftQueryError if the statement didn't succeed.
@@ -426,6 +428,29 @@ def delete_lambda_function(lambda_client, function_name: str) -> Dict:
426428
return lambda_client.delete_function(FunctionName=function_name)
427429

428430

431+
@retry(
432+
wait=wait_exponential(multiplier=1, max=4),
433+
retry=retry_if_exception_type(ClientError),
434+
stop=stop_after_attempt(5),
435+
reraise=True,
436+
)
437+
def update_lambda_function_environment(
438+
lambda_client, function_name: str, environment: Dict[str, Any]
439+
) -> None:
440+
"""
441+
Update AWS Lambda function environment. The function is retried multiple times in case another action is
442+
currently being run on the lambda (e.g. it's being created or being updated in parallel).
443+
Args:
444+
lambda_client: AWS Lambda client.
445+
function_name: Name of the AWS Lambda function.
446+
environment: The desired lambda environment.
447+
448+
"""
449+
lambda_client.update_function_configuration(
450+
FunctionName=function_name, Environment=environment
451+
)
452+
453+
429454
def get_first_api_gateway(api_gateway_client, api_gateway_name: str) -> Optional[Dict]:
430455
"""
431456
Get the first API Gateway with the given name. Note, that API Gateways can have the same name.

sdk/python/feast/online_response.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,12 @@ def __init__(self, online_response_proto: GetOnlineFeaturesResponse):
4545
online_response_proto: GetOnlineResponse proto object to construct from.
4646
"""
4747
self.proto = online_response_proto
48+
# Delete DUMMY_ENTITY_ID from proto if it exists
49+
for item in self.proto.field_values:
50+
if DUMMY_ENTITY_ID in item.statuses:
51+
del item.statuses[DUMMY_ENTITY_ID]
52+
if DUMMY_ENTITY_ID in item.fields:
53+
del item.fields[DUMMY_ENTITY_ID]
4854

4955
@property
5056
def field_values(self):
@@ -57,13 +63,9 @@ def to_dict(self) -> Dict[str, Any]:
5763
"""
5864
Converts GetOnlineFeaturesResponse features into a dictionary form.
5965
"""
60-
fields = [
61-
k
62-
for row in self.field_values
63-
for k, _ in row.statuses.items()
64-
if k != DUMMY_ENTITY_ID
65-
]
66-
features_dict: Dict[str, List[Any]] = {k: list() for k in fields}
66+
features_dict: Dict[str, List[Any]] = {
67+
k: list() for row in self.field_values for k, _ in row.statuses.items()
68+
}
6769

6870
for row in self.field_values:
6971
for feature in features_dict.keys():
@@ -77,9 +79,7 @@ def to_df(self) -> pd.DataFrame:
7779
Converts GetOnlineFeaturesResponse features into Panda dataframe form.
7880
"""
7981

80-
return pd.DataFrame(self.to_dict()).drop(
81-
DUMMY_ENTITY_ID, axis=1, errors="ignore"
82-
)
82+
return pd.DataFrame(self.to_dict())
8383

8484

8585
def _infer_online_entity_rows(

sdk/python/feast/repo_config.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,11 @@
4444
"gcp_cloudrun": "feast.infra.feature_servers.gcp_cloudrun.config.GcpCloudRunFeatureServerConfig",
4545
}
4646

47+
FEATURE_SERVER_TYPE_FOR_PROVIDER = {
48+
"aws": "aws_lambda",
49+
"gcp": "gcp_cloudrun",
50+
}
51+
4752

4853
class FeastBaseModel(BaseModel):
4954
""" Feast Pydantic Configuration Class """
@@ -226,15 +231,12 @@ def _validate_feature_server_config(cls, values):
226231
if "provider" not in values:
227232
raise FeastProviderNotSetError()
228233

229-
# Make sure that the type is not set, since we will set it based on the provider.
230-
if "type" in values["feature_server"]:
231-
raise FeastFeatureServerTypeSetError(values["feature_server"]["type"])
232-
233-
# Set the default type. We only support AWS Lambda for now.
234-
if values["provider"] == "aws":
235-
values["feature_server"]["type"] = "aws_lambda"
236-
237-
feature_server_type = values["feature_server"]["type"]
234+
feature_server_type = FEATURE_SERVER_TYPE_FOR_PROVIDER.get(values["provider"])
235+
defined_type = values["feature_server"].get("type")
236+
# Make sure that the type is either not set, or set correctly, since it's defined by the provider
237+
if defined_type not in (None, feature_server_type):
238+
raise FeastFeatureServerTypeSetError(defined_type)
239+
values["feature_server"]["type"] = feature_server_type
238240

239241
# Validate the dict to ensure one of the union types match
240242
try:

sdk/python/feast/type_map.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,11 @@ def feast_value_type_to_python_type(field_value_proto: ProtoValue) -> Any:
4747
Returns:
4848
Python native type representation/version of the given field_value_proto
4949
"""
50-
field_value_dict = MessageToDict(field_value_proto)
50+
field_value_dict = MessageToDict(field_value_proto, float_precision=18) # type: ignore
51+
52+
# This can happen when proto_json.patch() has been called before this call, which is true for a feature server
53+
if not isinstance(field_value_dict, dict):
54+
return field_value_dict
5155

5256
for k, v in field_value_dict.items():
5357
if "List" in k:

sdk/python/setup.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
"tqdm==4.*",
6565
"fastapi>=0.68.0",
6666
"uvicorn[standard]>=0.14.0",
67+
"proto-plus<1.19.7",
6768
]
6869

6970
GCP_REQUIRED = [
@@ -113,7 +114,7 @@
113114
"firebase-admin==4.5.2",
114115
"pre-commit",
115116
"assertpy==1.1",
116-
"pip-tools"
117+
"pip-tools",
117118
] + GCP_REQUIRED + REDIS_REQUIRED + AWS_REQUIRED
118119

119120
DEV_REQUIRED = ["mypy-protobuf==1.*", "grpcio-testing==1.*"] + CI_REQUIRED

0 commit comments

Comments
 (0)