Skip to content

Commit cbfc72a

Browse files
authored
Add final_output_feature_names in Query context to avoid SELECT * EXCEPT (feast-dev#1911)
* Add final_output_feature_names in Query context to avoid SELECT * EXCEPT at the end Signed-off-by: Matt Delacour <[email protected]> * Remove the drop_columns concept for AWS Redshift Signed-off-by: Matt Delacour <[email protected]> * Format files Signed-off-by: Matt Delacour <[email protected]> * Add again integration tests about backfill rows Signed-off-by: Matt Delacour <[email protected]> * Add teardown to datasource creator Signed-off-by: Matt Delacour <[email protected]> * Remove teardown logic in tests as it s part of conftest Signed-off-by: Matt Delacour <[email protected]> * Fix linter Signed-off-by: Matt Delacour <[email protected]> * Add pytest.mark.universal to new test Signed-off-by: Matt Delacour <[email protected]>
1 parent 6b10a82 commit cbfc72a

File tree

5 files changed

+146
-50
lines changed

5 files changed

+146
-50
lines changed

sdk/python/feast/infra/offline_stores/bigquery.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ def query_generator() -> Iterator[str]:
164164
query_context,
165165
left_table_query_string=table_reference,
166166
entity_df_event_timestamp_col=entity_df_event_timestamp_col,
167+
entity_df_columns=entity_schema.keys(),
167168
query_template=MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN,
168169
full_feature_names=full_feature_names,
169170
)
@@ -517,14 +518,17 @@ def _get_bigquery_client(project: Optional[str] = None, location: Optional[str]
517518
Thus we only need to compute the latest timestamp of each feature.
518519
*/
519520
{{ featureview.name }}__latest AS (
520-
SELECT * EXCEPT(row_number)
521+
SELECT
522+
event_timestamp,
523+
{% if featureview.created_timestamp_column %}created_timestamp,{% endif %}
524+
{{featureview.name}}__entity_row_unique_id
521525
FROM
522526
(
523527
SELECT *,
524528
ROW_NUMBER() OVER(
525529
PARTITION BY {{featureview.name}}__entity_row_unique_id
526530
ORDER BY event_timestamp DESC{% if featureview.created_timestamp_column %},created_timestamp DESC{% endif %}
527-
) AS row_number,
531+
) AS row_number
528532
FROM {{ featureview.name }}__base
529533
{% if featureview.created_timestamp_column %}
530534
INNER JOIN {{ featureview.name }}__dedup
@@ -558,7 +562,7 @@ def _get_bigquery_client(project: Optional[str] = None, location: Optional[str]
558562
The entity_dataframe dataset being our source of truth here.
559563
*/
560564
561-
SELECT * EXCEPT(entity_timestamp, {% for featureview in featureviews %} {{featureview.name}}__entity_row_unique_id{% if loop.last %}{% else %},{% endif %}{% endfor %})
565+
SELECT {{ final_output_feature_names | join(', ')}}
562566
FROM entity_dataframe
563567
{% for featureview in featureviews %}
564568
LEFT JOIN (

sdk/python/feast/infra/offline_stores/offline_utils.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import uuid
33
from dataclasses import asdict, dataclass
44
from datetime import timedelta
5-
from typing import Any, Dict, List, Optional, Set, Tuple
5+
from typing import Any, Dict, KeysView, List, Optional, Set, Tuple
66

77
import numpy as np
88
import pandas as pd
@@ -153,12 +153,22 @@ def build_point_in_time_query(
153153
feature_view_query_contexts: List[FeatureViewQueryContext],
154154
left_table_query_string: str,
155155
entity_df_event_timestamp_col: str,
156+
entity_df_columns: KeysView[str],
156157
query_template: str,
157158
full_feature_names: bool = False,
158159
) -> str:
159160
"""Build point-in-time query between each feature view table and the entity dataframe for Bigquery and Redshift"""
160161
template = Environment(loader=BaseLoader()).from_string(source=query_template)
161162

163+
final_output_feature_names = list(entity_df_columns)
164+
final_output_feature_names.extend(
165+
[
166+
(f"{fv.name}__{feature}" if full_feature_names else feature)
167+
for fv in feature_view_query_contexts
168+
for feature in fv.features
169+
]
170+
)
171+
162172
# Add additional fields to dict
163173
template_context = {
164174
"left_table_query_string": left_table_query_string,
@@ -168,6 +178,7 @@ def build_point_in_time_query(
168178
),
169179
"featureviews": [asdict(context) for context in feature_view_query_contexts],
170180
"full_feature_names": full_feature_names,
181+
"final_output_feature_names": final_output_feature_names,
171182
}
172183

173184
query = template.render(template_context)

sdk/python/feast/infra/offline_stores/redshift.py

Lines changed: 17 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ def query_generator() -> Iterator[str]:
149149
query_context,
150150
left_table_query_string=table_name,
151151
entity_df_event_timestamp_col=entity_df_event_timestamp_col,
152+
entity_df_columns=entity_schema.keys(),
152153
query_template=MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN,
153154
full_feature_names=full_feature_names,
154155
)
@@ -174,11 +175,6 @@ def query_generator() -> Iterator[str]:
174175
on_demand_feature_views=OnDemandFeatureView.get_requested_odfvs(
175176
feature_refs, project, registry
176177
),
177-
drop_columns=["entity_timestamp"]
178-
+ [
179-
f"{feature_view.projection.name_to_use()}__entity_row_unique_id"
180-
for feature_view in feature_views
181-
],
182178
)
183179

184180

@@ -191,7 +187,6 @@ def __init__(
191187
config: RepoConfig,
192188
full_feature_names: bool,
193189
on_demand_feature_views: Optional[List[OnDemandFeatureView]],
194-
drop_columns: Optional[List[str]] = None,
195190
):
196191
"""Initialize RedshiftRetrievalJob object.
197192
@@ -202,8 +197,6 @@ def __init__(
202197
config: Feast repo config
203198
full_feature_names: Whether to add the feature view prefixes to the feature names
204199
on_demand_feature_views: A list of on demand transforms to apply at retrieval time
205-
drop_columns: Optionally a list of columns to drop before unloading to S3.
206-
This is a convenient field, since "SELECT ... EXCEPT col" isn't supported in Redshift.
207200
"""
208201
if not isinstance(query, str):
209202
self._query_generator = query
@@ -225,7 +218,6 @@ def query_generator() -> Iterator[str]:
225218
)
226219
self._full_feature_names = full_feature_names
227220
self._on_demand_feature_views = on_demand_feature_views
228-
self._drop_columns = drop_columns
229221

230222
@property
231223
def full_feature_names(self) -> bool:
@@ -246,7 +238,6 @@ def _to_df_internal(self) -> pd.DataFrame:
246238
self._s3_path,
247239
self._config.offline_store.iam_role,
248240
query,
249-
self._drop_columns,
250241
)
251242

252243
def _to_arrow_internal(self) -> pa.Table:
@@ -260,7 +251,6 @@ def _to_arrow_internal(self) -> pa.Table:
260251
self._s3_path,
261252
self._config.offline_store.iam_role,
262253
query,
263-
self._drop_columns,
264254
)
265255

266256
def to_s3(self) -> str:
@@ -279,7 +269,6 @@ def to_s3(self) -> str:
279269
self._s3_path,
280270
self._config.offline_store.iam_role,
281271
query,
282-
self._drop_columns,
283272
)
284273
return self._s3_path
285274

@@ -302,9 +291,6 @@ def to_redshift(self, table_name: str) -> None:
302291

303292
with self._query_generator() as query:
304293
query = f'CREATE TABLE "{table_name}" AS ({query});\n'
305-
if self._drop_columns is not None:
306-
for column in self._drop_columns:
307-
query += f"ALTER TABLE {table_name} DROP COLUMN {column};\n"
308294

309295
aws_utils.execute_redshift_statement(
310296
self._redshift_client,
@@ -479,19 +465,23 @@ def _upload_entity_df_and_get_entity_schema(
479465
*/
480466
{{ featureview.name }}__latest AS (
481467
SELECT
482-
{{featureview.name}}__entity_row_unique_id,
483-
MAX(event_timestamp) AS event_timestamp
468+
event_timestamp,
469+
{% if featureview.created_timestamp_column %}created_timestamp,{% endif %}
470+
{{featureview.name}}__entity_row_unique_id
471+
FROM
472+
(
473+
SELECT *,
474+
ROW_NUMBER() OVER(
475+
PARTITION BY {{featureview.name}}__entity_row_unique_id
476+
ORDER BY event_timestamp DESC{% if featureview.created_timestamp_column %},created_timestamp DESC{% endif %}
477+
) AS row_number
478+
FROM {{ featureview.name }}__base
484479
{% if featureview.created_timestamp_column %}
485-
,MAX(created_timestamp) AS created_timestamp
480+
INNER JOIN {{ featureview.name }}__dedup
481+
USING ({{featureview.name}}__entity_row_unique_id, event_timestamp, created_timestamp)
486482
{% endif %}
487-
488-
FROM {{ featureview.name }}__base
489-
{% if featureview.created_timestamp_column %}
490-
INNER JOIN {{ featureview.name }}__dedup
491-
USING ({{featureview.name}}__entity_row_unique_id, event_timestamp, created_timestamp)
492-
{% endif %}
493-
494-
GROUP BY {{featureview.name}}__entity_row_unique_id
483+
)
484+
WHERE row_number = 1
495485
),
496486
497487
/*
@@ -518,7 +508,7 @@ def _upload_entity_df_and_get_entity_schema(
518508
The entity_dataframe dataset being our source of truth here.
519509
*/
520510
521-
SELECT *
511+
SELECT {{ final_output_feature_names | join(', ')}}
522512
FROM entity_dataframe
523513
{% for featureview in featureviews %}
524514
LEFT JOIN (

sdk/python/feast/infra/utils/aws_utils.py

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import os
33
import tempfile
44
import uuid
5-
from typing import Dict, Iterator, List, Optional, Tuple
5+
from typing import Dict, Iterator, Optional, Tuple
66

77
import pandas as pd
88
import pyarrow as pa
@@ -324,7 +324,6 @@ def execute_redshift_query_and_unload_to_s3(
324324
s3_path: str,
325325
iam_role: str,
326326
query: str,
327-
drop_columns: Optional[List[str]] = None,
328327
) -> None:
329328
"""Unload Redshift Query results to S3
330329
@@ -337,16 +336,11 @@ def execute_redshift_query_and_unload_to_s3(
337336
iam_role: IAM Role for Redshift to assume during the UNLOAD command.
338337
The role must grant permission to write to the S3 location.
339338
query: The SQL query to execute
340-
drop_columns: Optionally a list of columns to drop before unloading to S3.
341-
This is a convenient field, since "SELECT ... EXCEPT col" isn't supported in Redshift.
342339
343340
"""
344341
# Run the query, unload the results to S3
345342
unique_table_name = "_" + str(uuid.uuid4()).replace("-", "")
346343
query = f"CREATE TEMPORARY TABLE {unique_table_name} AS ({query});\n"
347-
if drop_columns is not None:
348-
for column in drop_columns:
349-
query += f"ALTER TABLE {unique_table_name} DROP COLUMN {column};\n"
350344
query += f"UNLOAD ('SELECT * FROM {unique_table_name}') TO '{s3_path}/' IAM_ROLE '{iam_role}' PARQUET"
351345
execute_redshift_statement(redshift_data_client, cluster_id, database, user, query)
352346

@@ -360,20 +354,12 @@ def unload_redshift_query_to_pa(
360354
s3_path: str,
361355
iam_role: str,
362356
query: str,
363-
drop_columns: Optional[List[str]] = None,
364357
) -> pa.Table:
365358
""" Unload Redshift Query results to S3 and get the results in PyArrow Table format """
366359
bucket, key = get_bucket_and_key(s3_path)
367360

368361
execute_redshift_query_and_unload_to_s3(
369-
redshift_data_client,
370-
cluster_id,
371-
database,
372-
user,
373-
s3_path,
374-
iam_role,
375-
query,
376-
drop_columns,
362+
redshift_data_client, cluster_id, database, user, s3_path, iam_role, query,
377363
)
378364

379365
with tempfile.TemporaryDirectory() as temp_dir:
@@ -391,7 +377,6 @@ def unload_redshift_query_to_df(
391377
s3_path: str,
392378
iam_role: str,
393379
query: str,
394-
drop_columns: Optional[List[str]] = None,
395380
) -> pd.DataFrame:
396381
""" Unload Redshift Query results to S3 and get the results in Pandas DataFrame format """
397382
table = unload_redshift_query_to_pa(
@@ -403,7 +388,6 @@ def unload_redshift_query_to_df(
403388
s3_path,
404389
iam_role,
405390
query,
406-
drop_columns,
407391
)
408392
return table.to_pandas()
409393

0 commit comments

Comments
 (0)