Skip to content

Commit 3a6963a

Browse files
authored
Retry shard movements during ESQL query (#126653)
1 parent 75ddd8b commit 3a6963a

File tree

12 files changed

+450
-99
lines changed

12 files changed

+450
-99
lines changed

docs/changelog/126653.yaml

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 126653
2+
summary: Retry shard movements during ESQL query
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/action/search/TransportSearchShardsAction.java

-3
Original file line numberDiff line numberDiff line change
@@ -131,11 +131,8 @@ public void searchShards(Task task, SearchShardsRequest searchShardsRequest, Act
131131
listener.delegateFailureAndWrap((delegate, searchRequest) -> {
132132
Index[] concreteIndices = resolvedIndices.getConcreteLocalIndices();
133133
final Set<ResolvedExpression> indicesAndAliases = indexNameExpressionResolver.resolveExpressions(
134-
135134
project.metadata(),
136-
137135
searchRequest.indices()
138-
139136
);
140137
final Map<String, AliasFilter> aliasFilters = transportSearchAction.buildIndexAliasFilters(
141138
project,

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java

+2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.action.ActionListener;
1616
import org.elasticsearch.cluster.RemoteException;
1717
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
18+
import org.elasticsearch.cluster.project.ProjectResolver;
1819
import org.elasticsearch.cluster.service.ClusterService;
1920
import org.elasticsearch.common.Strings;
2021
import org.elasticsearch.common.breaker.CircuitBreaker;
@@ -383,6 +384,7 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() {
383384
mock(SearchService.class),
384385
null,
385386
mock(ClusterService.class),
387+
mock(ProjectResolver.class),
386388
mock(IndexNameExpressionResolver.class),
387389
null,
388390
mockInferenceRunner()

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ManyShardsIT.java

-5
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.elasticsearch.rest.RestStatus;
2626
import org.elasticsearch.search.MockSearchService;
2727
import org.elasticsearch.search.SearchService;
28-
import org.elasticsearch.test.junit.annotations.TestIssueLogging;
2928
import org.elasticsearch.test.transport.MockTransportService;
3029
import org.elasticsearch.transport.RemoteTransportException;
3130
import org.elasticsearch.transport.TransportChannel;
@@ -260,10 +259,6 @@ public void testLimitConcurrentShards() {
260259
}
261260
}
262261

263-
@TestIssueLogging(
264-
issueUrl = "https://github.com/elastic/elasticsearch/issues/125947",
265-
value = "logger.org.elasticsearch.cluster.routing.allocation.ShardChangesObserver:TRACE"
266-
)
267262
public void testCancelUnnecessaryRequests() {
268263
assumeTrue("Requires pragmas", canUseQueryPragmas());
269264
internalCluster().ensureAtLeastNumDataNodes(3);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.plugin;
9+
10+
import org.elasticsearch.action.index.IndexRequest;
11+
import org.elasticsearch.action.support.WriteRequest;
12+
import org.elasticsearch.cluster.metadata.IndexMetadata;
13+
import org.elasticsearch.common.settings.Settings;
14+
import org.elasticsearch.common.util.CollectionUtils;
15+
import org.elasticsearch.compute.operator.exchange.ExchangeService;
16+
import org.elasticsearch.plugins.Plugin;
17+
import org.elasticsearch.test.transport.MockTransportService;
18+
import org.elasticsearch.transport.TransportService;
19+
import org.elasticsearch.xpack.esql.action.AbstractEsqlIntegTestCase;
20+
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
21+
22+
import java.util.Collection;
23+
import java.util.concurrent.atomic.AtomicBoolean;
24+
import java.util.concurrent.atomic.LongAdder;
25+
26+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
27+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
28+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
29+
import static org.hamcrest.Matchers.greaterThan;
30+
import static org.hamcrest.Matchers.hasSize;
31+
32+
public class DataNodeRequestSenderIT extends AbstractEsqlIntegTestCase {
33+
34+
@Override
35+
protected Collection<Class<? extends Plugin>> nodePlugins() {
36+
return CollectionUtils.appendToCopy(super.nodePlugins(), MockTransportService.TestPlugin.class);
37+
}
38+
39+
public void testSearchWhileRelocating() throws InterruptedException {
40+
internalCluster().ensureAtLeastNumDataNodes(3);
41+
var primaries = randomIntBetween(1, 10);
42+
var replicas = randomIntBetween(0, 1);
43+
44+
indicesAdmin().prepareCreate("index-1").setSettings(indexSettings(primaries, replicas)).get();
45+
46+
var docs = randomIntBetween(10, 100);
47+
var bulk = client().prepareBulk("index-1").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
48+
for (int i = 0; i < docs; i++) {
49+
bulk.add(new IndexRequest().source("key", "value-1"));
50+
}
51+
bulk.get();
52+
53+
// start background searches
54+
var stopped = new AtomicBoolean(false);
55+
var queries = new LongAdder();
56+
var threads = new Thread[randomIntBetween(1, 5)];
57+
for (int i = 0; i < threads.length; i++) {
58+
threads[i] = new Thread(() -> {
59+
while (stopped.get() == false) {
60+
try (EsqlQueryResponse resp = run("FROM index-1")) {
61+
assertThat(getValuesList(resp), hasSize(docs));
62+
}
63+
queries.increment();
64+
}
65+
});
66+
}
67+
for (Thread thread : threads) {
68+
thread.start();
69+
}
70+
71+
// start shard movements
72+
var rounds = randomIntBetween(1, 10);
73+
var names = internalCluster().getNodeNames();
74+
for (int i = 0; i < rounds; i++) {
75+
for (String name : names) {
76+
client().admin()
77+
.cluster()
78+
.prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT)
79+
.setPersistentSettings(Settings.builder().put("cluster.routing.allocation.exclude._name", name))
80+
.get();
81+
ensureGreen("index-1");
82+
Thread.yield();
83+
}
84+
}
85+
86+
stopped.set(true);
87+
for (Thread thread : threads) {
88+
thread.join(10_000);
89+
}
90+
91+
client().admin()
92+
.cluster()
93+
.prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT)
94+
.setPersistentSettings(Settings.builder().putNull("cluster.routing.allocation.exclude._name"))
95+
.get();
96+
assertThat(queries.sum(), greaterThan((long) threads.length));
97+
}
98+
99+
public void testRetryOnShardMovement() {
100+
internalCluster().ensureAtLeastNumDataNodes(2);
101+
102+
assertAcked(
103+
client().admin()
104+
.indices()
105+
.prepareCreate("index-1")
106+
.setSettings(
107+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
108+
)
109+
);
110+
assertAcked(
111+
client().admin()
112+
.indices()
113+
.prepareCreate("index-2")
114+
.setSettings(
115+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
116+
)
117+
);
118+
client().prepareBulk("index-1")
119+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
120+
.add(new IndexRequest().source("key", "value-1"))
121+
.get();
122+
client().prepareBulk("index-2")
123+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
124+
.add(new IndexRequest().source("key", "value-2"))
125+
.get();
126+
127+
var shouldMove = new AtomicBoolean(true);
128+
129+
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
130+
as(transportService, MockTransportService.class).addRequestHandlingBehavior(
131+
ExchangeService.OPEN_EXCHANGE_ACTION_NAME,
132+
(handler, request, channel, task) -> {
133+
// move index shard
134+
if (shouldMove.compareAndSet(true, false)) {
135+
var currentShardNodeId = clusterService().state()
136+
.routingTable()
137+
.index("index-1")
138+
.shard(0)
139+
.primaryShard()
140+
.currentNodeId();
141+
assertAcked(
142+
client().admin()
143+
.indices()
144+
.prepareUpdateSettings("index-1")
145+
.setSettings(Settings.builder().put("index.routing.allocation.exclude._id", currentShardNodeId))
146+
);
147+
ensureGreen("index-1");
148+
}
149+
// execute data node request
150+
handler.messageReceived(request, channel, task);
151+
}
152+
);
153+
}
154+
155+
try (EsqlQueryResponse resp = run("FROM " + randomFrom("index-1,index-2", "index-*"))) {
156+
assertThat(getValuesList(resp), hasSize(2));
157+
}
158+
}
159+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.action.search.SearchRequest;
1313
import org.elasticsearch.action.search.ShardSearchFailure;
1414
import org.elasticsearch.cluster.RemoteException;
15+
import org.elasticsearch.cluster.project.ProjectResolver;
1516
import org.elasticsearch.cluster.service.ClusterService;
1617
import org.elasticsearch.common.util.BigArrays;
1718
import org.elasticsearch.common.util.concurrent.RunOnce;
@@ -130,6 +131,7 @@ public class ComputeService {
130131
private final LookupFromIndexService lookupFromIndexService;
131132
private final InferenceRunner inferenceRunner;
132133
private final ClusterService clusterService;
134+
private final ProjectResolver projectResolver;
133135
private final AtomicLong childSessionIdGenerator = new AtomicLong();
134136
private final DataNodeComputeHandler dataNodeComputeHandler;
135137
private final ClusterComputeHandler clusterComputeHandler;
@@ -157,7 +159,16 @@ public ComputeService(
157159
this.lookupFromIndexService = lookupFromIndexService;
158160
this.inferenceRunner = transportActionServices.inferenceRunner();
159161
this.clusterService = transportActionServices.clusterService();
160-
this.dataNodeComputeHandler = new DataNodeComputeHandler(this, searchService, transportService, exchangeService, esqlExecutor);
162+
this.projectResolver = transportActionServices.projectResolver();
163+
this.dataNodeComputeHandler = new DataNodeComputeHandler(
164+
this,
165+
clusterService,
166+
projectResolver,
167+
searchService,
168+
transportService,
169+
exchangeService,
170+
esqlExecutor
171+
);
161172
this.clusterComputeHandler = new ClusterComputeHandler(
162173
this,
163174
exchangeService,

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java

+15-4
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import org.elasticsearch.action.support.ChannelActionListener;
1717
import org.elasticsearch.action.support.RefCountingRunnable;
1818
import org.elasticsearch.cluster.node.DiscoveryNode;
19+
import org.elasticsearch.cluster.project.ProjectResolver;
20+
import org.elasticsearch.cluster.service.ClusterService;
1921
import org.elasticsearch.compute.operator.DriverCompletionInfo;
2022
import org.elasticsearch.compute.operator.exchange.ExchangeService;
2123
import org.elasticsearch.compute.operator.exchange.ExchangeSink;
@@ -66,19 +68,25 @@
6668
final class DataNodeComputeHandler implements TransportRequestHandler<DataNodeRequest> {
6769
private final ComputeService computeService;
6870
private final SearchService searchService;
71+
private final ClusterService clusterService;
72+
private final ProjectResolver projectResolver;
6973
private final TransportService transportService;
7074
private final ExchangeService exchangeService;
7175
private final Executor esqlExecutor;
7276
private final ThreadPool threadPool;
7377

7478
DataNodeComputeHandler(
7579
ComputeService computeService,
80+
ClusterService clusterService,
81+
ProjectResolver projectResolver,
7682
SearchService searchService,
7783
TransportService transportService,
7884
ExchangeService exchangeService,
7985
Executor esqlExecutor
8086
) {
8187
this.computeService = computeService;
88+
this.clusterService = clusterService;
89+
this.projectResolver = projectResolver;
8290
this.searchService = searchService;
8391
this.transportService = transportService;
8492
this.exchangeService = exchangeService;
@@ -102,12 +110,17 @@ void startComputeOnDataNodes(
102110
Integer maxConcurrentNodesPerCluster = PlanConcurrencyCalculator.INSTANCE.calculateNodesConcurrency(dataNodePlan, configuration);
103111

104112
new DataNodeRequestSender(
113+
clusterService,
114+
projectResolver,
105115
transportService,
106116
esqlExecutor,
107-
clusterAlias,
108117
parentTask,
118+
originalIndices,
119+
PlannerUtils.canMatchFilter(dataNodePlan),
120+
clusterAlias,
109121
configuration.allowPartialResults(),
110-
maxConcurrentNodesPerCluster == null ? -1 : maxConcurrentNodesPerCluster
122+
maxConcurrentNodesPerCluster == null ? -1 : maxConcurrentNodesPerCluster,
123+
configuration.pragmas().unavailableShardResolutionAttempts()
111124
) {
112125
@Override
113126
protected void sendRequest(
@@ -200,8 +213,6 @@ protected void sendRequest(
200213
}
201214
}.startComputeOnDataNodes(
202215
concreteIndices,
203-
originalIndices,
204-
PlannerUtils.canMatchFilter(dataNodePlan),
205216
runOnTaskFailure,
206217
ActionListener.releaseAfter(outListener, exchangeSource.addEmptySink())
207218
);

0 commit comments

Comments
 (0)