Skip to content

Fix request processing scheduling #127464

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from

Conversation

idegtiarenko
Copy link
Contributor

@idegtiarenko idegtiarenko commented Apr 28, 2025

This change fix concurrency around handling moved shards.

The test was failing as the shard failures were visible before retry was processed. In order to fix it the error handling is updated to:

  • schedule retries before recording shard failures
  • block request sending as soon as moved shard is detected (before the sending was locked only when we accumulated the list of shards and started resolving their new location).

Closes: #127168

@idegtiarenko idegtiarenko added >non-issue Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) :Analytics/ES|QL AKA ESQL v9.1.0 labels Apr 28, 2025
@idegtiarenko idegtiarenko requested review from nik9000 and dnhatn April 28, 2025 12:38
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-analytical-engine (Team:Analytics)

@@ -265,9 +265,8 @@ void onAfter(DriverCompletionInfo info) {
concurrentRequests.release();
}

if (pendingRetries.isEmpty() == false && remainingUnavailableShardResolutionAttempts.decrementAndGet() >= 0) {
if (sendingLock.isHeldByCurrentThread()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think isHeldByCurrentThread should be used for assertions or debugging purposes, not in production code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. let me find a way to replace it.

@idegtiarenko idegtiarenko requested a review from dnhatn April 29, 2025 07:01
pendingRetries.add(shardId);
if (pendingRetries == null && remainingUnavailableShardResolutionAttempts.decrementAndGet() >= 0) {
pendingRetries = new HashSet<>();
sendingLock.lock();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am concerned that we are scattering sendingLock#lock and sendingLock#unlock in two different places. Can we keep them close?

Copy link
Contributor Author

@idegtiarenko idegtiarenko Apr 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are in the same inner listener, guarding the same structure at the moment.
They were previously in the same method before but that was not enough and caused a bug.
I suspect we could do this by creating a releasable inner wrapper class on top of pendingRetries = new HashSet<>() but that sounds like an overkill not really helping with readability.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@idegtiarenko Sorry, I should have provided more detail. My concern is that we acquire the sending lock in maybeScheduleRetry and release it in onAfter, which is linked to the status of pendingRetries. While the implementation is technically correct, I think we should stick to the simplest lock pattern unless there is a strong reason to do otherwise:

lock/tryLock
try {
  ...
} finally {
  unlock
}

I think we can follow this lock pattern in the DataNodeRequestSender class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds a lot like the original implementation (within onAfter) that was not correct.
Do you see a way how to implement this suggestion while still locking conditionally (only if shard movement is detected) and blocking concurrent requests while it is detected that new shard location resolution is required?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed yesterday, I moved retry scheduling to trySendingRequestsForPendingShards in 8a0dcc6

@idegtiarenko idegtiarenko requested a review from dnhatn May 6, 2025 11:31
Copy link
Member

@dnhatn dnhatn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks @idegtiarenko

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Analytics/ES|QL AKA ESQL >non-issue Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v9.1.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[CI] DataNodeRequestSenderTests testRetryOnlyMovedShards failing
3 participants