Skip to content

Commit 59b4853

Browse files
authored
chore: Add example of getting historical features with entity SQL (feast-dev#3224)
* chore: Add example of getting historical features with entity SQL Signed-off-by: Danny Chiao <[email protected]> * chore: Add example of getting historical features with entity SQL Signed-off-by: Danny Chiao <[email protected]> * Call this out in running feast in production guide Signed-off-by: Danny Chiao <[email protected]> * Call this out in running feast in production guide Signed-off-by: Danny Chiao <[email protected]> * Call this out in running feast in production guide Signed-off-by: Danny Chiao <[email protected]> * lint Signed-off-by: Danny Chiao <[email protected]> * lint Signed-off-by: Danny Chiao <[email protected]> Signed-off-by: Danny Chiao <[email protected]>
1 parent a59c33a commit 59b4853

File tree

8 files changed

+371
-156
lines changed

8 files changed

+371
-156
lines changed

docs/getting-started/concepts/feature-retrieval.md

Lines changed: 181 additions & 149 deletions
Large diffs are not rendered by default.

docs/how-to-guides/running-feast-in-production.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,8 @@ This supports pushing feature values into Feast to both online or offline stores
132132
## 3. How to use Feast for model training
133133

134134
### 3.1. Generating training data
135+
> For more details, see [feature retrieval](../getting-started/concepts/feature-retrieval.md#retrieving-historical-features-for-training-data-or-batch-scoring)
136+
135137
After we've defined our features and data sources in the repository, we can generate training datasets. We highly recommend you use a `FeatureService` to version the features that go into a specific model version.
136138

137139
1. The first thing we need to do in our training code is to create a `FeatureStore` object with a path to the registry.

sdk/python/feast/errors.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,3 +384,17 @@ def __init__(self, features: Any):
384384
super().__init__(
385385
f"Invalid `features` parameter type {type(features)}. Expected one of List[str] and FeatureService."
386386
)
387+
388+
389+
class EntitySQLEmptyResults(Exception):
390+
def __init__(self, entity_sql: str):
391+
super().__init__(
392+
f"No entity values found from the specified SQL query to generate the entity dataframe: {entity_sql}."
393+
)
394+
395+
396+
class EntityDFNotDateTime(Exception):
397+
def __init__(self):
398+
super().__init__(
399+
"The entity dataframe specified does not have the timestamp field as a datetime."
400+
)

sdk/python/feast/infra/offline_stores/bigquery.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
from feast.errors import (
2929
BigQueryJobCancelled,
3030
BigQueryJobStillRunning,
31+
EntityDFNotDateTime,
32+
EntitySQLEmptyResults,
3133
FeastProviderLoginError,
3234
InvalidEntityType,
3335
)
@@ -665,6 +667,13 @@ def _get_entity_df_event_timestamp_range(
665667
res.get("min"),
666668
res.get("max"),
667669
)
670+
if (
671+
entity_df_event_timestamp_range[0] is None
672+
or entity_df_event_timestamp_range[1] is None
673+
):
674+
raise EntitySQLEmptyResults(entity_df)
675+
if type(entity_df_event_timestamp_range[0]) != datetime:
676+
raise EntityDFNotDateTime()
668677
elif isinstance(entity_df, pd.DataFrame):
669678
entity_df_event_timestamp = entity_df.loc[
670679
:, entity_df_event_timestamp_col

sdk/python/feast/infra/offline_stores/snowflake.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525

2626
from feast import OnDemandFeatureView
2727
from feast.data_source import DataSource
28-
from feast.errors import InvalidEntityType
28+
from feast.errors import EntitySQLEmptyResults, InvalidEntityType
2929
from feast.feature_logging import LoggingConfig, LoggingSource
3030
from feast.feature_view import DUMMY_ENTITY_ID, DUMMY_ENTITY_VAL, FeatureView
3131
from feast.infra.offline_stores import offline_utils
@@ -574,6 +574,11 @@ def _get_entity_df_event_timestamp_range(
574574
results = execute_snowflake_statement(snowflake_conn, query).fetchall()
575575

576576
entity_df_event_timestamp_range = cast(Tuple[datetime, datetime], results[0])
577+
if (
578+
entity_df_event_timestamp_range[0] is None
579+
or entity_df_event_timestamp_range[1] is None
580+
):
581+
raise EntitySQLEmptyResults(entity_df)
577582
else:
578583
raise InvalidEntityType(type(entity_df))
579584

sdk/python/feast/templates/aws/feature_repo/test_workflow.py

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
import random
12
import subprocess
2-
from datetime import datetime
3+
from datetime import datetime, timedelta
34

45
import pandas as pd
6+
from pytz import utc
57

68
from feast import FeatureStore
79
from feast.data_source import PushMode
@@ -18,6 +20,16 @@ def run_demo():
1820
print("\n--- Historical features for batch scoring ---")
1921
fetch_historical_features_entity_df(store, for_batch_scoring=True)
2022

23+
print(
24+
"\n--- Historical features for training (all entities in a window using SQL entity dataframe) ---"
25+
)
26+
fetch_historical_features_entity_sql(store, for_batch_scoring=False)
27+
28+
print(
29+
"\n--- Historical features for batch scoring (all entities in a window using SQL entity dataframe) ---"
30+
)
31+
fetch_historical_features_entity_sql(store, for_batch_scoring=True)
32+
2133
print("\n--- Load features into online store ---")
2234
store.materialize_incremental(end_date=datetime.now())
2335

@@ -43,8 +55,8 @@ def run_demo():
4355
datetime.now(),
4456
],
4557
"conv_rate": [1.0],
46-
"acc_rate": [1.0],
47-
"avg_daily_trips": [1000],
58+
"acc_rate": [1.0 + random.random()],
59+
"avg_daily_trips": [int(1000 * random.random())],
4860
}
4961
)
5062
print(event_df)
@@ -57,6 +69,47 @@ def run_demo():
5769
subprocess.run(["feast", "teardown"])
5870

5971

72+
def fetch_historical_features_entity_sql(store: FeatureStore, for_batch_scoring):
73+
end_date = (
74+
datetime.now().replace(microsecond=0, second=0, minute=0).astimezone(tz=utc)
75+
)
76+
start_date = (end_date - timedelta(days=60)).astimezone(tz=utc)
77+
# For batch scoring, we want the latest timestamps
78+
if for_batch_scoring:
79+
print(
80+
"Generating a list of all unique entities in a time window for batch scoring"
81+
)
82+
# We use a group by since we want all distinct driver_ids.
83+
entity_sql = f"""
84+
SELECT
85+
driver_id,
86+
GETDATE() as event_timestamp
87+
FROM {store.get_data_source("feast_driver_hourly_stats").get_table_query_string()}
88+
WHERE event_timestamp BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}'
89+
GROUP BY driver_id
90+
"""
91+
else:
92+
print("Generating training data for all entities in a time window")
93+
# We don't need a group by if we want to generate training data
94+
entity_sql = f"""
95+
SELECT
96+
driver_id,
97+
event_timestamp
98+
FROM {store.get_data_source("feast_driver_hourly_stats").get_table_query_string()}
99+
WHERE event_timestamp BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}'
100+
"""
101+
102+
training_df = store.get_historical_features(
103+
entity_df=entity_sql,
104+
features=[
105+
"driver_hourly_stats:conv_rate",
106+
"driver_hourly_stats:acc_rate",
107+
"driver_hourly_stats:avg_daily_trips",
108+
],
109+
).to_df()
110+
print(training_df.head())
111+
112+
60113
def fetch_historical_features_entity_df(store: FeatureStore, for_batch_scoring: bool):
61114
# Note: see https://docs.feast.dev/getting-started/concepts/feature-retrieval for more details on how to retrieve
62115
# for all entities in the offline store instead

sdk/python/feast/templates/gcp/feature_repo/test_workflow.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,16 @@ def run_demo():
1919
print("\n--- Historical features for batch scoring ---")
2020
fetch_historical_features_entity_df(store, for_batch_scoring=True)
2121

22+
print(
23+
"\n--- Historical features for training (all entities in a window using SQL entity dataframe) ---"
24+
)
25+
fetch_historical_features_entity_sql(store, for_batch_scoring=False)
26+
27+
print(
28+
"\n--- Historical features for batch scoring (all entities in a window using SQL entity dataframe) ---"
29+
)
30+
fetch_historical_features_entity_sql(store, for_batch_scoring=True)
31+
2232
print("\n--- Load features into online store ---")
2333
store.materialize_incremental(end_date=datetime.now())
2434

@@ -60,6 +70,43 @@ def run_demo():
6070
subprocess.run(["feast", "teardown"])
6171

6272

73+
def fetch_historical_features_entity_sql(store: FeatureStore, for_batch_scoring):
74+
# For batch scoring, we want the latest timestamps
75+
if for_batch_scoring:
76+
print(
77+
"Generating a list of all unique entities in a time window for batch scoring"
78+
)
79+
# We use a group by since we want all distinct driver_ids.
80+
entity_sql = f"""
81+
SELECT
82+
driver_id,
83+
CURRENT_TIMESTAMP() as event_timestamp
84+
FROM {store.get_data_source("driver_hourly_stats_source").get_table_query_string()}
85+
WHERE event_timestamp BETWEEN '2021-01-01' and '2021-12-31'
86+
GROUP BY driver_id
87+
"""
88+
else:
89+
print("Generating training data for all entities in a time window")
90+
# We don't need a group by if we want to generate training data
91+
entity_sql = f"""
92+
SELECT
93+
driver_id,
94+
event_timestamp
95+
FROM {store.get_data_source("driver_hourly_stats_source").get_table_query_string()}
96+
WHERE event_timestamp BETWEEN '2021-01-01' and '2021-12-31'
97+
"""
98+
99+
training_df = store.get_historical_features(
100+
entity_df=entity_sql,
101+
features=[
102+
"driver_hourly_stats:conv_rate",
103+
"driver_hourly_stats:acc_rate",
104+
"driver_hourly_stats:avg_daily_trips",
105+
],
106+
).to_df()
107+
print(training_df.head())
108+
109+
63110
def fetch_historical_features_entity_df(store: FeatureStore, for_batch_scoring: bool):
64111
# Note: see https://docs.feast.dev/getting-started/concepts/feature-retrieval for more details on how to retrieve
65112
# for all entities in the offline store instead

sdk/python/feast/templates/snowflake/test_workflow.py

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
import random
12
import subprocess
2-
from datetime import datetime
3+
from datetime import datetime, timedelta
34

45
import pandas as pd
6+
from pytz import utc
57

68
from feast import FeatureStore
79
from feast.data_source import PushMode
@@ -19,6 +21,16 @@ def run_demo():
1921
print("\n--- Historical features for batch scoring ---")
2022
fetch_historical_features_entity_df(store, for_batch_scoring=True)
2123

24+
print(
25+
"\n--- Historical features for training (all entities in a window using SQL entity dataframe) ---"
26+
)
27+
fetch_historical_features_entity_sql(store, for_batch_scoring=False)
28+
29+
print(
30+
"\n--- Historical features for batch scoring (all entities in a window using SQL entity dataframe) ---"
31+
)
32+
fetch_historical_features_entity_sql(store, for_batch_scoring=True)
33+
2234
print("\n--- Load features into online store ---")
2335
store.materialize_incremental(end_date=datetime.now())
2436

@@ -44,8 +56,8 @@ def run_demo():
4456
datetime.now(),
4557
],
4658
"conv_rate": [1.0],
47-
"acc_rate": [1.0],
48-
"avg_daily_trips": [1000],
59+
"acc_rate": [1.0 + random.random()],
60+
"avg_daily_trips": [int(1000 * random.random())],
4961
}
5062
)
5163
print(event_df)
@@ -59,6 +71,47 @@ def run_demo():
5971
subprocess.run(command, shell=True)
6072

6173

74+
def fetch_historical_features_entity_sql(store: FeatureStore, for_batch_scoring):
75+
end_date = (
76+
datetime.now().replace(microsecond=0, second=0, minute=0).astimezone(tz=utc)
77+
)
78+
start_date = (end_date - timedelta(days=60)).astimezone(tz=utc)
79+
# For batch scoring, we want the latest timestamps
80+
if for_batch_scoring:
81+
print(
82+
"Generating a list of all unique entities in a time window for batch scoring"
83+
)
84+
# We use a group by since we want all distinct driver_ids.
85+
entity_sql = f"""
86+
SELECT
87+
"driver_id",
88+
CURRENT_TIMESTAMP() as "event_timestamp"
89+
FROM {store.list_data_sources()[-1].get_table_query_string()}
90+
WHERE "event_timestamp" BETWEEN '{start_date}' AND '{end_date}'
91+
GROUP BY "driver_id"
92+
"""
93+
else:
94+
print("Generating training data for all entities in a time window")
95+
# We don't need a group by if we want to generate training data
96+
entity_sql = f"""
97+
SELECT
98+
"driver_id",
99+
"event_timestamp"
100+
FROM {store.list_data_sources()[-1].get_table_query_string()}
101+
WHERE "event_timestamp" BETWEEN '{start_date}' AND '{end_date}'
102+
"""
103+
104+
training_df = store.get_historical_features(
105+
entity_df=entity_sql,
106+
features=[
107+
"driver_hourly_stats:conv_rate",
108+
"driver_hourly_stats:acc_rate",
109+
"driver_hourly_stats:avg_daily_trips",
110+
],
111+
).to_df()
112+
print(training_df.head())
113+
114+
62115
def fetch_historical_features_entity_df(store: FeatureStore, for_batch_scoring: bool):
63116
# Note: see https://docs.feast.dev/getting-started/concepts/feature-retrieval for more details on how to retrieve
64117
# for all entities in the offline store instead

0 commit comments

Comments
 (0)