Skip to content

Commit e7cd32f

Browse files
authored
Merge pull request feast-dev#1 from redhatHameed/remote-offline
[WIP] feat: Added offline store Arrow Flight server/client
2 parents e88f1e3 + 47faa21 commit e7cd32f

File tree

6 files changed

+286
-0
lines changed

6 files changed

+286
-0
lines changed

sdk/python/feast/cli.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from feast import utils
2828
from feast.constants import (
2929
DEFAULT_FEATURE_TRANSFORMATION_SERVER_PORT,
30+
DEFAULT_OFFLINE_SERVER_PORT,
3031
DEFAULT_REGISTRY_SERVER_PORT,
3132
)
3233
from feast.errors import FeastObjectNotFoundException, FeastProviderLoginError
@@ -773,6 +774,34 @@ def serve_registry_command(ctx: click.Context, port: int):
773774
store.serve_registry(port)
774775

775776

777+
@cli.command("serve_offline")
778+
@click.option(
779+
"--host",
780+
"-h",
781+
type=click.STRING,
782+
default="127.0.0.1",
783+
show_default=True,
784+
help="Specify a host for the server",
785+
)
786+
@click.option(
787+
"--port",
788+
"-p",
789+
type=click.INT,
790+
default=DEFAULT_OFFLINE_SERVER_PORT,
791+
help="Specify a port for the server",
792+
)
793+
@click.pass_context
794+
def serve_offline_command(
795+
ctx: click.Context,
796+
host: str,
797+
port: int,
798+
):
799+
"""Start a remote server locally on a given host, port."""
800+
store = create_feature_store(ctx)
801+
802+
store.serve_offline(host, port)
803+
804+
776805
@cli.command("validate")
777806
@click.option(
778807
"--feature-service",

sdk/python/feast/constants.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@
4747
# Default registry server port
4848
DEFAULT_REGISTRY_SERVER_PORT = 6570
4949

50+
# Default offline server port
51+
DEFAULT_OFFLINE_SERVER_PORT = 8815
52+
5053
# Environment variable for feature server docker image tag
5154
DOCKER_IMAGE_TAG_ENV_NAME: str = "FEAST_SERVER_DOCKER_IMAGE_TAG"
5255

sdk/python/feast/feature_store.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2369,6 +2369,13 @@ def serve_registry(self, port: int) -> None:
23692369

23702370
registry_server.start_server(self, port)
23712371

2372+
@log_exceptions_and_usage
2373+
def serve_offline(self, host: str, port: int) -> None:
2374+
"""Start offline server locally on a given port."""
2375+
from feast import offline_server
2376+
2377+
offline_server.start_server(self, host, port)
2378+
23722379
@log_exceptions_and_usage
23732380
def serve_transformations(self, port: int) -> None:
23742381
"""Start the feature transformation server locally on a given port."""
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
import uuid
2+
from datetime import datetime
3+
from pathlib import Path
4+
from typing import Any, Callable, List, Literal, Optional, Union
5+
6+
import pandas as pd
7+
import pyarrow as pa
8+
import pyarrow.parquet
9+
from pydantic import StrictStr
10+
11+
from feast import OnDemandFeatureView
12+
from feast.data_source import DataSource
13+
from feast.feature_logging import LoggingConfig, LoggingSource
14+
from feast.feature_view import FeatureView
15+
from feast.infra.offline_stores.offline_store import (
16+
OfflineStore,
17+
RetrievalJob,
18+
)
19+
from feast.infra.registry.base_registry import BaseRegistry
20+
from feast.infra.registry.registry import Registry
21+
from feast.repo_config import FeastConfigBaseModel, RepoConfig
22+
from feast.usage import log_exceptions_and_usage
23+
24+
25+
class RemoteOfflineStoreConfig(FeastConfigBaseModel):
26+
27+
offline_type: StrictStr = "remote"
28+
""" str: Provider name or a class name that implements Offline store."""
29+
30+
path: StrictStr = ""
31+
""" str: Path to metadata store.
32+
If offline_type is 'remote', then this is a URL for offline server """
33+
34+
host: StrictStr = ""
35+
""" str: host to offline store.
36+
If offline_type is 'remote', then this is a host URL for offline store of arrow flight server """
37+
38+
port: StrictStr = ""
39+
""" str: host to offline store."""
40+
41+
42+
class RemoteRetrievalJob(RetrievalJob):
43+
def __init__(
44+
self,
45+
config: RepoConfig,
46+
feature_refs: List[str],
47+
entity_df: Union[pd.DataFrame, str],
48+
# TODO add missing parameters from the OfflineStore API
49+
):
50+
# Generate unique command identifier
51+
self.command = str(uuid.uuid4())
52+
# Initialize the client connection
53+
self.client = pa.flight.connect(f"grpc://{config.offline_store.host}:{config.offline_store.port}")
54+
# Put API parameters
55+
self._put_parameters(feature_refs, entity_df)
56+
57+
def _put_parameters(self, feature_refs, entity_df):
58+
entity_df_table = pa.Table.from_pandas(entity_df)
59+
historical_flight_descriptor = pa.flight.FlightDescriptor.for_command(self.command)
60+
writer, _ = self.client.do_put(historical_flight_descriptor,
61+
entity_df_table.schema.with_metadata({
62+
'command': self.command,
63+
'api': 'get_historical_features',
64+
'param': 'entity_df'}))
65+
writer.write_table(entity_df_table)
66+
writer.close()
67+
68+
features_array = pa.array(feature_refs)
69+
features_batch = pa.RecordBatch.from_arrays([features_array], ['features'])
70+
writer, _ = self.client.do_put(historical_flight_descriptor,
71+
features_batch.schema.with_metadata({
72+
'command': self.command,
73+
'api': 'get_historical_features',
74+
'param': 'features'}))
75+
writer.write_batch(features_batch)
76+
writer.close()
77+
78+
# Invoked to realize the Pandas DataFrame
79+
def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
80+
# We use arrow format because it gives better control of the table schema
81+
return self._to_arrow_internal().to_pandas()
82+
83+
# Invoked to synchronously execute the underlying query and return the result as an arrow table
84+
# This is where do_get service is invoked
85+
def _to_arrow_internal(self, timeout: Optional[int] = None) -> pa.Table:
86+
upload_descriptor = pa.flight.FlightDescriptor.for_command(self.command)
87+
flight = self.client.get_flight_info(upload_descriptor)
88+
ticket = flight.endpoints[0].ticket
89+
90+
reader = self.client.do_get(ticket)
91+
return reader.read_all()
92+
93+
@property
94+
def on_demand_feature_views(self) -> List[OnDemandFeatureView]:
95+
return []
96+
97+
98+
class RemoteOfflineStore(OfflineStore):
99+
def __init__(
100+
self,
101+
102+
arrow_host,
103+
arrow_port
104+
):
105+
self.arrow_host = arrow_host
106+
self.arrow_port = arrow_port
107+
108+
@log_exceptions_and_usage(offline_store="remote")
109+
def get_historical_features(
110+
self,
111+
config: RepoConfig,
112+
feature_views: List[FeatureView],
113+
feature_refs: List[str],
114+
entity_df: Union[pd.DataFrame, str],
115+
registry: Registry = None,
116+
project: str = '',
117+
full_feature_names: bool = False,
118+
) -> RemoteRetrievalJob:
119+
offline_store_config = config.offline_store
120+
assert isinstance(config.offline_store_config, RemoteOfflineStoreConfig)
121+
store_type = offline_store_config.type
122+
port = offline_store_config.port
123+
host = offline_store_config.host
124+
125+
return RemoteRetrievalJob(RepoConfig, feature_refs, entity_df)
126+
127+
@log_exceptions_and_usage(offline_store="remote")
128+
def pull_latest_from_table_or_query(self,
129+
config: RepoConfig,
130+
data_source: DataSource,
131+
join_key_columns: List[str],
132+
feature_name_columns: List[str],
133+
timestamp_field: str,
134+
created_timestamp_column: Optional[str],
135+
start_date: datetime,
136+
end_date: datetime) -> RetrievalJob:
137+
""" Pulls data from the offline store for use in materialization."""
138+
print("Pulling latest features from my offline store")
139+
# Implementation here.
140+
pass
141+
142+
def write_logged_features(
143+
config: RepoConfig,
144+
data: Union[pyarrow.Table, Path],
145+
source: LoggingSource,
146+
logging_config: LoggingConfig,
147+
registry: BaseRegistry,
148+
):
149+
""" Optional method to have Feast support logging your online features."""
150+
# Implementation here.
151+
pass
152+
153+
def offline_write_batch(
154+
config: RepoConfig,
155+
feature_view: FeatureView,
156+
table: pyarrow.Table,
157+
progress: Optional[Callable[[int], Any]],
158+
):
159+
# Implementation here.
160+
pass

sdk/python/feast/offline_server.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import ast
2+
3+
import pyarrow as pa
4+
import pyarrow.flight
5+
6+
from feast import FeatureStore
7+
8+
9+
class OfflineServer(pa.flight.FlightServerBase):
10+
def __init__(self, location=None):
11+
super(OfflineServer, self).__init__(location)
12+
self._location = location
13+
self.flights = {}
14+
self.store = FeatureStore
15+
16+
@classmethod
17+
def descriptor_to_key(self, descriptor):
18+
return (
19+
descriptor.descriptor_type.value,
20+
descriptor.command,
21+
tuple(descriptor.path or tuple()),
22+
)
23+
24+
def _make_flight_info(self, key, descriptor, table):
25+
endpoints = [pyarrow.flight.FlightEndpoint(repr(key), [self._location])]
26+
mock_sink = pyarrow.MockOutputStream()
27+
stream_writer = pyarrow.RecordBatchStreamWriter(mock_sink, table.schema)
28+
stream_writer.write_table(table)
29+
stream_writer.close()
30+
data_size = mock_sink.size()
31+
32+
return pyarrow.flight.FlightInfo(
33+
table.schema, descriptor, endpoints, table.num_rows, data_size
34+
)
35+
36+
def get_flight_info(self, context, descriptor):
37+
key = OfflineServer.descriptor_to_key(descriptor)
38+
if key in self.flights:
39+
table = self.flights[key]
40+
return self._make_flight_info(key, descriptor, table)
41+
raise KeyError("Flight not found.")
42+
43+
def list_flights(self, context, criteria):
44+
for key, table in self.flights.items():
45+
if key[1] is not None:
46+
descriptor = pyarrow.flight.FlightDescriptor.for_command(key[1])
47+
else:
48+
descriptor = pyarrow.flight.FlightDescriptor.for_path(*key[2])
49+
50+
yield self._make_flight_info(key, descriptor, table)
51+
52+
def do_put(self, context, descriptor, reader, writer):
53+
key = OfflineServer.descriptor_to_key(descriptor)
54+
self.flights[key] = reader.read_all()
55+
56+
def do_get(self, context, ticket):
57+
key = ast.literal_eval(ticket.ticket.decode())
58+
if key not in self.flights:
59+
return None
60+
61+
entity_df_key = self.flights[key]
62+
entity_df = pa.Table.to_pandas(entity_df_key)
63+
# Get feature data
64+
features_key = (2, b"features_descriptor", ())
65+
if features_key in self.flights:
66+
features_data = self.flights[features_key]
67+
features = pa.RecordBatch.to_pylist(features_data)
68+
features = [item["features"] for item in features]
69+
else:
70+
features = None
71+
72+
training_df = self.store.get_historical_features(entity_df, features).to_df()
73+
table = pa.Table.from_pandas(training_df)
74+
75+
return pa.flight.RecordBatchStream(table)
76+
77+
78+
def start_server(
79+
store: FeatureStore,
80+
host: str,
81+
port: int,
82+
):
83+
location = "grpc+tcp://{}:{}".format(host, port)
84+
server = OfflineServer(location)
85+
print("Serving on", location)
86+
server.serve()

sdk/python/feast/repo_config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
"athena": "feast.infra.offline_stores.contrib.athena_offline_store.athena.AthenaOfflineStore",
7878
"mssql": "feast.infra.offline_stores.contrib.mssql_offline_store.mssql.MsSqlServerOfflineStore",
7979
"duckdb": "feast.infra.offline_stores.duckdb.DuckDBOfflineStore",
80+
"remote": "feast.infra.offline_stores.remote.RemoteOfflineStore",
8081
}
8182

8283
FEATURE_SERVER_CONFIG_CLASS_FOR_TYPE = {

0 commit comments

Comments
 (0)