Skip to content

Commit 1ba0d96

Browse files
authored
fix jar path handling in EMR launcher (feast-dev#1292)
Signed-off-by: Oleg Avdeev <[email protected]>
1 parent c96f111 commit 1ba0d96

File tree

1 file changed

+17
-9
lines changed
  • sdk/python/feast/pyspark/launchers/aws

1 file changed

+17
-9
lines changed

sdk/python/feast/pyspark/launchers/aws/emr.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -254,11 +254,15 @@ def offline_to_online_ingestion(
254254
BatchIngestionJob: wrapper around remote job that can be used to check when job completed.
255255
"""
256256

257-
jar_s3_path = _upload_jar(
258-
self._staging_location, ingestion_job_params.get_main_file_path()
259-
)
257+
ingestion_jar = ingestion_job_params.get_main_file_path()
258+
if ingestion_jar.startswith("s3://") or ingestion_jar.startswith("s3a://") or ingestion_jar.startswith("https://"):
259+
jar_path = ingestion_jar
260+
else:
261+
jar_path = _upload_jar(
262+
self._staging_location, ingestion_jar
263+
)
260264
step = _sync_offline_to_online_step(
261-
jar_s3_path,
265+
jar_path,
262266
ingestion_job_params.get_feature_table_name(),
263267
args=ingestion_job_params.get_arguments(),
264268
)
@@ -276,21 +280,25 @@ def start_stream_to_online_ingestion(
276280
Returns:
277281
StreamIngestionJob: wrapper around remote job that can be used to check on the job.
278282
"""
279-
jar_s3_path = _upload_jar(
280-
self._staging_location, ingestion_job_params.get_main_file_path()
281-
)
283+
ingestion_jar = ingestion_job_params.get_main_file_path()
284+
if ingestion_jar.startswith("s3://") or ingestion_jar.startswith("s3a://") or ingestion_jar.startswith("https://"):
285+
jar_path = ingestion_jar
286+
else:
287+
jar_path = _upload_jar(
288+
self._staging_location, ingestion_jar
289+
)
282290

283291
extra_jar_paths: List[str] = []
284292
for extra_jar in ingestion_job_params.get_extra_jar_paths():
285-
if extra_jar.startswith("s3://"):
293+
if extra_jar.startswith("s3://") or extra_jar.startswith("s3a://") or extra_jar.startswith("https://"):
286294
extra_jar_paths.append(extra_jar)
287295
else:
288296
extra_jar_paths.append(_upload_jar(self._staging_location, extra_jar))
289297

290298
job_hash = ingestion_job_params.get_job_hash()
291299

292300
step = _stream_ingestion_step(
293-
jar_s3_path,
301+
jar_path,
294302
extra_jar_paths,
295303
ingestion_job_params.get_feature_table_name(),
296304
args=ingestion_job_params.get_arguments(),

0 commit comments

Comments
 (0)