-
Notifications
You must be signed in to change notification settings - Fork 25.2k
ESQL: Heuristics to pick efficient partitioning #125739
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Adds heuristics to pick an efficient partitioning strategy based on the index and rewritten query. This speeds up some queries by throwing more cores at the problem: ``` FROM test | STATS SUM(b) Before: took: 31 CPU: 222.3% After: took: 15 CPU: 806.9% ``` It also lowers the overhead of simpler queries by throwing less cores at the problem when it won't really speed anything up: ``` FROM test Before: took: 1 CPU: 48.5% After: took: 1 CPU: 70.4% ``` We have had a `pragma` to control our data partitioning for a long time, this just looks at the query to pick a partitioning scheme. The partitioning options: * `shard`: use one core per shard * `segment`: use one core per large segment * `doc`: break each shard into as many segments as there are cores `doc` is the fastest, but has a lot of overhead, especially for complex Lucene queries. `segment` is fast, but doesn't make the most out of CPUs when there are few segments. `shard` has the lowest overhead. Previously we always used `segment` partitioning because it doesn't have the terrible overhead but is fast. With this change we use `doc` when the top level query matches all documents - those have very very low overhead even in the `doc` partitioning. That's the twice as fast example above. This also uses the `shard` partitioning for queries that don't have to do much work like `FROM foo` or `FROM foo | LIMIT 1` or `FROM foo | SORT a`. That's the lower CPU example above. This forking choice is taken very late on the data node. So queries like this: ``` FROM test | WHERE @timestamp > "2025-01-01T00:00:00Z" | STATS SUM(b) ``` can also use the `doc` partitioning when all documents are after the timestamp and all documents have `b`.
Pinging @elastic/es-analytical-engine (Team:Analytics) |
Hi @nik9000, I've created a changelog YAML for you. |
I think this needs more testing, but I'm going to open this so folks can have a look at it in the mean time. I think I'd like to see more examples of queries and how they get rewritten. I think real integration tests. |
I use a fixed limit of 250,000 for "small" indices and have them stick with |
The way I got the performance numbers I mentioned was on my laptop:
Its not the most scientific measure but it takes a few seconds. You can reproduce the old way with the new code by sending:
If you are in a production release you'd need to send I'm going to have more testing thoughts today. And more |
Initial reports from rally on the http_logs track, still just messing around with bash:
Edit: _search is vary fast at this one - clocking it around the same speed as ESQL with this change. |
Here's another one:
|
This is looking pretty good and I think I have the tests I was hoping for now. I'll main merged back into this shop around for reviews. The only person who has loaded into their head the state needed to review this is @dnhatn, but others should look too and have thoughts. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great! I really like how we handle partitioning per target shard with this PR instead of using the same partitioning for all target shards. Thanks, Nik!
|
||
/** | ||
* How we partition the data across {@link Driver}s. Each request forks into | ||
* {@code min(cpus, partition_count)} threads on the data node. More partitions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, we currently have to use min(1.5 * cpus, partition_count)
because drivers consist of both I/O and CPU-bound operations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll update the comment.
* <ul> | ||
* <li> | ||
* If the {@linkplain Query} matches <strong>no</strong> documents then this will | ||
* use the {@link LuceneSliceQueue.PartitioningStrategy#SEGMENT} strategy so we |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you meant PartitioningStrategy#SHARD
for MatchNoDocsQuery.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
return LuceneSliceQueue.PartitioningStrategy.SEGMENT; | ||
} | ||
LuceneSliceQueue.PartitioningStrategy forClause = highSpeedAutoStrategy(c.query()); | ||
if (forClause == LuceneSliceQueue.PartitioningStrategy.SHARD) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe replace these two if statements with:
if (forClause != LuceneSliceQueue.PartitioningStrategy.DOC) {
return forClause;
}
@nik9000 I benchmarked this change with the nyc_taxis track. Some tasks are much faster, while some are slower. I haven't looked into the details yet but wanted to share it with you here. |
Off topic: I'd really like to make a cluster setting to make the default value. Escape hatch.
@dnhatn, thanks so much for running that! I was looking around for stuff to run and kept looking at noaa. I've totally spaced on nyc_taxis.
Everything is the IMPORTANT: When the name says Maybe the most interesting thing is: why the changes in Why did Here's a 100th percentile service time that looks important:
What's up with that? I think I'll need to spin up a cluster with this change, run the benchmarks, and leave the cluster up. Then do |
+1. |
I've reproduced this and got fairly different numbers. Let's check one:
@dnhatn got a very believable 50% speed boost. I let the cluster keep running after the tests finished. So I looked up
on them. Before:
After:
Now I get the performance boost. More ghosts? |
Checking one that got a lot worse for Nhat,
I think that's just measurement noise from the cloud environments. When I hit new code with a profile I get:
Which is telling me that the heuristics picked the old partitioning scheme. But! When I run it in bash the new code is faster. What wild magic is this? |
The disk layouts are different...
My "before" cluster:
|
Yes, our lives will be easier with that cluster setting. |
I've pushed a change that let's you override the default using a cluster setting. I also rejuggled the |
Overall this looks like a great change. Once merged, we should circle back and add some scenarios with full-text search to ensure that they pick the most optimal partitioning strategy. |
Robots, I will make you happy with this change! |
Pull Request is not mergeable
I had a test bug that seemed to only kick in for serverless and didn't realize it for a few days. In the last build unreleated failures were blocking this. I've merged main again. |
Claims a transport version to backport elastic#125739.
Adds heuristics to pick an efficient partitioning strategy based on the index and rewritten query. This speeds up some queries by throwing more cores at the problem:
It also lowers the overhead of simpler queries by throwing less cores at the problem when it won't really speed anything up:
We have had a
pragma
to control our data partitioning for a long time, this just looks at the query to pick a partitioning scheme. The partitioning options:shard
: use one core per shardsegment
: use one core per large segmentdoc
: break each shard into as many segments as there are coresdoc
is the fastest, but has a lot of overhead, especially for complex Lucene queries.segment
is fast, but doesn't make the most out of CPUs when there are few segments.shard
has the lowest overhead.Previously we always used
segment
partitioning because it doesn't have the terrible overhead but is fast. With this change we usedoc
when the top level query matches all documents - those have very very low overhead even in thedoc
partitioning. That's the twice as fast example above.This also uses the
shard
partitioning for queries that don't have to do much work likeFROM foo
orFROM foo | LIMIT 1
orFROM foo | SORT a
. That's the lower CPU example above.This forking choice is taken very late on the data node. So queries like this:
can also use the
doc
partitioning when all documents are after the timestamp and all documents haveb
.