Skip to content
This repository was archived by the owner on Nov 10, 2022. It is now read-only.

Commit 680a7af

Browse files
authored
Convert python values into proto values in bulk (feast-dev#2172)
* Convert python values into proto values in bulk Signed-off-by: pyalex <[email protected]> * revert to CopyFrom Signed-off-by: pyalex <[email protected]> * better sampling Signed-off-by: pyalex <[email protected]> * better sampling Signed-off-by: pyalex <[email protected]> * even better sampling Signed-off-by: pyalex <[email protected]>
1 parent ff86e3e commit 680a7af

File tree

4 files changed

+107
-62
lines changed

4 files changed

+107
-62
lines changed

sdk/python/feast/feature_store.py

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,9 @@
7272
from feast.registry import Registry
7373
from feast.repo_config import RepoConfig, load_repo_config
7474
from feast.request_feature_view import RequestFeatureView
75-
from feast.type_map import python_value_to_proto_value
75+
from feast.type_map import python_values_to_proto_values
7676
from feast.usage import log_exceptions, log_exceptions_and_usage, set_usage_attribute
77+
from feast.value_type import ValueType
7778
from feast.version import get_version
7879

7980
warnings.simplefilter("once", DeprecationWarning)
@@ -1207,11 +1208,13 @@ def _populate_odfv_dependencies(
12071208
):
12081209
# Add more feature values to the existing result rows for the request data features
12091210
for feature_name, feature_values in request_data_features.items():
1210-
for row_idx, feature_value in enumerate(feature_values):
1211+
proto_values = python_values_to_proto_values(
1212+
feature_values, ValueType.UNKNOWN
1213+
)
1214+
1215+
for row_idx, proto_value in enumerate(proto_values):
12111216
result_row = result_rows[row_idx]
1212-
result_row.fields[feature_name].CopyFrom(
1213-
python_value_to_proto_value(feature_value)
1214-
)
1217+
result_row.fields[feature_name].CopyFrom(proto_value)
12151218
result_row.statuses[
12161219
feature_name
12171220
] = GetOnlineFeaturesResponse.FieldStatus.PRESENT
@@ -1372,19 +1375,25 @@ def _augment_response_with_on_demand_transforms(
13721375
transformed_features_df = odfv.get_transformed_features_df(
13731376
initial_response_df, full_feature_names,
13741377
)
1378+
selected_subset = [
1379+
f for f in transformed_features_df.columns if f in _feature_refs
1380+
]
1381+
1382+
proto_values_by_column = {
1383+
feature: python_values_to_proto_values(
1384+
transformed_features_df[feature].values, ValueType.UNKNOWN
1385+
)
1386+
for feature in selected_subset
1387+
}
1388+
13751389
for row_idx in range(len(result_rows)):
13761390
result_row = result_rows[row_idx]
13771391

1378-
selected_subset = [
1379-
f for f in transformed_features_df.columns if f in _feature_refs
1380-
]
1381-
13821392
for transformed_feature in selected_subset:
13831393
odfv_result_names.add(transformed_feature)
1384-
proto_value = python_value_to_proto_value(
1385-
transformed_features_df[transformed_feature].values[row_idx]
1394+
result_row.fields[transformed_feature].CopyFrom(
1395+
proto_values_by_column[transformed_feature][row_idx]
13861396
)
1387-
result_row.fields[transformed_feature].CopyFrom(proto_value)
13881397
result_row.statuses[
13891398
transformed_feature
13901399
] = GetOnlineFeaturesResponse.FieldStatus.PRESENT

sdk/python/feast/infra/provider.py

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
1818
from feast.registry import Registry
1919
from feast.repo_config import RepoConfig
20-
from feast.type_map import python_value_to_proto_value
20+
from feast.type_map import python_values_to_proto_values
21+
from feast.value_type import ValueType
2122

2223
PROVIDERS_CLASS_FOR_TYPE = {
2324
"gcp": "feast.infra.gcp.GcpProvider",
@@ -305,26 +306,28 @@ def _convert_arrow_to_proto(
305306
if isinstance(table, pyarrow.Table):
306307
table = table.to_batches()[0]
307308

308-
# Handle join keys
309-
join_key_values = {
310-
k: table.column(k).to_numpy(zero_copy_only=False) for k in join_keys
309+
columns = [(f.name, f.dtype) for f in feature_view.features] + [
310+
(key, ValueType.UNKNOWN) for key in join_keys
311+
]
312+
313+
proto_values_by_column = {
314+
column: python_values_to_proto_values(
315+
table.column(column).to_numpy(zero_copy_only=False), dtype
316+
)
317+
for column, dtype in columns
311318
}
319+
312320
entity_keys = [
313321
EntityKeyProto(
314322
join_keys=join_keys,
315-
entity_values=[
316-
python_value_to_proto_value(join_key_values[k][idx]) for k in join_keys
317-
],
323+
entity_values=[proto_values_by_column[k][idx] for k in join_keys],
318324
)
319325
for idx in range(table.num_rows)
320326
]
321327

322328
# Serialize the features per row
323329
feature_dict = {
324-
feature.name: [
325-
python_value_to_proto_value(val, feature.dtype)
326-
for val in table.column(feature.name).to_numpy(zero_copy_only=False)
327-
]
330+
feature.name: proto_values_by_column[feature.name]
328331
for feature in feature_view.features
329332
}
330333
features = [dict(zip(feature_dict, vars)) for vars in zip(*feature_dict.values())]

sdk/python/feast/online_response.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,9 @@ def _infer_online_entity_rows(
136136
if isinstance(value, Value):
137137
proto_value = value
138138
else:
139-
proto_value = _python_value_to_proto_value(entity_type_map[key], value)
139+
proto_value = _python_value_to_proto_value(
140+
entity_type_map[key], [value]
141+
)[0]
140142
fields[key] = proto_value
141143
entity_row_list.append(GetOnlineFeaturesRequestV2.EntityRow(fields=fields))
142144
return entity_row_list

sdk/python/feast/type_map.py

Lines changed: 69 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
import re
1616
from datetime import datetime
17-
from typing import Any, Dict, List, Optional, Set, Tuple, Type
17+
from typing import Any, Dict, List, Optional, Set, Sized, Tuple, Type
1818

1919
import numpy as np
2020
import pandas as pd
@@ -238,50 +238,59 @@ def _type_err(item, dtype):
238238
}
239239

240240

241-
def _python_value_to_proto_value(feast_value_type: ValueType, value: Any) -> ProtoValue:
241+
def _python_value_to_proto_value(
242+
feast_value_type: ValueType, values: List[Any]
243+
) -> List[ProtoValue]:
242244
"""
243245
Converts a Python (native, pandas) value to a Feast Proto Value based
244246
on a provided value type
245247
246248
Args:
247249
feast_value_type: The target value type
248-
value: Value that will be converted
250+
values: List of Values that will be converted
249251
250252
Returns:
251-
Feast Value Proto
253+
List of Feast Value Proto
252254
"""
255+
# ToDo: make a better sample for type checks (more than one element)
256+
sample = next(filter(_non_empty_value, values), None) # first not empty value
257+
if sample is None:
258+
# all input values are None or empty lists
259+
return [ProtoValue()] * len(values)
260+
253261
# Detect list type and handle separately
254262
if "list" in feast_value_type.name.lower():
255263
# Feature can be list but None is still valid
256-
if value is None:
257-
return ProtoValue()
258-
259264
if feast_value_type in PYTHON_LIST_VALUE_TYPE_TO_PROTO_VALUE:
260265
proto_type, field_name, valid_types = PYTHON_LIST_VALUE_TYPE_TO_PROTO_VALUE[
261266
feast_value_type
262267
]
263-
f = {
264-
field_name: proto_type(
265-
val=[
266-
item
267-
if type(item) in valid_types
268-
else _type_err(item, valid_types[0])
269-
for item in value
270-
]
268+
269+
if not all(type(item) in valid_types for item in sample):
270+
first_invalid = next(
271+
item for item in sample if type(item) not in valid_types
271272
)
272-
}
273-
return ProtoValue(**f)
273+
raise _type_err(first_invalid, valid_types[0])
274+
275+
return [
276+
ProtoValue(**{field_name: proto_type(val=value)})
277+
if value is not None
278+
else ProtoValue()
279+
for value in values
280+
]
281+
274282
# Handle scalar types below
275283
else:
276-
if pd.isnull(value):
277-
return ProtoValue()
278-
279284
if feast_value_type == ValueType.UNIX_TIMESTAMP:
280-
if isinstance(value, datetime):
281-
return ProtoValue(int64_val=int(value.timestamp()))
282-
elif isinstance(value, Timestamp):
283-
return ProtoValue(int64_val=int(value.ToSeconds()))
284-
return ProtoValue(int64_val=int(value))
285+
if isinstance(sample, datetime):
286+
return [
287+
ProtoValue(int64_val=int(value.timestamp())) for value in values
288+
]
289+
elif isinstance(sample, Timestamp):
290+
return [
291+
ProtoValue(int64_val=int(value.ToSeconds())) for value in values
292+
]
293+
return [ProtoValue(int64_val=int(value)) for value in values]
285294

286295
if feast_value_type in PYTHON_SCALAR_VALUE_TYPE_TO_PROTO_VALUE:
287296
(
@@ -290,27 +299,37 @@ def _python_value_to_proto_value(feast_value_type: ValueType, value: Any) -> Pro
290299
valid_scalar_types,
291300
) = PYTHON_SCALAR_VALUE_TYPE_TO_PROTO_VALUE[feast_value_type]
292301
if valid_scalar_types:
293-
assert type(value) in valid_scalar_types
294-
kwargs = {field_name: func(value)}
295-
return ProtoValue(**kwargs)
302+
assert type(sample) in valid_scalar_types
296303

297-
raise Exception(f"Unsupported data type: ${str(type(value))}")
304+
return [
305+
ProtoValue(**{field_name: func(value)})
306+
if not pd.isnull(value)
307+
else ProtoValue()
308+
for value in values
309+
]
298310

311+
raise Exception(f"Unsupported data type: ${str(type(values[0]))}")
299312

300-
def python_value_to_proto_value(
301-
value: Any, feature_type: ValueType = ValueType.UNKNOWN
302-
) -> ProtoValue:
313+
314+
def python_values_to_proto_values(
315+
values: List[Any], feature_type: ValueType = ValueType.UNKNOWN
316+
) -> List[ProtoValue]:
303317
value_type = feature_type
304-
if value is not None and feature_type == ValueType.UNKNOWN:
305-
if isinstance(value, (list, np.ndarray)):
318+
sample = next(filter(_non_empty_value, values), None) # first not empty value
319+
if sample is not None and feature_type == ValueType.UNKNOWN:
320+
if isinstance(sample, (list, np.ndarray)):
306321
value_type = (
307322
feature_type
308-
if len(value) == 0
309-
else python_type_to_feast_value_type("", value)
323+
if len(sample) == 0
324+
else python_type_to_feast_value_type("", sample)
310325
)
311326
else:
312-
value_type = python_type_to_feast_value_type("", value)
313-
return _python_value_to_proto_value(value_type, value)
327+
value_type = python_type_to_feast_value_type("", sample)
328+
329+
if value_type == ValueType.UNKNOWN:
330+
raise TypeError("Couldn't infer value type from empty value")
331+
332+
return _python_value_to_proto_value(value_type, values)
314333

315334

316335
def _proto_value_to_value_type(proto_value: ProtoValue) -> ValueType:
@@ -453,3 +472,15 @@ def pa_to_redshift_value_type(pa_type: pyarrow.DataType) -> str:
453472
}
454473

455474
return type_map[pa_type_as_str]
475+
476+
477+
def _non_empty_value(value: Any) -> bool:
478+
"""
479+
Check that there's enough data we can use for type inference.
480+
If primitive type - just checking that it's not None
481+
If iterable - checking that there's some elements (len > 0)
482+
String is special case: "" - empty string is considered non empty
483+
"""
484+
return value is not None and (
485+
not isinstance(value, Sized) or len(value) > 0 or isinstance(value, str)
486+
)

0 commit comments

Comments
 (0)