-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Retry shard movements during ESQL query #126653
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Retry shard movements during ESQL query #126653
Conversation
Hi @idegtiarenko, I've created a changelog YAML for you. |
|
||
private void maybeScheduleRetry(ShardId shardId, Exception e) { | ||
if (targetShards.getShard(shardId).remainingNodes.isEmpty() | ||
&& unwrapFailure(e) instanceof NoShardAvailableActionException) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This relies on org.elasticsearch.xpack.esql.plugin.DataNodeRequestSender#unwrapFailure
that uses
elasticsearch/server/src/main/java/org/elasticsearch/action/support/TransportActions.java
Lines 22 to 30 in a59c182
public static boolean isShardNotAvailableException(final Throwable e) { | |
final Throwable actual = ExceptionsHelper.unwrapCause(e); | |
return (actual instanceof ShardNotFoundException | |
|| actual instanceof IndexNotFoundException | |
|| actual instanceof IllegalIndexShardStateException | |
|| actual instanceof NoShardAvailableActionException | |
|| actual instanceof UnavailableShardsException | |
|| actual instanceof AlreadyClosedException); | |
} |
Probably this should only retry ShardNotFoundException/NoShardAvailableActionException/UnavailableShardsException. Not sure. Please let me know what do you think.
# Conflicts: # x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java # x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java
.index(index) | ||
.shard(0) | ||
.primaryShard() | ||
.currentNodeId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to check if this could be replaced by a proper API call instead
Pinging @elastic/es-analytical-engine (Team:Analytics) |
} | ||
|
||
try (EsqlQueryResponse resp = run("FROM " + randomFrom("index-1,index-2", "index-*"))) { | ||
assertThat(getValuesList(resp), hasSize(2)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we send back the node id that we ran the driver on when we profile
? Could we add that as a double check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to avoid it. We do not really know source for each row nor where shards are currently allocated to.
We would also need to exclude coordinating node as it participate in the query and might or might not contain (or used to contain) shards participating in query.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left two comments. Thanks @idegtiarenko
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; | ||
import static org.hamcrest.Matchers.hasSize; | ||
|
||
public class DataNodeRequestSenderIT extends AbstractEsqlIntegTestCase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this test similar to SearchWhileRelocatingIT? We continue running ES|QL on one thread while moving shards back and forth between two sets of nodes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
if (pendingRetries.isEmpty() == false && remainingTargetShardSearchAttempts.decrementAndGet() > 0) { | ||
ongoingTargetShardResolutionAttempts.incrementAndGet(); | ||
var indices = pendingRetries.stream().map(ShardId::getIndexName).distinct().toArray(String[]::new); | ||
searchShards(indices, pendingRetries::contains, computeListener.acquireAvoid().delegateFailure((l, newSearchShards) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to make changes to the search_shards API to allow bypassing can_match in this case. We only need the up-to-date routing table here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I think we can have a separate action for it to make it simpler.
} | ||
|
||
try (EsqlQueryResponse resp = run("FROM " + randomFrom("index-1,index-2", "index-*"))) { | ||
assertThat(getValuesList(resp), hasSize(2)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
.cluster() | ||
.prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT) | ||
.setPersistentSettings(Settings.builder().put("cluster.routing.allocation.exclude._name", name)) | ||
.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fun
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have some more comments. Thanks @idegtiarenko.
@@ -131,11 +131,8 @@ public void searchShards(Task task, SearchShardsRequest searchShardsRequest, Act | |||
listener.delegateFailureAndWrap((delegate, searchRequest) -> { | |||
Index[] concreteIndices = resolvedIndices.getConcreteLocalIndices(); | |||
final Set<ResolvedExpression> indicesAndAliases = indexNameExpressionResolver.resolveExpressions( | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left over?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeap, looks like a leftover after merging 2adb36e#diff-e9bf1f63e5fb1069f6fd3e4a7fb3b1fa44ff67a60c10dd9bb5f74caa40f2f3e3
shardId, | ||
project.routingTable() | ||
.shardRoutingTable(shardId) | ||
.allShards() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should return only shards with search role only here. Do we need an action for this? Maybe just a helper method in DataNodeRequestSender?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it is possible, although would require a bit more dependency management to bring ClusterService and ProjectResolver to DataNodeRequestSender
trySendingRequestsForPendingShards(targetShards, computeListener); | ||
l.onResponse(null); | ||
})); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other branch should be in else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not necessary. We could make progress without waiting for the moved shard(s) resolution in case there are other shards in queue.
ongoingTargetShardResolutionAttempts.incrementAndGet(); | ||
resolveShards(pendingRetries, computeListener.acquireAvoid().delegateFailure((l, resolutions) -> { | ||
for (var entry : resolutions.entrySet()) { | ||
targetShards.shards.get(entry.getKey()).remainingNodes.addAll(entry.getValue()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we can execute the same target nodes up to 10 times if we hit failed on the same node with unavailable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not follow. If the shard is unavailable it should no longer be in the routing table and should not be passed here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we are using the cluster state on the coordinator, which might not be up to dated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is generally okay as we are going to retry more than once.
We could make it transport action again and run on elected master by extending TransportMasterNodeAction
but even then there is always a possibility of state changing while we waiting for the response.
@@ -433,7 +461,7 @@ void searchShards( | |||
skippedShards++; | |||
continue; | |||
} | |||
List<DiscoveryNode> allocatedNodes = new ArrayList<>(group.allocatedNodes().size()); | |||
List<DiscoveryNode> allocatedNodes = Collections.synchronizedList(new ArrayList<>(group.allocatedNodes().size())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should make the logic that resolves the new target nodes a helper method and call it under sendingLock to avoid handling concurrency.
# Conflicts: # x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java # x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java # x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java
39ada69
to
b2819b6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @idegtiarenko.
@@ -272,6 +301,7 @@ public void onFailure(Exception e, boolean receivedData) { | |||
for (ShardId shardId : request.shardIds) { | |||
trackShardLevelFailure(shardId, receivedData, e); | |||
pendingShardIds.add(shardId); | |||
maybeScheduleRetry(shardId, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can retry only if receivedData is false.
@@ -466,4 +509,25 @@ void searchShards( | |||
new ActionListenerResponseHandler<>(searchShardsListener, SearchShardsResponse::new, esqlExecutor) | |||
); | |||
} | |||
|
|||
void resolveShards(Set<ShardId> shardIds, ActionListener<Map<ShardId, List<DiscoveryNode>>> listener) { | |||
ActionListener.completeWith(listener, () -> doResolveShards(shardIds)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just return this without listener?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See the comment below
|
||
if (pendingRetries.isEmpty() == false && remainingTargetShardSearchAttempts.getAndDecrement() > 0) { | ||
ongoingTargetShardResolutionAttempts.incrementAndGet(); | ||
resolveShards(pendingRetries, computeListener.acquireAvoid().delegateFailure((l, resolutions) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move this retry to trySendingRequestsForPendingShards
under sendingLock to avoid handling concurrency?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is possible in principle, but this would block sending more requests while resolving failure. We do not need to do that necessary.
This also depends if this has to be async (for example if we want to move resolution to the elected master).
ongoingTargetShardResolutionAttempts.incrementAndGet(); | ||
resolveShards(pendingRetries, computeListener.acquireAvoid().delegateFailure((l, resolutions) -> { | ||
for (var entry : resolutions.entrySet()) { | ||
targetShards.shards.get(entry.getKey()).remainingNodes.addAll(entry.getValue()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we are using the cluster state on the coordinator, which might not be up to dated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Talked to @idegtiarenko offline. He will make changes to address the race condition between retry and sendingRequests. With the upcoming changes, the PR should be good to go. Thank you for all the iterations!
b53d64c
to
ef0ffef
Compare
Today we fail entire query if a shard is unavailable (because it has moved to another node) and there are no other shard copies to retry the query.
This change schedules another shard location resolution round in such cases in order to not fail the query.
Closes: #125947