Skip to content

Commit f274ab7

Browse files
authored
Remove empty results before merging (#126770)
We addressed the empty top docs issue with #126385 specifically for scenarios where empty top docs don't go through the wire. Yet they may be serialized from data node back to the coord node, in which case they will no longer be equal to Lucene#EMPTY_TOP_DOCS. This commit expands the existing filtering of empty top docs to include also those that did go through serialization. Closes #126742
1 parent a4b1692 commit f274ab7

File tree

7 files changed

+49
-23
lines changed

7 files changed

+49
-23
lines changed

docs/changelog/126770.yaml

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 126770
2+
summary: Remove empty results before merging
3+
area: Search
4+
type: bug
5+
issues:
6+
- 126742

muted-tests.yml

-3
Original file line numberDiff line numberDiff line change
@@ -315,9 +315,6 @@ tests:
315315
- class: org.elasticsearch.search.CCSDuelIT
316316
method: testTerminateAfter
317317
issue: https://github.com/elastic/elasticsearch/issues/126085
318-
- class: org.elasticsearch.search.sort.GeoDistanceIT
319-
method: testDistanceSortingWithUnmappedField
320-
issue: https://github.com/elastic/elasticsearch/issues/126118
321318
- class: org.elasticsearch.search.basic.SearchWithRandomDisconnectsIT
322319
method: testSearchWithRandomDisconnects
323320
issue: https://github.com/elastic/elasticsearch/issues/122707

server/src/main/java/org/elasticsearch/TransportVersions.java

+1
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ static TransportVersion def(int id) {
225225
public static final TransportVersion ESQL_QUERY_PLANNING_DURATION = def(9_051_0_00);
226226
public static final TransportVersion ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED = def(9_052_0_00);
227227
public static final TransportVersion BATCHED_QUERY_EXECUTION_DELAYABLE_WRITABLE = def(9_053_0_00);
228+
public static final TransportVersion SEARCH_INCREMENTAL_TOP_DOCS_NULL = def(9_054_0_00);
228229

229230
/*
230231
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java

+10-4
Original file line numberDiff line numberDiff line change
@@ -303,13 +303,19 @@ private static void consumePartialMergeResult(
303303
Collection<DelayableWriteable<InternalAggregations>> aggsList
304304
) {
305305
if (topDocsList != null) {
306-
topDocsList.add(partialResult.reducedTopDocs);
306+
addTopDocsToList(partialResult, topDocsList);
307307
}
308308
if (aggsList != null) {
309309
addAggsToList(partialResult, aggsList);
310310
}
311311
}
312312

313+
private static void addTopDocsToList(MergeResult partialResult, List<TopDocs> topDocsList) {
314+
if (partialResult.reducedTopDocs != null) {
315+
topDocsList.add(partialResult.reducedTopDocs);
316+
}
317+
}
318+
313319
private static void addAggsToList(MergeResult partialResult, Collection<DelayableWriteable<InternalAggregations>> aggsList) {
314320
var aggs = partialResult.reducedAggs;
315321
if (aggs != null) {
@@ -340,7 +346,7 @@ private MergeResult partialReduce(
340346
if (hasTopDocs) {
341347
topDocsList = new ArrayList<>(resultSetSize);
342348
if (lastMerge != null) {
343-
topDocsList.add(lastMerge.reducedTopDocs);
349+
addTopDocsToList(lastMerge, topDocsList);
344350
}
345351
} else {
346352
topDocsList = null;
@@ -358,7 +364,7 @@ private MergeResult partialReduce(
358364
}
359365
}
360366
// we have to merge here in the same way we collect on a shard
361-
newTopDocs = topDocsList == null ? Lucene.EMPTY_TOP_DOCS : mergeTopDocs(topDocsList, topNSize, 0);
367+
newTopDocs = topDocsList == null ? null : mergeTopDocs(topDocsList, topNSize, 0);
362368
newAggs = hasAggs
363369
? aggregate(
364370
toConsume.iterator(),
@@ -636,7 +642,7 @@ private static void releaseAggs(List<QuerySearchResult> toConsume) {
636642

637643
record MergeResult(
638644
List<SearchShard> processedShards,
639-
TopDocs reducedTopDocs,
645+
@Nullable TopDocs reducedTopDocs,
640646
@Nullable DelayableWriteable<InternalAggregations> reducedAggs,
641647
long estimatedSize
642648
) implements Writeable {

server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java

+12-9
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import java.util.HashMap;
6161
import java.util.List;
6262
import java.util.Map;
63+
import java.util.Objects;
6364
import java.util.concurrent.Executor;
6465
import java.util.function.BiFunction;
6566
import java.util.function.Consumer;
@@ -140,24 +141,26 @@ static SortedTopDocs sortDocs(
140141
}
141142

142143
static TopDocs mergeTopDocs(List<TopDocs> results, int topN, int from) {
143-
if (results.isEmpty()) {
144+
List<TopDocs> topDocsList = results.stream().filter(Objects::nonNull).toList();
145+
if (topDocsList.isEmpty()) {
144146
return null;
145147
}
146-
final TopDocs topDocs = results.getFirst();
147-
final TopDocs mergedTopDocs;
148-
final int numShards = results.size();
148+
final TopDocs topDocs = topDocsList.getFirst();
149+
final int numShards = topDocsList.size();
149150
if (numShards == 1 && from == 0) { // only one shard and no pagination we can just return the topDocs as we got them.
150151
return topDocs;
151-
} else if (topDocs instanceof TopFieldGroups firstTopDocs) {
152+
}
153+
final TopDocs mergedTopDocs;
154+
if (topDocs instanceof TopFieldGroups firstTopDocs) {
152155
final Sort sort = new Sort(firstTopDocs.fields);
153-
final TopFieldGroups[] shardTopDocs = results.stream().filter(td -> td != Lucene.EMPTY_TOP_DOCS).toArray(TopFieldGroups[]::new);
156+
TopFieldGroups[] shardTopDocs = topDocsList.toArray(new TopFieldGroups[0]);
154157
mergedTopDocs = TopFieldGroups.merge(sort, from, topN, shardTopDocs, false);
155158
} else if (topDocs instanceof TopFieldDocs firstTopDocs) {
156-
final Sort sort = checkSameSortTypes(results, firstTopDocs.fields);
157-
final TopFieldDocs[] shardTopDocs = results.stream().filter((td -> td != Lucene.EMPTY_TOP_DOCS)).toArray(TopFieldDocs[]::new);
159+
TopFieldDocs[] shardTopDocs = topDocsList.toArray(new TopFieldDocs[0]);
160+
final Sort sort = checkSameSortTypes(topDocsList, firstTopDocs.fields);
158161
mergedTopDocs = TopDocs.merge(sort, from, topN, shardTopDocs);
159162
} else {
160-
final TopDocs[] shardTopDocs = results.toArray(new TopDocs[numShards]);
163+
final TopDocs[] shardTopDocs = topDocsList.toArray(new TopDocs[0]);
161164
mergedTopDocs = TopDocs.merge(from, topN, shardTopDocs);
162165
}
163166
return mergedTopDocs;

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

+7-6
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.elasticsearch.common.io.stream.StreamInput;
2828
import org.elasticsearch.common.io.stream.StreamOutput;
2929
import org.elasticsearch.common.io.stream.Writeable;
30-
import org.elasticsearch.common.lucene.Lucene;
3130
import org.elasticsearch.common.util.concurrent.CountDown;
3231
import org.elasticsearch.common.util.concurrent.EsExecutors;
3332
import org.elasticsearch.common.util.concurrent.ListenableFuture;
@@ -722,7 +721,7 @@ private static final class QueryPerNodeState {
722721

723722
private static final QueryPhaseResultConsumer.MergeResult EMPTY_PARTIAL_MERGE_RESULT = new QueryPhaseResultConsumer.MergeResult(
724723
List.of(),
725-
Lucene.EMPTY_TOP_DOCS,
724+
null,
726725
null,
727726
0L
728727
);
@@ -782,10 +781,12 @@ void onShardDone() {
782781
// also collect the set of indices that may be part of a subsequent fetch operation here so that we can release all other
783782
// indices without a roundtrip to the coordinating node
784783
final BitSet relevantShardIndices = new BitSet(searchRequest.shards.size());
785-
for (ScoreDoc scoreDoc : mergeResult.reducedTopDocs().scoreDocs) {
786-
final int localIndex = scoreDoc.shardIndex;
787-
scoreDoc.shardIndex = searchRequest.shards.get(localIndex).shardIndex;
788-
relevantShardIndices.set(localIndex);
784+
if (mergeResult.reducedTopDocs() != null) {
785+
for (ScoreDoc scoreDoc : mergeResult.reducedTopDocs().scoreDocs) {
786+
final int localIndex = scoreDoc.shardIndex;
787+
scoreDoc.shardIndex = searchRequest.shards.get(localIndex).shardIndex;
788+
relevantShardIndices.set(localIndex);
789+
}
789790
}
790791
final Object[] results = new Object[queryPhaseResultConsumer.getNumShards()];
791792
for (int i = 0; i < results.length; i++) {

server/src/main/java/org/elasticsearch/common/lucene/Lucene.java

+13-1
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import org.apache.lucene.util.BytesRef;
6565
import org.apache.lucene.util.Version;
6666
import org.elasticsearch.ExceptionsHelper;
67+
import org.elasticsearch.TransportVersions;
6768
import org.elasticsearch.common.Strings;
6869
import org.elasticsearch.common.io.stream.StreamInput;
6970
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -384,6 +385,14 @@ public static void writeTotalHits(StreamOutput out, TotalHits totalHits) throws
384385
* by shard for sorting purposes.
385386
*/
386387
public static void writeTopDocsIncludingShardIndex(StreamOutput out, TopDocs topDocs) throws IOException {
388+
if (topDocs == null) {
389+
if (out.getTransportVersion().onOrAfter(TransportVersions.SEARCH_INCREMENTAL_TOP_DOCS_NULL)) {
390+
out.writeByte((byte) -1);
391+
return;
392+
} else {
393+
topDocs = Lucene.EMPTY_TOP_DOCS;
394+
}
395+
}
387396
if (topDocs instanceof TopFieldGroups topFieldGroups) {
388397
out.writeByte((byte) 2);
389398
writeTotalHits(out, topDocs.totalHits);
@@ -424,7 +433,10 @@ public static void writeSortFieldArray(StreamOutput out, SortField[] sortFields)
424433
*/
425434
public static TopDocs readTopDocsIncludingShardIndex(StreamInput in) throws IOException {
426435
byte type = in.readByte();
427-
if (type == 0) {
436+
if (type == -1) {
437+
assert in.getTransportVersion().onOrAfter(TransportVersions.SEARCH_INCREMENTAL_TOP_DOCS_NULL);
438+
return null;
439+
} else if (type == 0) {
428440
TotalHits totalHits = readTotalHits(in);
429441

430442
final int scoreDocCount = in.readVInt();

0 commit comments

Comments
 (0)