From 808dfca32405ddd0194837cf8e8837ebd9dda1b0 Mon Sep 17 00:00:00 2001 From: Sai Sharan Tangeda Date: Wed, 9 Mar 2022 00:09:01 +0530 Subject: [PATCH 1/4] Add Databricks as Spark Launcher for feast --- cluster/sdk/python/feast_spark/constants.py | 24 +- .../historical_feature_retrieval_job.py | 25 +- .../python/feast_spark/pyspark/launcher.py | 21 + .../pyspark/launchers/databricks/__init__.py | 13 + .../launchers/databricks/databricks.py | 409 +++++++++++++++++ .../databricks/databricks_api_wrapper.py | 142 ++++++ .../launchers/databricks/databricks_utils.py | 415 ++++++++++++++++++ cluster/sdk/python/requirements-ci.txt | 178 +++++++- .../scala/feast/ingestion/IngestionJob.scala | 11 +- .../feast/ingestion/IngestionJobConfig.scala | 3 +- 10 files changed, 1214 insertions(+), 27 deletions(-) create mode 100644 cluster/sdk/python/feast_spark/pyspark/launchers/databricks/__init__.py create mode 100644 cluster/sdk/python/feast_spark/pyspark/launchers/databricks/databricks.py create mode 100644 cluster/sdk/python/feast_spark/pyspark/launchers/databricks/databricks_api_wrapper.py create mode 100644 cluster/sdk/python/feast_spark/pyspark/launchers/databricks/databricks_utils.py diff --git a/cluster/sdk/python/feast_spark/constants.py b/cluster/sdk/python/feast_spark/constants.py index 6d34810..ca3a01d 100644 --- a/cluster/sdk/python/feast_spark/constants.py +++ b/cluster/sdk/python/feast_spark/constants.py @@ -94,7 +94,7 @@ class ConfigOptions(metaclass=ConfigMeta): SPARK_K8S_JOB_TEMPLATE_PATH = None # Synapse dev url - AZURE_SYNAPSE_DEV_URL: Optional[str] = None + AZURE_SYNAPSE_DEV_URL: Optional[str] = None # Synapse pool name AZURE_SYNAPSE_POOL_NAME: Optional[str] = None @@ -110,10 +110,28 @@ class ConfigOptions(metaclass=ConfigMeta): # Azure EventHub Connection String (with Kafka API). See more details here: # https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-migration-guide - # Code Sample is here: + # Code Sample is here: # https://github.com/Azure/azure-event-hubs-for-kafka/blob/master/tutorials/spark/sparkConsumer.scala AZURE_EVENTHUB_KAFKA_CONNECTION_STRING = "" - + + # Databricks: Access Token + DATABRICKS_ACCESS_TOKEN: Optional[str] = None + + # Databricks: Host (https included URL of the databricks workspace) + DATABRICKS_HOST_URL: Optional[str] = None + + # Databricks: Common Cluster Id + DATABRICKS_COMMON_CLUSTER_ID: Optional[str] = None + + # Databricks: Dedicated Streaming Cluster Id [Optional Dedicated Cluster for streaming use-cases] + DATABRICKS_STREAMING_CLUSTER_ID: Optional[str] = None + + # Databricks: Maximum runs to retrieve + DATABRICKS_MAXIMUM_RUNS_TO_RETRIEVE: Optional[str] = None + + # Databricks: Mounted Storage Path (Ex: /mnt/) + DATABRICKS_MOUNTED_STORAGE_PATH: Optional[str] = None + #: File format of historical retrieval features HISTORICAL_FEATURE_OUTPUT_FORMAT: str = "parquet" diff --git a/cluster/sdk/python/feast_spark/pyspark/historical_feature_retrieval_job.py b/cluster/sdk/python/feast_spark/pyspark/historical_feature_retrieval_job.py index 6d3b979..6a6b5d3 100644 --- a/cluster/sdk/python/feast_spark/pyspark/historical_feature_retrieval_job.py +++ b/cluster/sdk/python/feast_spark/pyspark/historical_feature_retrieval_job.py @@ -623,10 +623,18 @@ def filter_feature_table_by_time_range( def _read_and_verify_entity_df_from_source( spark: SparkSession, source: Source ) -> DataFrame: + spark_path = source.spark_path + + # Handle read for databricks mounted storage + if mounted_staging_location is not None: + print(f"Using Databricks Mounted path:{mounted_staging_location}") + relative_path = source.spark_path.rsplit("/", 1)[1] if not source.spark_path.endswith( + "/") else source.spark_path.rsplit("/", 2)[1] + spark_path = mounted_staging_location + relative_path entity_df = ( spark.read.format(source.spark_format) .options(**source.spark_read_options) - .load(source.spark_path) + .load(spark_path) ) mapped_entity_df = _map_column(entity_df, source.field_mapping) @@ -860,6 +868,10 @@ def _get_args(): parser.add_argument( "--destination", type=str, help="Retrieval result destination in json string" ) + parser.add_argument( + "--mounted_staging_location", type=str, help="dbfs mounted staging path for verifying entity_source " + "(Only for databricks)", default="" + ) parser.add_argument("--checkpoint", type=str, help="Spark Checkpoint location") return parser.parse_args() @@ -882,6 +894,10 @@ def json_b64_decode(s: str) -> Any: return json.loads(b64decode(s.encode("ascii"))) +def b64_decode(obj: str) -> str: + return str(b64decode(obj.encode("ascii")).decode("ascii")) + + if __name__ == "__main__": spark = SparkSession.builder.getOrCreate() args = _get_args() @@ -889,6 +905,8 @@ def json_b64_decode(s: str) -> Any: feature_tables_sources_conf = json_b64_decode(args.feature_tables_sources) entity_source_conf = json_b64_decode(args.entity_source) destination_conf = json_b64_decode(args.destination) + mounted_staging_location = b64_decode(args.mounted_staging_location) if not b64_decode( + args.mounted_staging_location) == "" else None if args.checkpoint: spark.sparkContext.setCheckpointDir(args.checkpoint) @@ -903,4 +921,7 @@ def json_b64_decode(s: str) -> Any: except Exception as e: logger.exception(e) raise e - spark.stop() + + # Databricks clusters do not allow this + if mounted_staging_location is None: + spark.stop() diff --git a/cluster/sdk/python/feast_spark/pyspark/launcher.py b/cluster/sdk/python/feast_spark/pyspark/launcher.py index f8f8dc2..b7f8ab9 100644 --- a/cluster/sdk/python/feast_spark/pyspark/launcher.py +++ b/cluster/sdk/python/feast_spark/pyspark/launcher.py @@ -94,6 +94,26 @@ def _synapse_launcher(config: Config) -> JobLauncher: executors=int(config.get(opt.AZURE_SYNAPSE_EXECUTORS)) ) +def _databricks_launcher(config: Config) -> JobLauncher: + from feast_spark.pyspark.launchers import databricks + + staging_location = config.get(opt.SPARK_STAGING_LOCATION) + staging_uri = urlparse(staging_location) + + return databricks.DatabricksJobLauncher( + databricks_access_token=config.get(opt.DATABRICKS_ACCESS_TOKEN), + databricks_host_url=config.get(opt.DATABRICKS_HOST_URL), + staging_client=get_staging_client(staging_uri.scheme, config), + databricks_common_cluster_id=config.get(opt.DATABRICKS_COMMON_CLUSTER_ID), + databricks_streaming_cluster_id=config.get(opt.DATABRICKS_STREAMING_CLUSTER_ID, None), + databricks_max_active_jobs_to_retrieve=config.getint(opt.DATABRICKS_MAXIMUM_RUNS_TO_RETRIEVE, None), + mounted_staging_location=config.get(opt.DATABRICKS_MOUNTED_STORAGE_PATH, None), + azure_account_name=config.get(opt.AZURE_BLOB_ACCOUNT_NAME, None), + azure_account_key=config.get(opt.AZURE_BLOB_ACCOUNT_ACCESS_KEY, None), + staging_location=staging_location + ) + + _launchers = { "standalone": _standalone_launcher, @@ -101,6 +121,7 @@ def _synapse_launcher(config: Config) -> JobLauncher: "emr": _emr_launcher, "k8s": _k8s_launcher, 'synapse': _synapse_launcher, + 'databricks': _databricks_launcher, } diff --git a/cluster/sdk/python/feast_spark/pyspark/launchers/databricks/__init__.py b/cluster/sdk/python/feast_spark/pyspark/launchers/databricks/__init__.py new file mode 100644 index 0000000..f82185a --- /dev/null +++ b/cluster/sdk/python/feast_spark/pyspark/launchers/databricks/__init__.py @@ -0,0 +1,13 @@ +from .databricks import ( + DatabricksBatchIngestionJob, + DatabricksJobLauncher, + DatabricksRetrievalJob, + DatabricksStreamIngestionJob, +) + +__all__ = [ + "DatabricksRetrievalJob", + "DatabricksBatchIngestionJob", + "DatabricksStreamIngestionJob", + "DatabricksJobLauncher", +] diff --git a/cluster/sdk/python/feast_spark/pyspark/launchers/databricks/databricks.py b/cluster/sdk/python/feast_spark/pyspark/launchers/databricks/databricks.py new file mode 100644 index 0000000..fbc01d4 --- /dev/null +++ b/cluster/sdk/python/feast_spark/pyspark/launchers/databricks/databricks.py @@ -0,0 +1,409 @@ +import json +import random +import string +import time +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List, Optional, cast +from io import BytesIO +from urllib.parse import urlunparse, urlparse + +import requests +import yaml +from feast.staging.storage_client import AbstractStagingClient, get_staging_client + +from feast_spark.pyspark.abc import ( + BatchIngestionJob, + BatchIngestionJobParameters, + JobLauncher, + RetrievalJob, + RetrievalJobParameters, + ScheduledBatchIngestionJobParameters, + SparkJob, + SparkJobFailure, + SparkJobStatus, + StreamIngestionJob, + StreamIngestionJobParameters, +) + +from .databricks_utils import ( + HISTORICAL_RETRIEVAL_JOB_TYPE, + LABEL_FEATURE_TABLE, + METADATA_JOBHASH, + METADATA_OUTPUT_URI, + OFFLINE_TO_ONLINE_JOB_TYPE, + STREAM_TO_ONLINE_JOB_TYPE, + _cancel_job_by_id, + _get_job_by_id, + _list_jobs, + _submit_job, DatabricksJobManager, DatabricksJobInfo, HISTORICAL_RETRIEVAL_JOB_TYPE_CODE, + get_job_metadata, OFFLINE_TO_ONLINE_JOB_TYPE_CODE, _generate_job_extra_metadata, STREAM_TO_ONLINE_JOB_TYPE_CODE, + b64_encode, +) + + +def _load_resource_template(job_template_path: Path) -> Dict[str, Any]: + with open(job_template_path, "rt") as f: + return yaml.safe_load(f) + + +def _generate_job_id() -> str: + return "feast-" + "".join( + random.choice(string.ascii_lowercase + string.digits) for _ in range(8) + ) + + +def _generate_scheduled_job_id(project: str, feature_table_name: str) -> str: + scheduled_job_id = f"feast-{project}-{feature_table_name}".replace("_", "-") + k8s_res_name_char_limit = 253 + + return ( + scheduled_job_id + if len(scheduled_job_id) <= k8s_res_name_char_limit + else scheduled_job_id[:k8s_res_name_char_limit] + ) + + +def _truncate_label(label: str) -> str: + return label[:63] + + +class DatabricksJobMixin: + def __init__(self, api: DatabricksJobManager, job_id: int): + self._api = api + self._job_id = job_id + + def get_id(self) -> int: + return self._job_id + + def get_status(self) -> SparkJobStatus: + job = _get_job_by_id(self._api, self._job_id) + assert job is not None + return job.state + + def get_start_time(self) -> datetime: + job = _get_job_by_id(self._api, self._job_id) + assert job is not None + return job.start_time + + def cancel(self): + _cancel_job_by_id(self._api, self._job_id) + + def _wait_for_complete(self, timeout_seconds: Optional[float]) -> bool: + """ Returns true if the job completed successfully """ + start_time = time.time() + while (timeout_seconds is None) or (time.time() - start_time < timeout_seconds): + status = self.get_status() + if status == SparkJobStatus.COMPLETED: + return True + elif status == SparkJobStatus.FAILED: + return False + else: + time.sleep(1) + else: + raise TimeoutError("Timeout waiting for job to complete") + + +class DatabricksRetrievalJob(DatabricksJobMixin, RetrievalJob): + """ + Historical feature retrieval job result for a synapse cluster + """ + + def __init__( + self, api: DatabricksJobManager, job_id: int, output_file_uri: str + ): + """ + This is the job object representing the historical retrieval job, returned by SynapseClusterLauncher. + + Args: + output_file_uri (str): Uri to the historical feature retrieval job output file. + """ + super().__init__(api, job_id) + self._output_file_uri = output_file_uri + + def get_output_file_uri(self, timeout_sec=None, block=True): + if not block: + return self._output_file_uri + + if self._wait_for_complete(timeout_sec): + return self._output_file_uri + else: + raise SparkJobFailure("Spark job failed") + + +class DatabricksBatchIngestionJob(DatabricksJobMixin, BatchIngestionJob): + """ + Ingestion job result for a synapse cluster + """ + + def __init__( + self, api: DatabricksJobManager, job_id: int, feature_table: str + ): + super().__init__(api, job_id) + self._feature_table = feature_table + + def get_feature_table(self) -> str: + return self._feature_table + + +class DatabricksStreamIngestionJob(DatabricksJobMixin, StreamIngestionJob): + """ + Ingestion streaming job for a synapse cluster + """ + + def __init__( + self, + api: DatabricksJobManager, + job_id: int, + job_hash: str, + feature_table: str, + ): + super().__init__(api, job_id) + self._job_hash = job_hash + self._feature_table = feature_table + + def get_hash(self) -> str: + return self._job_hash + + def get_feature_table(self) -> str: + return self._feature_table + + +class DatabricksJobLauncher(JobLauncher): + """ + Submits spark jobs to a spark cluster. Currently supports only historical feature retrieval jobs. + Databricks Job Launcher supports mounted storage accounts as well + """ + + def __init__( + self, + databricks_access_token: str, + databricks_host_url: str, + databricks_common_cluster_id: str, + staging_client: AbstractStagingClient, + databricks_streaming_cluster_id: Optional[str] = None, + databricks_max_active_jobs_to_retrieve: Optional[int] = 5000, + mounted_staging_location: Optional[str] = None, + azure_account_key: Optional[str] = None, + staging_location: Optional[str] = None, + azure_account_name: Optional[str] = None, + ): + if mounted_staging_location is None and (azure_account_key is None or staging_location is None or + azure_account_name is None): + raise Exception("Error: Storage path unavailable, Please add mounted storage location or " + "remote azure storage location") + self._staging_location = staging_location + self._storage_client = staging_client + self._azure_account_name = azure_account_name + self._azure_account_key = azure_account_key + + self._mounted_staging_location = mounted_staging_location + '/' if mounted_staging_location[ + -1] != '/' else mounted_staging_location + if self._mounted_staging_location.startswith("/dbfs/"): + self._mounted_staging_location = "dbfs:" + self._mounted_staging_location[5] + + self._api = DatabricksJobManager(databricks_access_token=databricks_access_token, + databricks_host_url=databricks_host_url, + cluster_id=databricks_common_cluster_id, + streaming_cluster_id=databricks_streaming_cluster_id, + max_active_jobs_to_retrieve=databricks_max_active_jobs_to_retrieve) + + def _job_from_job_info(self, job_info: DatabricksJobInfo) -> SparkJob: + job_type, job_extra_metadata = get_job_metadata(job_info) + if job_type == HISTORICAL_RETRIEVAL_JOB_TYPE: + assert METADATA_OUTPUT_URI in job_extra_metadata + return DatabricksRetrievalJob( + api=self._api, + job_id=job_info.job_id, + output_file_uri=job_extra_metadata[METADATA_OUTPUT_URI], + ) + elif job_type == OFFLINE_TO_ONLINE_JOB_TYPE: + return DatabricksBatchIngestionJob( + api=self._api, + job_id=job_info.job_id, + feature_table=job_extra_metadata.get(LABEL_FEATURE_TABLE, ""), + ) + elif job_type == STREAM_TO_ONLINE_JOB_TYPE: + # job_hash must not be None for stream ingestion jobs + assert METADATA_JOBHASH in job_extra_metadata + return DatabricksStreamIngestionJob( + api=self._api, + job_id=job_info.job_id, + job_hash=job_extra_metadata[METADATA_JOBHASH], + feature_table=job_extra_metadata.get(LABEL_FEATURE_TABLE, ""), + ) + else: + # We should never get here + raise ValueError(f"Unknown job type {job_type}") + + def historical_feature_retrieval( + self, job_params: RetrievalJobParameters + ) -> RetrievalJob: + """ + Submits a historical feature retrieval job to a Spark cluster. + + Raises: + SparkJobFailure: The spark job submission failed, encountered error + during execution, or timeout. + + Returns: + RetrievalJob: wrapper around remote job that returns file uri to the result file. + """ + with open(job_params.get_main_file_path()) as f: + pyspark_script = f.read() + + pyspark_script_path = urlunparse( + self._storage_client.upload_fileobj( + BytesIO(pyspark_script.encode("utf8")), + local_path="historical_retrieval.py", + remote_path_prefix=self._staging_location, + remote_path_suffix=".py", + ) + ) + relative_path = str(pyspark_script_path).rsplit("/", 1)[1] if not pyspark_script_path.endswith("/") else str( + pyspark_script_path).rsplit("/", 2)[1] + + dbfs_mounted_path = self._mounted_staging_location + relative_path + job_args = job_params.get_arguments() + job_args.extend(["--mounted_staging_location", b64_encode(self._mounted_staging_location)]) + spark_python_task_info = { + "python_file": dbfs_mounted_path, + "parameters": job_args + } + job_extra_metadata = {METADATA_OUTPUT_URI: job_params.get_destination_path()} + job_info = _submit_job(self._api, HISTORICAL_RETRIEVAL_JOB_TYPE_CODE, job_extra_metadata, + spark_task_type="spark_python_task", spark_task_info=spark_python_task_info) + + return cast(RetrievalJob, self._job_from_job_info(job_info)) + + def _upload_jar(self, jar_path: str, jar_name: str) -> str: + if jar_path.startswith("dbfs:/"): + return jar_path + elif ( + jar_path.startswith("s3://") + or jar_path.startswith("s3a://") + or jar_path.startswith("https://") + or jar_path.startswith("local://") + or jar_path.startswith("wasbs://") + ): + r = requests.get(jar_path, allow_redirects=True) + + jar_path = urlunparse(self._storage_client.upload_fileobj( + BytesIO(r.content), + jar_name, + remote_path_prefix=self._staging_location, + remote_path_suffix=".jar", + )) + return jar_path + elif jar_path.startswith("file://"): + local_jar_path = urlparse(jar_path).path + else: + local_jar_path = jar_path + with open(local_jar_path, "rb") as f: + return urlunparse( + self._storage_client.upload_fileobj( + f, + local_jar_path, + remote_path_prefix=self._staging_location, + remote_path_suffix=".jar", + ) + ) + + def offline_to_online_ingestion( + self, ingestion_job_params: BatchIngestionJobParameters + ) -> BatchIngestionJob: + """ + Submits a batch ingestion job to a Spark cluster. + + Raises: + SparkJobFailure: The spark job submission failed, encountered error + during execution, or timeout. + + Returns: + BatchIngestionJob: wrapper around remote job that can be used to check when job completed. + """ + print(ingestion_job_params.get_main_file_path()) + + libraries = [ + { + "jar": self._upload_jar(ingestion_job_params.get_main_file_path(), "feast-ingestion-spark-develop.jar") + } + ] + arguments = ingestion_job_params.get_arguments() + ["--databricks-runtime"] + # arguments = [argument if argument.startswith("--") else b64_encode(argument) for argument in arguments] + spark_jar_task_info = { + "main_class_name": ingestion_job_params.get_class_name(), + "parameters": arguments + } + + job_info = _submit_job(self._api, OFFLINE_TO_ONLINE_JOB_TYPE_CODE, + _generate_job_extra_metadata(ingestion_job_params), spark_task_type="spark_jar_task", + spark_task_info=spark_jar_task_info, libraries=libraries) + + return cast(BatchIngestionJob, self._job_from_job_info(job_info)) + + def start_stream_to_online_ingestion( + self, ingestion_job_params: StreamIngestionJobParameters + ) -> StreamIngestionJob: + """ + Starts a stream ingestion job to a Spark cluster. + + Raises: + SparkJobFailure: The spark job submission failed, encountered error + during execution, or timeout. + + Returns: + StreamIngestionJob: wrapper around remote job. + """ + + jars = [{ + "jar": self._upload_jar(ingestion_job_params.get_main_file_path(), "feast-ingestion-spark-develop.jar") + }] + for extra_jar in ingestion_job_params.get_extra_jar_paths(): + jar_path = self._upload_jar(extra_jar, "feast-ingestion-spark-develop.jar") + relative_path = jar_path.rsplit("/", 1)[1] if not jar_path.endswith("/") else jar_path.rsplit("/", 2)[1] + jars.append( + { + "jar": "dbfs:" + self._mounted_staging_location + relative_path + } + ) + + arguments = ingestion_job_params.get_arguments() + ["--databricks-runtime"] + # arguments = [argument if argument.startswith("--") else b64_encode(argument) for argument in arguments] + spark_jar_task_info = { + "main_class_name": ingestion_job_params.get_class_name(), + "parameters": arguments + } + + job_info = _submit_job(self._api, STREAM_TO_ONLINE_JOB_TYPE_CODE, + _generate_job_extra_metadata(ingestion_job_params), spark_task_type="spark_jar_task", + spark_task_info=spark_jar_task_info, libraries=jars, use_stream_cluster=True) + + return cast(StreamIngestionJob, self._job_from_job_info(job_info)) + + def get_job_by_id(self, job_id: int) -> SparkJob: + job_info = _get_job_by_id(self._api, job_id) + if job_info is None: + raise KeyError(f"Job with id {job_id} not found") + else: + return self._job_from_job_info(job_info) + + def list_jobs( + self, + include_terminated: bool, + project: Optional[str] = None, + table_name: Optional[str] = None, + ) -> List[DatabricksJobInfo]: + return _list_jobs(self._api, include_terminated, project, table_name) + + def schedule_offline_to_online_ingestion( + self, ingestion_job_params: ScheduledBatchIngestionJobParameters + ): + raise NotImplementedError( + "Schedule spark jobs are not supported by emr launcher" + ) + + def unschedule_offline_to_online_ingestion(self, project: str, feature_table: str): + raise NotImplementedError( + "Unschedule spark jobs are not supported by emr launcher" + ) diff --git a/cluster/sdk/python/feast_spark/pyspark/launchers/databricks/databricks_api_wrapper.py b/cluster/sdk/python/feast_spark/pyspark/launchers/databricks/databricks_api_wrapper.py new file mode 100644 index 0000000..0f7db0b --- /dev/null +++ b/cluster/sdk/python/feast_spark/pyspark/launchers/databricks/databricks_api_wrapper.py @@ -0,0 +1,142 @@ +import json +import os + +import requests +import requests.packages.urllib3 + +requests.packages.urllib3.disable_warnings() + + +class DatabricksAPIWrapper: + """ + Rest API Wrapper for Databricks APIs + """ + # set of http error codes to throw an exception if hit. Handles client and auth errors + http_error_codes = (401, 403) + + def __init__(self, databricks_access_token: str, databricks_host_url: str, is_verbose: bool = False, + verify_ssl: bool = True, ): + self._token = { + 'Authorization': 'Bearer {0}'.format(databricks_access_token), + 'User-Agent': 'feast_spark' + } + self._url = databricks_host_url + self._is_verbose = is_verbose + self._verify_ssl = verify_ssl + # self._file_format = configs['file_format'] + if self._verify_ssl: + # set these env variables if skip SSL verification is enabled + os.environ['REQUESTS_CA_BUNDLE'] = "" + os.environ['CURL_CA_BUNDLE'] = "" + + def is_verbose(self): + return self._is_verbose + + # def get_file_format(self): + # return self._file_format + # + # def is_source_file_format(self): + # if self._file_format == 'SOURCE': + # return True + # return False + + def test_connection(self): + # verify the proper url settings to configure this client + if self._url[-4:] != '.com' and self._url[-4:] != '.net': + print("Hostname should end in '.com'") + return -1 + results = requests.get(self._url + '/api/2.0/clusters/spark-versions', headers=self._token, + verify=self._verify_ssl) + http_status_code = results.status_code + if http_status_code != 200: + print("Error. Either the credentials have expired or the credentials don't have proper permissions.") + print("If you have a ~/.netrc file, check those credentials. Those take precedence over passed input.") + print(results.text) + return -1 + return 0 + + def get(self, endpoint, json_params=None, version='2.0', print_json=False): + full_endpoint = self._url + '/api/{0}'.format(version) + endpoint + if self.is_verbose(): + print("Get: {0}".format(full_endpoint)) + if json_params: + raw_results = requests.get(full_endpoint, headers=self._token, params=json_params, verify=self._verify_ssl) + http_status_code = raw_results.status_code + if http_status_code in DatabricksAPIWrapper.http_error_codes: + raise Exception("Error: GET request failed with code {}\n{}".format(http_status_code, raw_results.text)) + results = raw_results.json() + else: + raw_results = requests.get(full_endpoint, headers=self._token, verify=self._verify_ssl) + http_status_code = raw_results.status_code + if http_status_code in DatabricksAPIWrapper.http_error_codes: + raise Exception("Error: GET request failed with code {}\n{}".format(http_status_code, raw_results.text)) + results = raw_results.json() + if print_json: + print(json.dumps(results, indent=4, sort_keys=True)) + if type(results) == list: + results = {'elements': results} + results['http_status_code'] = raw_results.status_code + return results + + def http_req(self, http_type, endpoint, json_params, version='2.0', print_json=False, files_json=None): + full_endpoint = self._url + '/api/{0}'.format(version) + endpoint + if self.is_verbose(): + print("{0}: {1}".format(http_type, full_endpoint)) + if json_params: + if http_type == 'post': + if files_json: + raw_results = requests.post(full_endpoint, headers=self._token, + data=json_params, files=files_json, verify=self._verify_ssl) + else: + raw_results = requests.post(full_endpoint, headers=self._token, + json=json_params, verify=self._verify_ssl) + if http_type == 'put': + raw_results = requests.put(full_endpoint, headers=self._token, + json=json_params, verify=self._verify_ssl) + if http_type == 'patch': + raw_results = requests.patch(full_endpoint, headers=self._token, + json=json_params, verify=self._verify_ssl) + http_status_code = raw_results.status_code + if http_status_code in DatabricksAPIWrapper.http_error_codes: + raise Exception("Error: {0} request failed with code {1}\n{2}".format(http_type, + http_status_code, + raw_results.text)) + results = raw_results.json() + else: + print("Must have a payload in json_args param.") + return {} + if print_json: + print(json.dumps(results, indent=4, sort_keys=True)) + # if results are empty, let's return the return status + if results: + results['http_status_code'] = raw_results.status_code + return results + else: + return {'http_status_code': raw_results.status_code} + + def post(self, endpoint, json_params, version='2.0', print_json=False, files_json=None): + return self.http_req('post', endpoint, json_params, version, print_json, files_json) + + def put(self, endpoint, json_params, version='2.0', print_json=False): + return self.http_req('put', endpoint, json_params, version, print_json) + + def patch(self, endpoint, json_params, version='2.0', print_json=False): + return self.http_req('patch', endpoint, json_params, version, print_json) + + @staticmethod + def get_key(http_resp, key_name): + value = http_resp.get(key_name, None) + if value is None: + print(http_resp) + raise ValueError('Unable to find key') + return value + + @staticmethod + def my_map(F, items): + to_return = [] + for elem in items: + to_return.append(F(elem)) + return to_return + + def get_url(self): + return self._url diff --git a/cluster/sdk/python/feast_spark/pyspark/launchers/databricks/databricks_utils.py b/cluster/sdk/python/feast_spark/pyspark/launchers/databricks/databricks_utils.py new file mode 100644 index 0000000..80ef163 --- /dev/null +++ b/cluster/sdk/python/feast_spark/pyspark/launchers/databricks/databricks_utils.py @@ -0,0 +1,415 @@ +import hashlib +import json +import random +import string +from base64 import b64encode, b64decode +from datetime import datetime +from typing import Any, Dict, List, NamedTuple, Optional + +from kubernetes import client + +from kubernetes.client import ApiException + +from feast_spark.pyspark.abc import SparkJobStatus +from .databricks_api_wrapper import DatabricksAPIWrapper + +__all__ = [ + "_cancel_job_by_id", + "_list_jobs", + "_get_job_by_id", + "_generate_project_table_hash", + "_generate_job_extra_metadata", + "get_job_metadata", + "DatabricksJobManager", + "LABEL_FEATURE_TABLE", + "_submit_job", + "STREAM_TO_ONLINE_JOB_TYPE", + "OFFLINE_TO_ONLINE_JOB_TYPE", + "HISTORICAL_RETRIEVAL_JOB_TYPE", + "METADATA_JOBHASH", + "METADATA_OUTPUT_URI", + "DatabricksJobInfo", + "STREAM_TO_ONLINE_JOB_TYPE_CODE", + "OFFLINE_TO_ONLINE_JOB_TYPE_CODE", + "HISTORICAL_RETRIEVAL_JOB_TYPE_CODE", + "b64_encode" +] + +STREAM_TO_ONLINE_JOB_TYPE = "STREAM_TO_ONLINE_JOB" +OFFLINE_TO_ONLINE_JOB_TYPE = "OFFLINE_TO_ONLINE_JOB" +HISTORICAL_RETRIEVAL_JOB_TYPE = "HISTORICAL_RETRIEVAL_JOB" + +STREAM_TO_ONLINE_JOB_TYPE_CODE = "S2O" +OFFLINE_TO_ONLINE_JOB_TYPE_CODE = "O2O" +HISTORICAL_RETRIEVAL_JOB_TYPE_CODE = "HR" + +LABEL_JOBID = "feast.dev/jobid" +LABEL_JOBTYPE = "feast.dev/type" +LABEL_FEATURE_TABLE = "feast.dev/table" +LABEL_FEATURE_TABLE_HASH = "feast.dev/tablehash" +LABEL_PROJECT = "feast.dev/project" + +# Can't store these bits of info in k8s labels due to 64-character limit, so we store them as +# sparkConf +METADATA_OUTPUT_URI = "dev.feast.outputuri" +METADATA_JOBHASH = "dev.feast.jobhash" + +METADATA_KEYS = set((METADATA_JOBHASH, METADATA_OUTPUT_URI)) + + +def json_b64_decode(s: str) -> Any: + return json.loads(b64decode(s.encode("ascii"))) + + +def json_b64_encode(obj: dict) -> str: + return b64encode(json.dumps(obj).encode("utf8")).decode("ascii") + + +def b64_encode(obj: str) -> str: + return b64encode(obj.encode("ascii")).decode("ascii") + + +def generate_short_id(length: int = 6): + return ''.join(random.SystemRandom().choice(string.ascii_lowercase + string.digits) for _ in range(length)) + + +def _generate_project_table_hash(project: str, table_name: str) -> str: + return hashlib.md5(f"{project}:{table_name}".encode()).hexdigest() + + +def _job_id_to_resource_name(job_id: str) -> str: + return job_id + + +def _truncate_label(label: str) -> str: + return label[:63] + + +def _prepare_job_name(job_type: str, extra_info: dict) -> str: + return f"{job_type}_{json_b64_encode(extra_info)}_{generate_short_id()}" + + +class DatabricksJobInfo(NamedTuple): + job_id: int + run_id: int + state: SparkJobStatus + start_time: datetime + job_name: str + + +STATE_MAP = { + "": SparkJobStatus.STARTING, + "PENDING": SparkJobStatus.STARTING, + "RUNNING": SparkJobStatus.IN_PROGRESS, + "TERMINATING": SparkJobStatus.IN_PROGRESS, + "TERMINATED": SparkJobStatus.IN_PROGRESS, + "SUCCESS": SparkJobStatus.COMPLETED, + "FAILED": SparkJobStatus.FAILED, + "TIMEDOUT": SparkJobStatus.FAILED, + "CANCELED": SparkJobStatus.FAILED, + "SKIPPED": SparkJobStatus.FAILED, + "INTERNAL_ERROR": SparkJobStatus.FAILED, +} + +TYPE_MAP = { + "S2O": STREAM_TO_ONLINE_JOB_TYPE, + "O2O": OFFLINE_TO_ONLINE_JOB_TYPE, + "HR": HISTORICAL_RETRIEVAL_JOB_TYPE +} + + +# Fetch Jobtype, and extra meta info +def get_job_metadata(job: DatabricksJobInfo) -> (str, Any): + job_metadata = job.job_name.split("_") + return TYPE_MAP[job_metadata[0]], json_b64_decode(job_metadata[1]) + + +def _generate_job_extra_metadata(job_params) -> Dict[str, Any]: + return { + LABEL_FEATURE_TABLE: _truncate_label( + job_params.get_feature_table_name() + ), + LABEL_FEATURE_TABLE_HASH: _generate_project_table_hash( + job_params.get_project(), + job_params.get_feature_table_name(), + ), + LABEL_PROJECT: job_params.get_project() + } + + +def _get_feast_job_state(job: DatabricksJobInfo) -> SparkJobStatus: + return STATE_MAP[job.state] + + +def _get_job_start_time(job: DatabricksJobInfo) -> datetime: + return job.start_time + + +def categorized_files(reference_files): + if reference_files is None: + return None, None + + files = [] + jars = [] + for file in reference_files: + file = file.strip() + if file.endswith(".jar"): + jars.append(file) + else: + files.append(file) + return files, jars + + +class DatabricksJobManager(object): + def __init__(self, databricks_access_token: str, databricks_host_url: str, cluster_id: str, + streaming_cluster_id: Optional[str] = None, max_active_jobs_to_retrieve: int = 5000): + self.client = DatabricksJobsClient(databricks_access_token=databricks_access_token, + databricks_host_url=databricks_host_url) + self.cluster_id = cluster_id + self.streaming_cluster_id = streaming_cluster_id + self.max_active_jobs_to_retrieve = max_active_jobs_to_retrieve + + def get_spark_job(self, job_id): + return self.client.get_databricks_job(job_id) + + def get_all_spark_jobs(self, include_terminated: bool = False, project: Optional[str] = None, + table_name: Optional[str] = None) -> List[DatabricksJobInfo]: + jobs = self.client.get_active_jobs(self.max_active_jobs_to_retrieve, self.cluster_id, include_terminated) + if self.streaming_cluster_id is not None: + jobs.extend(self.client.get_active_jobs(self.max_active_jobs_to_retrieve, self.streaming_cluster_id, + include_terminated)) + filtered_jobs = [] + for job in jobs: + _, job_extra_metadata = get_job_metadata(job) + if project is not None and job_extra_metadata[LABEL_PROJECT] == project: + filtered_jobs.append(job) + + if table_name is not None and job_extra_metadata[LABEL_FEATURE_TABLE] == table_name: + filtered_jobs.append(job) + return filtered_jobs + + def cancel_spark_job(self, job_id): + return self.client.cancel_job_run(job_id) + + def create_spark_job(self, job_type: str, job_extra_info: dict, spark_task_type: str, spark_task_info: dict, + libraries: list = None, use_stream_cluster: bool = False) -> DatabricksJobInfo: + job_name = _prepare_job_name(job_type, job_extra_info) + if use_stream_cluster and self.streaming_cluster_id is not None: + return self.client.create_and_run_one_time_job(job_name=job_name, spark_task_type=spark_task_type, + spark_task_info=spark_task_info, + cluster_id=self.streaming_cluster_id, libraries=libraries) + else: + return self.client.create_and_run_one_time_job(job_name=job_name, spark_task_type=spark_task_type, + spark_task_info=spark_task_info, + cluster_id=self.cluster_id, libraries=libraries) + + +class DatabricksJobsClient(DatabricksAPIWrapper): + + def _create_job_request(self, job_type: str, job_name: str, spark_task_type: str, spark_task_info: dict, + cluster_exists: bool = True, cluster_id: Optional[str] = None, + libraries: Optional[list] = None, spark_version: str = "7.3.x-scala2.12", + node_type_id: str = "Standard_DS3_v2", num_workers: int = 1, + auto_scaling: str = True, max_workers: int = 3) -> dict: + if cluster_exists: + req_body = { + job_type: job_name, + "existing_cluster_id": cluster_id, + spark_task_type: spark_task_info + } + elif auto_scaling: + req_body = { + job_type: job_name, + "new_cluster": { + "spark_version": spark_version, + "node_type_id": node_type_id, + "autoscale": { + "min_workers": num_workers, + "max_workers": max_workers + } + }, + spark_task_type: spark_task_info + } + else: + req_body = { + job_type: job_name, + "new_cluster": { + "spark_version": spark_version, + "node_type_id": node_type_id, + "num_workers": num_workers + }, + spark_task_type: spark_task_info + } + if libraries is not None: + req_body["libraries"] = libraries + + return req_body + + def _create_databricks_job_from_job_details(self, job: dict, run_id: Optional[int] = None, + print_json: bool = False) -> DatabricksJobInfo: + run = None + if run_id is None: + query_params = { + "active_only": True, + "limit": 1, + "job_id": int(job["job_id"]) + } + runs = self.get("/jobs/runs/list", json_params=query_params, print_json=print_json) + if 'error_code' in runs: + raise ApiException( + f"Error fetching job: {job['job_id']}, from the workspace, \n Error Response: {runs}") + run = runs["runs"][0] + else: + run = self.get("/jobs/runs/get", json_params={"run_id": run_id}, print_json=print_json) + if 'error_code' in run: + raise ApiException( + f"Error fetching job information from the workspace, \n Error Response: {run}") + + life_cycle_state = run["state"]["life_cycle_state"] + if life_cycle_state in ["TERMINATED", "TERMINATING", "INTERNAL_ERROR"] and \ + "result_state" in run["state"]: + state = run["state"]["result_state"] + else: + state = life_cycle_state + return DatabricksJobInfo(job_id=job["job_id"], + run_id=run["run_id"], + state=STATE_MAP[state], + start_time=datetime.utcfromtimestamp(int(run["start_time"]) / 1e3), + job_name=job["settings"]["name"]) + + def get_jobs_list(self, print_json: bool = False) -> list: + """ Returns an array of json objects for jobs """ + jobs = self.get("/jobs/list", print_json) + return jobs.get('jobs', []) + + def get_job_id_by_name(self) -> dict: + """ + get a dict mapping of job name to job id for the new job ids + :return: + """ + jobs = self.get_jobs_list() + job_ids = {} + for job in jobs: + job_ids[job['settings']['name']] = job['job_id'] + return job_ids + + # Store Job Info in workspace UI and then start the job [Recommended[ + def create_and_run_one_time_job(self, job_name: str, spark_task_type: str, spark_task_info: dict, + cluster_exists: bool = True, cluster_id: Optional[str] = None, + libraries: Optional[list] = None, spark_version: str = "7.3.x-scala2.12", + node_type_id: str = "Standard_DS3_v2", num_workers: int = 1, + auto_scaling: str = True, max_workers: int = 3) -> DatabricksJobInfo: + if cluster_exists and cluster_id is None: + raise Exception("Databricks Cluster Id was not provided for the job") + job_req = self._create_job_request("name", job_name, spark_task_type, + spark_task_info, cluster_exists, cluster_id, libraries, spark_version, + node_type_id, num_workers, auto_scaling, max_workers) + + job_response = self.post('/jobs/create', job_req) + if 'error_code' in job_response: + raise ApiException(f"Error Launching job : {job_name}, on cluster: " + f"{cluster_id if cluster_exists else node_type_id}. \n Error Response: {job_response}," + f"\n Error: Request: {job_req}") + else: + print(f"Created job configuration, starting job: {job_response}") + run_response = self.post('/jobs/run-now', job_response) + if 'error_code' in run_response: + print(f"Error running job: {job_name} , on cluster: " + f"{cluster_id if cluster_exists else node_type_id}.\n Error Response: {run_response}") + return self.get_databricks_job(job_id=job_response["job_id"], run_id=run_response["run_id"]) + + # DEPRECATE: Submit job without recording information in Workspace UI + def run_submit_job(self, job_name: str, spark_task_type: str, spark_task_info: dict, + cluster_exists: bool = True, cluster_id: Optional[str] = None, + libraries: Optional[list] = None, spark_version: str = "7.3.x-scala2.12", + node_type_id: str = "Standard_DS3_v2", num_workers: int = 1, auto_scaling: str = True, + max_workers: int = 3) -> DatabricksJobInfo: + if cluster_exists and cluster_id is None: + raise Exception("Databricks Cluster Id was not provided for the job") + + job_req = self._create_job_request("run_name", job_name, spark_task_type, spark_task_info, cluster_exists, + cluster_id, libraries, spark_version, node_type_id, num_workers, + auto_scaling, max_workers) + job_response = self.post('/jobs/runs/submit', job_req) + if 'error_code' in job_response: + raise ApiException(f"Error Launching job : {job_name}, on cluster: " + f"{cluster_id if cluster_exists else node_type_id}. \n Error Response: {job_response}") + else: + print(f"Created job configuration, starting job: {job_response}") + return self.get_databricks_job(int(job_response["run_id"])) + + # Get job start time + def get_job_start_time(self, job_id: int, print_json: bool = False) -> datetime: + return self.get_databricks_job(job_id, print_json=print_json).start_time + + def get_databricks_job(self, job_id: int, run_id: Optional[int] = None, + print_json: bool = False) -> DatabricksJobInfo: + job_response = self.get("/jobs/get", json_params={"job_id": job_id}, print_json=print_json) + if 'error_code' in job_response: + raise ApiException(f"Error fetching job information from the workspace, \n Error Response: {job_response}") + return self._create_databricks_job_from_job_details(job_response, run_id) + + def get_active_jobs(self, max_active_jobs: int, cluster_id: str, included_inactive_jobs: bool = False, + print_json: bool = False) -> List[DatabricksJobInfo]: + query_params = { + "active_only": (not included_inactive_jobs), + "limit": 1000, + "run_type": { + "cluster_spec": { + "existing_cluster_id": cluster_id + } + } + } + job_response = self.get("/jobs/runs/list", json_params=query_params, print_json=print_json) + if 'error_code' in job_response: + raise ApiException(f"Error fetching active jobs from the workspace, \n Error Response: {job_response}") + active_runs = job_response["runs"] + already_fetched = len(active_runs) + while bool(job_response["has_more"]) and already_fetched < max_active_jobs: + query_params["offset"] = already_fetched + job_response = self.get("/jobs/runs/list", json_params=query_params, print_json=print_json) + if 'error_code' in job_response: + raise ApiException(f"Error fetching active jobs from the workspace, \n Error Response: {job_response}") + active_runs.extend(job_response["runs"]) + already_fetched += len(job_response["runs"]) + active_runs = list(set(active_runs)) + return [self.get_databricks_job(run["job_id"], run["run_id"]) for run in active_runs] + + def cancel_job_run(self, job_id: int): + run_id = self.get_databricks_job(job_id) + cancel_job_response = self.post('/jobs/runs/cancel', {"run_id": run_id}) + if 'error_code' in cancel_job_response: + raise ApiException(f"Error Cancelling job: {job_id}, run_id: {run_id}., " + f"Error Response: {cancel_job_response}") + + +def _submit_job(api: DatabricksJobManager, job_type: str, job_extra_info: dict, spark_task_type: str, + spark_task_info: dict, libraries: list = None, use_stream_cluster: bool = False) -> DatabricksJobInfo: + return api.create_spark_job(job_type=job_type, job_extra_info=job_extra_info, spark_task_type=spark_task_type, + spark_task_info=spark_task_info, libraries=libraries, + use_stream_cluster=use_stream_cluster) + + +def _list_jobs(api: DatabricksJobManager, include_terminated: bool = False, project: Optional[str] = None, + table_name: Optional[str] = None) -> List[DatabricksJobInfo]: + return api.get_all_spark_jobs(include_terminated, project, table_name) + + +def _get_job_by_id(api: DatabricksJobManager, job_id: int) -> Optional[DatabricksJobInfo]: + try: + return api.get_spark_job(job_id) + except client.ApiException as e: + if e.status == 404: + return None + else: + raise + + +def _cancel_job_by_id(api: DatabricksJobManager, job_id: int): + try: + api.cancel_spark_job(job_id) + except client.ApiException as e: + if e.status == 404: + return None + else: + raise diff --git a/cluster/sdk/python/requirements-ci.txt b/cluster/sdk/python/requirements-ci.txt index dd84f04..92feb88 100644 --- a/cluster/sdk/python/requirements-ci.txt +++ b/cluster/sdk/python/requirements-ci.txt @@ -1,25 +1,165 @@ +adal==1.2.7 +adlfs==0.5.9 +aiohttp==3.7.4.post0; python_version >= '3.6' +altair==4.1.0; python_version >= '3.6' +appdirs==1.4.4 +appnope==0.1.2; sys_platform == 'darwin' and platform_system == 'Darwin' +argon2-cffi==20.1.0 +async-generator==1.10; python_version >= '3.5' +async-timeout==3.0.1; python_full_version >= '3.5.3' +attrs==21.2.0; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4' +avro==1.10.0 +azure-core==1.15.0 +azure-datalake-store==0.0.52 +azure-identity==1.6.0 +azure-storage-blob==12.8.1 +backcall==0.2.0 +beautifulsoup4==4.9.3 +black==21.5b2 +bleach==3.3.0; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4' +boto3==1.17.88; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4, 3.5' +botocore==1.20.88; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4, 3.5' +cachetools==4.2.2; python_version ~= '3.5' +certifi==2021.5.30 +cffi==1.14.5 +chardet==3.0.4 +click==7.1.2; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4' +cryptography==3.4.7 +decorator==5.0.9; python_version >= '3.5' +defusedxml==0.7.1; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4' +entrypoints==0.3; python_version >= '2.7' +fastavro==0.22.13 feast==0.9.5.2 -cryptography==3.3.2 -flake8 -black==19.10b0 -isort>=5 +flake8==3.9.2 +fsspec==2021.5.0; python_version >= '3.6' +gcsfs==2021.5.0 +google-api-core[grpc]==1.22.4; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3' +google-auth-oauthlib==0.4.4; python_version >= '3.6' +google-auth==1.30.1; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4, 3.5' +google-cloud-bigquery-storage==0.7.0; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3' +google-cloud-bigquery==1.18.1; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3' +google-cloud-core==1.0.3; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3' +google-cloud-dataproc==2.0.2 +google-cloud-storage==1.20.0; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3' +google-resumable-media==0.4.1; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3' +google==3.0.0 +googleapis-common-protos==1.52.0; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3' +great-expectations==0.13.2 grpcio-tools==1.31.0 -pyspark==3.0.1 -pandas~=1.0.0 +grpcio==1.31.0 +idna==2.10; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3' +importlib-metadata==4.5.0; python_version < '3.8' and python_version < '3.8' +iniconfig==1.1.1 +ipykernel==5.5.5; python_version >= '3.5' +ipython-genutils==0.2.0 +ipython==7.24.1; python_version >= '3.3' +ipywidgets==7.6.3 +isodate==0.6.0 +isort==5.8.0 +jedi==0.18.0; python_version >= '3.6' +jinja2==3.0.1; python_version >= '3.6' +jmespath==0.10.0; python_version >= '2.6' and python_version not in '3.0, 3.1, 3.2' +jsonpatch==1.32; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4' +jsonpointer==2.1; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3' +jsonschema==3.2.0 +jupyter-client==6.1.12; python_version >= '3.5' +jupyter-core==4.7.1; python_version >= '3.6' +jupyterlab-pygments==0.1.2 +jupyterlab-widgets==1.0.0; python_version >= '3.6' +kubernetes==12.0.1 +libcst==0.3.19; python_version >= '3.6' +markupsafe==2.0.1; python_version >= '3.6' +matplotlib-inline==0.1.2; python_version >= '3.5' +mccabe==0.6.1 +mistune==0.8.4 mock==2.0.0 -pandavro==1.5.* -moto +more-itertools==8.8.0; python_version >= '3.5' +moto==2.0.8 +msal-extensions==0.3.0 +msal==1.12.0 +msrest==0.6.21 +msrestazure==0.6.4 +multidict==5.1.0; python_version >= '3.6' +mypy-extensions==0.4.3 +mypy-protobuf==2.4 mypy==0.790 -mypy-protobuf -avro==1.10.0 -gcsfs -urllib3>=1.25.4 -google-cloud-dataproc==2.0.2 -pytest==6.0.0 +nbclient==0.5.3; python_full_version >= '3.6.1' +nbconvert==6.0.7; python_version >= '3.6' +nbformat==5.1.3; python_version >= '3.5' +nest-asyncio==1.5.1; python_version >= '3.5' +notebook==6.4.0; python_version >= '3.6' +numpy==1.19.5; python_version >= '3.6' +oauthlib==3.1.1; python_version >= '3.6' +packaging==20.9; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3' +pandas==1.0.5 +pandavro==1.5.2 +pandocfilters==1.4.3 +parso==0.8.2; python_version >= '3.6' +pathspec==0.8.1 +pbr==5.6.0; python_version >= '2.6' +pexpect==4.8.0; sys_platform != 'win32' +pickleshare==0.7.5 +pluggy==0.13.1; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3' +portalocker==1.7.1; platform_system != 'Windows' +prometheus-client==0.11.0; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3' +prompt-toolkit==3.0.18; python_full_version >= '3.6.1' +proto-plus==1.18.1; python_version >= '3.6' +protobuf==3.17.2 +ptyprocess==0.7.0; os_name != 'nt' +py4j==0.10.9 +py==1.10.0; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3' +pyarrow==2.0.0; python_version >= '3.5' +pyasn1-modules==0.2.8 +pyasn1==0.4.8 +pycodestyle==2.7.0; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3' +pycparser==2.20; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3' +pyflakes==2.3.1; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3' +pygments==2.9.0; python_version >= '3.5' +pyjwt[crypto]==2.1.0; python_version >= '3.6' +pyparsing==2.4.7; python_version >= '2.6' and python_version not in '3.0, 3.1, 3.2' +pyrsistent==0.17.3; python_version >= '3.5' +pyspark==3.0.1 pytest-lazy-fixture==0.6.3 -pytest-timeout==1.4.2 -pytest-ordering==0.6.* pytest-mock==1.10.4 -PyYAML==5.4 -great-expectations==0.13.2 -adlfs==0.5.9 +pytest-ordering==0.6 +pytest-timeout==1.4.2 +pytest==6.0.0 +python-dateutil==2.8.1; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2' +pytz==2021.1 +pyyaml==5.3.1 +pyzmq==22.1.0; python_version >= '3.6' +regex==2021.4.4 +requests-oauthlib==1.3.0 +requests==2.23.0 +responses==0.13.3; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4' +rsa==4.7.2; python_version >= '3.6' +ruamel.yaml.clib==0.2.2; platform_python_implementation == 'CPython' and python_version < '3.10' +ruamel.yaml==0.17.7; python_version >= '3' +s3transfer==0.4.2 +scipy==1.6.3; python_version < '3.10' and python_version >= '3.7' +send2trash==1.5.0 +six==1.16.0; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2' +soupsieve==2.2.1; python_version >= '3.0' +tabulate==0.8.9 +termcolor==1.1.0 +terminado==0.10.0; python_version >= '3.6' +testpath==0.5.0; python_version >= '3.5' +toml==0.10.2; python_version >= '2.6' and python_version not in '3.0, 3.1, 3.2' +toolz==0.11.1; python_version >= '3.5' +tornado==6.1; python_version >= '3.5' +tqdm==4.61.0; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3' +traitlets==5.0.5; python_version >= '3.7' +typed-ast==1.4.3; python_version < '3.8' +typing-extensions==3.10.0.0; python_version < '3.8' and python_version < '3.8' +typing-inspect==0.6.0 +tzlocal==2.1 +urllib3==1.25.11 +wcwidth==0.2.5 +webencodings==0.5.1 +websocket-client==1.0.1; python_version >= '3.6' +werkzeug==2.0.1; python_version >= '3.6' +widgetsnbextension==3.5.1 +xmltodict==0.12.0 +yarl==1.6.3; python_version >= '3.6' +zipp==3.4.1; python_version >= '3.6' +requests==2.25.1 \ No newline at end of file diff --git a/cluster/sdk/spark/ingestion/src/main/scala/feast/ingestion/IngestionJob.scala b/cluster/sdk/spark/ingestion/src/main/scala/feast/ingestion/IngestionJob.scala index 8b80e46..b3a5bc7 100644 --- a/cluster/sdk/spark/ingestion/src/main/scala/feast/ingestion/IngestionJob.scala +++ b/cluster/sdk/spark/ingestion/src/main/scala/feast/ingestion/IngestionJob.scala @@ -119,6 +119,9 @@ object IngestionJob { opt[Int](name = "triggering-interval") .action((x, c) => c.copy(streamingTriggeringSecs = x)) + opt[Unit](name = "databricks-runtime") + .action((_, c) => c.copy(isDatabricksRuntime = true)) + opt[String](name = "kafka_sasl_auth") .action((x, c) => c.copy(kafkaSASL = Some(x))) } @@ -146,7 +149,9 @@ object IngestionJob { logger.fatal("Batch ingestion failed", e) throw e } finally { - sparkSession.close() + if (!config.isDatabricksRuntime) { + sparkSession.close() + } } case Modes.Online => val sparkSession = BasePipeline.createSparkSession(config) @@ -157,7 +162,9 @@ object IngestionJob { logger.fatal("Streaming ingestion failed", e) throw e } finally { - sparkSession.close() + if (!config.isDatabricksRuntime) { + sparkSession.close() + } } } case None => diff --git a/cluster/sdk/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala b/cluster/sdk/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala index 8e9a8fe..f67f08f 100644 --- a/cluster/sdk/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala +++ b/cluster/sdk/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala @@ -128,5 +128,6 @@ case class IngestionJobConfig( validationConfig: Option[ValidationConfig] = None, doNotIngestInvalidRows: Boolean = false, checkpointPath: Option[String] = None, - kafkaSASL: Option[String] = None + kafkaSASL: Option[String] = None, + isDatabricksRuntime: Boolean = false ) From a94521351034f8de3211e4c2db299463c1755fdb Mon Sep 17 00:00:00 2001 From: Sai Sharan Tangeda Date: Wed, 9 Mar 2022 00:14:07 +0530 Subject: [PATCH 2/4] sanitise repo --- cluster/sdk/python/feast_spark/constants.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cluster/sdk/python/feast_spark/constants.py b/cluster/sdk/python/feast_spark/constants.py index ca3a01d..2390ccc 100644 --- a/cluster/sdk/python/feast_spark/constants.py +++ b/cluster/sdk/python/feast_spark/constants.py @@ -114,6 +114,7 @@ class ConfigOptions(metaclass=ConfigMeta): # https://github.com/Azure/azure-event-hubs-for-kafka/blob/master/tutorials/spark/sparkConsumer.scala AZURE_EVENTHUB_KAFKA_CONNECTION_STRING = "" + # Databricks: Access Token DATABRICKS_ACCESS_TOKEN: Optional[str] = None From 8a62fa3cffef03e7092f077a83f503351bacfe08 Mon Sep 17 00:00:00 2001 From: Sai Sharan Tangeda Date: Wed, 9 Mar 2022 00:17:42 +0530 Subject: [PATCH 3/4] Update comments --- cluster/sdk/python/feast_spark/constants.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cluster/sdk/python/feast_spark/constants.py b/cluster/sdk/python/feast_spark/constants.py index 2390ccc..417a1cd 100644 --- a/cluster/sdk/python/feast_spark/constants.py +++ b/cluster/sdk/python/feast_spark/constants.py @@ -39,12 +39,12 @@ class ConfigOptions(metaclass=ConfigMeta): #: Spark Job launcher. The choice of storage is connected to the choice of SPARK_LAUNCHER. #: - #: Options: "standalone", "dataproc", "emr" + #: Options: "standalone", "dataproc", "emr", "databricks" SPARK_LAUNCHER: Optional[str] = None #: Feast Spark Job ingestion jobs staging location. The choice of storage is connected to the choice of SPARK_LAUNCHER. #: - #: Eg. gs://some-bucket/output/, s3://some-bucket/output/, file:///data/subfolder/ + #: Eg. gs://some-bucket/output/, s3://some-bucket/output/, file:///data/subfolder/, dbfs:/mnt/subfolder SPARK_STAGING_LOCATION: Optional[str] = None #: Feast Spark Job ingestion jar file. The choice of storage is connected to the choice of SPARK_LAUNCHER. From 1bc3f86beb7e87ac3b32e27cd38efd558abc7dac Mon Sep 17 00:00:00 2001 From: Sai Sharan Tangeda Date: Wed, 9 Mar 2022 00:28:23 +0530 Subject: [PATCH 4/4] Update comments --- .../feast_spark/pyspark/launchers/databricks/databricks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cluster/sdk/python/feast_spark/pyspark/launchers/databricks/databricks.py b/cluster/sdk/python/feast_spark/pyspark/launchers/databricks/databricks.py index fbc01d4..5c022c9 100644 --- a/cluster/sdk/python/feast_spark/pyspark/launchers/databricks/databricks.py +++ b/cluster/sdk/python/feast_spark/pyspark/launchers/databricks/databricks.py @@ -400,10 +400,10 @@ def schedule_offline_to_online_ingestion( self, ingestion_job_params: ScheduledBatchIngestionJobParameters ): raise NotImplementedError( - "Schedule spark jobs are not supported by emr launcher" + "Schedule spark jobs are not supported by Databricks launcher" ) def unschedule_offline_to_online_ingestion(self, project: str, feature_table: str): raise NotImplementedError( - "Unschedule spark jobs are not supported by emr launcher" + "Unschedule spark jobs are not supported by Databricks launcher" )