Skip to content

ESQL: Compute engine support for tagged queries (#128521) #128638

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import org.apache.lucene.search.DocIdStream;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Scorable;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Weight;
Expand Down Expand Up @@ -44,7 +43,7 @@ public static class Factory extends LuceneOperator.Factory {

public Factory(
List<? extends ShardContext> contexts,
Function<ShardContext, Query> queryFunction,
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
DataPartitioning dataPartitioning,
int taskConcurrency,
int limit
Expand Down Expand Up @@ -121,6 +120,9 @@ protected Page getCheckedOutput() throws IOException {
if (scorer == null) {
remainingDocs = 0;
} else {
if (scorer.tags().isEmpty() == false) {
throw new UnsupportedOperationException("tags not supported by " + getClass());
}
Weight weight = scorer.weight();
var leafReaderContext = scorer.leafReaderContext();
// see org.apache.lucene.search.TotalHitCountCollector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.PointValues;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.util.NumericUtils;
import org.elasticsearch.compute.data.Block;
Expand Down Expand Up @@ -114,7 +113,7 @@ public final long evaluate(long value1, long value2) {

public LuceneMaxFactory(
List<? extends ShardContext> contexts,
Function<ShardContext, Query> queryFunction,
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
DataPartitioning dataPartitioning,
int taskConcurrency,
String fieldName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.PointValues;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.util.NumericUtils;
import org.elasticsearch.compute.data.Block;
Expand Down Expand Up @@ -114,7 +113,7 @@ public final long evaluate(long value1, long value2) {

public LuceneMinFactory(
List<? extends ShardContext> contexts,
Function<ShardContext, Query> queryFunction,
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
DataPartitioning dataPartitioning,
int taskConcurrency,
String fieldName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ public Page getCheckedOutput() throws IOException {
if (scorer == null) {
remainingDocs = 0;
} else {
if (scorer.tags().isEmpty() == false) {
throw new UnsupportedOperationException("tags not supported by " + getClass());
}
final LeafReader reader = scorer.leafReaderContext().reader();
final Query query = scorer.weight().getQuery();
if (query == null || query instanceof MatchAllDocsQuery) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public abstract static class Factory implements SourceOperator.SourceOperatorFac
*/
protected Factory(
List<? extends ShardContext> contexts,
Function<ShardContext, Query> queryFunction,
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
DataPartitioning dataPartitioning,
Function<Query, LuceneSliceQueue.PartitioningStrategy> autoStrategy,
int taskConcurrency,
Expand Down Expand Up @@ -153,10 +153,13 @@ LuceneScorer getCurrentOrLoadNextScorer() {
final PartialLeafReaderContext partialLeaf = currentSlice.getLeaf(sliceIndex++);
logger.trace("Starting {}", partialLeaf);
final LeafReaderContext leaf = partialLeaf.leafReaderContext();
if (currentScorer == null || currentScorer.leafReaderContext() != leaf) {
if (currentScorer == null // First time
|| currentScorer.leafReaderContext() != leaf // Moved to a new leaf
|| currentScorer.weight != currentSlice.weight() // Moved to a new query
) {
final Weight weight = currentSlice.weight();
processedQueries.add(weight.getQuery());
currentScorer = new LuceneScorer(currentSlice.shardContext(), weight, leaf);
currentScorer = new LuceneScorer(currentSlice.shardContext(), weight, currentSlice.tags(), leaf);
}
assert currentScorer.maxPosition <= partialLeaf.maxDoc() : currentScorer.maxPosition + ">" + partialLeaf.maxDoc();
currentScorer.maxPosition = partialLeaf.maxDoc();
Expand All @@ -175,15 +178,17 @@ static final class LuceneScorer {
private final ShardContext shardContext;
private final Weight weight;
private final LeafReaderContext leafReaderContext;
private final List<Object> tags;

private BulkScorer bulkScorer;
private int position;
private int maxPosition;
private Thread executingThread;

LuceneScorer(ShardContext shardContext, Weight weight, LeafReaderContext leafReaderContext) {
LuceneScorer(ShardContext shardContext, Weight weight, List<Object> tags, LeafReaderContext leafReaderContext) {
this.shardContext = shardContext;
this.weight = weight;
this.tags = tags;
this.leafReaderContext = leafReaderContext;
reinitialize();
}
Expand Down Expand Up @@ -228,6 +233,13 @@ Weight weight() {
int position() {
return position;
}

/**
* Tags to add to the data returned by this query.
*/
List<Object> tags() {
return tags;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
/**
* Holds a list of multiple partial Lucene segments
*/
public record LuceneSlice(ShardContext shardContext, List<PartialLeafReaderContext> leaves, Weight weight) {
public record LuceneSlice(ShardContext shardContext, List<PartialLeafReaderContext> leaves, Weight weight, List<Object> tags) {
int numLeaves() {
return leaves.size();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,47 @@

/**
* Shared Lucene slices between Lucene operators.
* <p>
* Each shard is {@link #create built} with a list of queries to run and
* tags to add to the queries ({@code List<QueryAndTags>}). Some examples:
* </p>
* <ul>
* <li>
* For queries like {@code FROM foo} we'll use a one element list
* containing {@code match_all, []}. It loads all documents in the
* index and append no extra fields to the loaded documents.
* </li>
* <li>
* For queries like {@code FROM foo | WHERE a > 10} we'll use a one
* element list containing {@code +single_value(a) +(a > 10), []}.
* It loads all documents where {@code a} is single valued and
* greater than 10.
* </li>
* <li>
* For queries like {@code FROM foo | STATS MAX(b) BY ROUND_TO(a, 0, 100)}
* we'll use a two element list containing
* <ul>
* <li>{@code +single_value(a) +(a < 100), [0]}</li>
* <li>{@code +single_value(a) +(a >= 100), [100]}</li>
* </ul>
* It loads all documents in the index where {@code a} is single
* valued and adds a constant {@code 0} to the documents where
* {@code a < 100} and the constant {@code 100} to the documents
* where {@code a >= 100}.
* </li>
* </ul>
* <p>
* IMPORTANT: Runners make no effort to deduplicate the results from multiple
* queries. If you need to only see each document one time then make sure the
* queries are mutually exclusive.
* </p>
*/
public final class LuceneSliceQueue {
/**
* Query to run and tags to add to the results.
*/
public record QueryAndTags(Query query, List<Object> tags) {}

public static final int MAX_DOCS_PER_SLICE = 250_000; // copied from IndexSearcher
public static final int MAX_SEGMENTS_PER_SLICE = 5; // copied from IndexSearcher

Expand Down Expand Up @@ -64,7 +103,7 @@ public Map<String, PartitioningStrategy> partitioningStrategies() {

public static LuceneSliceQueue create(
List<? extends ShardContext> contexts,
Function<ShardContext, Query> queryFunction,
Function<ShardContext, List<QueryAndTags>> queryFunction,
DataPartitioning dataPartitioning,
Function<Query, PartitioningStrategy> autoStrategy,
int taskConcurrency,
Expand All @@ -73,27 +112,29 @@ public static LuceneSliceQueue create(
List<LuceneSlice> slices = new ArrayList<>();
Map<String, PartitioningStrategy> partitioningStrategies = new HashMap<>(contexts.size());
for (ShardContext ctx : contexts) {
Query query = queryFunction.apply(ctx);
query = scoreMode.needsScores() ? query : new ConstantScoreQuery(query);
/*
* Rewrite the query on the local index so things like fully
* overlapping range queries become match all. It's important
* to do this before picking the partitioning strategy so we
* can pick more aggressive strategies when the query rewrites
* into MatchAll.
*/
try {
query = ctx.searcher().rewrite(query);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
PartitioningStrategy partitioning = PartitioningStrategy.pick(dataPartitioning, autoStrategy, ctx, query);
partitioningStrategies.put(ctx.shardIdentifier(), partitioning);
List<List<PartialLeafReaderContext>> groups = partitioning.groups(ctx.searcher(), taskConcurrency);
Weight weight = weight(ctx, query, scoreMode);
for (List<PartialLeafReaderContext> group : groups) {
if (group.isEmpty() == false) {
slices.add(new LuceneSlice(ctx, group, weight));
for (QueryAndTags queryAndExtra : queryFunction.apply(ctx)) {
Query query = queryAndExtra.query;
query = scoreMode.needsScores() ? query : new ConstantScoreQuery(query);
/*
* Rewrite the query on the local index so things like fully
* overlapping range queries become match all. It's important
* to do this before picking the partitioning strategy so we
* can pick more aggressive strategies when the query rewrites
* into MatchAll.
*/
try {
query = ctx.searcher().rewrite(query);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
PartitioningStrategy partitioning = PartitioningStrategy.pick(dataPartitioning, autoStrategy, ctx, query);
partitioningStrategies.put(ctx.shardIdentifier(), partitioning);
List<List<PartialLeafReaderContext>> groups = partitioning.groups(ctx.searcher(), taskConcurrency);
Weight weight = weight(ctx, query, scoreMode);
for (List<PartialLeafReaderContext> group : groups) {
if (group.isEmpty() == false) {
slices.add(new LuceneSlice(ctx, group, weight, queryAndExtra.tags));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
import org.apache.lucene.search.MatchNoDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Scorable;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.DocBlock;
import org.elasticsearch.compute.data.BlockUtils;
import org.elasticsearch.compute.data.DocVector;
import org.elasticsearch.compute.data.DoubleVector;
import org.elasticsearch.compute.data.IntVector;
Expand Down Expand Up @@ -64,7 +65,7 @@ public static class Factory extends LuceneOperator.Factory {

public Factory(
List<? extends ShardContext> contexts,
Function<ShardContext, Query> queryFunction,
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
DataPartitioning dataPartitioning,
int taskConcurrency,
int maxPageSize,
Expand Down Expand Up @@ -321,28 +322,29 @@ public Page getCheckedOutput() throws IOException {
IntVector shard = null;
IntVector leaf = null;
IntVector docs = null;
DoubleVector scores = null;
DocBlock docBlock = null;
Block[] blocks = new Block[1 + (scoreBuilder == null ? 0 : 1) + scorer.tags().size()];
currentPagePos -= discardedDocs;
try {
shard = blockFactory.newConstantIntVector(scorer.shardContext().index(), currentPagePos);
leaf = blockFactory.newConstantIntVector(scorer.leafReaderContext().ord, currentPagePos);
docs = buildDocsVector(currentPagePos);
docsBuilder = blockFactory.newIntVectorBuilder(Math.min(remainingDocs, maxPageSize));
docBlock = new DocVector(shard, leaf, docs, true).asBlock();
int b = 0;
blocks[b++] = new DocVector(shard, leaf, docs, true).asBlock();
shard = null;
leaf = null;
docs = null;
if (scoreBuilder == null) {
page = new Page(currentPagePos, docBlock);
} else {
scores = buildScoresVector(currentPagePos);
if (scoreBuilder != null) {
blocks[b++] = buildScoresVector(currentPagePos).asBlock();
scoreBuilder = blockFactory.newDoubleVectorBuilder(Math.min(remainingDocs, maxPageSize));
page = new Page(currentPagePos, docBlock, scores.asBlock());
}
for (Object e : scorer.tags()) {
blocks[b++] = BlockUtils.constantBlock(blockFactory, e, currentPagePos);
}
page = new Page(currentPagePos, blocks);
} finally {
if (page == null) {
Releasables.closeExpectNoException(shard, leaf, docs, docBlock, scores);
Releasables.closeExpectNoException(shard, leaf, docs, Releasables.wrap(blocks));
}
}
currentPagePos = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public static class Factory extends LuceneOperator.Factory {

public Factory(
List<? extends ShardContext> contexts,
Function<ShardContext, Query> queryFunction,
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
DataPartitioning dataPartitioning,
int taskConcurrency,
int maxPageSize,
Expand Down Expand Up @@ -170,6 +170,9 @@ private Page collect() throws IOException {
return emit(true);
}
try {
if (scorer.tags().isEmpty() == false) {
throw new UnsupportedOperationException("tags not supported by " + getClass());
}
if (perShardCollector == null || perShardCollector.shardContext.index() != scorer.shardContext().index()) {
// TODO: share the bottom between shardCollectors
perShardCollector = newPerShardCollector(scorer.shardContext(), sorts, needsScore, limit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
Expand Down Expand Up @@ -51,7 +50,7 @@ public class TimeSeriesSortedSourceOperatorFactory extends LuceneOperator.Factor

private TimeSeriesSortedSourceOperatorFactory(
List<? extends ShardContext> contexts,
Function<ShardContext, Query> queryFunction,
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
int taskConcurrency,
int maxPageSize,
int limit
Expand Down Expand Up @@ -84,7 +83,7 @@ public static TimeSeriesSortedSourceOperatorFactory create(
int maxPageSize,
int taskConcurrency,
List<? extends ShardContext> searchContexts,
Function<ShardContext, Query> queryFunction
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction
) {
return new TimeSeriesSortedSourceOperatorFactory(searchContexts, queryFunction, taskConcurrency, maxPageSize, limit);
}
Expand Down
Loading