-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Add Apache Arrow as a bulk ingestion format #125040
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Pinging @elastic/es-distributed-indexing (Team:Distributed Indexing) |
Hi @swallez, I've created a changelog YAML for you. |
I think |
…any version conflicts.
@swallez sorry if this is a stupid question: would it be possible to invert the dependency, and have the Arrow module inject a mime-type/parser into the server module? |
@axw that's actually an interesting suggestion! There's no infrastructure in place for that (i.e. it's not a plugin extension point), but certainly something that could be considered. |
@smith I updated timestamps conversions from the 4 units in Arrow to the 2 units in ES:
|
This PR allows to bulk-ingest data using the Apache Arrow format. Arrow is an efficient column-oriented data format that allows zero-copy reads and efficient processing. It is used in data science as a backend for the Python Pandas library, and in data analytics where column-orientation allows efficient processing (like we do in ES|QL).
We already have Arrow as an output format for ES|QL (/cc @nik9000), this adds Arrow as an input format.
Architecture and implementation
This PR leverages the streaming incremental bulk ingestion added recently. Arrow decoding happens on new
/_arrow/_bulk
and/_arrow/{index}/_bulk
endpoints. The request's content-type must beapplication/vnd.apache.arrow.stream
.(note: it was initially an additional mime-type accepted by the
/_bulk
endpoint but adding a dependency on Arrow and its own dependencies in theserver
module proved to be too difficult because of version conflicts – see details in the module-info in thelibs/arrow
module)The Arrow IPC format splits a large data stream into smaller "record batches". The
ArrowBulkIncrementalParser
buffers these batches, and translates them into a stream of individualDocWriteRequest
(Arrow columns are "rotated" into record operations).Decoding Arrow dataframes into ES operations and documents
The bulk endpoint can receive arbitrary create/insert/update/delete operations. Although the Arrow format still allows it (see below), it's more suited to using the same operation with homogeneous data records, to leverage the column-oriented structure of Arrow dataframes.
The default operation type is defined by the
op_type
request parameter and defaults tocreate
, which is also the only operation available on data streams. Forupdate
operations, only partial updates are supported and not script updates.All columns of the dataframe are converted to JSON, except for 3 optional columns:
_id
: defines the document id._index
: defines the target index, if different from the default index set in the request path._bulk_action
: an Arrow struct or map that defines the bulk action, using the same properties as "classic bulk". Allows for advanced use cases and heterogeneous operations.Arrow has a rich set of timestamp types while ES only handles UTC timestamps in milliseconds or nanoseconds. Arrow values in seconds are converted to milliseconds, and values in microseconds are converted to nanoseconds. Timestamps with a timezone are converted to UTC.
The documents created from Arrow dataframes are encoded in CBOR, which is more compact and more efficient to parse than JSON text. This reduces storage space and improves performance when documents are parsed in the indexing phase.
Response format
The response is an Arrow table that only contains error information. Arrow bulk ingestion is expected to be used mainly for create and index operations where success information (index, shard, etc) is of no use and would therefore unnecessarily increase the response size. A fully successful request will therefore return an empty Arrow table.
The fields in the result table are
item_no
(index of the item in the request),_id
,_index
,status
,type
andreason
that are those found in a JSON error.Applications wanting full response details can obtain the "classic" response by sending a
Accept: application/json
header.Code modules
Arrow is already used by ES|QL as an output format. This PR moves the Arrow dependencies to a new
libs/arrow
module to avoid duplication and provide some common utilities.Doing so caused some dependency version conflicts for a few libraries. They were solved by using the latest of the versions used, and centralizing them in the main version catalog.
Todo
ArrowToXContent
.ArrowToXContent
to cover all data types._bulk_action
? It provides equivalence with the existing bulk, but requiring a single operation type would make further optimizations easier, such as directly passing column data to the indexing phase.Possible future developments:
Example usage
This Python script creates an Arrow dataframe, bulk-ingests it, and then uses ES|QL to retrieve it in Arrow format that is read as a Pandas dataframe.
Make sure
pyarrow
,pandas
andtabulate
are installed, then run./gradlew run -Dtests.es.xpack.security.enabled=false
and launch the script.Running it yields the following output (the
name.keyword
field was created because we didn't define an index mapping):