Skip to content

Commit f506207

Browse files
authored
E2E tests support for jobservice's control loop (feast-dev#1267)
Signed-off-by: Oleksii Moskalenko <[email protected]>
1 parent eaaf233 commit f506207

File tree

3 files changed

+43
-22
lines changed

3 files changed

+43
-22
lines changed

tests/e2e/conftest.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@ def pytest_addoption(parser):
3030
parser.addoption("--feast-project", action="store", default="default")
3131
parser.addoption("--statsd-url", action="store", default="localhost:8125")
3232
parser.addoption("--prometheus-url", action="store", default="localhost:9102")
33+
parser.addoption(
34+
"--scheduled-streaming-job",
35+
action="store_true",
36+
help="When set tests won't manually start streaming jobs,"
37+
" instead jobservice's loop is responsible for that",
38+
)
3339

3440

3541
def pytest_runtest_setup(item):

tests/e2e/test_online_features.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ def test_offline_ingestion_from_bq_view(pytestconfig, bq_dataset, feast_client:
9797

9898

9999
def test_streaming_ingestion(
100-
feast_client: Client, local_staging_path: str, kafka_server
100+
feast_client: Client, local_staging_path: str, kafka_server, pytestconfig
101101
):
102102
entity = Entity(name="s2id", description="S2id", value_type=ValueType.INT64,)
103103
kafka_broker = f"{kafka_server[0]}:{kafka_server[1]}"
@@ -124,12 +124,14 @@ def test_streaming_ingestion(
124124
feast_client.apply(entity)
125125
feast_client.apply(feature_table)
126126

127-
job = feast_client.start_stream_to_online_ingestion(feature_table)
128-
assert job.get_feature_table() == feature_table.name
129-
130-
wait_retry_backoff(
131-
lambda: (None, job.get_status() == SparkJobStatus.IN_PROGRESS), 120
132-
)
127+
if not pytestconfig.getoption("scheduled_streaming_job"):
128+
job = feast_client.start_stream_to_online_ingestion(feature_table)
129+
assert job.get_feature_table() == feature_table.name
130+
wait_retry_backoff(
131+
lambda: (None, job.get_status() == SparkJobStatus.IN_PROGRESS), 120
132+
)
133+
else:
134+
job = None
133135

134136
wait_retry_backoff(
135137
lambda: (None, check_consumer_exist(kafka_broker, topic_name)), 120
@@ -148,7 +150,10 @@ def test_streaming_ingestion(
148150
feature_names=["drivers_stream:unique_drivers"],
149151
)
150152
finally:
151-
job.cancel()
153+
if job:
154+
job.cancel()
155+
else:
156+
feast_client.delete_feature_table(feature_table.name)
152157

153158
pd.testing.assert_frame_equal(
154159
ingested[["s2id", "drivers_stream:unique_drivers"]],

tests/e2e/test_validation.py

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,25 @@ def create_schema(kafka_broker, topic_name, feature_table_name):
6565
return entity, feature_table
6666

6767

68-
def test_validation_with_ge(feast_client: Client, kafka_server):
68+
def start_job(feast_client: Client, feature_table: FeatureTable, pytestconfig):
69+
if pytestconfig.getoption("scheduled_streaming_job"):
70+
return
71+
72+
job = feast_client.start_stream_to_online_ingestion(feature_table)
73+
wait_retry_backoff(
74+
lambda: (None, job.get_status() == SparkJobStatus.IN_PROGRESS), 120
75+
)
76+
return job
77+
78+
79+
def stop_job(job, feast_client: Client, feature_table: FeatureTable):
80+
if job:
81+
job.cancel()
82+
else:
83+
feast_client.delete_feature_table(feature_table.name)
84+
85+
86+
def test_validation_with_ge(feast_client: Client, kafka_server, pytestconfig):
6987
kafka_broker = f"{kafka_server[0]}:{kafka_server[1]}"
7088
topic_name = f"avro-{uuid.uuid4()}"
7189

@@ -82,11 +100,7 @@ def test_validation_with_ge(feast_client: Client, kafka_server):
82100
udf = create_validation_udf("testUDF", expectations, feature_table)
83101
apply_validation(feast_client, feature_table, udf, validation_window_secs=1)
84102

85-
job = feast_client.start_stream_to_online_ingestion(feature_table)
86-
87-
wait_retry_backoff(
88-
lambda: (None, job.get_status() == SparkJobStatus.IN_PROGRESS), 120
89-
)
103+
job = start_job(feast_client, feature_table, pytestconfig)
90104

91105
wait_retry_backoff(
92106
lambda: (None, check_consumer_exist(kafka_broker, topic_name)), 120
@@ -117,7 +131,7 @@ def test_validation_with_ge(feast_client: Client, kafka_server):
117131
expected_ingested_count=test_data.shape[0] - len(invalid_idx),
118132
)
119133
finally:
120-
job.cancel()
134+
stop_job(job, feast_client, feature_table)
121135

122136
test_data["num"] = test_data["num"].astype(np.float64)
123137
test_data["num"].iloc[invalid_idx] = np.nan
@@ -133,7 +147,7 @@ def test_validation_with_ge(feast_client: Client, kafka_server):
133147

134148
@pytest.mark.env("local")
135149
def test_validation_reports_metrics(
136-
feast_client: Client, kafka_server, statsd_server: StatsDServer
150+
feast_client: Client, kafka_server, statsd_server: StatsDServer, pytestconfig
137151
):
138152
kafka_broker = f"{kafka_server[0]}:{kafka_server[1]}"
139153
topic_name = f"avro-{uuid.uuid4()}"
@@ -153,11 +167,7 @@ def test_validation_reports_metrics(
153167
udf = create_validation_udf("testUDF", expectations, feature_table)
154168
apply_validation(feast_client, feature_table, udf, validation_window_secs=10)
155169

156-
job = feast_client.start_stream_to_online_ingestion(feature_table)
157-
158-
wait_retry_backoff(
159-
lambda: (None, job.get_status() == SparkJobStatus.IN_PROGRESS), 120
160-
)
170+
job = start_job(feast_client, feature_table, pytestconfig)
161171

162172
wait_retry_backoff(
163173
lambda: (None, check_consumer_exist(kafka_broker, topic_name)), 120
@@ -196,7 +206,7 @@ def test_validation_reports_metrics(
196206
expected_ingested_count=test_data.shape[0] - len(invalid_idx),
197207
)
198208
finally:
199-
job.cancel()
209+
stop_job(job, feast_client, feature_table)
200210

201211
expected_metrics = [
202212
(

0 commit comments

Comments
 (0)