Skip to content

Retry shard movements during ESQL query #126653

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
63b19d6
Test shard movements during query
idegtiarenko Apr 8, 2025
90f33b8
improve test
idegtiarenko Apr 10, 2025
b1a04ff
cleanup
idegtiarenko Apr 10, 2025
545c50d
fmt
idegtiarenko Apr 10, 2025
8b5f590
make searchShards reusable
idegtiarenko Apr 10, 2025
406df8d
implement retry
idegtiarenko Apr 10, 2025
1e3e92d
minor cleanups
idegtiarenko Apr 11, 2025
f7916f7
Update docs/changelog/126653.yaml
idegtiarenko Apr 11, 2025
068b6be
request only relevant indices
idegtiarenko Apr 11, 2025
a031270
limit retry attempts
idegtiarenko Apr 11, 2025
1453368
Merge branch 'main' into retry_shard_movements_during_query
idegtiarenko Apr 11, 2025
4220622
make it possible to inject shard resolution logic
idegtiarenko Apr 11, 2025
2ea3292
select random pattern
idegtiarenko Apr 11, 2025
782c595
add unit tests
idegtiarenko Apr 14, 2025
8d2293e
Merge branch 'main' into retry_shard_movements_during_query
idegtiarenko Apr 14, 2025
54c0656
separate test for retrying only relevant shards
idegtiarenko Apr 14, 2025
811d8fc
remove todo
idegtiarenko Apr 14, 2025
c20d53c
Merge branch 'main' into retry_shard_movements_during_query
idegtiarenko Apr 14, 2025
56128ee
do can match only once
idegtiarenko Apr 15, 2025
9e2bb85
testSearchWhileRelocating
idegtiarenko Apr 15, 2025
333ed60
Merge branch 'main' into retry_shard_movements_during_query
idegtiarenko Apr 16, 2025
b8b4a95
move logic to DataNodeComputeHandler
idegtiarenko Apr 16, 2025
503f13e
update remaining nodes under the lock
idegtiarenko Apr 16, 2025
e660001
upd
idegtiarenko Apr 16, 2025
0908649
Merge branch 'main' into retry_shard_movements_during_query
idegtiarenko Apr 16, 2025
b2819b6
Merge branch 'main' into retry_shard_movements_during_query
idegtiarenko Apr 16, 2025
e899a10
retry only when not received data
idegtiarenko Apr 17, 2025
e298aaf
make resolution sync
idegtiarenko Apr 22, 2025
bb67124
limit retry attempts
idegtiarenko Apr 22, 2025
ef0ffef
Merge branch 'main' into retry_shard_movements_during_query
idegtiarenko Apr 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/126653.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 126653
summary: Retry shard movements during ESQL query
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,8 @@ public void searchShards(Task task, SearchShardsRequest searchShardsRequest, Act
listener.delegateFailureAndWrap((delegate, searchRequest) -> {
Index[] concreteIndices = resolvedIndices.getConcreteLocalIndices();
final Set<ResolvedExpression> indicesAndAliases = indexNameExpressionResolver.resolveExpressions(

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left over?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

project.metadata(),

searchRequest.indices()

);
final Map<String, AliasFilter> aliasFilters = transportSearchAction.buildIndexAliasFilters(
project,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.RemoteException;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.breaker.CircuitBreaker;
Expand Down Expand Up @@ -383,6 +384,7 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() {
mock(SearchService.class),
null,
mock(ClusterService.class),
mock(ProjectResolver.class),
mock(IndexNameExpressionResolver.class),
null,
mockInferenceRunner()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.MockSearchService;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.test.junit.annotations.TestIssueLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportChannel;
Expand Down Expand Up @@ -260,10 +259,6 @@ public void testLimitConcurrentShards() {
}
}

@TestIssueLogging(
issueUrl = "https://github.com/elastic/elasticsearch/issues/125947",
value = "logger.org.elasticsearch.cluster.routing.allocation.ShardChangesObserver:TRACE"
)
public void testCancelUnnecessaryRequests() {
assumeTrue("Requires pragmas", canUseQueryPragmas());
internalCluster().ensureAtLeastNumDataNodes(3);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.plugin;

import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.action.AbstractEsqlIntegTestCase;
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;

import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;

public class DataNodeRequestSenderIT extends AbstractEsqlIntegTestCase {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this test similar to SearchWhileRelocatingIT? We continue running ES|QL on one thread while moving shards back and forth between two sets of nodes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added


@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), MockTransportService.TestPlugin.class);
}

public void testSearchWhileRelocating() throws InterruptedException {
internalCluster().ensureAtLeastNumDataNodes(3);
var primaries = randomIntBetween(1, 10);
var replicas = randomIntBetween(0, 1);

indicesAdmin().prepareCreate("index-1").setSettings(indexSettings(primaries, replicas)).get();

var docs = randomIntBetween(10, 100);
var bulk = client().prepareBulk("index-1").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int i = 0; i < docs; i++) {
bulk.add(new IndexRequest().source("key", "value-1"));
}
bulk.get();

// start background searches
var stopped = new AtomicBoolean(false);
var queries = new LongAdder();
var threads = new Thread[randomIntBetween(1, 5)];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
while (stopped.get() == false) {
try (EsqlQueryResponse resp = run("FROM index-1")) {
assertThat(getValuesList(resp), hasSize(docs));
}
queries.increment();
}
});
}
for (Thread thread : threads) {
thread.start();
}

// start shard movements
var rounds = randomIntBetween(1, 10);
var names = internalCluster().getNodeNames();
for (int i = 0; i < rounds; i++) {
for (String name : names) {
client().admin()
.cluster()
.prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT)
.setPersistentSettings(Settings.builder().put("cluster.routing.allocation.exclude._name", name))
.get();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fun

ensureGreen("index-1");
Thread.yield();
}
}

stopped.set(true);
for (Thread thread : threads) {
thread.join(10_000);
}

client().admin()
.cluster()
.prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT)
.setPersistentSettings(Settings.builder().putNull("cluster.routing.allocation.exclude._name"))
.get();
assertThat(queries.sum(), greaterThan((long) threads.length));
}

public void testRetryOnShardMovement() {
internalCluster().ensureAtLeastNumDataNodes(2);

assertAcked(
client().admin()
.indices()
.prepareCreate("index-1")
.setSettings(
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
)
);
assertAcked(
client().admin()
.indices()
.prepareCreate("index-2")
.setSettings(
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
)
);
client().prepareBulk("index-1")
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.add(new IndexRequest().source("key", "value-1"))
.get();
client().prepareBulk("index-2")
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.add(new IndexRequest().source("key", "value-2"))
.get();

var shouldMove = new AtomicBoolean(true);

for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
as(transportService, MockTransportService.class).addRequestHandlingBehavior(
ExchangeService.OPEN_EXCHANGE_ACTION_NAME,
(handler, request, channel, task) -> {
// move index shard
if (shouldMove.compareAndSet(true, false)) {
var currentShardNodeId = clusterService().state()
.routingTable()
.index("index-1")
.shard(0)
.primaryShard()
.currentNodeId();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to check if this could be replaced by a proper API call instead

assertAcked(
client().admin()
.indices()
.prepareUpdateSettings("index-1")
.setSettings(Settings.builder().put("index.routing.allocation.exclude._id", currentShardNodeId))
);
ensureGreen("index-1");
}
// execute data node request
handler.messageReceived(request, channel, task);
}
);
}

try (EsqlQueryResponse resp = run("FROM " + randomFrom("index-1,index-2", "index-*"))) {
assertThat(getValuesList(resp), hasSize(2));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we send back the node id that we ran the driver on when we profile? Could we add that as a double check?

Copy link
Contributor Author

@idegtiarenko idegtiarenko Apr 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to avoid it. We do not really know source for each row nor where shards are currently allocated to.
We would also need to exclude coordinating node as it participate in the query and might or might not contain (or used to contain) shards participating in query.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.cluster.RemoteException;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.RunOnce;
Expand Down Expand Up @@ -130,6 +131,7 @@ public class ComputeService {
private final LookupFromIndexService lookupFromIndexService;
private final InferenceRunner inferenceRunner;
private final ClusterService clusterService;
private final ProjectResolver projectResolver;
private final AtomicLong childSessionIdGenerator = new AtomicLong();
private final DataNodeComputeHandler dataNodeComputeHandler;
private final ClusterComputeHandler clusterComputeHandler;
Expand Down Expand Up @@ -157,7 +159,16 @@ public ComputeService(
this.lookupFromIndexService = lookupFromIndexService;
this.inferenceRunner = transportActionServices.inferenceRunner();
this.clusterService = transportActionServices.clusterService();
this.dataNodeComputeHandler = new DataNodeComputeHandler(this, searchService, transportService, exchangeService, esqlExecutor);
this.projectResolver = transportActionServices.projectResolver();
this.dataNodeComputeHandler = new DataNodeComputeHandler(
this,
clusterService,
projectResolver,
searchService,
transportService,
exchangeService,
esqlExecutor
);
this.clusterComputeHandler = new ClusterComputeHandler(
this,
exchangeService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.compute.operator.DriverCompletionInfo;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.compute.operator.exchange.ExchangeSink;
Expand Down Expand Up @@ -66,19 +68,25 @@
final class DataNodeComputeHandler implements TransportRequestHandler<DataNodeRequest> {
private final ComputeService computeService;
private final SearchService searchService;
private final ClusterService clusterService;
private final ProjectResolver projectResolver;
private final TransportService transportService;
private final ExchangeService exchangeService;
private final Executor esqlExecutor;
private final ThreadPool threadPool;

DataNodeComputeHandler(
ComputeService computeService,
ClusterService clusterService,
ProjectResolver projectResolver,
SearchService searchService,
TransportService transportService,
ExchangeService exchangeService,
Executor esqlExecutor
) {
this.computeService = computeService;
this.clusterService = clusterService;
this.projectResolver = projectResolver;
this.searchService = searchService;
this.transportService = transportService;
this.exchangeService = exchangeService;
Expand All @@ -102,12 +110,17 @@ void startComputeOnDataNodes(
Integer maxConcurrentNodesPerCluster = PlanConcurrencyCalculator.INSTANCE.calculateNodesConcurrency(dataNodePlan, configuration);

new DataNodeRequestSender(
clusterService,
projectResolver,
transportService,
esqlExecutor,
clusterAlias,
parentTask,
originalIndices,
PlannerUtils.canMatchFilter(dataNodePlan),
clusterAlias,
configuration.allowPartialResults(),
maxConcurrentNodesPerCluster == null ? -1 : maxConcurrentNodesPerCluster
maxConcurrentNodesPerCluster == null ? -1 : maxConcurrentNodesPerCluster,
configuration.pragmas().unavailableShardResolutionAttempts()
) {
@Override
protected void sendRequest(
Expand Down Expand Up @@ -200,8 +213,6 @@ protected void sendRequest(
}
}.startComputeOnDataNodes(
concreteIndices,
originalIndices,
PlannerUtils.canMatchFilter(dataNodePlan),
runOnTaskFailure,
ActionListener.releaseAfter(outListener, exchangeSource.addEmptySink())
);
Expand Down
Loading
Loading