Skip to content

Retry shard movements during ESQL query #126653

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

idegtiarenko
Copy link
Contributor

@idegtiarenko idegtiarenko commented Apr 11, 2025

Today we fail entire query if a shard is unavailable (because it has moved to another node) and there are no other shard copies to retry the query.

This change schedules another shard location resolution round in such cases in order to not fail the query.

Closes: #125947

@idegtiarenko idegtiarenko added >enhancement Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) :Analytics/ES|QL AKA ESQL v9.1.0 labels Apr 11, 2025
@idegtiarenko idegtiarenko requested review from nik9000 and dnhatn April 11, 2025 06:47
@idegtiarenko idegtiarenko marked this pull request as draft April 11, 2025 06:48
@elasticsearchmachine
Copy link
Collaborator

Hi @idegtiarenko, I've created a changelog YAML for you.


private void maybeScheduleRetry(ShardId shardId, Exception e) {
if (targetShards.getShard(shardId).remainingNodes.isEmpty()
&& unwrapFailure(e) instanceof NoShardAvailableActionException) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This relies on org.elasticsearch.xpack.esql.plugin.DataNodeRequestSender#unwrapFailure that uses

public static boolean isShardNotAvailableException(final Throwable e) {
final Throwable actual = ExceptionsHelper.unwrapCause(e);
return (actual instanceof ShardNotFoundException
|| actual instanceof IndexNotFoundException
|| actual instanceof IllegalIndexShardStateException
|| actual instanceof NoShardAvailableActionException
|| actual instanceof UnavailableShardsException
|| actual instanceof AlreadyClosedException);
}

Probably this should only retry ShardNotFoundException/NoShardAvailableActionException/UnavailableShardsException. Not sure. Please let me know what do you think.

# Conflicts:
#	x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java
#	x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java
.index(index)
.shard(0)
.primaryShard()
.currentNodeId();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to check if this could be replaced by a proper API call instead

@idegtiarenko idegtiarenko marked this pull request as ready for review April 14, 2025 07:41
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-analytical-engine (Team:Analytics)

}

try (EsqlQueryResponse resp = run("FROM " + randomFrom("index-1,index-2", "index-*"))) {
assertThat(getValuesList(resp), hasSize(2));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we send back the node id that we ran the driver on when we profile? Could we add that as a double check?

Copy link
Contributor Author

@idegtiarenko idegtiarenko Apr 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to avoid it. We do not really know source for each row nor where shards are currently allocated to.
We would also need to exclude coordinating node as it participate in the query and might or might not contain (or used to contain) shards participating in query.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Member

@dnhatn dnhatn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left two comments. Thanks @idegtiarenko

import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
import static org.hamcrest.Matchers.hasSize;

public class DataNodeRequestSenderIT extends AbstractEsqlIntegTestCase {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this test similar to SearchWhileRelocatingIT? We continue running ES|QL on one thread while moving shards back and forth between two sets of nodes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

if (pendingRetries.isEmpty() == false && remainingTargetShardSearchAttempts.decrementAndGet() > 0) {
ongoingTargetShardResolutionAttempts.incrementAndGet();
var indices = pendingRetries.stream().map(ShardId::getIndexName).distinct().toArray(String[]::new);
searchShards(indices, pendingRetries::contains, computeListener.acquireAvoid().delegateFailure((l, newSearchShards) -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to make changes to the search_shards API to allow bypassing can_match in this case. We only need the up-to-date routing table here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I think we can have a separate action for it to make it simpler.

@idegtiarenko idegtiarenko requested review from dnhatn and nik9000 April 15, 2025 14:13
}

try (EsqlQueryResponse resp = run("FROM " + randomFrom("index-1,index-2", "index-*"))) {
assertThat(getValuesList(resp), hasSize(2));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

.cluster()
.prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT)
.setPersistentSettings(Settings.builder().put("cluster.routing.allocation.exclude._name", name))
.get();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fun

Copy link
Member

@dnhatn dnhatn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some more comments. Thanks @idegtiarenko.

@@ -131,11 +131,8 @@ public void searchShards(Task task, SearchShardsRequest searchShardsRequest, Act
listener.delegateFailureAndWrap((delegate, searchRequest) -> {
Index[] concreteIndices = resolvedIndices.getConcreteLocalIndices();
final Set<ResolvedExpression> indicesAndAliases = indexNameExpressionResolver.resolveExpressions(

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left over?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shardId,
project.routingTable()
.shardRoutingTable(shardId)
.allShards()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should return only shards with search role only here. Do we need an action for this? Maybe just a helper method in DataNodeRequestSender?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it is possible, although would require a bit more dependency management to bring ClusterService and ProjectResolver to DataNodeRequestSender

trySendingRequestsForPendingShards(targetShards, computeListener);
l.onResponse(null);
}));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other branch should be in else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessary. We could make progress without waiting for the moved shard(s) resolution in case there are other shards in queue.

ongoingTargetShardResolutionAttempts.incrementAndGet();
resolveShards(pendingRetries, computeListener.acquireAvoid().delegateFailure((l, resolutions) -> {
for (var entry : resolutions.entrySet()) {
targetShards.shards.get(entry.getKey()).remainingNodes.addAll(entry.getValue());
Copy link
Member

@dnhatn dnhatn Apr 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we can execute the same target nodes up to 10 times if we hit failed on the same node with unavailable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not follow. If the shard is unavailable it should no longer be in the routing table and should not be passed here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we are using the cluster state on the coordinator, which might not be up to dated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is generally okay as we are going to retry more than once.
We could make it transport action again and run on elected master by extending TransportMasterNodeAction but even then there is always a possibility of state changing while we waiting for the response.

@@ -433,7 +461,7 @@ void searchShards(
skippedShards++;
continue;
}
List<DiscoveryNode> allocatedNodes = new ArrayList<>(group.allocatedNodes().size());
List<DiscoveryNode> allocatedNodes = Collections.synchronizedList(new ArrayList<>(group.allocatedNodes().size()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should make the logic that resolves the new target nodes a helper method and call it under sendingLock to avoid handling concurrency.

# Conflicts:
#	x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java
#	x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java
#	x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java
@idegtiarenko idegtiarenko force-pushed the retry_shard_movements_during_query branch from 39ada69 to b2819b6 Compare April 16, 2025 16:03
Copy link
Member

@dnhatn dnhatn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @idegtiarenko.

@@ -272,6 +301,7 @@ public void onFailure(Exception e, boolean receivedData) {
for (ShardId shardId : request.shardIds) {
trackShardLevelFailure(shardId, receivedData, e);
pendingShardIds.add(shardId);
maybeScheduleRetry(shardId, e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can retry only if receivedData is false.

@@ -466,4 +509,25 @@ void searchShards(
new ActionListenerResponseHandler<>(searchShardsListener, SearchShardsResponse::new, esqlExecutor)
);
}

void resolveShards(Set<ShardId> shardIds, ActionListener<Map<ShardId, List<DiscoveryNode>>> listener) {
ActionListener.completeWith(listener, () -> doResolveShards(shardIds));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just return this without listener?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the comment below


if (pendingRetries.isEmpty() == false && remainingTargetShardSearchAttempts.getAndDecrement() > 0) {
ongoingTargetShardResolutionAttempts.incrementAndGet();
resolveShards(pendingRetries, computeListener.acquireAvoid().delegateFailure((l, resolutions) -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this retry to trySendingRequestsForPendingShards under sendingLock to avoid handling concurrency?

Copy link
Contributor Author

@idegtiarenko idegtiarenko Apr 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is possible in principle, but this would block sending more requests while resolving failure. We do not need to do that necessary.
This also depends if this has to be async (for example if we want to move resolution to the elected master).

ongoingTargetShardResolutionAttempts.incrementAndGet();
resolveShards(pendingRetries, computeListener.acquireAvoid().delegateFailure((l, resolutions) -> {
for (var entry : resolutions.entrySet()) {
targetShards.shards.get(entry.getKey()).remainingNodes.addAll(entry.getValue());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we are using the cluster state on the coordinator, which might not be up to dated.

Copy link
Member

@dnhatn dnhatn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Talked to @idegtiarenko offline. He will make changes to address the race condition between retry and sendingRequests. With the upcoming changes, the PR should be good to go. Thank you for all the iterations!

@idegtiarenko idegtiarenko force-pushed the retry_shard_movements_during_query branch from b53d64c to ef0ffef Compare April 22, 2025 09:40
@idegtiarenko idegtiarenko merged commit 3a6963a into elastic:main Apr 22, 2025
17 checks passed
@idegtiarenko idegtiarenko deleted the retry_shard_movements_during_query branch April 22, 2025 11:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Analytics/ES|QL AKA ESQL >enhancement Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v9.1.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[CI] ManyShardsIT testCancelUnnecessaryRequests failing
4 participants