Skip to content

[ML] Custom Inference Service #125679

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 43 commits into
base: main
Choose a base branch
from

Conversation

davidkyle
Copy link
Member

@davidkyle davidkyle commented Mar 26, 2025

Taking the ideas and commits from #124299

Notable changes from initial PR:

  • Flattened structure by removing the path and method nesting
    • I expect that we'll only have a single path and the method will always be POST
    • The path portion of the url can be placed directly in the url field
  • Removed query_string as this can be placed directly in the url
  • Removed description and version as they weren't used
  • Flattened the sparse embedding response parser format by removing the sparse_result and value fields
  • Refactored the sparse embedding response parser format to have the token and weight fields include the full path
  • Adding response.error_parser to indicate the location to find the error message field
  • Removed the custom task type support, the reason being that it'd be difficult for the client libraries to handle a custom response
  • Refactored the sparse embedding json_parser fields to only be path
    • The parsing logic expects the response to be a map of token id and weight so we only need a path field to tell it where to find that nested map
    • NOTE: This will not support how ELSER formats the response (for example hugging face elser, or elasticsearch). If we want to support that in the future, I think we could add a format field that specifies how the response is structure (elser's structure is an array of maps, where the key is the token id and the value is the weight, this parser expects the map to have a token id field and a weight field)

Add Custom Model support to Inference API.

You can use this Inference API to invoke models that support the HTTP format.

Inference Endpoint Creation:

Endpoint creation
PUT _inference/{task_type}/{inference_id}
{
  "service": "custom-model",
  "service_settings": {
    "secret_parameters": {
      ...
    },
    "url": "<<url>>",
    "headers": {
      <<header parameters>>
    },
    "query_parameters": {
      <<parameters>>
    },
    "request": {
      "content": "<<content>>"
    },
    "response": {
      "json_parser":{
        ...
      },
      "error_parser":{
       ...
      }
    }
  },
  "task_settings": {
    "parameters":{
      ...
    }
  }
}

Support task_type

  • text_embedding
  • sparse_embedding
  • rerank
  • completion

Parameter Description

Parameter Description
  • secret_parameters: secret parameters like api_key can be defined here.
"secret_parameters":{
  "api_key":"xxx"
}
  • headers(optional):https' header parameters
"headers":{
  "Authorization": "Bearer ${api_key}",    //Replace the placeholders when constructing the request.
  "Content-Type": "application/json;charset=utf-8"
}
  • request.content: The body structure of the request requires passing in the string-escaped result of the JSON format HTTP request body.
"request":{
  "content":"{\"input\":${input}}"
}

# use kibana
"request":{
  "content":"""
    {
      "input":${input}   //Replace the placeholders when constructing the request.
    }
    """
}

NOTE: Unfortunately, if we aren't using kibana the content string needs to be a single line

  • response.json_parser: We need to parse the returned response into an object that Elasticsearch can recognize.(TextEmbeddingFloatResults, SparseEmbeddingResults, RankedDocsResults, ChatCompletionResults)
    Therefore, we use jsonPath syntax to parse the necessary content from the response.
    (For the text_embedding type, we need a List<List<Float>> object. The same applies to other types.)
    Different task types have different json_parser parameters.
# text_embedding
"response":{
  "json_parser":{
    "text_embeddings":"$.result.embeddings[*].embedding"
  }
}

# sparse_embedding
"response":{
  "json_parser":{
    "token_path":"$.result[*].embeddings[*].token",
    "weight_path":"$.result[*].embeddings[*].weight"
  }
}

# rerank
"response":{
  "json_parser":{
    "reranked_index":"$.result.scores[*].index",    // optional
    "relevance_score":"$.result.scores[*].score",
    "document_text":"xxx"    // optional
  }
}

# completion
"response":{
  "json_parser":{
    "completion_result":"$.result.text"
  }
}
"response": {
    "error_parser": {
        "path": "$.error.message"
    }
}
  • task_settings.parameters: Due to the limitations of the inference framework, if the model requires more parameters to be configured, they can be set in task_settings.parameters. These parameters can be placed in the request.body as placeholders and replaced with the configured values when constructing the request.
"task_settings":{
  "parameters":{
    "input_type":"query",
    "return_token":true
  }
}

Testing

🚧 In progress

Jon Testing
OpenAI

Texting Embedding

PUT _inference/text_embedding/test
{
    "service": "custom",
    "service_settings": {
        "secret_parameters": {
            "api_key": <api_key>
        },
        "url": "https://api.openai.com/v1/embeddings",
        "headers": {
            "Authorization": "Bearer ${api_key}",
            "Content-Type": "application/json;charset=utf-8"
        },
        "request": {
            "content": "{\"input\": ${input}, \"model\": \"text-embedding-3-small\"}"
        },
        "response": {
            "json_parser": {
                "text_embeddings": "$.data[*].embedding[*]"
            },
            "error_parser": {
                "path": "$.error.message"
            }
        }
    }
}

POST _inference/text_embedding/test
{
    "input": ["The quick brown fox jumps over the lazy dog"]
}
Cohere

Rerank

PUT _inference/rerank/test-rerank
{
    "service": "custom",
    "service_settings": {
        "secret_parameters": {
            "api_key": "<api key>"
        },
        "url": "https://api.cohere.com/v2/rerank",
        "headers": {
            "Authorization": "bearer ${api_key}",
            "Content-Type": "application/json"
        },
        "request": {
            "content": "{\"documents\": ${input}, \"query\": ${query}, \"model\": \"rerank-v3.5\"}"
        },
        "response": {
            "json_parser": {
                "reranked_index":"$.results[*].index",
                "relevance_score":"$.results[*].relevance_score"
            },
            "error_parser": {
                "path": "$.message"
            }
        }
    }
}


POST _inference/rerank/test-rerank
{
    "input": [
        "Carson City is the capital city of the American state of Nevada.",
        "The Commonwealth of the Northern Mariana Islands is a group of islands in the Pacific Ocean. Its capital is Saipan.",
        "Washington, D.C. (also known as simply Washington or D.C., and officially as the District of Columbia) is the capital of the United States. It is a federal district.",
        "Capitalization or capitalisation in English grammar is the use of a capital letter at the start of a word. English usage varies from capitalization in other languages.",
        "Capital punishment has existed in the United States since beforethe United States was a country. As of 2017, capital punishment is legal in 30 of the 50 states."
    ],
    "query": "What is the capital of the United States?"
}
Alibaba Testing

we use Alibaba Cloud AI Search Model for example,
Please replace the value of secret_parameters.api_key with your api_key.

text_embedding

PUT _inference/text_embedding/custom_embeddings
{
  "service":"custom-model",
  "service_settings":{
        "secret_parameters":{
        "api_key":"<<your api_key>>"
        },
        "url":"http://default-j01.platform-cn-shanghai.opensearch.aliyuncs.com",
        "headers":{
            "Authorization": "Bearer ${api_key}",
            "Content-Type": "application/json;charset=utf-8"
        },
        "request":{
            "content":"""
                {
                "input":${input}
                }
                """
        },
        "response":{
            "json_parser":{
                "text_embeddings":"$.result.embeddings[*].embedding"
            },
            "error_parser": {
                "path": "$.error.message"
            }
        }
    }
}

POST _inference/text_embedding/custom_embeddings
{
  "input":"test"
}

sparse_embedding

PUT _inference/sparse_embedding/custom_sparse_embedding
{
  "service":"custom-model",
  "service_settings":{
    "secret_parameters":{
      "api_key":<<your api_key>>
    },
    "url":"http://default-j01.platform-cn-shanghai.opensearch.aliyuncs.com/v3/openapi/workspaces/default/text-sparse-embedding/ops-text-sparse-embedding-001",
    "headers":{
      "Authorization": "Bearer ${api_key}",
      "Content-Type": "application/json;charset=utf-8"
    },
    "request":{
      "content":"""
        {
          "input": ${input},
          "input_type": "${input_type}",
          "return_token": ${return_token}
        }
        """
    },
    "response":{
      "json_parser":{
        "token_path":"$.result[*].embeddings[*].token",
         "weight_path":"$.result[*].embeddings[*].weight"
      },
      "error_parser": {
         "path": "$.error.message"
      }
    }
  },
  "task_settings":{
    "parameters":{
      "input_type":"query",
      "return_token":true
    }
  }
}

POST _inference/sparse_embedding/custom_sparse_embedding?error_trace
{
  "input":["hello", "world"]
}

rerank

PUT _inference/rerank/custom_rerank
{
    "service":"custom-model",
    "service_settings":{
        "secret_parameters":{
            "api_key":<<your api_key>>
        },
        "url":"http://default-j01.platform-cn-shanghai.opensearch.aliyuncs.com",
        "headers":{
            "Authorization": "Bearer ${api_key}",
            "Content-Type": "application/json;charset=utf-8"
        },
        "request":{
            "content":"""
                {
                "query": "${query}",
                "docs": ${input}
                }
            """
        },
        "response":{
            "json_parser":{
                "reranked_index":"$.result.scores[*].index",
                "relevance_score":"$.result.scores[*].score"
            },
            "error_parser": {
                "path": "$.error.message"
            }
        }
    }
}

POST _inference/rerank/custom_rerank
{
  "input": ["luke", "like", "leia", "chewy","r2d2", "star", "wars"],
  "query": "star wars main character"
}

completion

In the completion module, we demonstrated how to use the task_settings.parameters parameter for more flexible parameter configuration.
To understand completion interface definition for the Alibaba Cloud AI Search completion API, please refer to the official documentation alibaba cloud ai search completion api doc

PUT _inference/completion/custom_completion
{
    "service":"custom-model",
    "service_settings":{
        "secret_parameters":{
            "api_key":<<your api_key>>
        },
        "url":"http://default-j01.platform-cn-shanghai.opensearch.aliyuncs.com",
        "headers":{
            "Authorization": "Bearer ${api_key}"
        },
        "request":{
            "content":"{\"messages\":${messages}}"
        },
        "response":{
            "json_parser":{
                "completion_result":"$.result.text"
            }
            "error_parser":{
                "path":"$.error.message"
            }
        }
    }
}

POST _inference/completion/custom_completion
{
  "input":"",
  "task_settings":{
    "parameters":{
      "messages":[
        {
          "role":"system", 
          "content":"你是一个机器人助手"
        },
        {
          "role":"user", 
          "content":"河南的省会是哪里"
        },
        {
          "role":"assistant", 
          "content":"郑州"
        },
        {
          "role":"user", 
          "content":"那里有什么好玩的"
        }
      ]
    }
  }
}

@elasticsearchmachine
Copy link
Collaborator

Hi @davidkyle, I've created a changelog YAML for you.

TimeValue timeout,
ActionListener<List<ChunkedInference>> listener
) {
listener.onFailure(new ElasticsearchStatusException("Chunking not supported by the {} service", RestStatus.BAD_REQUEST, NAME));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will see how to support this in the next PR, to support semantic_text

@davidkyle
Copy link
Member Author

@weizijun the Elasticsearch security team have advised against adding JsonPATH as a dependency, the last commit to the GitHub project was over a year ago and the project does not appear to be activity maintained. If a critical vulnerability was found in JsonPATH Elasticsearch would be exposed to it and there are no guarantees that the CVE would be fixed.

The team at Elastic considered using another Json path library but have decided to implement the features we need ourselves. The Elasticsearch code base already contains a lot of code for parsing JSON that we can use and writing our own implementation avoids adding another dependency.

@@ -36,7 +36,7 @@ public abstract class BaseResponseHandler implements ResponseHandler {
public static final String METHOD_NOT_ALLOWED = "Received a method not allowed status code";

protected final String requestType;
private final ResponseParser parseFunction;
protected final ResponseParser parseFunction;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making this available so the custom response handler can immediately return on a parse failure instead of retrying.

private static final LazyInitializable<InferenceServiceConfiguration, RuntimeException> configuration = new LazyInitializable<>(
() -> {
var configurationMap = new HashMap<String, SettingsConfiguration>();
// TODO revisit this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll need to create some more complex configuration types to support the fields (like maps, lists of lists etc). Maybe for now we don't expose this in the services API?


Map<String, Object> headers = extractOptionalMap(map, HEADERS, ModelConfigurations.SERVICE_SETTINGS, validationException);
removeNullValues(headers);
var stringHeaders = validateMapStringValues(headers, HEADERS, validationException, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should limit the values in the header map to only strings.

removeNullValues(parameters);
validateMapValues(
parameters,
List.of(String.class, Integer.class, Double.class, Float.class, Boolean.class),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Restricting the task settings to these types (no nested fields aka maps or lists).

public static final String QUERY_PARAMETERS = "query_parameters";

public static QueryParameters fromMap(Map<String, Object> map, ValidationException validationException) {
List<Tuple<String, String>> queryParams = extractOptionalListOfStringTuples(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Query parameters can have duplicate keys which is why I'm not using a map here.

uri = buildUri();
}

private static void addStringParams(Map<String, String> stringParams, Map<String, ?> paramsToAdd) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fields like the url, query parameters, and headers should not have their values converted to json format. This only accepts strings and doesn't manipulate them.

}
}

private static void addJsonStringParams(Map<String, String> jsonStringParams, Map<String, ?> params) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fields like the request body need to be a valid json object so we'll convert the values into json

import java.io.IOException;
import java.util.Objects;

public class SerializableSecureString implements ToXContentFragment, Writeable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we need to serialize the api key or some secrets to the body of a request this class will make that process a little easier by implementing toXContent()

import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.mock;

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is an attempt to push a lot of the duplicate logic in the inference service tests into a central place. If we create more services we should leverage this base class to remove the copy/paste.

@jonathan-buttner jonathan-buttner marked this pull request as ready for review May 2, 2025 21:05
@jonathan-buttner jonathan-buttner added Team:ML Meta label for the ML team auto-backport Automatically create backport pull requests when merged v8.19.0 labels May 2, 2025
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/ml-core (Team:ML)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
auto-backport Automatically create backport pull requests when merged >enhancement :ml Machine learning Team:ML Meta label for the ML team v8.19.0 v9.1.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants