Skip to content

Commit 7ca4d48

Browse files
iverasejavanna
andauthored
Add test cluster setting to control the minimum number of docs in a concurrent slice (elastic#97740)
The current slicing algorithm for concurrent search only creates multiple slices iff it can add at least 50k documents in each slice. This means that we need to have at least 100K documents in order to trigger concurrent search. In tests, we can create several segments, but normally those segments contains few documents, therefore they will never trigger concurrent search. This PR adds a cluster setting that can control the minimum number documents we require to create multiple slices. This setting is only registered on tests, therefore it is not exposed to the users. During test we can set this number to a very low value to increase the likelihood that the test will exercise the concurrent code-path. This change affects ESIntegTestCase as well as EsSingleNodeTestCase. --------- Co-authored-by: Luca Cavanna <[email protected]>
1 parent 6c45fec commit 7ca4d48

File tree

12 files changed

+257
-31
lines changed

12 files changed

+257
-31
lines changed

server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ final class DefaultSearchContext extends SearchContext {
136136
SearchShardTarget shardTarget,
137137
LongSupplier relativeTimeSupplier,
138138
TimeValue timeout,
139+
int minimumDocsPerSlice,
139140
FetchPhase fetchPhase,
140141
boolean lowLevelCancellation
141142
) throws IOException {
@@ -153,7 +154,9 @@ final class DefaultSearchContext extends SearchContext {
153154
engineSearcher.getSimilarity(),
154155
engineSearcher.getQueryCache(),
155156
engineSearcher.getQueryCachingPolicy(),
156-
lowLevelCancellation
157+
minimumDocsPerSlice,
158+
lowLevelCancellation,
159+
null
157160
);
158161
releasables.addAll(List.of(engineSearcher, searcher));
159162

server/src/main/java/org/elasticsearch/search/SearchService.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,14 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
202202
Property.NodeScope
203203
);
204204

205+
// This setting is only registered on tests to force concurrent search even when segments contains very few documents.
206+
public static final Setting<Integer> MINIMUM_DOCS_PER_SLICE = Setting.intSetting(
207+
"search.minimum_docs_per_slice",
208+
50_000,
209+
1,
210+
Property.NodeScope
211+
);
212+
205213
public static final Setting<Integer> MAX_OPEN_SCROLL_CONTEXT = Setting.intSetting(
206214
"search.max_open_scroll_context",
207215
500,
@@ -251,6 +259,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
251259

252260
private volatile TimeValue defaultSearchTimeout;
253261

262+
private final int minimumDocsPerSlice;
263+
254264
private volatile boolean defaultAllowPartialSearchResults;
255265

256266
private volatile boolean lowLevelCancellation;
@@ -316,6 +326,8 @@ public SearchService(
316326
defaultSearchTimeout = DEFAULT_SEARCH_TIMEOUT_SETTING.get(settings);
317327
clusterService.getClusterSettings().addSettingsUpdateConsumer(DEFAULT_SEARCH_TIMEOUT_SETTING, this::setDefaultSearchTimeout);
318328

329+
minimumDocsPerSlice = MINIMUM_DOCS_PER_SLICE.get(settings);
330+
319331
defaultAllowPartialSearchResults = DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS.get(settings);
320332
clusterService.getClusterSettings()
321333
.addSettingsUpdateConsumer(DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS, this::setDefaultAllowPartialSearchResults);
@@ -1037,6 +1049,7 @@ private DefaultSearchContext createSearchContext(ReaderContext reader, ShardSear
10371049
shardTarget,
10381050
threadPool::relativeTimeInMillis,
10391051
timeout,
1052+
minimumDocsPerSlice,
10401053
fetchPhase,
10411054
lowLevelCancellation
10421055
);

server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,6 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable {
7171
*/
7272
private static final int CHECK_CANCELLED_SCORER_INTERVAL = 1 << 11;
7373

74-
// don't create slices with less than 50k docs
75-
private static final int MINIMUM_DOCS_PER_SLICE = 50_000;
76-
7774
// make sure each slice has at least 10% of the documents as a way to limit memory usage and
7875
// to keep the error margin of terms aggregation low
7976
private static final double MINIMUM_DOCS_PERCENT_PER_SLICE = 0.1;
@@ -84,6 +81,8 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable {
8481

8582
private final QueueSizeBasedExecutor queueSizeBasedExecutor;
8683
private final LeafSlice[] leafSlices;
84+
// don't create slices with less than this number of docs
85+
private final int minimumDocsPerSlice;
8786

8887
/** constructor for non-concurrent search */
8988
public ContextIndexSearcher(
@@ -93,7 +92,16 @@ public ContextIndexSearcher(
9392
QueryCachingPolicy queryCachingPolicy,
9493
boolean wrapWithExitableDirectoryReader
9594
) throws IOException {
96-
this(reader, similarity, queryCache, queryCachingPolicy, new MutableQueryTimeout(), wrapWithExitableDirectoryReader, null);
95+
this(
96+
reader,
97+
similarity,
98+
queryCache,
99+
queryCachingPolicy,
100+
new MutableQueryTimeout(),
101+
Integer.MAX_VALUE,
102+
wrapWithExitableDirectoryReader,
103+
null
104+
);
97105
}
98106

99107
/** constructor for concurrent search */
@@ -102,10 +110,20 @@ public ContextIndexSearcher(
102110
Similarity similarity,
103111
QueryCache queryCache,
104112
QueryCachingPolicy queryCachingPolicy,
113+
int minimumDocsPerSlice,
105114
boolean wrapWithExitableDirectoryReader,
106115
ThreadPoolExecutor executor
107116
) throws IOException {
108-
this(reader, similarity, queryCache, queryCachingPolicy, new MutableQueryTimeout(), wrapWithExitableDirectoryReader, executor);
117+
this(
118+
reader,
119+
similarity,
120+
queryCache,
121+
queryCachingPolicy,
122+
new MutableQueryTimeout(),
123+
minimumDocsPerSlice,
124+
wrapWithExitableDirectoryReader,
125+
executor
126+
);
109127
}
110128

111129
private ContextIndexSearcher(
@@ -114,6 +132,7 @@ private ContextIndexSearcher(
114132
QueryCache queryCache,
115133
QueryCachingPolicy queryCachingPolicy,
116134
MutableQueryTimeout cancellable,
135+
int minimumDocsPerSlice,
117136
boolean wrapWithExitableDirectoryReader,
118137
ThreadPoolExecutor executor
119138
) throws IOException {
@@ -124,9 +143,15 @@ private ContextIndexSearcher(
124143
setQueryCachingPolicy(queryCachingPolicy);
125144
this.cancellable = cancellable;
126145
this.queueSizeBasedExecutor = executor != null ? new QueueSizeBasedExecutor(executor) : null;
146+
this.minimumDocsPerSlice = minimumDocsPerSlice;
127147
this.leafSlices = executor == null ? null : slices(leafContexts);
128148
}
129149

150+
// package private for testing
151+
int getMinimumDocsPerSlice() {
152+
return minimumDocsPerSlice;
153+
}
154+
130155
public void setProfiler(QueryProfiler profiler) {
131156
this.profiler = profiler;
132157
}
@@ -203,7 +228,7 @@ public Weight createWeight(Query query, ScoreMode scoreMode, float boost) throws
203228

204229
@Override
205230
protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
206-
return computeSlices(leaves, queueSizeBasedExecutor.threadPoolExecutor.getMaximumPoolSize(), MINIMUM_DOCS_PER_SLICE);
231+
return computeSlices(leaves, queueSizeBasedExecutor.threadPoolExecutor.getMaximumPoolSize(), minimumDocsPerSlice);
207232
}
208233

209234
/**

server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
142142
target,
143143
null,
144144
timeout,
145+
randomIntBetween(1, Integer.MAX_VALUE),
145146
null,
146147
false
147148
);
@@ -173,7 +174,16 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
173174
shardSearchRequest,
174175
randomNonNegativeLong()
175176
);
176-
DefaultSearchContext context1 = new DefaultSearchContext(readerContext, shardSearchRequest, target, null, timeout, null, false);
177+
DefaultSearchContext context1 = new DefaultSearchContext(
178+
readerContext,
179+
shardSearchRequest,
180+
target,
181+
null,
182+
timeout,
183+
randomIntBetween(1, Integer.MAX_VALUE),
184+
null,
185+
false
186+
);
177187
context1.from(300);
178188
exception = expectThrows(IllegalArgumentException.class, () -> context1.preProcess());
179189
assertThat(
@@ -237,7 +247,16 @@ public ScrollContext scrollContext() {
237247
}
238248
};
239249
// rescore is null but sliceBuilder is not null
240-
DefaultSearchContext context2 = new DefaultSearchContext(readerContext, shardSearchRequest, target, null, timeout, null, false);
250+
DefaultSearchContext context2 = new DefaultSearchContext(
251+
readerContext,
252+
shardSearchRequest,
253+
target,
254+
null,
255+
timeout,
256+
randomIntBetween(1, Integer.MAX_VALUE),
257+
null,
258+
false
259+
);
241260

242261
SliceBuilder sliceBuilder = mock(SliceBuilder.class);
243262
int numSlices = maxSlicesPerScroll + randomIntBetween(1, 100);
@@ -263,7 +282,16 @@ public ScrollContext scrollContext() {
263282
when(shardSearchRequest.getAliasFilter()).thenReturn(AliasFilter.EMPTY);
264283
when(shardSearchRequest.indexBoost()).thenReturn(AbstractQueryBuilder.DEFAULT_BOOST);
265284

266-
DefaultSearchContext context3 = new DefaultSearchContext(readerContext, shardSearchRequest, target, null, timeout, null, false);
285+
DefaultSearchContext context3 = new DefaultSearchContext(
286+
readerContext,
287+
shardSearchRequest,
288+
target,
289+
null,
290+
timeout,
291+
randomIntBetween(1, Integer.MAX_VALUE),
292+
null,
293+
false
294+
);
267295
ParsedQuery parsedQuery = ParsedQuery.parsedMatchAllQuery();
268296
context3.sliceBuilder(null).parsedQuery(parsedQuery).preProcess();
269297
assertEquals(context3.query(), context3.buildFilteredQuery(parsedQuery.query()));
@@ -279,7 +307,16 @@ public ScrollContext scrollContext() {
279307
randomNonNegativeLong(),
280308
false
281309
);
282-
DefaultSearchContext context4 = new DefaultSearchContext(readerContext, shardSearchRequest, target, null, timeout, null, false);
310+
DefaultSearchContext context4 = new DefaultSearchContext(
311+
readerContext,
312+
shardSearchRequest,
313+
target,
314+
null,
315+
timeout,
316+
randomIntBetween(1, Integer.MAX_VALUE),
317+
null,
318+
false
319+
);
283320
context4.sliceBuilder(new SliceBuilder(1, 2)).parsedQuery(parsedQuery).preProcess();
284321
Query query1 = context4.query();
285322
context4.sliceBuilder(new SliceBuilder(0, 2)).parsedQuery(parsedQuery).preProcess();
@@ -336,7 +373,16 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
336373
randomNonNegativeLong(),
337374
false
338375
);
339-
DefaultSearchContext context = new DefaultSearchContext(readerContext, shardSearchRequest, target, null, timeout, null, false);
376+
DefaultSearchContext context = new DefaultSearchContext(
377+
readerContext,
378+
shardSearchRequest,
379+
target,
380+
null,
381+
timeout,
382+
randomIntBetween(1, Integer.MAX_VALUE),
383+
null,
384+
false
385+
);
340386

341387
assertThat(context.searcher().hasCancellations(), is(false));
342388
context.searcher().addQueryCancellation(() -> {});

server/src/test/java/org/elasticsearch/search/dfs/DfsPhaseTests.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import org.apache.lucene.document.Document;
1212
import org.apache.lucene.document.KnnFloatVectorField;
1313
import org.apache.lucene.index.IndexReader;
14-
import org.apache.lucene.index.LeafReaderContext;
1514
import org.apache.lucene.search.IndexSearcher;
1615
import org.apache.lucene.search.KnnFloatVectorQuery;
1716
import org.apache.lucene.search.Query;
@@ -74,15 +73,10 @@ public void testSingleKnnSearch() throws IOException {
7473
IndexSearcher.getDefaultSimilarity(),
7574
IndexSearcher.getDefaultQueryCache(),
7675
IndexSearcher.getDefaultQueryCachingPolicy(),
76+
1,
7777
randomBoolean(),
7878
this.threadPoolExecutor
79-
) {
80-
@Override
81-
protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
82-
// get a thread per segment
83-
return slices(leaves, 1, 1);
84-
}
85-
};
79+
);
8680

8781
Query query = new KnnFloatVectorQuery("float_vector", new float[] { 0, 0, 0 }, numDocs, null);
8882

server/src/test/java/org/elasticsearch/search/internal/ContextIndexSearcherTests.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -247,14 +247,12 @@ public Void reduce(Collection<Collector> collectors) {
247247
IndexSearcher.getDefaultSimilarity(),
248248
IndexSearcher.getDefaultQueryCache(),
249249
IndexSearcher.getDefaultQueryCachingPolicy(),
250+
1,
250251
randomBoolean(),
251252
executor
252253
) {
253254
@Override
254255
protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
255-
if (leaves.size() == 1) {
256-
return super.slices(leaves);
257-
}
258256
return slices(leaves, 1, 1);
259257
}
260258
};
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.search;
10+
11+
import org.elasticsearch.common.settings.Setting;
12+
import org.elasticsearch.plugins.Plugin;
13+
14+
import java.util.List;
15+
16+
/**
17+
* This plugin is used to register the {@link SearchService#MINIMUM_DOCS_PER_SLICE} setting.
18+
* This setting forces the {@link SearchService} to create many slices even when very few documents
19+
* are available, something we don;t really want to happen in real usage.
20+
*/
21+
public class ConcurrentSearchTestPlugin extends Plugin {
22+
@Override
23+
public List<Setting<?>> getSettings() {
24+
return List.of(SearchService.MINIMUM_DOCS_PER_SLICE);
25+
}
26+
}

test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -929,15 +929,10 @@ protected IndexSearcher newIndexSearcher(DirectoryReader indexReader) throws IOE
929929
IndexSearcher.getDefaultSimilarity(),
930930
IndexSearcher.getDefaultQueryCache(),
931931
IndexSearcher.getDefaultQueryCachingPolicy(),
932+
1, // forces multiple slices
932933
randomBoolean(),
933934
this.threadPoolExecutor
934-
) {
935-
@Override
936-
protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
937-
// get a thread per segment
938-
return slices(leaves, 1, 1);
939-
}
940-
};
935+
);
941936
}
942937
}
943938

test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@
123123
import org.elasticsearch.rest.RestStatus;
124124
import org.elasticsearch.rest.action.RestCancellableNodeClient;
125125
import org.elasticsearch.script.MockScriptService;
126+
import org.elasticsearch.search.ConcurrentSearchTestPlugin;
126127
import org.elasticsearch.search.MockSearchService;
127128
import org.elasticsearch.search.SearchHit;
128129
import org.elasticsearch.search.SearchService;
@@ -2045,6 +2046,10 @@ private NodeConfigurationSource getNodeConfigSource() {
20452046
if (addMockTransportService()) {
20462047
initialNodeSettings.put(NetworkModule.TRANSPORT_TYPE_KEY, getTestTransportType());
20472048
}
2049+
boolean eagerConcurrentSearch = eagerConcurrentSearch();
2050+
if (eagerConcurrentSearch) {
2051+
initialNodeSettings.put(SearchService.MINIMUM_DOCS_PER_SLICE.getKey(), 1);
2052+
}
20482053
return new NodeConfigurationSource() {
20492054
@Override
20502055
public Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
@@ -2061,6 +2066,11 @@ public Path nodeConfigPath(int nodeOrdinal) {
20612066

20622067
@Override
20632068
public Collection<Class<? extends Plugin>> nodePlugins() {
2069+
if (eagerConcurrentSearch) {
2070+
List<Class<? extends Plugin>> plugins = new ArrayList<>(ESIntegTestCase.this.nodePlugins());
2071+
plugins.add(ConcurrentSearchTestPlugin.class);
2072+
return plugins;
2073+
}
20642074
return ESIntegTestCase.this.nodePlugins();
20652075
}
20662076
};
@@ -2074,6 +2084,15 @@ protected boolean addMockTransportService() {
20742084
return true;
20752085
}
20762086

2087+
/**
2088+
* Whether we'd like to increase the likelihood of leveraging inter-segment search concurrency, by creating multiple slices
2089+
* with a low amount of documents in them, which would not be allowed in production.
2090+
* Default is true, can be disabled if it causes problems in specific tests.
2091+
*/
2092+
protected boolean eagerConcurrentSearch() {
2093+
return true;
2094+
}
2095+
20772096
/** Returns {@code true} iff this test cluster should use a dummy http transport */
20782097
protected boolean addMockHttpTransport() {
20792098
return true;

0 commit comments

Comments
 (0)