Skip to content

Commit 1f54c76

Browse files
Force all per-node query response handling onto a single thread
Same reasoning as for field_caps in elastic#120863, no need to have multiple threads contending the same mutex(s) when the heavy lifting step in handling the results is sequential anyway.
1 parent c4cba5a commit 1f54c76

File tree

4 files changed

+47
-38
lines changed

4 files changed

+47
-38
lines changed

server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,9 @@
3232
import org.elasticsearch.common.collect.Iterators;
3333
import org.elasticsearch.common.regex.Regex;
3434
import org.elasticsearch.common.util.Maps;
35-
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
3635
import org.elasticsearch.common.util.concurrent.EsExecutors;
3736
import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner;
3837
import org.elasticsearch.core.Nullable;
39-
import org.elasticsearch.core.Releasable;
4038
import org.elasticsearch.core.Tuple;
4139
import org.elasticsearch.index.shard.ShardId;
4240
import org.elasticsearch.indices.IndicesService;
@@ -148,7 +146,7 @@ private void doExecuteForked(
148146
if (ccsCheckCompatibility) {
149147
checkCCSVersionCompatibility(request);
150148
}
151-
final Executor singleThreadedExecutor = buildSingleThreadedExecutor();
149+
final Executor singleThreadedExecutor = ThrottledTaskRunner.buildSingleThreadedExecutor("field_caps", searchCoordinationExecutor);
152150
assert task instanceof CancellableTask;
153151
final CancellableTask fieldCapTask = (CancellableTask) task;
154152
// retrieve the initial timestamp in case the action is a cross cluster search
@@ -314,29 +312,6 @@ private void doExecuteForked(
314312
}
315313
}
316314

317-
private Executor buildSingleThreadedExecutor() {
318-
final ThrottledTaskRunner throttledTaskRunner = new ThrottledTaskRunner("field_caps", 1, searchCoordinationExecutor);
319-
return r -> throttledTaskRunner.enqueueTask(new ActionListener<>() {
320-
@Override
321-
public void onResponse(Releasable releasable) {
322-
try (releasable) {
323-
r.run();
324-
}
325-
}
326-
327-
@Override
328-
public void onFailure(Exception e) {
329-
if (r instanceof AbstractRunnable abstractRunnable) {
330-
abstractRunnable.onFailure(e);
331-
} else {
332-
// should be impossible, we should always submit an AbstractRunnable
333-
logger.error("unexpected failure running " + r, e);
334-
assert false : new AssertionError("unexpected failure running " + r, e);
335-
}
336-
}
337-
});
338-
}
339-
340315
public interface RemoteRequestExecutor {
341316
void executeRemoteRequest(
342317
RemoteClusterClient remoteClient,

server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -188,9 +188,7 @@ MergeResult consumePartialMergeResultDataNode() {
188188
}
189189

190190
void addBatchedPartialResult(TopDocsStats topDocsStats, MergeResult mergeResult) {
191-
synchronized (batchedResults) {
192-
batchedResults.add(new Tuple<>(topDocsStats, mergeResult));
193-
}
191+
batchedResults.add(new Tuple<>(topDocsStats, mergeResult));
194192
}
195193

196194
@Override
@@ -215,10 +213,7 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
215213
buffer.sort(RESULT_COMPARATOR);
216214
final TopDocsStats topDocsStats = this.topDocsStats;
217215
var mergeResult = this.mergeResult;
218-
final ArrayDeque<Tuple<TopDocsStats, MergeResult>> batchedResults;
219-
synchronized (this.batchedResults) {
220-
batchedResults = this.batchedResults;
221-
}
216+
final ArrayDeque<Tuple<TopDocsStats, MergeResult>> batchedResults = this.batchedResults;
222217
final int resultSize = buffer.size() + (mergeResult == null ? 0 : 1) + batchedResults.size();
223218
final List<TopDocs> topDocsList = hasTopDocs ? new ArrayList<>(resultSize) : null;
224219
final Deque<DelayableWriteable<InternalAggregations>> aggsList = hasAggs ? new ArrayDeque<>(resultSize) : null;

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.common.util.concurrent.CountDown;
3131
import org.elasticsearch.common.util.concurrent.EsExecutors;
3232
import org.elasticsearch.common.util.concurrent.ListenableFuture;
33+
import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner;
3334
import org.elasticsearch.core.RefCounted;
3435
import org.elasticsearch.core.SimpleRefCounted;
3536
import org.elasticsearch.core.TimeValue;
@@ -408,7 +409,8 @@ protected void doRun(Map<SearchShardIterator, Integer> shardIndexMap) {
408409
}
409410
AbstractSearchAsyncAction.doCheckNoMissingShards(getName(), request, shardsIts);
410411
final Map<CanMatchPreFilterSearchPhase.SendingTarget, NodeQueryRequest> perNodeQueries = new HashMap<>();
411-
final String localNodeId = searchTransportService.transportService().getLocalNode().getId();
412+
final var transportService = searchTransportService.transportService();
413+
final String localNodeId = transportService.getLocalNode().getId();
412414
final int numberOfShardsTotal = shardsIts.size();
413415
for (int i = 0; i < numberOfShardsTotal; i++) {
414416
final SearchShardIterator shardRoutings = shardsIts.get(i);
@@ -445,6 +447,10 @@ protected void doRun(Map<SearchShardIterator, Integer> shardIndexMap) {
445447
}
446448
}
447449
}
450+
final Executor singleThreadedExecutor = ThrottledTaskRunner.buildSingleThreadedExecutor(
451+
"node_query_response",
452+
transportService.getThreadPool().executor(ThreadPool.Names.SEARCH_COORDINATION)
453+
);
448454
perNodeQueries.forEach((routing, request) -> {
449455
if (request.shards.size() == 1) {
450456
executeAsSingleRequest(routing, request.shards.getFirst());
@@ -463,16 +469,20 @@ protected void doRun(Map<SearchShardIterator, Integer> shardIndexMap) {
463469
executeWithoutBatching(routing, request);
464470
return;
465471
}
466-
searchTransportService.transportService()
467-
.sendChildRequest(connection, NODE_SEARCH_ACTION_NAME, request, task, new TransportResponseHandler<NodeQueryResponse>() {
472+
transportService.sendChildRequest(
473+
connection,
474+
NODE_SEARCH_ACTION_NAME,
475+
request,
476+
task,
477+
new TransportResponseHandler<NodeQueryResponse>() {
468478
@Override
469479
public NodeQueryResponse read(StreamInput in) throws IOException {
470480
return new NodeQueryResponse(in);
471481
}
472482

473483
@Override
474484
public Executor executor() {
475-
return EsExecutors.DIRECT_EXECUTOR_SERVICE;
485+
return singleThreadedExecutor;
476486
}
477487

478488
@Override
@@ -517,7 +527,8 @@ public void handleException(TransportException e) {
517527
onPhaseFailure(getName(), "", cause);
518528
}
519529
}
520-
});
530+
}
531+
);
521532
});
522533
}
523534

server/src/main/java/org/elasticsearch/common/util/concurrent/ThrottledTaskRunner.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,40 @@
1111

1212
import org.elasticsearch.action.ActionListener;
1313
import org.elasticsearch.core.Releasable;
14+
import org.elasticsearch.logging.LogManager;
15+
import org.elasticsearch.logging.Logger;
1416

1517
import java.util.concurrent.Executor;
1618

1719
public class ThrottledTaskRunner extends AbstractThrottledTaskRunner<ActionListener<Releasable>> {
20+
21+
private static final Logger logger = LogManager.getLogger(ThrottledTaskRunner.class);
22+
1823
// a simple AbstractThrottledTaskRunner which fixes the task type and uses a regular FIFO blocking queue.
1924
public ThrottledTaskRunner(String name, int maxRunningTasks, Executor executor) {
2025
super(name, maxRunningTasks, executor, ConcurrentCollections.newBlockingQueue());
2126
}
27+
28+
public static Executor buildSingleThreadedExecutor(String name, Executor executor) {
29+
final ThrottledTaskRunner throttledTaskRunner = new ThrottledTaskRunner(name, 1, executor);
30+
return r -> throttledTaskRunner.enqueueTask(new ActionListener<>() {
31+
@Override
32+
public void onResponse(Releasable releasable) {
33+
try (releasable) {
34+
r.run();
35+
}
36+
}
37+
38+
@Override
39+
public void onFailure(Exception e) {
40+
if (r instanceof AbstractRunnable abstractRunnable) {
41+
abstractRunnable.onFailure(e);
42+
} else {
43+
// should be impossible, we should always submit an AbstractRunnable
44+
logger.error("unexpected failure running [" + r + "] on [" + name + "]", e);
45+
assert false : new AssertionError("unexpected failure running " + r, e);
46+
}
47+
}
48+
});
49+
}
2250
}

0 commit comments

Comments
 (0)