Skip to content

Commit af3056b

Browse files
committed
feat: Populate created and updated timestamp on data sources
Signed-off-by: ntkathole <[email protected]>
1 parent 7f43350 commit af3056b

File tree

22 files changed

+339
-73
lines changed

22 files changed

+339
-73
lines changed

protos/feast/core/DataSource.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ message DataSource {
9595
message SourceMeta {
9696
google.protobuf.Timestamp earliestEventTimestamp = 1;
9797
google.protobuf.Timestamp latestEventTimestamp = 2;
98+
google.protobuf.Timestamp created_timestamp = 3;
99+
google.protobuf.Timestamp last_updated_timestamp = 4;
98100
}
99101

100102
// Defines options for DataSource that sources features from a file

sdk/python/feast/data_source.py

Lines changed: 71 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import enum
1515
import warnings
1616
from abc import ABC, abstractmethod
17-
from datetime import timedelta
17+
from datetime import datetime, timedelta, timezone
1818
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple
1919

2020
from google.protobuf.duration_pb2 import Duration
@@ -27,6 +27,7 @@
2727
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
2828
from feast.repo_config import RepoConfig, get_data_source_class_from_type
2929
from feast.types import from_value_type
30+
from feast.utils import _utc_now
3031
from feast.value_type import ValueType
3132

3233

@@ -182,6 +183,8 @@ class DataSource(ABC):
182183
owner (optional): The owner of the data source, typically the email of the primary
183184
maintainer.
184185
date_partition_column (optional): Timestamp column used for partitioning. Not supported by all offline stores.
186+
created_timestamp: The time when the data source was created.
187+
last_updated_timestamp: The time when the data source was last updated.
185188
"""
186189

187190
name: str
@@ -192,6 +195,8 @@ class DataSource(ABC):
192195
tags: Dict[str, str]
193196
owner: str
194197
date_partition_column: str
198+
created_timestamp: Optional[datetime]
199+
last_updated_timestamp: Optional[datetime]
195200

196201
def __init__(
197202
self,
@@ -242,6 +247,9 @@ def __init__(
242247
self.date_partition_column = (
243248
date_partition_column if date_partition_column else ""
244249
)
250+
now = _utc_now()
251+
self.created_timestamp = now
252+
self.last_updated_timestamp = now
245253

246254
def __hash__(self):
247255
return hash((self.name, self.timestamp_field))
@@ -295,15 +303,32 @@ def from_proto(data_source: DataSourceProto) -> Any:
295303

296304
if data_source_type == DataSourceProto.SourceType.CUSTOM_SOURCE:
297305
cls = get_data_source_class_from_type(data_source.data_source_class_type)
298-
return cls.from_proto(data_source)
299-
cls = get_data_source_class_from_type(_DATA_SOURCE_OPTIONS[data_source_type])
300-
return cls.from_proto(data_source)
306+
data_source_instance = cls.from_proto(data_source)
307+
else:
308+
cls = get_data_source_class_from_type(
309+
_DATA_SOURCE_OPTIONS[data_source_type]
310+
)
311+
data_source_instance = cls.from_proto(data_source)
312+
313+
data_source_instance._extract_timestamps_from_proto(data_source)
314+
315+
return data_source_instance
301316

302-
@abstractmethod
303317
def to_proto(self) -> DataSourceProto:
304318
"""
305319
Converts a DataSourceProto object to its protobuf representation.
306320
"""
321+
proto = self._to_proto_impl()
322+
self._set_timestamps_in_proto(proto)
323+
324+
return proto
325+
326+
@abstractmethod
327+
def _to_proto_impl(self) -> DataSourceProto:
328+
"""
329+
Subclass implementation of protobuf conversion.
330+
This should be implemented by each DataSource subclass.
331+
"""
307332
raise NotImplementedError
308333

309334
def validate(self, config: RepoConfig):
@@ -340,6 +365,42 @@ def get_table_query_string(self) -> str:
340365
"""
341366
raise NotImplementedError
342367

368+
def _extract_timestamps_from_proto(self, data_source_proto: DataSourceProto):
369+
"""
370+
Internal method to extract created_timestamp and last_updated_timestamp from protobuf.
371+
Called automatically by the base from_proto method.
372+
"""
373+
if data_source_proto.HasField("meta"):
374+
if data_source_proto.meta.HasField("created_timestamp"):
375+
self.created_timestamp = (
376+
data_source_proto.meta.created_timestamp.ToDatetime().replace(
377+
tzinfo=timezone.utc
378+
)
379+
)
380+
if data_source_proto.meta.HasField("last_updated_timestamp"):
381+
self.last_updated_timestamp = (
382+
data_source_proto.meta.last_updated_timestamp.ToDatetime().replace(
383+
tzinfo=timezone.utc
384+
)
385+
)
386+
387+
def _set_timestamps_in_proto(self, data_source_proto: DataSourceProto):
388+
"""
389+
Internal method to set created_timestamp and last_updated_timestamp in protobuf.
390+
Called automatically by the base to_proto method.
391+
"""
392+
if not data_source_proto.HasField("meta"):
393+
data_source_proto.meta.CopyFrom(DataSourceProto.SourceMeta())
394+
395+
if self.created_timestamp:
396+
data_source_proto.meta.created_timestamp.FromDatetime(
397+
self.created_timestamp
398+
)
399+
if self.last_updated_timestamp:
400+
data_source_proto.meta.last_updated_timestamp.FromDatetime(
401+
self.last_updated_timestamp
402+
)
403+
343404

344405
@typechecked
345406
class KafkaSource(DataSource):
@@ -470,7 +531,7 @@ def from_proto(data_source: DataSourceProto):
470531
),
471532
)
472533

473-
def to_proto(self) -> DataSourceProto:
534+
def _to_proto_impl(self) -> DataSourceProto:
474535
data_source_proto = DataSourceProto(
475536
name=self.name,
476537
type=DataSourceProto.STREAM_KAFKA,
@@ -485,6 +546,7 @@ def to_proto(self) -> DataSourceProto:
485546
data_source_proto.created_timestamp_column = self.created_timestamp_column
486547
if self.batch_source:
487548
data_source_proto.batch_source.MergeFrom(self.batch_source.to_proto())
549+
488550
return data_source_proto
489551

490552
def validate(self, config: RepoConfig):
@@ -587,7 +649,7 @@ def from_proto(data_source: DataSourceProto):
587649
owner=data_source.owner,
588650
)
589651

590-
def to_proto(self) -> DataSourceProto:
652+
def _to_proto_impl(self) -> DataSourceProto:
591653
schema_pb = []
592654

593655
if isinstance(self.schema, Dict):
@@ -731,7 +793,7 @@ def __eq__(self, other):
731793
def __hash__(self):
732794
return super().__hash__()
733795

734-
def to_proto(self) -> DataSourceProto:
796+
def _to_proto_impl(self) -> DataSourceProto:
735797
data_source_proto = DataSourceProto(
736798
name=self.name,
737799
type=DataSourceProto.STREAM_KINESIS,
@@ -829,7 +891,7 @@ def from_proto(data_source: DataSourceProto):
829891
owner=data_source.owner,
830892
)
831893

832-
def to_proto(self) -> DataSourceProto:
894+
def _to_proto_impl(self) -> DataSourceProto:
833895
data_source_proto = DataSourceProto(
834896
name=self.name,
835897
type=DataSourceProto.PUSH_SOURCE,

sdk/python/feast/feature_view.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,7 @@ def __eq__(self, other):
289289
or self.batch_source != other.batch_source
290290
or self.stream_source != other.stream_source
291291
or sorted(self.entity_columns) != sorted(other.entity_columns)
292+
or self.materialization_intervals != other.materialization_intervals
292293
):
293294
return False
294295

sdk/python/feast/infra/offline_stores/bigquery_source.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ def from_proto(data_source: DataSourceProto):
120120
owner=data_source.owner,
121121
)
122122

123-
def to_proto(self) -> DataSourceProto:
123+
def _to_proto_impl(self) -> DataSourceProto:
124124
data_source_proto = DataSourceProto(
125125
name=self.name,
126126
type=DataSourceProto.BATCH_BIGQUERY,

sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena_source.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ def data_source(self):
144144
"""Returns the Athena data_source of this Athena source."""
145145
return self.athena_options.data_source
146146

147-
def to_proto(self) -> DataSourceProto:
147+
def _to_proto_impl(self) -> DataSourceProto:
148148
"""
149149
Converts a RedshiftSource object to its protobuf representation.
150150

sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/clickhouse_source.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ def from_proto(data_source: DataSourceProto) -> Any:
105105
owner=data_source.owner,
106106
)
107107

108-
def to_proto(self) -> DataSourceProto:
108+
def _to_proto_impl(self) -> DataSourceProto:
109109
data_source_proto = DataSourceProto(
110110
name=self.name,
111111
type=DataSourceProto.CUSTOM_SOURCE,

sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase_source.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ def from_proto(data_source: DataSourceProto):
121121
owner=data_source.owner,
122122
)
123123

124-
def to_proto(self) -> DataSourceProto:
124+
def _to_proto_impl(self) -> DataSourceProto:
125125
data_source_proto = DataSourceProto(
126126
name=self.name,
127127
type=DataSourceProto.CUSTOM_SOURCE,

sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/mssqlserver_source.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ def from_proto(data_source: DataSourceProto):
220220
date_partition_column=data_source.date_partition_column,
221221
)
222222

223-
def to_proto(self) -> DataSourceProto:
223+
def _to_proto_impl(self) -> DataSourceProto:
224224
data_source_proto = DataSourceProto(
225225
type=DataSourceProto.CUSTOM_SOURCE,
226226
data_source_class_type="feast.infra.offline_stores.contrib.mssql_offline_store.mssqlserver_source.MsSqlServerSource",

sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres_source.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ def from_proto(data_source: DataSourceProto):
103103
owner=data_source.owner,
104104
)
105105

106-
def to_proto(self) -> DataSourceProto:
106+
def _to_proto_impl(self) -> DataSourceProto:
107107
data_source_proto = DataSourceProto(
108108
name=self.name,
109109
type=DataSourceProto.CUSTOM_SOURCE,

sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ def from_proto(data_source: DataSourceProto) -> Any:
157157
owner=data_source.owner,
158158
)
159159

160-
def to_proto(self) -> DataSourceProto:
160+
def _to_proto_impl(self) -> DataSourceProto:
161161
data_source_proto = DataSourceProto(
162162
name=self.name,
163163
type=DataSourceProto.BATCH_SPARK,

0 commit comments

Comments
 (0)