Skip to content

Push down field extraction to time-series source #127445

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
package org.elasticsearch.compute.lucene;

import org.apache.lucene.search.IndexSearcher;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.index.mapper.BlockLoader;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.SourceLoader;
import org.elasticsearch.search.sort.SortAndFormats;
import org.elasticsearch.search.sort.SortBuilder;

Expand Down Expand Up @@ -39,4 +43,14 @@ public interface ShardContext {
* {@code _cat/shards}.
*/
String shardIdentifier();

/**
* Build something to load source {@code _source}.
*/
SourceLoader newSourceLoader();

/**
* Returns something to load values from this field into a {@link Block}.
*/
BlockLoader blockLoader(String name, boolean asUnsupportedSource, MappedFieldType.FieldExtractPreference fieldExtractPreference);
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -639,16 +639,44 @@ public String toString() {
}
}

private static class ComputeBlockLoaderFactory implements BlockLoader.BlockFactory, Releasable {
private final BlockFactory factory;
private static class ComputeBlockLoaderFactory extends DelegatingBlockLoaderFactory implements Releasable {
private final int pageSize;
private Block nullBlock;

private ComputeBlockLoaderFactory(BlockFactory factory, int pageSize) {
this.factory = factory;
super(factory);
this.pageSize = pageSize;
}

@Override
public Block constantNulls() {
if (nullBlock == null) {
nullBlock = factory.newConstantNullBlock(pageSize);
}
nullBlock.incRef();
return nullBlock;
}

@Override
public void close() {
if (nullBlock != null) {
nullBlock.close();
}
}

@Override
public BytesRefBlock constantBytes(BytesRef value) {
return factory.newConstantBytesRefBlockWith(value, pageSize);
}
}

public abstract static class DelegatingBlockLoaderFactory implements BlockLoader.BlockFactory {
protected final BlockFactory factory;

protected DelegatingBlockLoaderFactory(BlockFactory factory) {
this.factory = factory;
}

@Override
public BlockLoader.BooleanBuilder booleansFromDocValues(int expectedCount) {
return factory.newBooleanBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING);
Expand Down Expand Up @@ -704,27 +732,6 @@ public BlockLoader.Builder nulls(int expectedCount) {
return ElementType.NULL.newBlockBuilder(expectedCount, factory);
}

@Override
public Block constantNulls() {
if (nullBlock == null) {
nullBlock = factory.newConstantNullBlock(pageSize);
}
nullBlock.incRef();
return nullBlock;
}

@Override
public void close() {
if (nullBlock != null) {
nullBlock.close();
}
}

@Override
public BytesRefBlock constantBytes(BytesRef value) {
return factory.newConstantBytesRefBlockWith(value, pageSize);
}

@Override
public BlockLoader.SingletonOrdinalsBuilder singletonOrdinalsBuilder(SortedDocValues ordinals, int count) {
return new SingletonOrdinalsBuilder(factory, ordinals, count);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.cache.query.TrivialQueryCachingPolicy;
import org.elasticsearch.index.mapper.BlockLoader;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.index.mapper.SourceLoader;
import org.elasticsearch.indices.CrankyCircuitBreakerService;
import org.elasticsearch.search.internal.ContextIndexSearcher;
import org.elasticsearch.search.sort.SortAndFormats;
Expand Down Expand Up @@ -299,6 +301,20 @@ public IndexSearcher searcher() {
return searcher;
}

@Override
public SourceLoader newSourceLoader() {
return SourceLoader.FROM_STORED_SOURCE;
}

@Override
public BlockLoader blockLoader(
String name,
boolean asUnsupportedSource,
MappedFieldType.FieldExtractPreference fieldExtractPreference
) {
throw new UnsupportedOperationException();
}

@Override
public Optional<SortAndFormats> buildSort(List<SortBuilder<?>> sorts) {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.index.mapper.RoutingPathFields;
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
Expand Down Expand Up @@ -155,9 +156,12 @@ record Doc(int host, long timestamp, long metric) {}
}
int maxPageSize = between(1, 1024);
int limit = randomBoolean() ? between(1, 100000) : Integer.MAX_VALUE;
var metricField = new NumberFieldMapper.NumberFieldType("metric", NumberFieldMapper.NumberType.LONG);
var timeSeriesFactory = createTimeSeriesSourceOperator(
directory,
r -> this.reader = r,
true,
List.of(new ExtractField(metricField, ElementType.LONG)),
limit,
maxPageSize,
randomBoolean(),
Expand All @@ -171,12 +175,11 @@ record Doc(int host, long timestamp, long metric) {}
);
DriverContext driverContext = driverContext();
List<Page> results = new ArrayList<>();
var metricField = new NumberFieldMapper.NumberFieldType("metric", NumberFieldMapper.NumberType.LONG);
OperatorTestCase.runDriver(
TestDriverFactory.create(
driverContext,
timeSeriesFactory.get(driverContext),
List.of(ValuesSourceReaderOperatorTests.factory(reader, metricField, ElementType.LONG).get(driverContext)),
List.of(),
new TestResultPageSinkOperator(results::add)
)
);
Expand Down Expand Up @@ -240,7 +243,9 @@ public void testMatchNone() throws Exception {
Integer.MAX_VALUE,
randomIntBetween(1, 1024),
1,
randomBoolean(),
List.of(ctx),
List.of(),
unused -> query
);
var driverContext = driverContext();
Expand All @@ -260,7 +265,7 @@ public void testMatchNone() throws Exception {

@Override
protected Operator.OperatorFactory simple() {
return createTimeSeriesSourceOperator(directory, r -> this.reader = r, 1, 1, false, writer -> {
return createTimeSeriesSourceOperator(directory, r -> this.reader = r, randomBoolean(), List.of(), 1, 1, false, writer -> {
long timestamp = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2024-01-01T00:00:00Z");
writeTS(writer, timestamp, new Object[] { "hostname", "host-01" }, new Object[] { "voltage", 2 });
return 1;
Expand All @@ -279,9 +284,13 @@ protected Matcher<String> expectedToStringOfSimple() {

List<Page> runDriver(int limit, int maxPageSize, boolean forceMerge, int numTimeSeries, int numSamplesPerTS, long timestampStart) {
var ctx = driverContext();
var voltageField = new NumberFieldMapper.NumberFieldType("voltage", NumberFieldMapper.NumberType.LONG);
var hostnameField = new KeywordFieldMapper.KeywordFieldType("hostname");
var timeSeriesFactory = createTimeSeriesSourceOperator(
directory,
indexReader -> this.reader = indexReader,
true,
List.of(new ExtractField(voltageField, ElementType.LONG), new ExtractField(hostnameField, ElementType.BYTES_REF)),
limit,
maxPageSize,
forceMerge,
Expand All @@ -300,18 +309,8 @@ List<Page> runDriver(int limit, int maxPageSize, boolean forceMerge, int numTime
);

List<Page> results = new ArrayList<>();
var voltageField = new NumberFieldMapper.NumberFieldType("voltage", NumberFieldMapper.NumberType.LONG);
var hostnameField = new KeywordFieldMapper.KeywordFieldType("hostname");
OperatorTestCase.runDriver(
TestDriverFactory.create(
ctx,
timeSeriesFactory.get(ctx),
List.of(
ValuesSourceReaderOperatorTests.factory(reader, voltageField, ElementType.LONG).get(ctx),
ValuesSourceReaderOperatorTests.factory(reader, hostnameField, ElementType.BYTES_REF).get(ctx)
),
new TestResultPageSinkOperator(results::add)
)
TestDriverFactory.create(ctx, timeSeriesFactory.get(ctx), List.of(), new TestResultPageSinkOperator(results::add))
);
OperatorTestCase.assertDriverContext(ctx);
for (Page result : results) {
Expand All @@ -321,9 +320,15 @@ List<Page> runDriver(int limit, int maxPageSize, boolean forceMerge, int numTime
return results;
}

public record ExtractField(MappedFieldType ft, ElementType elementType) {

}

public static TimeSeriesSortedSourceOperatorFactory createTimeSeriesSourceOperator(
Directory directory,
Consumer<IndexReader> readerConsumer,
boolean emitDocIds,
List<ExtractField> extractFields,
int limit,
int maxPageSize,
boolean forceMerge,
Expand Down Expand Up @@ -354,7 +359,18 @@ public static TimeSeriesSortedSourceOperatorFactory createTimeSeriesSourceOperat
}
var ctx = new LuceneSourceOperatorTests.MockShardContext(reader, 0);
Function<ShardContext, Query> queryFunction = c -> new MatchAllDocsQuery();
return TimeSeriesSortedSourceOperatorFactory.create(limit, maxPageSize, 1, List.of(ctx), queryFunction);

var fieldInfos = extractFields.stream()
.map(
f -> new ValuesSourceReaderOperator.FieldInfo(
f.ft.name(),
f.elementType,
n -> f.ft.blockLoader(ValuesSourceReaderOperatorTests.blContext())
)
)
.toList();

return TimeSeriesSortedSourceOperatorFactory.create(limit, maxPageSize, 1, emitDocIds, List.of(ctx), fieldInfos, queryFunction);
}

public static void writeTS(RandomIndexWriter iw, long timestamp, Object[] dimensions, Object[] metrics) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ private static ValuesSourceReaderOperator.FieldInfo fieldInfo(MappedFieldType ft
});
}

private static MappedFieldType.BlockLoaderContext blContext() {
public static MappedFieldType.BlockLoaderContext blContext() {
return new MappedFieldType.BlockLoaderContext() {
@Override
public String indexName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.elasticsearch.xpack.esql.common.Failures;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.EnableSpatialDistancePushdown;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.InsertFieldExtraction;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushFieldExtractionToTimeSeriesSource;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushDownFieldExtractionToTimeSeriesSource;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushFiltersToSource;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushLimitToSource;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushSampleToSource;
Expand Down Expand Up @@ -81,7 +81,7 @@ protected static List<Batch<PhysicalPlan>> rules(boolean optimizeForEsSource) {
new InsertFieldExtraction(),
new SpatialDocValuesExtraction(),
new SpatialShapeBoundsExtraction(),
new PushFieldExtractionToTimeSeriesSource()
new PushDownFieldExtractionToTimeSeriesSource()
);
return List.of(pushdown, fieldExtraction);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.optimizer.rules.physical.local;

import org.elasticsearch.index.IndexMode;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.util.Holder;
import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext;
import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerRules;
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec;
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
* An optimization rule that pushes down field extractions to occur at the lowest filter, limit, or topN in the time-series source plan.
* For example:
* `TS index | WHERE host = 'a' AND cluster = 'b' | STATS max(rate(counter)) BY host, bucket(1minute)`
* In this query, the extraction of the `host` and `cluster` fields will be pushed down to the time-series source,
* while the extraction of the `counter` field will occur later. In such cases, the `doc_ids` still need to be returned
* for the later extraction. However, if the filter (`host = 'a' AND cluster = 'b'`) is pushed down to Lucene, all field extractions
* (e.g., `host` and `counter`) will be pushed down to the time-series source, and `doc_ids` will not be returned.
*/
public class PushDownFieldExtractionToTimeSeriesSource extends PhysicalOptimizerRules.ParameterizedOptimizerRule<
PhysicalPlan,
LocalPhysicalOptimizerContext> {

@Override
public PhysicalPlan rule(PhysicalPlan plan, LocalPhysicalOptimizerContext context) {
if (plan.anyMatch(p -> p instanceof EsQueryExec q && q.indexMode() == IndexMode.TIME_SERIES) == false) {
return plan;
}
final List<FieldExtractExec> pushDownExtracts = new ArrayList<>();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main change in planning, where field extractions are pushed down to the time-series source

final Holder<Boolean> keepDocIds = new Holder<>(Boolean.FALSE);
plan.forEachDown(p -> {
if (p instanceof FieldExtractExec) {
pushDownExtracts.add((FieldExtractExec) p);
} else if (stopPushDownExtract(p)) {
if (pushDownExtracts.isEmpty() == false) {
keepDocIds.set(Boolean.TRUE);
pushDownExtracts.clear();
}
}
});
final Holder<Boolean> aborted = new Holder<>(Boolean.FALSE);
return plan.transformUp(PhysicalPlan.class, p -> {
if (aborted.get()) {
return p;
}
if (p instanceof EsQueryExec q && q.indexMode() == IndexMode.TIME_SERIES) {
return addFieldExtract(context, q, keepDocIds.get(), pushDownExtracts);
}
if (stopPushDownExtract(p)) {
aborted.set(Boolean.TRUE);
return p;
}
if (p instanceof FieldExtractExec e) {
return e.child();
}
return p;
});
}

private static boolean stopPushDownExtract(PhysicalPlan p) {
return p instanceof FilterExec || p instanceof TopNExec || p instanceof LimitExec;
}

private TimeSeriesSourceExec addFieldExtract(
LocalPhysicalOptimizerContext context,
EsQueryExec query,
boolean keepDocAttribute,
List<FieldExtractExec> extracts
) {
Set<Attribute> docValuesAttributes = new HashSet<>();
Set<Attribute> boundsAttributes = new HashSet<>();
List<Attribute> attributesToExtract = new ArrayList<>();
for (FieldExtractExec extract : extracts) {
docValuesAttributes.addAll(extract.docValuesAttributes());
boundsAttributes.addAll(extract.boundsAttributes());
attributesToExtract.addAll(extract.attributesToExtract());
}
List<Attribute> attrs = query.attrs();
if (keepDocAttribute == false) {
attrs = attrs.stream().filter(a -> EsQueryExec.isSourceAttribute(a) == false).toList();
}
return new TimeSeriesSourceExec(
query.source(),
attrs,
query.query(),
query.limit(),
context.configuration().pragmas().fieldExtractPreference(),
docValuesAttributes,
boundsAttributes,
attributesToExtract,
query.estimatedRowSize()
);
}
}
Loading