Skip to content

Commit 1c4ba12

Browse files
authored
Reporting metrics from validation UDF (feast-dev#1256)
* report metrics from ge validation udf Signed-off-by: Oleksii Moskalenko <[email protected]> * e2e tests for validation metrics Signed-off-by: Oleksii Moskalenko <[email protected]> * parse metrics from prometheus Signed-off-by: Oleksii Moskalenko <[email protected]> * docker-compose test: pull js logs Signed-off-by: Oleksii Moskalenko <[email protected]> * better error message Signed-off-by: Oleksii Moskalenko <[email protected]> * enable statsd in docker compose Signed-off-by: Oleksii Moskalenko <[email protected]> * enable statsd in docker compose Signed-off-by: Oleksii Moskalenko <[email protected]> * enable statsd in docker compose Signed-off-by: Oleksii Moskalenko <[email protected]> * unique ft name Signed-off-by: Oleksii Moskalenko <[email protected]> * cleanup Signed-off-by: Oleksii Moskalenko <[email protected]>
1 parent fae8b65 commit 1c4ba12

File tree

12 files changed

+364
-24
lines changed

12 files changed

+364
-24
lines changed

infra/docker-compose/docker-compose.yml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ services:
3737
FEAST_HISTORICAL_FEATURE_OUTPUT_FORMAT: parquet
3838
FEAST_REDIS_HOST: redis
3939
FEAST_SPARK_INGESTION_JAR: ${INGESTION_JAR_PATH}
40+
FEAST_STATSD_ENABLED: "true"
41+
FEAST_STATSD_HOST: prometheus_statsd
42+
FEAST_STATSD_PORT: 9125
4043

4144
jupyter:
4245
image: gcr.io/kf-feast/feast-jupyter:${FEAST_VERSION}
@@ -106,4 +109,10 @@ services:
106109
redis:
107110
image: redis:5-alpine
108111
ports:
109-
- "6379:6379"
112+
- "6379:6379"
113+
114+
prometheus_statsd:
115+
image: prom/statsd-exporter:v0.12.1
116+
ports:
117+
- "9125:9125"
118+
- "9102:9102"

infra/scripts/test-docker-compose.sh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ clean_up () {
1313
ARG=$?
1414

1515
# Shut down docker-compose images
16+
1617
docker-compose down
1718

1819
exit $ARG
@@ -69,4 +70,4 @@ docker exec \
6970
-e DISABLE_FEAST_SERVICE_FIXTURES=true \
7071
--user root \
7172
feast_jupyter_1 bash \
72-
-c 'cd /feast/tests && python -m pip install -r requirements.txt && pytest e2e/ --ingestion-jar https://storage.googleapis.com/feast-jobs/spark/ingestion/feast-ingestion-spark-${FEAST_VERSION}.jar --redis-url redis:6379 --core-url core:6565 --serving-url online_serving:6566 --job-service-url jobservice:6568 --staging-path file:///shared/staging/ --kafka-brokers kafka:9092 --feast-version develop'
73+
-c 'cd /feast/tests && python -m pip install -r requirements.txt && pytest e2e/ --ingestion-jar https://storage.googleapis.com/feast-jobs/spark/ingestion/feast-ingestion-spark-${FEAST_VERSION}.jar --redis-url redis:6379 --core-url core:6565 --serving-url online_serving:6566 --job-service-url jobservice:6568 --staging-path file:///shared/staging/ --kafka-brokers kafka:9092 --statsd-url prometheus_statsd:9125 --prometheus-url prometheus_statsd:9102 --feast-version develop'

sdk/python/feast/contrib/validation/ge.py

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import io
22
import json
3+
import os
34
from typing import TYPE_CHECKING
45
from urllib.parse import urlparse
56

@@ -10,7 +11,7 @@
1011
from feast.staging.storage_client import get_staging_client
1112

1213
try:
13-
from great_expectations.core import ExpectationSuite
14+
from great_expectations.core import ExpectationConfiguration, ExpectationSuite
1415
from great_expectations.dataset import PandasDataset
1516
except ImportError:
1617
raise ImportError(
@@ -41,7 +42,28 @@ def __init__(self, name: str, pickled_code: bytes):
4142
self.pickled_code = pickled_code
4243

4344

44-
def create_validation_udf(name: str, expectations: ExpectationSuite) -> ValidationUDF:
45+
def drop_feature_table_prefix(
46+
expectation_configuration: ExpectationConfiguration, prefix
47+
):
48+
kwargs = expectation_configuration.kwargs
49+
for arg_name in ("column", "column_A", "column_B"):
50+
if arg_name not in kwargs:
51+
continue
52+
53+
if kwargs[arg_name].startswith(prefix):
54+
kwargs[arg_name] = kwargs[arg_name][len(prefix) :]
55+
56+
57+
def prepare_expectations(suite: ExpectationSuite, feature_table: "FeatureTable"):
58+
for expectation in suite.expectations:
59+
drop_feature_table_prefix(expectation, f"{feature_table.name}__")
60+
61+
return suite
62+
63+
64+
def create_validation_udf(
65+
name: str, expectations: ExpectationSuite, feature_table: "FeatureTable",
66+
) -> ValidationUDF:
4567
"""
4668
Wraps your expectations into Spark UDF.
4769
@@ -60,10 +82,25 @@ def create_validation_udf(name: str, expectations: ExpectationSuite) -> Validati
6082
6183
:param name
6284
:param expectations: collection of expectation gathered on training dataset
85+
:param feature_table
6386
:return: ValidationUDF with serialized code
6487
"""
6588

89+
expectations = prepare_expectations(expectations, feature_table)
90+
6691
def udf(df: pd.DataFrame) -> pd.Series:
92+
from datadog.dogstatsd import DogStatsd
93+
94+
reporter = (
95+
DogStatsd(
96+
host=os.environ["STATSD_HOST"],
97+
port=int(os.environ["STATSD_PORT"]),
98+
telemetry_min_flush_interval=0,
99+
)
100+
if os.getenv("STATSD_HOST") and os.getenv("STATSD_PORT")
101+
else DogStatsd()
102+
)
103+
67104
ds = PandasDataset.from_dataset(df)
68105
result = ds.validate(expectations, result_format="COMPLETE")
69106
valid_rows = pd.Series([True] * df.shape[0])
@@ -72,6 +109,32 @@ def udf(df: pd.DataFrame) -> pd.Series:
72109
if check.success:
73110
continue
74111

112+
unexpected_count = (
113+
check.result["unexpected_count"]
114+
if "unexpected_count" in check.result
115+
else df.shape[0]
116+
)
117+
118+
check_kwargs = check.expectation_config.kwargs
119+
check_kwargs.pop("result_format", None)
120+
check_name = "_".join(
121+
[check.expectation_config.expectation_type]
122+
+ [
123+
str(v)
124+
for v in check_kwargs.values()
125+
if isinstance(v, (str, int, float))
126+
]
127+
)
128+
129+
reporter.increment(
130+
"feast_feature_validation_check_failed",
131+
value=unexpected_count,
132+
tags=[
133+
f"feature_table:{os.getenv('FEAST_INGESTION_FEATURE_TABLE', 'unknown')}",
134+
f"check:{check_name}",
135+
],
136+
)
137+
75138
if check.exception_info["raised_exception"]:
76139
# ToDo: probably we should mark all rows as invalid
77140
continue
@@ -106,7 +169,7 @@ def apply_validation(
106169
staging_client = get_staging_client(staging_scheme, client._config)
107170

108171
pickled_code_fp = io.BytesIO(udf.pickled_code)
109-
remote_path = f"{staging_location}/udfs/{udf.name}.pickle"
172+
remote_path = f"{staging_location}/udfs/{feature_table.name}/{udf.name}.pickle"
110173
staging_client.upload_fileobj(
111174
pickled_code_fp, f"{udf.name}.pickle", remote_uri=urlparse(remote_path)
112175
)

sdk/python/feast/feature_table.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,3 +406,6 @@ def _update_from_feature_table(self, feature_table):
406406
self.stream_source = feature_table.stream_source
407407
self._created_timestamp = feature_table.created_timestamp
408408
self._last_updated_timestamp = feature_table.last_updated_timestamp
409+
410+
def __repr__(self):
411+
return f"FeatureTable <{self.name}>"

spark/ingestion/src/main/scala/feast/ingestion/StreamingPipeline.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,9 +171,19 @@ object StreamingPipeline extends BasePipeline with Serializable {
171171
val fileName = validationConfig.pickledCodePath.split("/").last
172172
val pickledCode = FileUtils.readFileToByteArray(new File(SparkFiles.get(fileName)))
173173

174+
val env = config.metrics match {
175+
case Some(c: StatsDConfig) =>
176+
Map(
177+
"STATSD_HOST" -> c.host,
178+
"STATSD_PORT" -> c.port.toString,
179+
"FEAST_INGESTION_FEATURE_TABLE" -> config.featureTable.name
180+
)
181+
case _ => Map.empty[String, String]
182+
}
183+
174184
UserDefinedPythonFunction(
175185
validationConfig.name,
176-
DynamicPythonFunction.create(pickledCode),
186+
DynamicPythonFunction.create(pickledCode, env),
177187
BooleanType,
178188
pythonEvalType = 200, // SQL_SCALAR_PANDAS_UDF (original constant is in private object)
179189
udfDeterministic = true

spark/ingestion/src/main/scala/org/apache/spark/api/python/DynamicPythonFunction.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,12 @@ object DynamicPythonFunction {
6161
)
6262
}
6363

64-
def create(pickledCode: Array[Byte], includePath: String = "libs/"): PythonFunction = {
65-
val envVars = new JHashMap[String, String]()
64+
def create(
65+
pickledCode: Array[Byte],
66+
env: Map[String, String] = Map.empty,
67+
includePath: String = "libs/"
68+
): PythonFunction = {
69+
val envVars = new JHashMap[String, String](env.asJava)
6670
val broadcasts = new JArrayList[Broadcast[PythonBroadcast]]()
6771

6872
if (!sys.env.contains("SPARK_HOME")) {

tests/e2e/conftest.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ def pytest_addoption(parser):
2828
parser.addoption("--feast-version", action="store")
2929
parser.addoption("--bq-project", action="store")
3030
parser.addoption("--feast-project", action="store", default="default")
31+
parser.addoption("--statsd-url", action="store", default="localhost:8125")
32+
parser.addoption("--prometheus-url", action="store", default="localhost:9102")
3133

3234

3335
def pytest_runtest_setup(item):
@@ -51,10 +53,15 @@ def pytest_runtest_setup(item):
5153
kafka_port,
5254
kafka_server,
5355
redis_server,
56+
statsd_server,
5457
zookeeper_server,
5558
)
5659
else:
57-
from .fixtures.external_services import kafka_server, redis_server # noqa
60+
from .fixtures.external_services import ( # type: ignore # noqa
61+
kafka_server,
62+
redis_server,
63+
statsd_server,
64+
)
5865

5966
if not os.environ.get("DISABLE_FEAST_SERVICE_FIXTURES"):
6067
from .fixtures.feast_services import * # type: ignore # noqa

tests/e2e/fixtures/client.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@
77
from pytest_redis.executor import RedisExecutor
88

99
from feast import Client
10+
from tests.e2e.fixtures.statsd_stub import StatsDServer
1011

1112

1213
@pytest.fixture
1314
def feast_client(
1415
pytestconfig,
1516
ingestion_job_jar,
1617
redis_server: RedisExecutor,
18+
statsd_server: StatsDServer,
1719
feast_core: Tuple[str, int],
1820
feast_serving: Tuple[str, int],
1921
local_staging_path,
@@ -44,6 +46,9 @@ def feast_client(
4446
local_staging_path, "historical_output"
4547
),
4648
ingestion_drop_invalid_rows=True,
49+
statsd_enabled=True,
50+
statsd_host=statsd_server.host,
51+
statsd_port=statsd_server.port,
4752
**job_service_env,
4853
)
4954

tests/e2e/fixtures/external_services.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
import pytest
22
from pytest_redis.executor import NoopRedis
33

4+
from tests.e2e.fixtures.statsd_stub import PrometheusStatsDServer
5+
46
__all__ = (
57
"feast_core",
68
"feast_serving",
79
"redis_server",
810
"kafka_server",
911
"enable_auth",
1012
"feast_jobservice",
13+
"statsd_server",
1114
)
1215

1316

@@ -44,3 +47,12 @@ def enable_auth():
4447
def feast_jobservice(pytestconfig):
4548
host, port = pytestconfig.getoption("job_service_url").split(":")
4649
return host, port
50+
51+
52+
@pytest.fixture(scope="session")
53+
def statsd_server(pytestconfig):
54+
host, port = pytestconfig.getoption("statsd_url").split(":")
55+
prometheus_host, prometheus_port = pytestconfig.getoption("prometheus_url").split(
56+
":"
57+
)
58+
return PrometheusStatsDServer(host, port, prometheus_host, prometheus_port)

tests/e2e/fixtures/services.py

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1+
import os
12
import pathlib
23
import shutil
3-
import tempfile
44

5+
import port_for
56
import pytest
67
import requests
78
from pytest_kafka import make_kafka_server, make_zookeeper_process
@@ -14,16 +15,23 @@
1415
"zookeeper_server",
1516
"postgres_server",
1617
"redis_server",
18+
"statsd_server",
1719
)
1820

21+
from tests.e2e.fixtures.statsd_stub import StatsDStub
22+
1923

2024
def download_kafka(version="2.12-2.6.0"):
21-
r = requests.get(f"https://downloads.apache.org/kafka/2.6.0/kafka_{version}.tgz")
22-
temp_dir = pathlib.Path(tempfile.mkdtemp())
23-
local_path = temp_dir / "kafka.tgz"
25+
temp_dir = pathlib.Path("/tmp")
26+
local_path = temp_dir / f"kafka_{version}.tgz"
27+
28+
if not os.path.isfile(local_path):
29+
r = requests.get(
30+
f"https://downloads.apache.org/kafka/2.6.0/kafka_{version}.tgz"
31+
)
2432

25-
with open(local_path, "wb") as f:
26-
f.write(r.content)
33+
with open(local_path, "wb") as f:
34+
f.write(r.content)
2735

2836
shutil.unpack_archive(str(local_path), str(temp_dir))
2937
return temp_dir / f"kafka_{version}" / "bin"
@@ -35,6 +43,15 @@ def kafka_server(kafka_port):
3543
return "localhost", port
3644

3745

46+
@pytest.fixture
47+
def statsd_server():
48+
port = port_for.select_random(None)
49+
server = StatsDStub(port=port)
50+
server.start()
51+
yield server
52+
server.stop()
53+
54+
3855
postgres_server = pg_factories.postgresql_proc(password="password")
3956
redis_server = redis_factories.redis_proc(
4057
executable=shutil.which("redis-server"), timeout=3600

0 commit comments

Comments
 (0)