Skip to content

Commit a16582a

Browse files
authored
feat: Offline Store historical features retrieval based on datetime range in dask (#5717)
* RHOAIENG-37451:Offline Store historical features retrieval based on datatime range for Dask Signed-off-by: Aniket Paluskar <[email protected]> * Removal of unnecessary checks & made both dates timezone aware Signed-off-by: Aniket Paluskar <[email protected]> * Added elaborated test cases for to retrieve historical features without using datetime Signed-off-by: Aniket Paluskar <[email protected]> --------- Signed-off-by: Aniket Paluskar <[email protected]>
1 parent b70eda0 commit a16582a

File tree

2 files changed

+614
-11
lines changed

2 files changed

+614
-11
lines changed

sdk/python/feast/infra/offline_stores/dask.py

Lines changed: 59 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import os
22
import uuid
3-
from datetime import datetime, timezone
3+
from datetime import datetime, timedelta, timezone
44
from pathlib import Path
55
from typing import Any, Callable, Dict, List, Literal, Optional, Tuple, Union
66

@@ -37,7 +37,7 @@
3737
from feast.on_demand_feature_view import OnDemandFeatureView
3838
from feast.repo_config import FeastConfigBaseModel, RepoConfig
3939
from feast.saved_dataset import SavedDatasetStorage
40-
from feast.utils import _get_requested_feature_views_to_features_dict
40+
from feast.utils import _get_requested_feature_views_to_features_dict, make_tzaware
4141

4242
# DaskRetrievalJob will cast string objects to string[pyarrow] from dask version 2023.7.1
4343
# This is not the desired behavior for our use case, so we set the convert-string option to False
@@ -133,21 +133,56 @@ def get_historical_features(
133133
config: RepoConfig,
134134
feature_views: List[FeatureView],
135135
feature_refs: List[str],
136-
entity_df: Union[pd.DataFrame, str],
136+
entity_df: Optional[Union[pd.DataFrame, dd.DataFrame, str]],
137137
registry: BaseRegistry,
138138
project: str,
139139
full_feature_names: bool = False,
140+
**kwargs,
140141
) -> RetrievalJob:
141142
assert isinstance(config.offline_store, DaskOfflineStoreConfig)
142143
for fv in feature_views:
143144
assert isinstance(fv.batch_source, FileSource)
144145

145-
if not isinstance(entity_df, pd.DataFrame) and not isinstance(
146-
entity_df, dd.DataFrame
147-
):
148-
raise ValueError(
149-
f"Please provide an entity_df of type {type(pd.DataFrame)} instead of type {type(entity_df)}"
146+
# Allow non-entity mode using start/end timestamps to enable bounded retrievals without an input entity_df.
147+
# This synthesizes a minimal entity_df solely to drive the existing join and metadata plumbing without
148+
# incurring source scans here; actual pushdowns can be layered in follow-ups if needed.
149+
start_date: Optional[datetime] = kwargs.get("start_date", None)
150+
end_date: Optional[datetime] = kwargs.get("end_date", None)
151+
non_entity_mode = entity_df is None
152+
153+
if non_entity_mode:
154+
# Default end_date to current time (UTC) to keep behavior predictable without extra parameters.
155+
end_date = (
156+
make_tzaware(end_date) if end_date else datetime.now(timezone.utc)
157+
)
158+
159+
# When start_date is not provided, choose a conservative lower bound using max TTL, otherwise fall back.
160+
if start_date is None:
161+
max_ttl_seconds = 0
162+
for fv in feature_views:
163+
if fv.ttl and isinstance(fv.ttl, timedelta):
164+
max_ttl_seconds = max(
165+
max_ttl_seconds, int(fv.ttl.total_seconds())
166+
)
167+
if max_ttl_seconds > 0:
168+
start_date = end_date - timedelta(seconds=max_ttl_seconds)
169+
else:
170+
# Keep default window bounded to avoid unbounded scans by default.
171+
start_date = end_date - timedelta(days=30)
172+
start_date = make_tzaware(start_date)
173+
174+
# Minimal synthetic entity_df: one timestamp row; join keys are not materialized here on purpose to avoid
175+
# accidental dependence on specific feature view schemas at this layer.
176+
entity_df = pd.DataFrame(
177+
{DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL: [end_date]}
150178
)
179+
else:
180+
if not isinstance(entity_df, pd.DataFrame) and not isinstance(
181+
entity_df, dd.DataFrame
182+
):
183+
raise ValueError(
184+
f"Please provide an entity_df of type pd.DataFrame or dask.dataframe.DataFrame instead of type {type(entity_df)}"
185+
)
151186
entity_df_event_timestamp_col = DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL # local modifiable copy of global variable
152187
if entity_df_event_timestamp_col not in entity_df.columns:
153188
datetime_columns = entity_df.select_dtypes(
@@ -171,8 +206,12 @@ def get_historical_features(
171206
registry.list_on_demand_feature_views(config.project),
172207
)
173208

174-
entity_df_event_timestamp_range = _get_entity_df_event_timestamp_range(
175-
entity_df, entity_df_event_timestamp_col
209+
entity_df_event_timestamp_range = (
210+
(start_date, end_date)
211+
if non_entity_mode
212+
else _get_entity_df_event_timestamp_range(
213+
entity_df, entity_df_event_timestamp_col
214+
)
176215
)
177216

178217
# Create lazy function that is only called from the RetrievalJob object
@@ -260,7 +299,16 @@ def evaluate_historical_retrieval():
260299
full_feature_names,
261300
)
262301

263-
df_to_join = _merge(entity_df_with_features, df_to_join, join_keys)
302+
# In non-entity mode, if the synthetic entity_df lacks join keys, cross join to build a snapshot
303+
# of all entities as-of the requested timestamp, then rely on TTL and deduplication to select
304+
# the appropriate latest rows per entity.
305+
current_join_keys = join_keys
306+
if non_entity_mode:
307+
current_join_keys = []
308+
309+
df_to_join = _merge(
310+
entity_df_with_features, df_to_join, current_join_keys
311+
)
264312

265313
df_to_join = _normalize_timestamp(
266314
df_to_join, timestamp_field, created_timestamp_column

0 commit comments

Comments
 (0)