Skip to content

Commit e00d581

Browse files
authored
Split early terminating collector into two separate collectors (elastic#96901)
We use `EarlyTerminatingCollector` for two similar, yet different purposes: - terminate_after, for which we wrap an empty collector and we forcibly terminate collection for all collectors - early termination of total hit count collector based on track_total_hits, which wraps `TotalHitCountCollector` and does not affect other collectors Given that the two are quite separate scenarios, besides the commonality that they both do early termination of some kind, it is convenient to split the two into separate collectors which are specialized for their purposes. The proposal is to have the following two collectors: 1) `TerminateAfterCollector` which forcibly terminates the collector by throwing an exception that is unknown by Lucene. It does not need to wrap another collector 2) `PartialHitCountCollector` that is an extension of `TotalHitCountCollector` that is able to early terminate total hits tracking. This simplifies things as we no longer need to wrap and early terminate a generic collector.
1 parent 840514e commit e00d581

File tree

7 files changed

+294
-133
lines changed

7 files changed

+294
-133
lines changed

server/src/main/java/org/elasticsearch/search/query/EarlyTerminatingCollector.java

Lines changed: 0 additions & 96 deletions
This file was deleted.
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.search.query;
10+
11+
import org.apache.lucene.index.LeafReaderContext;
12+
import org.apache.lucene.search.CollectionTerminatedException;
13+
import org.apache.lucene.search.FilterLeafCollector;
14+
import org.apache.lucene.search.LeafCollector;
15+
import org.apache.lucene.search.ScoreMode;
16+
import org.apache.lucene.search.TotalHitCountCollector;
17+
18+
import java.io.IOException;
19+
20+
/**
21+
* Extension of {@link TotalHitCountCollector} that supports early termination of total hits counting based on a provided threshold.
22+
* Note that the total hit count may be retrieved from {@link org.apache.lucene.search.Weight#count(LeafReaderContext)},
23+
* in which case early termination is only applied to the leaves that do collect documents.
24+
*/
25+
class PartialHitCountCollector extends TotalHitCountCollector {
26+
27+
private final int totalHitsThreshold;
28+
// we could reuse the counter that TotalHitCountCollector has and exposes through getTotalHits(),
29+
// but that would make us early terminate also when retrieving count from Weight#count and would
30+
// cause a behaviour that's difficult to explain and test.
31+
private int numCollected = 0;
32+
private boolean earlyTerminated;
33+
34+
PartialHitCountCollector(int totalHitsThreshold) {
35+
this.totalHitsThreshold = totalHitsThreshold;
36+
}
37+
38+
@Override
39+
public ScoreMode scoreMode() {
40+
// Does not need scores like TotalHitCountCollector (COMPLETE_NO_SCORES), but not exhaustive as it early terminates.
41+
return ScoreMode.TOP_DOCS;
42+
}
43+
44+
@Override
45+
public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
46+
if (numCollected >= totalHitsThreshold) {
47+
earlyTerminateIfNeeded();
48+
}
49+
return new FilterLeafCollector(super.getLeafCollector(context)) {
50+
@Override
51+
public void collect(int doc) throws IOException {
52+
if (++numCollected > totalHitsThreshold) {
53+
earlyTerminateIfNeeded();
54+
}
55+
super.collect(doc);
56+
}
57+
};
58+
}
59+
60+
private void earlyTerminateIfNeeded() {
61+
earlyTerminated = true;
62+
throw new CollectionTerminatedException();
63+
}
64+
65+
boolean hasEarlyTerminated() {
66+
return earlyTerminated;
67+
}
68+
}

server/src/main/java/org/elasticsearch/search/query/QueryPhase.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -208,15 +208,11 @@ static void addCollectorsAndSearch(SearchContext searchContext) throws QueryPhas
208208
if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER) {
209209
// add terminate_after before the filter collectors
210210
// it will only be applied on documents accepted by these filter collectors
211-
EarlyTerminatingCollector earlyTerminatingCollector = new EarlyTerminatingCollector(
212-
EMPTY_COLLECTOR,
213-
searchContext.terminateAfter(),
214-
true
215-
);
211+
TerminateAfterCollector terminateAfterCollector = new TerminateAfterCollector(searchContext.terminateAfter());
216212
final Collector collector = collectorManager.newCollector();
217213
collectorManager = wrapWithProfilerCollectorManagerIfNeeded(
218214
searchContext.getProfilers(),
219-
new SingleThreadCollectorManager(MultiCollector.wrap(earlyTerminatingCollector, collector)),
215+
new SingleThreadCollectorManager(MultiCollector.wrap(terminateAfterCollector, collector)),
220216
REASON_SEARCH_TERMINATE_AFTER_COUNT,
221217
collector
222218
);
@@ -319,7 +315,7 @@ private static void searchWithCollectorManager(
319315
QuerySearchResult queryResult = searchContext.queryResult();
320316
try {
321317
searcher.search(query, collectorManager);
322-
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
318+
} catch (TerminateAfterCollector.EarlyTerminationException e) {
323319
queryResult.terminatedEarly(true);
324320
} catch (TimeExceededException e) {
325321
assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set";
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.search.query;
10+
11+
import org.apache.lucene.index.LeafReaderContext;
12+
import org.apache.lucene.search.Collector;
13+
import org.apache.lucene.search.LeafCollector;
14+
import org.apache.lucene.search.Scorable;
15+
import org.apache.lucene.search.ScoreMode;
16+
17+
import java.io.IOException;
18+
19+
/**
20+
* A {@link Collector} that forcibly early terminates collection after a certain number of hits have been collected.
21+
* Terminates the collection across all collectors by throwing an {@link EarlyTerminationException} once the threshold is reached.
22+
*/
23+
class TerminateAfterCollector implements Collector {
24+
static final class EarlyTerminationException extends RuntimeException {
25+
private EarlyTerminationException(String msg) {
26+
super(msg);
27+
}
28+
29+
@Override
30+
public Throwable fillInStackTrace() {
31+
// never re-thrown so we can save the expensive stacktrace
32+
return this;
33+
}
34+
}
35+
36+
private final int maxCountHits;
37+
private int numCollected;
38+
39+
/**
40+
*
41+
* @param maxCountHits the number of hits to collect, after which the collection must be early terminated
42+
*/
43+
TerminateAfterCollector(int maxCountHits) {
44+
this.maxCountHits = maxCountHits;
45+
}
46+
47+
@Override
48+
public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
49+
if (numCollected >= maxCountHits) {
50+
earlyTerminate();
51+
}
52+
return new LeafCollector() {
53+
@Override
54+
public void setScorer(Scorable scorer) {}
55+
56+
@Override
57+
public void collect(int doc) {
58+
if (++numCollected > maxCountHits) {
59+
earlyTerminate();
60+
}
61+
}
62+
};
63+
}
64+
65+
@Override
66+
public ScoreMode scoreMode() {
67+
// this collector is not exhaustive, as it early terminates, and never needs scores
68+
return ScoreMode.TOP_DOCS;
69+
}
70+
71+
private void earlyTerminate() {
72+
throw new EarlyTerminationException("early termination [CountBased]");
73+
}
74+
}

server/src/main/java/org/elasticsearch/search/query/TopDocsCollectorManagerFactory.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,19 +109,19 @@ private EmptyTopDocsCollectorManagerFactory(@Nullable SortAndFormats sortAndForm
109109
super(REASON_SEARCH_COUNT, null);
110110
this.sort = sortAndFormats == null ? null : sortAndFormats.sort;
111111
if (trackTotalHitsUpTo == SearchContext.TRACK_TOTAL_HITS_DISABLED) {
112-
this.collector = new EarlyTerminatingCollector(new TotalHitCountCollector(), 0, false);
112+
this.collector = new PartialHitCountCollector(0);
113113
// for bwc hit count is set to 0, it will be converted to -1 by the coordinating node
114114
this.hitCountSupplier = () -> new TotalHits(0, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO);
115115
} else {
116-
TotalHitCountCollector hitCountCollector = new TotalHitCountCollector();
117116
if (trackTotalHitsUpTo == SearchContext.TRACK_TOTAL_HITS_ACCURATE) {
117+
TotalHitCountCollector hitCountCollector = new TotalHitCountCollector();
118118
this.collector = hitCountCollector;
119119
this.hitCountSupplier = () -> new TotalHits(hitCountCollector.getTotalHits(), TotalHits.Relation.EQUAL_TO);
120120
} else {
121-
EarlyTerminatingCollector col = new EarlyTerminatingCollector(hitCountCollector, trackTotalHitsUpTo, false);
121+
PartialHitCountCollector col = new PartialHitCountCollector(trackTotalHitsUpTo);
122122
this.collector = col;
123123
this.hitCountSupplier = () -> new TotalHits(
124-
hitCountCollector.getTotalHits(),
124+
col.getTotalHits(),
125125
col.hasEarlyTerminated() ? TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO : TotalHits.Relation.EQUAL_TO
126126
);
127127
}

0 commit comments

Comments
 (0)