Skip to content

Add Apache Arrow as a bulk ingestion format #125040

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 21 commits into
base: main
Choose a base branch
from

Conversation

swallez
Copy link
Member

@swallez swallez commented Mar 17, 2025

This PR allows to bulk-ingest data using the Apache Arrow format. Arrow is an efficient column-oriented data format that allows zero-copy reads and efficient processing. It is used in data science as a backend for the Python Pandas library, and in data analytics where column-orientation allows efficient processing (like we do in ES|QL).

We already have Arrow as an output format for ES|QL (/cc @nik9000), this adds Arrow as an input format.

Architecture and implementation

This PR leverages the streaming incremental bulk ingestion added recently. Arrow decoding happens on new /_arrow/_bulk and /_arrow/{index}/_bulk endpoints. The request's content-type must be application/vnd.apache.arrow.stream.

(note: it was initially an additional mime-type accepted by the /_bulk endpoint but adding a dependency on Arrow and its own dependencies in the server module proved to be too difficult because of version conflicts – see details in the module-info in the libs/arrow module)

The Arrow IPC format splits a large data stream into smaller "record batches". The ArrowBulkIncrementalParser buffers these batches, and translates them into a stream of individual DocWriteRequest (Arrow columns are "rotated" into record operations).

Decoding Arrow dataframes into ES operations and documents

The bulk endpoint can receive arbitrary create/insert/update/delete operations. Although the Arrow format still allows it (see below), it's more suited to using the same operation with homogeneous data records, to leverage the column-oriented structure of Arrow dataframes.

The default operation type is defined by the op_type request parameter and defaults to create, which is also the only operation available on data streams. For update operations, only partial updates are supported and not script updates.

All columns of the dataframe are converted to JSON, except for 3 optional columns:

  • _id: defines the document id.
  • _index: defines the target index, if different from the default index set in the request path.
  • _bulk_action: an Arrow struct or map that defines the bulk action, using the same properties as "classic bulk". Allows for advanced use cases and heterogeneous operations.

Arrow has a rich set of timestamp types while ES only handles UTC timestamps in milliseconds or nanoseconds. Arrow values in seconds are converted to milliseconds, and values in microseconds are converted to nanoseconds. Timestamps with a timezone are converted to UTC.

The documents created from Arrow dataframes are encoded in CBOR, which is more compact and more efficient to parse than JSON text. This reduces storage space and improves performance when documents are parsed in the indexing phase.

Response format

The response is an Arrow table that only contains error information. Arrow bulk ingestion is expected to be used mainly for create and index operations where success information (index, shard, etc) is of no use and would therefore unnecessarily increase the response size. A fully successful request will therefore return an empty Arrow table.

The fields in the result table are item_no (index of the item in the request), _id, _index, status, typeand reason that are those found in a JSON error.

Applications wanting full response details can obtain the "classic" response by sending a Accept: application/json header.

Code modules

Arrow is already used by ES|QL as an output format. This PR moves the Arrow dependencies to a new libs/arrow module to avoid duplication and provide some common utilities.

Doing so caused some dependency version conflicts for a few libraries. They were solved by using the latest of the versions used, and centralizing them in the main version catalog.

Todo

  • Docs.
  • Benchmarks.
  • Memory usage limit is hard-coded to 100 MiB per record batch (still allows for very large streams). Arrow has its own memory allocator, which we should link to ES memory management (how to do that?).
  • Implement missing vector types in ArrowToXContent.
  • More tests in ArrowToXContent to cover all data types.
  • Do we really need _bulk_action? It provides equivalence with the existing bulk, but requiring a single operation type would make further optimizations easier, such as directly passing column data to the indexing phase.

Possible future developments:

  • Instead of converting columns to rows (documents), and back again to columns to store data into Lucene, we could index columns directly and avoid creating the document (and use synthetic source).
  • Implement the OTel-Arrow format to improve OpenTelemetry data ingestion (/cc @felixbarny)

Example usage

This Python script creates an Arrow dataframe, bulk-ingests it, and then uses ES|QL to retrieve it in Arrow format that is read as a Pandas dataframe.

Make sure pyarrow, pandas and tabulate are installed, then run ./gradlew run -Dtests.es.xpack.security.enabled=false and launch the script.

import pyarrow
from urllib import request

es_url = "http://localhost:9200/"

def main():
    ids = pyarrow.array(['a', 'b', 'c', 'd'])
    names = pyarrow.array(['Tom', 'Jessica', 'Krish', 'Jack'])
    ages = pyarrow.array([38, 31, 42, 53], type=pyarrow.int8())
    table = pyarrow.table([ids, names, ages], names=["_id", "name", "age"])

    bulk_response = bulk_arrow(table, "arrow-test").read_pandas()
    if len(bulk_response) == 0:
        print("Bulk operations were all successful!")
    else:
        print("Some bulk operations failed:")
        print(bulk_response.to_markdown())

    print()
    esql_response = esql_arrow("FROM arrow-test | keep *")
    print("ES|QL response:")
    print(esql_response.read_pandas().to_markdown())


def bulk_arrow(arrow_table, index):
    """Ingests an arrow table into Elasticsearch in the provided index"""

    with pyarrow.BufferOutputStream() as sink:
        with pyarrow.ipc.new_stream(sink, arrow_table.schema) as writer:
            writer.write(arrow_table)
        buf = sink.getvalue().to_pybytes()

    req =  request.Request(url = es_url + "_arrow/" + index + "/_bulk?refresh=wait_for", data = buf)
    req.add_header("Content-Type", "application/vnd.apache.arrow.stream")

    try:
        response = request.urlopen(req)
    except urllib.error.HTTPError as e:
        print("Bulk request failed: ", e.status)
        print(e.read().decode())
        raise e

    return pyarrow.ipc.open_stream(response.read())

def esql_arrow(query):
    """Runs an ES|QL query and returns the result as an arrow table."""

    data = '{"query": "' + query + '"}' # don't do that in prod
    req =  request.Request(url = es_url + "_query", data = data.encode("utf-8"))
    req.add_header("Content-Type", "application/json")
    req.add_header("Accept", "application/vnd.apache.arrow.stream")

    response = request.urlopen(req)
    return pyarrow.ipc.open_stream(response.read())

if __name__ == "__main__":
    main()

Running it yields the following output (the name.keyword field was created because we didn't define an index mapping):

Bulk operations were all successful!
ES|QL response:
|    |   age | name    | name.keyword   |
|---:|------:|:--------|:---------------|
|  0 |    38 | Tom     | Tom            |
|  1 |    31 | Jessica | Jessica        |
|  2 |    42 | Krish   | Krish          |
|  3 |    53 | Jack    | Jack           |

@swallez swallez added >enhancement :Distributed Indexing/CRUD A catch all label for issues around indexing, updating and getting a doc by id. Not search. labels Mar 17, 2025
@swallez swallez requested review from a team as code owners March 17, 2025 16:28
@elasticsearchmachine elasticsearchmachine added v9.1.0 external-contributor Pull request authored by a developer outside the Elasticsearch team Team:Distributed Indexing Meta label for Distributed Indexing team labels Mar 17, 2025
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-distributed-indexing (Team:Distributed Indexing)

@elasticsearchmachine
Copy link
Collaborator

Hi @swallez, I've created a changelog YAML for you.

@smith
Copy link
Contributor

smith commented Mar 17, 2025

Arrow has a rich set of date/time types. They're all converted to milliseconds since the Epoch.

I think date_nanos would be better here, as that's what all otel dates are.

@axw
Copy link
Member

axw commented Apr 9, 2025

(note: it was initially an additional mime-type accepted by the /_bulk endpoint but adding a dependency on Arrow and its own dependencies in the server module proved to be too difficult because of version conflicts – see details in the module-info in the libs/arrow module)

@swallez sorry if this is a stupid question: would it be possible to invert the dependency, and have the Arrow module inject a mime-type/parser into the server module?

@swallez
Copy link
Member Author

swallez commented Apr 9, 2025

@axw that's actually an interesting suggestion! There's no infrastructure in place for that (i.e. it's not a plugin extension point), but certainly something that could be considered.

@swallez
Copy link
Member Author

swallez commented Apr 12, 2025

@smith I updated timestamps conversions from the 4 units in Arrow to the 2 units in ES:

  • seconds and milliseconds --> milliseconds
  • microseconds and nanoseconds --> nanoseconds

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed Indexing/CRUD A catch all label for issues around indexing, updating and getting a doc by id. Not search. >enhancement external-contributor Pull request authored by a developer outside the Elasticsearch team Team:Distributed Indexing Meta label for Distributed Indexing team v9.1.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants