Skip to content

Commit e48b3e2

Browse files
committed
Push down field extraction to time-series source
1 parent 9772b5e commit e48b3e2

File tree

12 files changed

+631
-185
lines changed

12 files changed

+631
-185
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ShardContext.java

+14
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88
package org.elasticsearch.compute.lucene;
99

1010
import org.apache.lucene.search.IndexSearcher;
11+
import org.elasticsearch.compute.data.Block;
12+
import org.elasticsearch.index.mapper.BlockLoader;
13+
import org.elasticsearch.index.mapper.MappedFieldType;
14+
import org.elasticsearch.index.mapper.SourceLoader;
1115
import org.elasticsearch.search.sort.SortAndFormats;
1216
import org.elasticsearch.search.sort.SortBuilder;
1317

@@ -39,4 +43,14 @@ public interface ShardContext {
3943
* {@code _cat/shards}.
4044
*/
4145
String shardIdentifier();
46+
47+
/**
48+
* Build something to load source {@code _source}.
49+
*/
50+
SourceLoader newSourceLoader();
51+
52+
/**
53+
* Returns something to load values from this field into a {@link Block}.
54+
*/
55+
BlockLoader blockLoader(String name, boolean asUnsupportedSource, MappedFieldType.FieldExtractPreference fieldExtractPreference);
4256
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java

+274-73
Large diffs are not rendered by default.

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java

+31-24
Original file line numberDiff line numberDiff line change
@@ -639,16 +639,44 @@ public String toString() {
639639
}
640640
}
641641

642-
private static class ComputeBlockLoaderFactory implements BlockLoader.BlockFactory, Releasable {
643-
private final BlockFactory factory;
642+
private static class ComputeBlockLoaderFactory extends DelegatingBlockLoaderFactory implements Releasable {
644643
private final int pageSize;
645644
private Block nullBlock;
646645

647646
private ComputeBlockLoaderFactory(BlockFactory factory, int pageSize) {
648-
this.factory = factory;
647+
super(factory);
649648
this.pageSize = pageSize;
650649
}
651650

651+
@Override
652+
public Block constantNulls() {
653+
if (nullBlock == null) {
654+
nullBlock = factory.newConstantNullBlock(pageSize);
655+
}
656+
nullBlock.incRef();
657+
return nullBlock;
658+
}
659+
660+
@Override
661+
public void close() {
662+
if (nullBlock != null) {
663+
nullBlock.close();
664+
}
665+
}
666+
667+
@Override
668+
public BytesRefBlock constantBytes(BytesRef value) {
669+
return factory.newConstantBytesRefBlockWith(value, pageSize);
670+
}
671+
}
672+
673+
public abstract static class DelegatingBlockLoaderFactory implements BlockLoader.BlockFactory {
674+
protected final BlockFactory factory;
675+
676+
protected DelegatingBlockLoaderFactory(BlockFactory factory) {
677+
this.factory = factory;
678+
}
679+
652680
@Override
653681
public BlockLoader.BooleanBuilder booleansFromDocValues(int expectedCount) {
654682
return factory.newBooleanBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING);
@@ -704,27 +732,6 @@ public BlockLoader.Builder nulls(int expectedCount) {
704732
return ElementType.NULL.newBlockBuilder(expectedCount, factory);
705733
}
706734

707-
@Override
708-
public Block constantNulls() {
709-
if (nullBlock == null) {
710-
nullBlock = factory.newConstantNullBlock(pageSize);
711-
}
712-
nullBlock.incRef();
713-
return nullBlock;
714-
}
715-
716-
@Override
717-
public void close() {
718-
if (nullBlock != null) {
719-
nullBlock.close();
720-
}
721-
}
722-
723-
@Override
724-
public BytesRefBlock constantBytes(BytesRef value) {
725-
return factory.newConstantBytesRefBlockWith(value, pageSize);
726-
}
727-
728735
@Override
729736
public BlockLoader.SingletonOrdinalsBuilder singletonOrdinalsBuilder(SortedDocValues ordinals, int count) {
730737
return new SingletonOrdinalsBuilder(factory, ordinals, count);

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java

+16
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,10 @@
3535
import org.elasticsearch.core.IOUtils;
3636
import org.elasticsearch.core.TimeValue;
3737
import org.elasticsearch.index.cache.query.TrivialQueryCachingPolicy;
38+
import org.elasticsearch.index.mapper.BlockLoader;
3839
import org.elasticsearch.index.mapper.MappedFieldType;
3940
import org.elasticsearch.index.mapper.NumberFieldMapper;
41+
import org.elasticsearch.index.mapper.SourceLoader;
4042
import org.elasticsearch.indices.CrankyCircuitBreakerService;
4143
import org.elasticsearch.search.internal.ContextIndexSearcher;
4244
import org.elasticsearch.search.sort.SortAndFormats;
@@ -299,6 +301,20 @@ public IndexSearcher searcher() {
299301
return searcher;
300302
}
301303

304+
@Override
305+
public SourceLoader newSourceLoader() {
306+
return SourceLoader.FROM_STORED_SOURCE;
307+
}
308+
309+
@Override
310+
public BlockLoader blockLoader(
311+
String name,
312+
boolean asUnsupportedSource,
313+
MappedFieldType.FieldExtractPreference fieldExtractPreference
314+
) {
315+
throw new UnsupportedOperationException();
316+
}
317+
302318
@Override
303319
public Optional<SortAndFormats> buildSort(List<SortBuilder<?>> sorts) {
304320
return Optional.empty();

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorTests.java

+32-16
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper;
4545
import org.elasticsearch.index.mapper.DateFieldMapper;
4646
import org.elasticsearch.index.mapper.KeywordFieldMapper;
47+
import org.elasticsearch.index.mapper.MappedFieldType;
4748
import org.elasticsearch.index.mapper.NumberFieldMapper;
4849
import org.elasticsearch.index.mapper.RoutingPathFields;
4950
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
@@ -81,7 +82,7 @@ public void testSimple() {
8182
int numSamplesPerTS = 10;
8283
long timestampStart = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2024-01-01T00:00:00Z");
8384
int maxPageSize = between(1, 1024);
84-
List<Page> results = runDriver(1024, maxPageSize, randomBoolean(), numTimeSeries, numSamplesPerTS, timestampStart);
85+
List<Page> results = runDriver(1024, maxPageSize, true, numTimeSeries, numSamplesPerTS, timestampStart);
8586
// for now we emit at most one time series each page
8687
int offset = 0;
8788
for (Page page : results) {
@@ -155,9 +156,12 @@ record Doc(int host, long timestamp, long metric) {}
155156
}
156157
int maxPageSize = between(1, 1024);
157158
int limit = randomBoolean() ? between(1, 100000) : Integer.MAX_VALUE;
159+
var metricField = new NumberFieldMapper.NumberFieldType("metric", NumberFieldMapper.NumberType.LONG);
158160
var timeSeriesFactory = createTimeSeriesSourceOperator(
159161
directory,
160162
r -> this.reader = r,
163+
true,
164+
List.of(new ExtractField(metricField, ElementType.LONG)),
161165
limit,
162166
maxPageSize,
163167
randomBoolean(),
@@ -171,12 +175,11 @@ record Doc(int host, long timestamp, long metric) {}
171175
);
172176
DriverContext driverContext = driverContext();
173177
List<Page> results = new ArrayList<>();
174-
var metricField = new NumberFieldMapper.NumberFieldType("metric", NumberFieldMapper.NumberType.LONG);
175178
OperatorTestCase.runDriver(
176179
TestDriverFactory.create(
177180
driverContext,
178181
timeSeriesFactory.get(driverContext),
179-
List.of(ValuesSourceReaderOperatorTests.factory(reader, metricField, ElementType.LONG).get(driverContext)),
182+
List.of(),
180183
new TestResultPageSinkOperator(results::add)
181184
)
182185
);
@@ -240,7 +243,9 @@ public void testMatchNone() throws Exception {
240243
Integer.MAX_VALUE,
241244
randomIntBetween(1, 1024),
242245
1,
246+
randomBoolean(),
243247
List.of(ctx),
248+
List.of(),
244249
unused -> query
245250
);
246251
var driverContext = driverContext();
@@ -260,7 +265,7 @@ public void testMatchNone() throws Exception {
260265

261266
@Override
262267
protected Operator.OperatorFactory simple() {
263-
return createTimeSeriesSourceOperator(directory, r -> this.reader = r, 1, 1, false, writer -> {
268+
return createTimeSeriesSourceOperator(directory, r -> this.reader = r, randomBoolean(), List.of(), 1, 1, false, writer -> {
264269
long timestamp = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2024-01-01T00:00:00Z");
265270
writeTS(writer, timestamp, new Object[] { "hostname", "host-01" }, new Object[] { "voltage", 2 });
266271
return 1;
@@ -279,9 +284,13 @@ protected Matcher<String> expectedToStringOfSimple() {
279284

280285
List<Page> runDriver(int limit, int maxPageSize, boolean forceMerge, int numTimeSeries, int numSamplesPerTS, long timestampStart) {
281286
var ctx = driverContext();
287+
var voltageField = new NumberFieldMapper.NumberFieldType("voltage", NumberFieldMapper.NumberType.LONG);
288+
var hostnameField = new KeywordFieldMapper.KeywordFieldType("hostname");
282289
var timeSeriesFactory = createTimeSeriesSourceOperator(
283290
directory,
284291
indexReader -> this.reader = indexReader,
292+
true,
293+
List.of(new ExtractField(voltageField, ElementType.LONG), new ExtractField(hostnameField, ElementType.BYTES_REF)),
285294
limit,
286295
maxPageSize,
287296
forceMerge,
@@ -300,18 +309,8 @@ List<Page> runDriver(int limit, int maxPageSize, boolean forceMerge, int numTime
300309
);
301310

302311
List<Page> results = new ArrayList<>();
303-
var voltageField = new NumberFieldMapper.NumberFieldType("voltage", NumberFieldMapper.NumberType.LONG);
304-
var hostnameField = new KeywordFieldMapper.KeywordFieldType("hostname");
305312
OperatorTestCase.runDriver(
306-
TestDriverFactory.create(
307-
ctx,
308-
timeSeriesFactory.get(ctx),
309-
List.of(
310-
ValuesSourceReaderOperatorTests.factory(reader, voltageField, ElementType.LONG).get(ctx),
311-
ValuesSourceReaderOperatorTests.factory(reader, hostnameField, ElementType.BYTES_REF).get(ctx)
312-
),
313-
new TestResultPageSinkOperator(results::add)
314-
)
313+
TestDriverFactory.create(ctx, timeSeriesFactory.get(ctx), List.of(), new TestResultPageSinkOperator(results::add))
315314
);
316315
OperatorTestCase.assertDriverContext(ctx);
317316
for (Page result : results) {
@@ -321,9 +320,15 @@ List<Page> runDriver(int limit, int maxPageSize, boolean forceMerge, int numTime
321320
return results;
322321
}
323322

323+
public record ExtractField(MappedFieldType ft, ElementType elementType) {
324+
325+
}
326+
324327
public static TimeSeriesSortedSourceOperatorFactory createTimeSeriesSourceOperator(
325328
Directory directory,
326329
Consumer<IndexReader> readerConsumer,
330+
boolean emitDocIds,
331+
List<ExtractField> extractFields,
327332
int limit,
328333
int maxPageSize,
329334
boolean forceMerge,
@@ -354,7 +359,18 @@ public static TimeSeriesSortedSourceOperatorFactory createTimeSeriesSourceOperat
354359
}
355360
var ctx = new LuceneSourceOperatorTests.MockShardContext(reader, 0);
356361
Function<ShardContext, Query> queryFunction = c -> new MatchAllDocsQuery();
357-
return TimeSeriesSortedSourceOperatorFactory.create(limit, maxPageSize, 1, List.of(ctx), queryFunction);
362+
363+
var fieldInfos = extractFields.stream()
364+
.map(
365+
f -> new ValuesSourceReaderOperator.FieldInfo(
366+
f.ft.name(),
367+
f.elementType,
368+
n -> f.ft.blockLoader(ValuesSourceReaderOperatorTests.blContext())
369+
)
370+
)
371+
.toList();
372+
373+
return TimeSeriesSortedSourceOperatorFactory.create(limit, maxPageSize, 1, emitDocIds, List.of(ctx), fieldInfos, queryFunction);
358374
}
359375

360376
public static void writeTS(RandomIndexWriter iw, long timestamp, Object[] dimensions, Object[] metrics) throws IOException {

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -495,7 +495,7 @@ private static ValuesSourceReaderOperator.FieldInfo fieldInfo(MappedFieldType ft
495495
});
496496
}
497497

498-
private static MappedFieldType.BlockLoaderContext blContext() {
498+
public static MappedFieldType.BlockLoaderContext blContext() {
499499
return new MappedFieldType.BlockLoaderContext() {
500500
@Override
501501
public String indexName() {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import org.elasticsearch.xpack.esql.common.Failures;
1212
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.EnableSpatialDistancePushdown;
1313
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.InsertFieldExtraction;
14-
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushFieldExtractionToTimeSeriesSource;
14+
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushDownFieldExtractionToTimeSeriesSource;
1515
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushFiltersToSource;
1616
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushLimitToSource;
1717
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushSampleToSource;
@@ -81,7 +81,7 @@ protected static List<Batch<PhysicalPlan>> rules(boolean optimizeForEsSource) {
8181
new InsertFieldExtraction(),
8282
new SpatialDocValuesExtraction(),
8383
new SpatialShapeBoundsExtraction(),
84-
new PushFieldExtractionToTimeSeriesSource()
84+
new PushDownFieldExtractionToTimeSeriesSource()
8585
);
8686
return List.of(pushdown, fieldExtraction);
8787
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.optimizer.rules.physical.local;
9+
10+
import org.elasticsearch.index.IndexMode;
11+
import org.elasticsearch.xpack.esql.core.expression.Attribute;
12+
import org.elasticsearch.xpack.esql.core.util.Holder;
13+
import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext;
14+
import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerRules;
15+
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
16+
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
17+
import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
18+
import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
19+
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
20+
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec;
21+
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
22+
23+
import java.util.ArrayList;
24+
import java.util.HashSet;
25+
import java.util.List;
26+
import java.util.Set;
27+
28+
/**
29+
* A rule that pushes down field extractions to occur before filter/limit/topN in the time-series source plan.
30+
*/
31+
public class PushDownFieldExtractionToTimeSeriesSource extends PhysicalOptimizerRules.ParameterizedOptimizerRule<
32+
PhysicalPlan,
33+
LocalPhysicalOptimizerContext> {
34+
35+
@Override
36+
public PhysicalPlan rule(PhysicalPlan plan, LocalPhysicalOptimizerContext context) {
37+
if (plan.anyMatch(p -> p instanceof EsQueryExec q && q.indexMode() == IndexMode.TIME_SERIES) == false) {
38+
return plan;
39+
}
40+
final List<FieldExtractExec> pushDownExtracts = new ArrayList<>();
41+
final Holder<Boolean> keepDocIds = new Holder<>(Boolean.FALSE);
42+
plan.forEachDown(p -> {
43+
if (p instanceof FieldExtractExec) {
44+
pushDownExtracts.add((FieldExtractExec) p);
45+
} else if (stopPushDownExtract(p)) {
46+
if (pushDownExtracts.isEmpty() == false) {
47+
keepDocIds.set(Boolean.TRUE);
48+
pushDownExtracts.clear();
49+
}
50+
}
51+
});
52+
final Holder<Boolean> aborted = new Holder<>(Boolean.FALSE);
53+
return plan.transformUp(PhysicalPlan.class, p -> {
54+
if (aborted.get()) {
55+
return p;
56+
}
57+
if (p instanceof EsQueryExec q && q.indexMode() == IndexMode.TIME_SERIES) {
58+
return addFieldExtract(context, q, keepDocIds.get(), pushDownExtracts);
59+
}
60+
if (stopPushDownExtract(p)) {
61+
aborted.set(Boolean.TRUE);
62+
return p;
63+
}
64+
if (p instanceof FieldExtractExec e) {
65+
return e.child();
66+
}
67+
return p;
68+
});
69+
}
70+
71+
private static boolean stopPushDownExtract(PhysicalPlan p) {
72+
return p instanceof FilterExec || p instanceof TopNExec || p instanceof LimitExec;
73+
}
74+
75+
private TimeSeriesSourceExec addFieldExtract(
76+
LocalPhysicalOptimizerContext context,
77+
EsQueryExec query,
78+
boolean keepDocAttribute,
79+
List<FieldExtractExec> extracts
80+
) {
81+
Set<Attribute> docValuesAttributes = new HashSet<>();
82+
Set<Attribute> boundsAttributes = new HashSet<>();
83+
List<Attribute> attributesToExtract = new ArrayList<>();
84+
for (FieldExtractExec extract : extracts) {
85+
docValuesAttributes.addAll(extract.docValuesAttributes());
86+
boundsAttributes.addAll(extract.boundsAttributes());
87+
attributesToExtract.addAll(extract.attributesToExtract());
88+
}
89+
List<Attribute> attrs = query.attrs();
90+
if (keepDocAttribute == false) {
91+
attrs = attrs.stream().filter(a -> EsQueryExec.isSourceAttribute(a) == false).toList();
92+
}
93+
return new TimeSeriesSourceExec(
94+
query.source(),
95+
attrs,
96+
query.query(),
97+
query.limit(),
98+
context.configuration().pragmas().fieldExtractPreference(),
99+
docValuesAttributes,
100+
boundsAttributes,
101+
attributesToExtract,
102+
query.estimatedRowSize()
103+
);
104+
}
105+
}

0 commit comments

Comments
 (0)