Skip to content

Commit 0c20a4e

Browse files
authored
fix: Enable Spark materialization on Yarn (feast-dev#3370)
* fix: Fix Spark materialization engine to work on Yarn Signed-off-by: Danny Chiao <[email protected]> * fix Signed-off-by: Danny Chiao <[email protected]> * fix Signed-off-by: Danny Chiao <[email protected]> * add docs Signed-off-by: Danny Chiao <[email protected]> Signed-off-by: Danny Chiao <[email protected]>
1 parent 00fa21f commit 0c20a4e

File tree

2 files changed

+38
-8
lines changed

2 files changed

+38
-8
lines changed

docs/reference/batch-materialization/spark.md

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,37 @@ batch_engine:
1919
partitions: [optional num partitions to use to write to online store]
2020
```
2121
{% endcode %}
22+
23+
## Example in Python
24+
25+
{% code title="feature_store.py" %}
26+
```python
27+
from feast import FeatureStore, RepoConfig
28+
from feast.repo_config import RegistryConfig
29+
from feast.infra.online_stores.dynamodb import DynamoDBOnlineStoreConfig
30+
from feast.infra.offline_stores.contrib.spark_offline_store.spark import SparkOfflineStoreConfig
31+
32+
repo_config = RepoConfig(
33+
registry="s3://[YOUR_BUCKET]/feast-registry.db",
34+
project="feast_repo",
35+
provider="aws",
36+
offline_store=SparkOfflineStoreConfig(
37+
spark_conf={
38+
"spark.ui.enabled": "false",
39+
"spark.eventLog.enabled": "false",
40+
"spark.sql.catalogImplementation": "hive",
41+
"spark.sql.parser.quotedRegexColumnNames": "true",
42+
"spark.sql.session.timeZone": "UTC"
43+
}
44+
),
45+
batch_engine={
46+
"type": "spark.engine",
47+
"partitions": 10
48+
},
49+
online_store=DynamoDBOnlineStoreConfig(region="us-west-1"),
50+
entity_key_serialization_version=2
51+
)
52+
53+
store = FeatureStore(config=repo_config)
54+
```
55+
{% endcode %}

sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import tempfile
21
from dataclasses import dataclass
32
from datetime import datetime
43
from typing import Callable, List, Literal, Optional, Sequence, Union
@@ -196,7 +195,7 @@ class _SparkSerializedArtifacts:
196195
"""Class to assist with serializing unpicklable artifacts to the spark workers"""
197196

198197
feature_view_proto: str
199-
repo_config_file: str
198+
repo_config_byte: str
200199

201200
@classmethod
202201
def serialize(cls, feature_view, repo_config):
@@ -205,12 +204,10 @@ def serialize(cls, feature_view, repo_config):
205204
feature_view_proto = feature_view.to_proto().SerializeToString()
206205

207206
# serialize repo_config to disk. Will be used to instantiate the online store
208-
repo_config_file = tempfile.NamedTemporaryFile(delete=False).name
209-
with open(repo_config_file, "wb") as f:
210-
dill.dump(repo_config, f)
207+
repo_config_byte = dill.dumps(repo_config)
211208

212209
return _SparkSerializedArtifacts(
213-
feature_view_proto=feature_view_proto, repo_config_file=repo_config_file
210+
feature_view_proto=feature_view_proto, repo_config_byte=repo_config_byte
214211
)
215212

216213
def unserialize(self):
@@ -220,8 +217,7 @@ def unserialize(self):
220217
feature_view = FeatureView.from_proto(proto)
221218

222219
# load
223-
with open(self.repo_config_file, "rb") as f:
224-
repo_config = dill.load(f)
220+
repo_config = dill.loads(self.repo_config_byte)
225221

226222
provider = PassthroughProvider(repo_config)
227223
online_store = provider.online_store

0 commit comments

Comments
 (0)