Skip to content

Commit 41cb476

Browse files
chore: Use push source in templates (feast-dev#3164)
* Modify local templates to actually use the push source Signed-off-by: Felix Wang <[email protected]> * Fix AWS template Signed-off-by: Felix Wang <[email protected]> * Fix GCP template Signed-off-by: Felix Wang <[email protected]> * Fix Postgres template Signed-off-by: Felix Wang <[email protected]> * Fix Cassandra template Signed-off-by: Felix Wang <[email protected]> * Fix Hbase template Signed-off-by: Felix Wang <[email protected]> * Fix Snowflake template Signed-off-by: Felix Wang <[email protected]> * Fix bug with duplicate features requested Signed-off-by: Danny Chiao <[email protected]> Signed-off-by: Felix Wang <[email protected]> Signed-off-by: Danny Chiao <[email protected]> Co-authored-by: Danny Chiao <[email protected]>
1 parent fafa3e3 commit 41cb476

File tree

16 files changed

+428
-108
lines changed

16 files changed

+428
-108
lines changed

sdk/python/feast/templates/aws/feature_repo/example_repo.py

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,6 @@
5959
tags={"team": "driver_performance"},
6060
)
6161

62-
# Defines a way to push data (to be available offline, online or both) into Feast.
63-
driver_stats_push_source = PushSource(
64-
name="driver_stats_push_source",
65-
batch_source=driver_stats_source,
66-
)
67-
6862
# Define a request data source which encodes features / information only
6963
# available at request time (e.g. part of the user initiated HTTP request)
7064
input_request = RequestSource(
@@ -103,3 +97,48 @@ def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame:
10397
driver_activity_v2 = FeatureService(
10498
name="driver_activity_v2", features=[driver_stats_fv, transformed_conv_rate]
10599
)
100+
101+
# Defines a way to push data (to be available offline, online or both) into Feast.
102+
driver_stats_push_source = PushSource(
103+
name="driver_stats_push_source",
104+
batch_source=driver_stats_source,
105+
)
106+
107+
# Defines a slightly modified version of the feature view from above, where the source
108+
# has been changed to the push source. This allows fresh features to be directly pushed
109+
# to the online store for this feature view.
110+
driver_stats_fresh_fv = FeatureView(
111+
name="driver_hourly_stats_fresh",
112+
entities=[driver],
113+
ttl=timedelta(days=1),
114+
schema=[
115+
Field(name="conv_rate", dtype=Float32),
116+
Field(name="acc_rate", dtype=Float32),
117+
Field(name="avg_daily_trips", dtype=Int64),
118+
],
119+
online=True,
120+
source=driver_stats_push_source, # Changed from above
121+
tags={"team": "driver_performance"},
122+
)
123+
124+
125+
# Define an on demand feature view which can generate new features based on
126+
# existing feature views and RequestSource features
127+
@on_demand_feature_view(
128+
sources=[driver_stats_fresh_fv, input_request], # relies on fresh version of FV
129+
schema=[
130+
Field(name="conv_rate_plus_val1", dtype=Float64),
131+
Field(name="conv_rate_plus_val2", dtype=Float64),
132+
],
133+
)
134+
def transformed_conv_rate_fresh(inputs: pd.DataFrame) -> pd.DataFrame:
135+
df = pd.DataFrame()
136+
df["conv_rate_plus_val1"] = inputs["conv_rate"] + inputs["val_to_add"]
137+
df["conv_rate_plus_val2"] = inputs["conv_rate"] + inputs["val_to_add_2"]
138+
return df
139+
140+
141+
driver_activity_v3 = FeatureService(
142+
name="driver_activity_v3",
143+
features=[driver_stats_fresh_fv, transformed_conv_rate_fresh],
144+
)

sdk/python/feast/templates/aws/feature_repo/test_workflow.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,25 @@ def run_demo():
2222
store.materialize_incremental(end_date=datetime.now())
2323

2424
print("\n--- Online features ---")
25-
fetch_online_features(store, use_feature_service=False)
25+
fetch_online_features(store)
2626

2727
print("\n--- Online features retrieved (instead) through a feature service---")
28-
fetch_online_features(store, use_feature_service=True)
28+
fetch_online_features(store, source="feature_service")
29+
30+
print(
31+
"\n--- Online features retrieved (using feature service v3, which uses a feature view with a push source---"
32+
)
33+
fetch_online_features(store, source="push")
2934

3035
print("\n--- Simulate a stream event ingestion of the hourly stats df ---")
3136
event_df = pd.DataFrame.from_dict(
3237
{
3338
"driver_id": [1001],
3439
"event_timestamp": [
35-
datetime(2021, 5, 13, 10, 59, 42),
40+
datetime.now(),
3641
],
3742
"created": [
38-
datetime(2021, 5, 13, 10, 59, 42),
43+
datetime.now(),
3944
],
4045
"conv_rate": [1.0],
4146
"acc_rate": [1.0],
@@ -46,7 +51,7 @@ def run_demo():
4651
store.push("driver_stats_push_source", event_df, to=PushMode.ONLINE_AND_OFFLINE)
4752

4853
print("\n--- Online features again with updated values from a stream push---")
49-
fetch_online_features(store, use_feature_service=True)
54+
fetch_online_features(store, source="push")
5055

5156
print("\n--- Run feast teardown ---")
5257
subprocess.run(["feast", "teardown"])
@@ -89,7 +94,7 @@ def fetch_historical_features_entity_df(store: FeatureStore, for_batch_scoring:
8994
print(training_df.head())
9095

9196

92-
def fetch_online_features(store, use_feature_service: bool):
97+
def fetch_online_features(store, source: str = ""):
9398
entity_rows = [
9499
# {join_key: entity_value}
95100
{
@@ -103,12 +108,13 @@ def fetch_online_features(store, use_feature_service: bool):
103108
"val_to_add_2": 2002,
104109
},
105110
]
106-
if use_feature_service:
111+
if source == "feature_service":
107112
features_to_fetch = store.get_feature_service("driver_activity_v1")
113+
elif source == "push":
114+
features_to_fetch = store.get_feature_service("driver_activity_v3")
108115
else:
109116
features_to_fetch = [
110117
"driver_hourly_stats:acc_rate",
111-
"driver_hourly_stats:avg_daily_trips",
112118
"transformed_conv_rate:conv_rate_plus_val1",
113119
"transformed_conv_rate:conv_rate_plus_val2",
114120
]

sdk/python/feast/templates/cassandra/feature_repo/example_repo.py

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,6 @@
5454
tags={"team": "driver_performance"},
5555
)
5656

57-
# Defines a way to push data (to be available offline, online or both) into Feast.
58-
driver_stats_push_source = PushSource(
59-
name="driver_stats_push_source",
60-
batch_source=driver_stats_source,
61-
)
62-
6357
# Define a request data source which encodes features / information only
6458
# available at request time (e.g. part of the user initiated HTTP request)
6559
input_request = RequestSource(
@@ -98,3 +92,48 @@ def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame:
9892
driver_activity_v2 = FeatureService(
9993
name="driver_activity_v2", features=[driver_stats_fv, transformed_conv_rate]
10094
)
95+
96+
# Defines a way to push data (to be available offline, online or both) into Feast.
97+
driver_stats_push_source = PushSource(
98+
name="driver_stats_push_source",
99+
batch_source=driver_stats_source,
100+
)
101+
102+
# Defines a slightly modified version of the feature view from above, where the source
103+
# has been changed to the push source. This allows fresh features to be directly pushed
104+
# to the online store for this feature view.
105+
driver_stats_fresh_fv = FeatureView(
106+
name="driver_hourly_stats_fresh",
107+
entities=[driver],
108+
ttl=timedelta(days=1),
109+
schema=[
110+
Field(name="conv_rate", dtype=Float32),
111+
Field(name="acc_rate", dtype=Float32),
112+
Field(name="avg_daily_trips", dtype=Int64),
113+
],
114+
online=True,
115+
source=driver_stats_push_source, # Changed from above
116+
tags={"team": "driver_performance"},
117+
)
118+
119+
120+
# Define an on demand feature view which can generate new features based on
121+
# existing feature views and RequestSource features
122+
@on_demand_feature_view(
123+
sources=[driver_stats_fresh_fv, input_request], # relies on fresh version of FV
124+
schema=[
125+
Field(name="conv_rate_plus_val1", dtype=Float64),
126+
Field(name="conv_rate_plus_val2", dtype=Float64),
127+
],
128+
)
129+
def transformed_conv_rate_fresh(inputs: pd.DataFrame) -> pd.DataFrame:
130+
df = pd.DataFrame()
131+
df["conv_rate_plus_val1"] = inputs["conv_rate"] + inputs["val_to_add"]
132+
df["conv_rate_plus_val2"] = inputs["conv_rate"] + inputs["val_to_add_2"]
133+
return df
134+
135+
136+
driver_activity_v3 = FeatureService(
137+
name="driver_activity_v3",
138+
features=[driver_stats_fresh_fv, transformed_conv_rate_fresh],
139+
)

sdk/python/feast/templates/cassandra/feature_repo/test_workflow.py

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,25 @@ def run_demo():
2222
store.materialize_incremental(end_date=datetime.now())
2323

2424
print("\n--- Online features ---")
25-
fetch_online_features(store, use_feature_service=False)
25+
fetch_online_features(store)
26+
27+
print("\n--- Online features retrieved (instead) through a feature service---")
28+
fetch_online_features(store, source="feature_service")
29+
30+
print(
31+
"\n--- Online features retrieved (using feature service v3, which uses a feature view with a push source---"
32+
)
33+
fetch_online_features(store, source="push")
2634

2735
print("\n--- Simulate a stream event ingestion of the hourly stats df ---")
2836
event_df = pd.DataFrame.from_dict(
2937
{
3038
"driver_id": [1001],
3139
"event_timestamp": [
32-
datetime(2021, 5, 13, 10, 59, 42),
40+
datetime.now(),
3341
],
3442
"created": [
35-
datetime(2021, 5, 13, 10, 59, 42),
43+
datetime.now(),
3644
],
3745
"conv_rate": [1.0],
3846
"acc_rate": [1.0],
@@ -43,10 +51,7 @@ def run_demo():
4351
store.push("driver_stats_push_source", event_df, to=PushMode.ONLINE_AND_OFFLINE)
4452

4553
print("\n--- Online features again with updated values from a stream push---")
46-
fetch_online_features(store, use_feature_service=True)
47-
48-
print("\n--- Online features retrieved (instead) through a feature service---")
49-
fetch_online_features(store, use_feature_service=True)
54+
fetch_online_features(store, source="push")
5055

5156
print("\n--- Run feast teardown ---")
5257
subprocess.run(["feast", "teardown"])
@@ -89,7 +94,7 @@ def fetch_historical_features_entity_df(store: FeatureStore, for_batch_scoring:
8994
print(training_df.head())
9095

9196

92-
def fetch_online_features(store, use_feature_service: bool):
97+
def fetch_online_features(store, source: str = ""):
9398
entity_rows = [
9499
# {join_key: entity_value}
95100
{
@@ -103,12 +108,13 @@ def fetch_online_features(store, use_feature_service: bool):
103108
"val_to_add_2": 2002,
104109
},
105110
]
106-
if use_feature_service:
111+
if source == "feature_service":
107112
features_to_fetch = store.get_feature_service("driver_activity_v1")
113+
elif source == "push":
114+
features_to_fetch = store.get_feature_service("driver_activity_v3")
108115
else:
109116
features_to_fetch = [
110117
"driver_hourly_stats:acc_rate",
111-
"driver_hourly_stats:avg_daily_trips",
112118
"transformed_conv_rate:conv_rate_plus_val1",
113119
"transformed_conv_rate:conv_rate_plus_val2",
114120
]

sdk/python/feast/templates/gcp/feature_repo/example_repo.py

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,6 @@
6363
tags={"team": "driver_performance"},
6464
)
6565

66-
# Defines a way to push data (to be available offline, online or both) into Feast.
67-
driver_stats_push_source = PushSource(
68-
name="driver_stats_push_source",
69-
batch_source=driver_stats_source,
70-
)
71-
7266
# Define a request data source which encodes features / information only
7367
# available at request time (e.g. part of the user initiated HTTP request)
7468
input_request = RequestSource(
@@ -107,3 +101,48 @@ def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame:
107101
driver_activity_v2 = FeatureService(
108102
name="driver_activity_v2", features=[driver_stats_fv, transformed_conv_rate]
109103
)
104+
105+
# Defines a way to push data (to be available offline, online or both) into Feast.
106+
driver_stats_push_source = PushSource(
107+
name="driver_stats_push_source",
108+
batch_source=driver_stats_source,
109+
)
110+
111+
# Defines a slightly modified version of the feature view from above, where the source
112+
# has been changed to the push source. This allows fresh features to be directly pushed
113+
# to the online store for this feature view.
114+
driver_stats_fresh_fv = FeatureView(
115+
name="driver_hourly_stats_fresh",
116+
entities=[driver],
117+
ttl=timedelta(weeks=52 * 10), # Set to be very long for example purposes only
118+
schema=[
119+
Field(name="conv_rate", dtype=Float32),
120+
Field(name="acc_rate", dtype=Float32),
121+
Field(name="avg_daily_trips", dtype=Int64),
122+
],
123+
online=True,
124+
source=driver_stats_push_source, # Changed from above
125+
tags={"team": "driver_performance"},
126+
)
127+
128+
129+
# Define an on demand feature view which can generate new features based on
130+
# existing feature views and RequestSource features
131+
@on_demand_feature_view(
132+
sources=[driver_stats_fresh_fv, input_request], # relies on fresh version of FV
133+
schema=[
134+
Field(name="conv_rate_plus_val1", dtype=Float64),
135+
Field(name="conv_rate_plus_val2", dtype=Float64),
136+
],
137+
)
138+
def transformed_conv_rate_fresh(inputs: pd.DataFrame) -> pd.DataFrame:
139+
df = pd.DataFrame()
140+
df["conv_rate_plus_val1"] = inputs["conv_rate"] + inputs["val_to_add"]
141+
df["conv_rate_plus_val2"] = inputs["conv_rate"] + inputs["val_to_add_2"]
142+
return df
143+
144+
145+
driver_activity_v3 = FeatureService(
146+
name="driver_activity_v3",
147+
features=[driver_stats_fresh_fv, transformed_conv_rate_fresh],
148+
)

sdk/python/feast/templates/gcp/feature_repo/test_workflow.py

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import random
12
import subprocess
23
from datetime import datetime
34

@@ -22,24 +23,29 @@ def run_demo():
2223
store.materialize_incremental(end_date=datetime.now())
2324

2425
print("\n--- Online features ---")
25-
fetch_online_features(store, use_feature_service=False)
26+
fetch_online_features(store)
2627

2728
print("\n--- Online features retrieved (instead) through a feature service---")
28-
fetch_online_features(store, use_feature_service=True)
29+
fetch_online_features(store, source="feature_service")
30+
31+
print(
32+
"\n--- Online features retrieved (using feature service v3, which uses a feature view with a push source---"
33+
)
34+
fetch_online_features(store, source="push")
2935

3036
print("\n--- Simulate a stream event ingestion of the hourly stats df ---")
3137
event_df = pd.DataFrame.from_dict(
3238
{
3339
"driver_id": [1001],
3440
"event_timestamp": [
35-
datetime(2021, 5, 13, 10, 59, 42),
41+
datetime.now(),
3642
],
3743
"created": [
38-
datetime(2021, 5, 13, 10, 59, 42),
44+
datetime.now(),
3945
],
4046
"conv_rate": [1.0],
41-
"acc_rate": [1.0],
42-
"avg_daily_trips": [1000],
47+
"acc_rate": [1.0 + random.random()],
48+
"avg_daily_trips": [int(1000 * random.random())],
4349
}
4450
)
4551
print(event_df)
@@ -48,7 +54,7 @@ def run_demo():
4854
store.push("driver_stats_push_source", event_df, to=PushMode.ONLINE)
4955

5056
print("\n--- Online features again with updated values from a stream push---")
51-
fetch_online_features(store, use_feature_service=True)
57+
fetch_online_features(store, source="push")
5258

5359
print("\n--- Run feast teardown ---")
5460
subprocess.run(["feast", "teardown"])
@@ -91,7 +97,7 @@ def fetch_historical_features_entity_df(store: FeatureStore, for_batch_scoring:
9197
print(training_df.head())
9298

9399

94-
def fetch_online_features(store, use_feature_service: bool):
100+
def fetch_online_features(store, source: str = ""):
95101
entity_rows = [
96102
# {join_key: entity_value}
97103
{
@@ -105,12 +111,13 @@ def fetch_online_features(store, use_feature_service: bool):
105111
"val_to_add_2": 2002,
106112
},
107113
]
108-
if use_feature_service:
114+
if source == "feature_service":
109115
features_to_fetch = store.get_feature_service("driver_activity_v1")
116+
elif source == "push":
117+
features_to_fetch = store.get_feature_service("driver_activity_v3")
110118
else:
111119
features_to_fetch = [
112120
"driver_hourly_stats:acc_rate",
113-
"driver_hourly_stats:avg_daily_trips",
114121
"transformed_conv_rate:conv_rate_plus_val1",
115122
"transformed_conv_rate:conv_rate_plus_val2",
116123
]

0 commit comments

Comments
 (0)