-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Push down field extraction to time-series source #127445
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
cda5378
to
f68492b
Compare
db193c6
to
e48b3e2
Compare
e48b3e2
to
aa331a5
Compare
Pinging @elastic/es-storage-engine (Team:StorageEngine) |
if (plan.anyMatch(p -> p instanceof EsQueryExec q && q.indexMode() == IndexMode.TIME_SERIES) == false) { | ||
return plan; | ||
} | ||
final List<FieldExtractExec> pushDownExtracts = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the main change in planning, where field extractions are pushed down to the time-series source
} | ||
} | ||
|
||
static final class ShardLevelFieldsReader implements Releasable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, we cannot reuse the ValuesSourceReaderOperator and must duplicate some logic here.
...te/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java
Show resolved
Hide resolved
timestampsBuilder = blockFactory.newLongVectorBuilder(Math.min(remainingDocs, maxPageSize)); | ||
tsids = tsHashesBuilder.build(); | ||
int blockIndex = 0; | ||
if (emitDocIds) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (docCollector != null) {
for consistency?
...te/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java
Show resolved
Hide resolved
...pute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorTests.java
Outdated
Show resolved
Hide resolved
import java.util.Set; | ||
|
||
/** | ||
* A rule that pushes down field extractions to occur before filter/limit/topN in the time-series source plan. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we want filtering to happen before field extraction? Or combine them, at least? Maybe I misunderstood this comment..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the query TS index | WHERE host = 'a' | STATS max(rate(counter)) BY host, bucket(1minute)
, we should extract only the host
field (and tsid and timestamp) in the time-series source command. The counter
field should be extracted later by the ValuesSourceReaderOperator because we don't know how many rows of will be filtered out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, what if we have
TS index | WHERE host = 'a' AND TRANGE(1hour) | STATS max(rate(counter)) BY host, bucket(1minute)
I'd think we want to push down the filters on host
and @timestamp
to Lucene, to run before extracting the fields. If this is the case, maybe clarify in the comment that filters on the extracted fields are still pushed down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I expanded the javadoc: 164d788
} else { | ||
sourceLoader = null; | ||
} | ||
this.storedFieldsSpec = storedFieldsSpec; | ||
; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: remove.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, removed 144f13d
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd be lying if I claimed I fully understand this change.. It'd be great if Mark can also take a look, or maybe Nik. That said, we'll be iterating on this code, I expect it to be extended and updated iteratively.
Thanks Kostas! |
This change pushes down field extractions to the time-series source operator, providing these advantages:
DocVector
and its forward/backward maps.DocValues
cache (i.e., blocks that are already decompressed/decoded) when loading values, which can be lost when reading blocks with theValuesSourceReaderOperator
.The following query against the TSDB track previously took 19 seconds but was reduced to 13 seconds with this change:
Note that with this change:
now performs as well as:
when using the shard level data partitioning. This means the performance of the TS command is comparable to the
FROM
command, except that it does not yet support segment-level or doc-level concurrency. I will try to add support for segment-level concurrency, as document-level partitioning is not useful when iterating over documents in order.