Skip to content

Commit a233d3f

Browse files
authored
chore: Fix test asserts for offline store write and improve some errors (feast-dev#2964)
* chore: Fix test asserts for offline store write and improve some error messages Signed-off-by: Achal Shah <[email protected]> * wait for write to finish Signed-off-by: Achal Shah <[email protected]> * wait for write to finish Signed-off-by: Achal Shah <[email protected]> * detailed error messages Signed-off-by: Achal Shah <[email protected]> * sort and reset index Signed-off-by: Achal Shah <[email protected]> * fix Signed-off-by: Achal Shah <[email protected]>
1 parent a36a695 commit a233d3f

File tree

4 files changed

+37
-32
lines changed

4 files changed

+37
-32
lines changed

sdk/python/feast/feature_store.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1131,8 +1131,7 @@ def create_saved_dataset(
11311131

11321132
if not from_.metadata:
11331133
raise ValueError(
1134-
"RetrievalJob must contains metadata. "
1135-
"Use RetrievalJob produced by get_historical_features"
1134+
f"The RetrievalJob {type(from_)} must implement the metadata property."
11361135
)
11371136

11381137
dataset = SavedDataset(

sdk/python/feast/infra/offline_stores/bigquery.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ def write_logged_features(
306306
file_obj=f,
307307
destination=destination.table,
308308
job_config=job_config,
309-
)
309+
).result()
310310

311311
return
312312

@@ -319,7 +319,7 @@ def write_logged_features(
319319
file_obj=parquet_temp_file,
320320
destination=destination.table,
321321
job_config=job_config,
322-
)
322+
).result()
323323

324324
@staticmethod
325325
def offline_write_batch(
@@ -373,7 +373,7 @@ def offline_write_batch(
373373
file_obj=parquet_temp_file,
374374
destination=feature_view.batch_source.table,
375375
job_config=job_config,
376-
)
376+
).result()
377377

378378

379379
class BigQueryRetrievalJob(RetrievalJob):

sdk/python/feast/usage.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
USAGE_ENDPOINT = "https://usage.feast.dev"
3636

3737
_logger = logging.getLogger(__name__)
38-
_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
38+
_executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
3939

4040
_is_enabled = os.getenv(FEAST_USAGE, default=DEFAULT_FEAST_USAGE_VALUE) == "True"
4141

sdk/python/tests/integration/offline_store/test_offline_write.py

Lines changed: 32 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -126,11 +126,12 @@ def test_writing_consecutively_to_offline_store(environment, universal_data_sour
126126
"created": [ts, ts],
127127
},
128128
)
129+
first_df = first_df.astype({"conv_rate": "float32", "acc_rate": "float32"})
129130
store.write_to_offline_store(
130131
driver_stats.name, first_df, allow_registry_cache=False
131132
)
132133

133-
after_write_df = store.get_historical_features(
134+
after_write_df: pd.DataFrame = store.get_historical_features(
134135
entity_df=entity_df,
135136
features=[
136137
"driver_stats:conv_rate",
@@ -139,21 +140,26 @@ def test_writing_consecutively_to_offline_store(environment, universal_data_sour
139140
],
140141
full_feature_names=False,
141142
).to_df()
142-
143-
assert len(after_write_df) == len(first_df)
144-
assert np.where(
145-
after_write_df["conv_rate"].reset_index(drop=True)
146-
== first_df["conv_rate"].reset_index(drop=True)
147-
)
148-
assert np.where(
149-
after_write_df["acc_rate"].reset_index(drop=True)
150-
== first_df["acc_rate"].reset_index(drop=True)
143+
after_write_df = after_write_df.sort_values("event_timestamp").reset_index(
144+
drop=True
151145
)
152-
assert np.where(
153-
after_write_df["avg_daily_trips"].reset_index(drop=True)
154-
== first_df["avg_daily_trips"].reset_index(drop=True)
146+
147+
print(f"After: {after_write_df}\nFirst: {first_df}")
148+
print(
149+
f"After: {after_write_df['conv_rate'].reset_index(drop=True)}\nFirst: {first_df['conv_rate'].reset_index(drop=True)}"
155150
)
156151

152+
assert len(after_write_df) == len(first_df)
153+
for field in ["conv_rate", "acc_rate", "avg_daily_trips"]:
154+
assert np.equal(
155+
after_write_df[field].reset_index(drop=True),
156+
first_df[field].reset_index(drop=True),
157+
).all(), (
158+
f"Field: {field}\n"
159+
f"After: {after_write_df[field].reset_index(drop=True)}\n"
160+
f"First: {first_df[field].reset_index(drop=True)}"
161+
)
162+
157163
second_df = pd.DataFrame.from_dict(
158164
{
159165
"event_timestamp": [ts + timedelta(hours=5), ts + timedelta(hours=6)],
@@ -164,6 +170,7 @@ def test_writing_consecutively_to_offline_store(environment, universal_data_sour
164170
"created": [ts, ts],
165171
},
166172
)
173+
second_df = second_df.astype({"conv_rate": "float32", "acc_rate": "float32"})
167174

168175
store.write_to_offline_store(
169176
driver_stats.name, second_df, allow_registry_cache=False
@@ -190,18 +197,17 @@ def test_writing_consecutively_to_offline_store(environment, universal_data_sour
190197
],
191198
full_feature_names=False,
192199
).to_df()
193-
200+
after_write_df = after_write_df.sort_values("event_timestamp").reset_index(
201+
drop=True
202+
)
194203
expected_df = pd.concat([first_df, second_df])
195204
assert len(after_write_df) == len(expected_df)
196-
assert np.where(
197-
after_write_df["conv_rate"].reset_index(drop=True)
198-
== expected_df["conv_rate"].reset_index(drop=True)
199-
)
200-
assert np.where(
201-
after_write_df["acc_rate"].reset_index(drop=True)
202-
== expected_df["acc_rate"].reset_index(drop=True)
203-
)
204-
assert np.where(
205-
after_write_df["avg_daily_trips"].reset_index(drop=True)
206-
== expected_df["avg_daily_trips"].reset_index(drop=True)
207-
)
205+
for field in ["conv_rate", "acc_rate", "avg_daily_trips"]:
206+
assert np.equal(
207+
after_write_df[field].reset_index(drop=True),
208+
expected_df[field].reset_index(drop=True),
209+
).all(), (
210+
f"Field: {field}\n"
211+
f"After: {after_write_df[field].reset_index(drop=True)}\n"
212+
f"First: {expected_df[field].reset_index(drop=True)}"
213+
)

0 commit comments

Comments
 (0)