Skip to content

Commit d241a84

Browse files
Implement diff_infra_protos method for feast plan (feast-dev#2204)
* Rename proto methods for SqliteTable, DynamoDBTable, DatastoreTable Signed-off-by: Felix Wang <[email protected]> * Implement diff_infra Signed-off-by: Felix Wang <[email protected]> * Add tests for diff_infra logic Signed-off-by: Felix Wang <[email protected]> * Fix Datastore infra object Signed-off-by: Felix Wang <[email protected]>
1 parent 36c1d46 commit d241a84

File tree

6 files changed

+347
-47
lines changed

6 files changed

+347
-47
lines changed

sdk/python/feast/diff/infra_diff.py

Lines changed: 132 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,30 @@
11
from dataclasses import dataclass
2-
from typing import Any, List
2+
from typing import Any, Iterable, List, Tuple, TypeVar
33

44
from feast.diff.property_diff import PropertyDiff, TransitionType
5+
from feast.infra.infra_object import (
6+
DATASTORE_INFRA_OBJECT_CLASS_TYPE,
7+
DYNAMODB_INFRA_OBJECT_CLASS_TYPE,
8+
SQLITE_INFRA_OBJECT_CLASS_TYPE,
9+
InfraObject,
10+
)
11+
from feast.protos.feast.core.DatastoreTable_pb2 import (
12+
DatastoreTable as DatastoreTableProto,
13+
)
14+
from feast.protos.feast.core.DynamoDBTable_pb2 import (
15+
DynamoDBTable as DynamoDBTableProto,
16+
)
17+
from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto
18+
from feast.protos.feast.core.SqliteTable_pb2 import SqliteTable as SqliteTableProto
519

620

721
@dataclass
822
class InfraObjectDiff:
923
name: str
1024
infra_object_type: str
11-
current_fco: Any
12-
new_fco: Any
13-
fco_property_diffs: List[PropertyDiff]
25+
current_infra_object: Any
26+
new_infra_object: Any
27+
infra_object_property_diffs: List[PropertyDiff]
1428
transition_type: TransitionType
1529

1630

@@ -26,3 +40,117 @@ def update(self):
2640

2741
def to_string(self):
2842
pass
43+
44+
45+
U = TypeVar("U", DatastoreTableProto, DynamoDBTableProto, SqliteTableProto)
46+
47+
48+
def tag_infra_proto_objects_for_keep_delete_add(
49+
existing_objs: Iterable[U], desired_objs: Iterable[U]
50+
) -> Tuple[Iterable[U], Iterable[U], Iterable[U]]:
51+
existing_obj_names = {e.name for e in existing_objs}
52+
desired_obj_names = {e.name for e in desired_objs}
53+
54+
objs_to_add = [e for e in desired_objs if e.name not in existing_obj_names]
55+
objs_to_keep = [e for e in desired_objs if e.name in existing_obj_names]
56+
objs_to_delete = [e for e in existing_objs if e.name not in desired_obj_names]
57+
58+
return objs_to_keep, objs_to_delete, objs_to_add
59+
60+
61+
def diff_infra_protos(
62+
current_infra_proto: InfraProto, new_infra_proto: InfraProto
63+
) -> InfraDiff:
64+
infra_diff = InfraDiff()
65+
66+
infra_object_class_types_to_str = {
67+
DATASTORE_INFRA_OBJECT_CLASS_TYPE: "datastore table",
68+
DYNAMODB_INFRA_OBJECT_CLASS_TYPE: "dynamodb table",
69+
SQLITE_INFRA_OBJECT_CLASS_TYPE: "sqlite table",
70+
}
71+
72+
for infra_object_class_type in infra_object_class_types_to_str:
73+
current_infra_objects = get_infra_object_protos_by_type(
74+
current_infra_proto, infra_object_class_type
75+
)
76+
new_infra_objects = get_infra_object_protos_by_type(
77+
new_infra_proto, infra_object_class_type
78+
)
79+
(
80+
infra_objects_to_keep,
81+
infra_objects_to_delete,
82+
infra_objects_to_add,
83+
) = tag_infra_proto_objects_for_keep_delete_add(
84+
current_infra_objects, new_infra_objects,
85+
)
86+
87+
for e in infra_objects_to_add:
88+
infra_diff.infra_object_diffs.append(
89+
InfraObjectDiff(
90+
e.name,
91+
infra_object_class_types_to_str[infra_object_class_type],
92+
None,
93+
e,
94+
[],
95+
TransitionType.CREATE,
96+
)
97+
)
98+
for e in infra_objects_to_delete:
99+
infra_diff.infra_object_diffs.append(
100+
InfraObjectDiff(
101+
e.name,
102+
infra_object_class_types_to_str[infra_object_class_type],
103+
e,
104+
None,
105+
[],
106+
TransitionType.DELETE,
107+
)
108+
)
109+
for e in infra_objects_to_keep:
110+
current_infra_object = [
111+
_e for _e in current_infra_objects if _e.name == e.name
112+
][0]
113+
infra_diff.infra_object_diffs.append(
114+
diff_between(
115+
current_infra_object,
116+
e,
117+
infra_object_class_types_to_str[infra_object_class_type],
118+
)
119+
)
120+
121+
return infra_diff
122+
123+
124+
def get_infra_object_protos_by_type(
125+
infra_proto: InfraProto, infra_object_class_type: str
126+
) -> List[U]:
127+
return [
128+
InfraObject.from_infra_object_proto(infra_object).to_proto()
129+
for infra_object in infra_proto.infra_objects
130+
if infra_object.infra_object_class_type == infra_object_class_type
131+
]
132+
133+
134+
FIELDS_TO_IGNORE = {"project"}
135+
136+
137+
def diff_between(current: U, new: U, infra_object_type: str) -> InfraObjectDiff:
138+
assert current.DESCRIPTOR.full_name == new.DESCRIPTOR.full_name
139+
property_diffs = []
140+
transition: TransitionType = TransitionType.UNCHANGED
141+
if current != new:
142+
for _field in current.DESCRIPTOR.fields:
143+
if _field.name in FIELDS_TO_IGNORE:
144+
continue
145+
if getattr(current, _field.name) != getattr(new, _field.name):
146+
transition = TransitionType.UPDATE
147+
property_diffs.append(
148+
PropertyDiff(
149+
_field.name,
150+
getattr(current, _field.name),
151+
getattr(new, _field.name),
152+
)
153+
)
154+
return InfraObjectDiff(
155+
new.name, infra_object_type, current, new, property_diffs, transition,
156+
)

sdk/python/feast/infra/infra_object.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,29 @@
1919
from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto
2020
from feast.protos.feast.core.InfraObject_pb2 import InfraObject as InfraObjectProto
2121

22+
DATASTORE_INFRA_OBJECT_CLASS_TYPE = "feast.infra.online_stores.datastore.DatastoreTable"
23+
DYNAMODB_INFRA_OBJECT_CLASS_TYPE = "feast.infra.online_stores.dynamodb.DynamoDBTable"
24+
SQLITE_INFRA_OBJECT_CLASS_TYPE = "feast.infra.online_store.sqlite.SqliteTable"
25+
2226

2327
class InfraObject(ABC):
2428
"""
2529
Represents a single infrastructure object (e.g. online store table) managed by Feast.
2630
"""
2731

2832
@abstractmethod
29-
def to_proto(self) -> InfraObjectProto:
33+
def to_infra_object_proto(self) -> InfraObjectProto:
34+
"""Converts an InfraObject to its protobuf representation, wrapped in an InfraObjectProto."""
35+
pass
36+
37+
@abstractmethod
38+
def to_proto(self) -> Any:
3039
"""Converts an InfraObject to its protobuf representation."""
3140
pass
3241

3342
@staticmethod
3443
@abstractmethod
35-
def from_proto(infra_object_proto: InfraObjectProto) -> Any:
44+
def from_infra_object_proto(infra_object_proto: InfraObjectProto) -> Any:
3645
"""
3746
Returns an InfraObject created from a protobuf representation.
3847
@@ -46,7 +55,7 @@ def from_proto(infra_object_proto: InfraObjectProto) -> Any:
4655
cls = _get_infra_object_class_from_type(
4756
infra_object_proto.infra_object_class_type
4857
)
49-
return cls.from_proto(infra_object_proto)
58+
return cls.from_infra_object_proto(infra_object_proto)
5059

5160
raise ValueError("Could not identify the type of the InfraObject.")
5261

@@ -97,7 +106,7 @@ def from_proto(cls, infra_proto: InfraProto):
97106
"""
98107
infra = cls()
99108
cls.infra_objects += [
100-
InfraObject.from_proto(infra_object_proto)
109+
InfraObject.from_infra_object_proto(infra_object_proto)
101110
for infra_object_proto in infra_proto.infra_objects
102111
]
103112

sdk/python/feast/infra/online_stores/datastore.py

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@
2323
from pydantic.typing import Literal
2424

2525
from feast import Entity, utils
26+
from feast.errors import FeastProviderLoginError
2627
from feast.feature_view import FeatureView
27-
from feast.infra.infra_object import InfraObject
28+
from feast.infra.infra_object import DATASTORE_INFRA_OBJECT_CLASS_TYPE, InfraObject
2829
from feast.infra.online_stores.helpers import compute_entity_id
2930
from feast.infra.online_stores.online_store import OnlineStore
3031
from feast.protos.feast.core.DatastoreTable_pb2 import (
@@ -43,7 +44,7 @@
4344
from google.cloud import datastore
4445
from google.cloud.datastore.client import Key
4546
except ImportError as e:
46-
from feast.errors import FeastExtrasDependencyImportError, FeastProviderLoginError
47+
from feast.errors import FeastExtrasDependencyImportError
4748

4849
raise FeastExtrasDependencyImportError("gcp", str(e))
4950

@@ -332,14 +333,12 @@ class DatastoreTable(InfraObject):
332333
name: The name of the table.
333334
project_id (optional): The GCP project id.
334335
namespace (optional): Datastore namespace.
335-
client: Datastore client.
336336
"""
337337

338338
project: str
339339
name: str
340340
project_id: Optional[str]
341341
namespace: Optional[str]
342-
client: datastore.Client
343342

344343
def __init__(
345344
self,
@@ -352,51 +351,55 @@ def __init__(
352351
self.name = name
353352
self.project_id = project_id
354353
self.namespace = namespace
355-
self.client = _initialize_client(self.project_id, self.namespace)
356354

357-
def to_proto(self) -> InfraObjectProto:
355+
def to_infra_object_proto(self) -> InfraObjectProto:
356+
datastore_table_proto = self.to_proto()
357+
return InfraObjectProto(
358+
infra_object_class_type=DATASTORE_INFRA_OBJECT_CLASS_TYPE,
359+
datastore_table=datastore_table_proto,
360+
)
361+
362+
def to_proto(self) -> Any:
358363
datastore_table_proto = DatastoreTableProto()
359364
datastore_table_proto.project = self.project
360365
datastore_table_proto.name = self.name
361366
if self.project_id:
362-
datastore_table_proto.project_id.FromString(bytes(self.project_id, "utf-8"))
367+
datastore_table_proto.project_id.value = self.project_id
363368
if self.namespace:
364-
datastore_table_proto.namespace.FromString(bytes(self.namespace, "utf-8"))
365-
366-
return InfraObjectProto(
367-
infra_object_class_type="feast.infra.online_stores.datastore.DatastoreTable",
368-
datastore_table=datastore_table_proto,
369-
)
369+
datastore_table_proto.namespace.value = self.namespace
370+
return datastore_table_proto
370371

371372
@staticmethod
372-
def from_proto(infra_object_proto: InfraObjectProto) -> Any:
373+
def from_infra_object_proto(infra_object_proto: InfraObjectProto) -> Any:
373374
datastore_table = DatastoreTable(
374375
project=infra_object_proto.datastore_table.project,
375376
name=infra_object_proto.datastore_table.name,
376377
)
377378

378379
if infra_object_proto.datastore_table.HasField("project_id"):
379380
datastore_table.project_id = (
380-
infra_object_proto.datastore_table.project_id.SerializeToString()
381-
).decode("utf-8")
381+
infra_object_proto.datastore_table.project_id.value
382+
)
382383
if infra_object_proto.datastore_table.HasField("namespace"):
383384
datastore_table.namespace = (
384-
infra_object_proto.datastore_table.namespace.SerializeToString()
385-
).decode("utf-8")
385+
infra_object_proto.datastore_table.namespace.value
386+
)
386387

387388
return datastore_table
388389

389390
def update(self):
390-
key = self.client.key("Project", self.project, "Table", self.name)
391+
client = _initialize_client(self.project_id, self.namespace)
392+
key = client.key("Project", self.project, "Table", self.name)
391393
entity = datastore.Entity(
392394
key=key, exclude_from_indexes=("created_ts", "event_ts", "values")
393395
)
394396
entity.update({"created_ts": datetime.utcnow()})
395-
self.client.put(entity)
397+
client.put(entity)
396398

397399
def teardown(self):
398-
key = self.client.key("Project", self.project, "Table", self.name)
399-
_delete_all_values(self.client, key)
400+
client = _initialize_client(self.project_id, self.namespace)
401+
key = client.key("Project", self.project, "Table", self.name)
402+
_delete_all_values(client, key)
400403

401404
# Delete the table metadata datastore entity
402-
self.client.delete(key)
405+
client.delete(key)

sdk/python/feast/infra/online_stores/dynamodb.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from pydantic.typing import Literal
2020

2121
from feast import Entity, FeatureView, utils
22-
from feast.infra.infra_object import InfraObject
22+
from feast.infra.infra_object import DYNAMODB_INFRA_OBJECT_CLASS_TYPE, InfraObject
2323
from feast.infra.online_stores.helpers import compute_entity_id
2424
from feast.infra.online_stores.online_store import OnlineStore
2525
from feast.protos.feast.core.DynamoDBTable_pb2 import (
@@ -234,18 +234,21 @@ def __init__(self, name: str, region: str):
234234
self.name = name
235235
self.region = region
236236

237-
def to_proto(self) -> InfraObjectProto:
238-
dynamodb_table_proto = DynamoDBTableProto()
239-
dynamodb_table_proto.name = self.name
240-
dynamodb_table_proto.region = self.region
241-
237+
def to_infra_object_proto(self) -> InfraObjectProto:
238+
dynamodb_table_proto = self.to_proto()
242239
return InfraObjectProto(
243-
infra_object_class_type="feast.infra.online_stores.dynamodb.DynamoDBTable",
240+
infra_object_class_type=DYNAMODB_INFRA_OBJECT_CLASS_TYPE,
244241
dynamodb_table=dynamodb_table_proto,
245242
)
246243

244+
def to_proto(self) -> Any:
245+
dynamodb_table_proto = DynamoDBTableProto()
246+
dynamodb_table_proto.name = self.name
247+
dynamodb_table_proto.region = self.region
248+
return dynamodb_table_proto
249+
247250
@staticmethod
248-
def from_proto(infra_object_proto: InfraObjectProto) -> Any:
251+
def from_infra_object_proto(infra_object_proto: InfraObjectProto) -> Any:
249252
return DynamoDBTable(
250253
name=infra_object_proto.dynamodb_table.name,
251254
region=infra_object_proto.dynamodb_table.region,

sdk/python/feast/infra/online_stores/sqlite.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
from feast import Entity
2525
from feast.feature_view import FeatureView
26-
from feast.infra.infra_object import InfraObject
26+
from feast.infra.infra_object import SQLITE_INFRA_OBJECT_CLASS_TYPE, InfraObject
2727
from feast.infra.key_encoding_utils import serialize_entity_key
2828
from feast.infra.online_stores.online_store import OnlineStore
2929
from feast.protos.feast.core.InfraObject_pb2 import InfraObject as InfraObjectProto
@@ -241,18 +241,21 @@ def __init__(self, path: str, name: str):
241241
self.name = name
242242
self.conn = _initialize_conn(path)
243243

244-
def to_proto(self) -> InfraObjectProto:
245-
sqlite_table_proto = SqliteTableProto()
246-
sqlite_table_proto.path = self.path
247-
sqlite_table_proto.name = self.name
248-
244+
def to_infra_object_proto(self) -> InfraObjectProto:
245+
sqlite_table_proto = self.to_proto()
249246
return InfraObjectProto(
250-
infra_object_class_type="feast.infra.online_store.sqlite.SqliteTable",
247+
infra_object_class_type=SQLITE_INFRA_OBJECT_CLASS_TYPE,
251248
sqlite_table=sqlite_table_proto,
252249
)
253250

251+
def to_proto(self) -> Any:
252+
sqlite_table_proto = SqliteTableProto()
253+
sqlite_table_proto.path = self.path
254+
sqlite_table_proto.name = self.name
255+
return sqlite_table_proto
256+
254257
@staticmethod
255-
def from_proto(infra_object_proto: InfraObjectProto) -> Any:
258+
def from_infra_object_proto(infra_object_proto: InfraObjectProto) -> Any:
256259
return SqliteTable(
257260
path=infra_object_proto.sqlite_table.path,
258261
name=infra_object_proto.sqlite_table.name,

0 commit comments

Comments
 (0)