Skip to content

Commit 3364bad

Browse files
committed
fix: BatchFeatureView transformation should persist in Registry Ser/Deserialization
Signed-off-by: ntkathole <[email protected]>
1 parent 99a321a commit 3364bad

File tree

7 files changed

+208
-35
lines changed

7 files changed

+208
-35
lines changed

protos/feast/core/FeatureView.proto

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import "google/protobuf/duration.proto";
2626
import "google/protobuf/timestamp.proto";
2727
import "feast/core/DataSource.proto";
2828
import "feast/core/Feature.proto";
29+
import "feast/core/Transformation.proto";
2930

3031
message FeatureView {
3132
// User-specified specifications of this feature view.
@@ -35,7 +36,7 @@ message FeatureView {
3536
FeatureViewMeta meta = 2;
3637
}
3738

38-
// Next available id: 13
39+
// Next available id: 16
3940
// TODO(adchia): refactor common fields from this and ODFV into separate metadata proto
4041
message FeatureViewSpec {
4142
// Name of the feature view. Must be unique. Not updated.
@@ -81,6 +82,9 @@ message FeatureViewSpec {
8182
bool offline = 13;
8283

8384
repeated FeatureViewSpec source_views = 14;
85+
86+
// Feature transformation for batch feature views
87+
FeatureTransformationV2 feature_transformation = 15;
8488
}
8589

8690
message FeatureViewMeta {

sdk/python/feast/feature_view.py

Lines changed: 80 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@
3636
from feast.protos.feast.core.FeatureView_pb2 import (
3737
MaterializationInterval as MaterializationIntervalProto,
3838
)
39+
from feast.protos.feast.core.Transformation_pb2 import (
40+
FeatureTransformationV2 as FeatureTransformationProto,
41+
)
3942
from feast.types import from_value_type
4043
from feast.value_type import ValueType
4144

@@ -415,6 +418,27 @@ def to_proto_spec(
415418
source_view_protos = [
416419
view._to_proto_internal(seen).spec for view in self.source_views
417420
]
421+
422+
feature_transformation_proto = None
423+
if hasattr(self, "feature_transformation") and self.feature_transformation:
424+
from feast.protos.feast.core.Transformation_pb2 import (
425+
SubstraitTransformationV2 as SubstraitTransformationProto,
426+
)
427+
from feast.protos.feast.core.Transformation_pb2 import (
428+
UserDefinedFunctionV2 as UserDefinedFunctionProto,
429+
)
430+
431+
transformation_proto = self.feature_transformation.to_proto()
432+
433+
if isinstance(transformation_proto, UserDefinedFunctionProto):
434+
feature_transformation_proto = FeatureTransformationProto(
435+
user_defined_function=transformation_proto,
436+
)
437+
elif isinstance(transformation_proto, SubstraitTransformationProto):
438+
feature_transformation_proto = FeatureTransformationProto(
439+
substrait_transformation=transformation_proto,
440+
)
441+
418442
return FeatureViewSpecProto(
419443
name=self.name,
420444
entities=self.entities,
@@ -429,6 +453,7 @@ def to_proto_spec(
429453
batch_source=batch_source_proto,
430454
stream_source=stream_source_proto,
431455
source_views=source_view_protos,
456+
feature_transformation=feature_transformation_proto,
432457
)
433458

434459
def to_proto_meta(self):
@@ -498,21 +523,61 @@ def _from_proto_internal(
498523
for view_spec in feature_view_proto.spec.source_views
499524
]
500525

501-
feature_view = cls(
502-
name=feature_view_proto.spec.name,
503-
description=feature_view_proto.spec.description,
504-
tags=dict(feature_view_proto.spec.tags),
505-
owner=feature_view_proto.spec.owner,
506-
online=feature_view_proto.spec.online,
507-
offline=feature_view_proto.spec.offline,
508-
ttl=(
509-
timedelta(days=0)
510-
if feature_view_proto.spec.ttl.ToNanoseconds() == 0
511-
else feature_view_proto.spec.ttl.ToTimedelta()
512-
),
513-
source=source_views if source_views else batch_source,
514-
sink_source=batch_source if source_views else None,
515-
)
526+
has_transformation = feature_view_proto.spec.HasField("feature_transformation")
527+
528+
if has_transformation and cls == FeatureView:
529+
from feast.batch_feature_view import BatchFeatureView
530+
from feast.transformation.python_transformation import PythonTransformation
531+
from feast.transformation.substrait_transformation import (
532+
SubstraitTransformation,
533+
)
534+
535+
feature_transformation_proto = (
536+
feature_view_proto.spec.feature_transformation
537+
)
538+
transformation = None
539+
540+
if feature_transformation_proto.HasField("user_defined_function"):
541+
transformation = PythonTransformation.from_proto(
542+
feature_transformation_proto.user_defined_function
543+
)
544+
elif feature_transformation_proto.HasField("substrait_transformation"):
545+
transformation = SubstraitTransformation.from_proto(
546+
feature_transformation_proto.substrait_transformation
547+
)
548+
549+
feature_view: FeatureView = BatchFeatureView( # type: ignore[assignment]
550+
name=feature_view_proto.spec.name,
551+
description=feature_view_proto.spec.description,
552+
tags=dict(feature_view_proto.spec.tags),
553+
owner=feature_view_proto.spec.owner,
554+
online=feature_view_proto.spec.online,
555+
offline=feature_view_proto.spec.offline,
556+
ttl=(
557+
timedelta(days=0)
558+
if feature_view_proto.spec.ttl.ToNanoseconds() == 0
559+
else feature_view_proto.spec.ttl.ToTimedelta()
560+
),
561+
source=source_views if source_views else batch_source, # type: ignore[arg-type]
562+
sink_source=batch_source if source_views else None,
563+
feature_transformation=transformation,
564+
)
565+
else:
566+
feature_view = cls( # type: ignore[assignment]
567+
name=feature_view_proto.spec.name,
568+
description=feature_view_proto.spec.description,
569+
tags=dict(feature_view_proto.spec.tags),
570+
owner=feature_view_proto.spec.owner,
571+
online=feature_view_proto.spec.online,
572+
offline=feature_view_proto.spec.offline,
573+
ttl=(
574+
timedelta(days=0)
575+
if feature_view_proto.spec.ttl.ToNanoseconds() == 0
576+
else feature_view_proto.spec.ttl.ToTimedelta()
577+
),
578+
source=source_views if source_views else batch_source,
579+
sink_source=batch_source if source_views else None,
580+
)
516581
if stream_source:
517582
feature_view.stream_source = stream_source
518583

sdk/python/feast/infra/compute_engines/local/feature_builder.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,13 @@ def build_dedup_node(self, view, input_node):
7070

7171
def build_transformation_node(self, view, input_nodes):
7272
transform_config = view.feature_transformation
73+
transformation_fn = (
74+
transform_config.udf
75+
if hasattr(transform_config, "udf")
76+
else transform_config
77+
)
7378
node = LocalTransformationNode(
74-
"transform", transform_config, self.backend, inputs=input_nodes
79+
"transform", transformation_fn, self.backend, inputs=input_nodes
7580
)
7681
self.nodes.append(node)
7782
return node

sdk/python/feast/protos/feast/core/FeatureService_pb2.pyi

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ class FeatureServiceSpec(google.protobuf.message.Message):
7171
"""Name of Feast project that this Feature Service belongs to."""
7272
@property
7373
def features(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[feast.core.FeatureViewProjection_pb2.FeatureViewProjection]:
74-
"""Represents a projection that's to be applied on top of the FeatureView.
74+
"""Represents a projection that's to be applied on top of the FeatureView.
7575
Contains data such as the features to use from a FeatureView.
7676
"""
7777
@property

sdk/python/feast/protos/feast/core/FeatureView_pb2.py

Lines changed: 14 additions & 13 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sdk/python/feast/protos/feast/core/FeatureView_pb2.pyi

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import builtins
2020
import collections.abc
2121
import feast.core.DataSource_pb2
2222
import feast.core.Feature_pb2
23+
import feast.core.Transformation_pb2
2324
import google.protobuf.descriptor
2425
import google.protobuf.duration_pb2
2526
import google.protobuf.internal.containers
@@ -57,7 +58,7 @@ class FeatureView(google.protobuf.message.Message):
5758
global___FeatureView = FeatureView
5859

5960
class FeatureViewSpec(google.protobuf.message.Message):
60-
"""Next available id: 13
61+
"""Next available id: 16
6162
TODO(adchia): refactor common fields from this and ODFV into separate metadata proto
6263
"""
6364

@@ -92,6 +93,7 @@ class FeatureViewSpec(google.protobuf.message.Message):
9293
ONLINE_FIELD_NUMBER: builtins.int
9394
OFFLINE_FIELD_NUMBER: builtins.int
9495
SOURCE_VIEWS_FIELD_NUMBER: builtins.int
96+
FEATURE_TRANSFORMATION_FIELD_NUMBER: builtins.int
9597
name: builtins.str
9698
"""Name of the feature view. Must be unique. Not updated."""
9799
project: builtins.str
@@ -133,6 +135,9 @@ class FeatureViewSpec(google.protobuf.message.Message):
133135
"""Whether these features should be written to the offline store"""
134136
@property
135137
def source_views(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___FeatureViewSpec]: ...
138+
@property
139+
def feature_transformation(self) -> feast.core.Transformation_pb2.FeatureTransformationV2:
140+
"""Feature transformation (UDF or Substrait) for batch feature views"""
136141
def __init__(
137142
self,
138143
*,
@@ -150,9 +155,10 @@ class FeatureViewSpec(google.protobuf.message.Message):
150155
online: builtins.bool = ...,
151156
offline: builtins.bool = ...,
152157
source_views: collections.abc.Iterable[global___FeatureViewSpec] | None = ...,
158+
feature_transformation: feast.core.Transformation_pb2.FeatureTransformationV2 | None = ...,
153159
) -> None: ...
154-
def HasField(self, field_name: typing_extensions.Literal["batch_source", b"batch_source", "stream_source", b"stream_source", "ttl", b"ttl"]) -> builtins.bool: ...
155-
def ClearField(self, field_name: typing_extensions.Literal["batch_source", b"batch_source", "description", b"description", "entities", b"entities", "entity_columns", b"entity_columns", "features", b"features", "name", b"name", "offline", b"offline", "online", b"online", "owner", b"owner", "project", b"project", "source_views", b"source_views", "stream_source", b"stream_source", "tags", b"tags", "ttl", b"ttl"]) -> None: ...
160+
def HasField(self, field_name: typing_extensions.Literal["batch_source", b"batch_source", "feature_transformation", b"feature_transformation", "stream_source", b"stream_source", "ttl", b"ttl"]) -> builtins.bool: ...
161+
def ClearField(self, field_name: typing_extensions.Literal["batch_source", b"batch_source", "description", b"description", "entities", b"entities", "entity_columns", b"entity_columns", "feature_transformation", b"feature_transformation", "features", b"features", "name", b"name", "offline", b"offline", "online", b"online", "owner", b"owner", "project", b"project", "source_views", b"source_views", "stream_source", b"stream_source", "tags", b"tags", "ttl", b"ttl"]) -> None: ...
156162

157163
global___FeatureViewSpec = FeatureViewSpec
158164

0 commit comments

Comments
 (0)