-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Fixing executor and flow control issues in subscriber #1915
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
user code - Separated the handling of batched received messages from the per-message dispatching of message to the user code. In order to accomplish this I had to keep the batch in memory while the processing of each message will draw from the in-memory batch until completely depleted. - Drawing flow controller permits on a per message basis (used to try to draw permits for the whole batch potentially deadlocking the whole subscriber), this addresses the deadlock condition raised in googleapis#1868 - No longer using blocking flow controller, instead pausing and resuming pulls/streamed-messages based on the flow controller feedback and when new permits become available. - A separate executor for alarms (2 threads in it has showed up to scale pretty well with many subscriber, given our ack operations are pretty lightweight) - Setting the maximum of messages to pull per request based on the number requested by the user in the flow controller (if any), this in a best effort addresses googleapis#1868 Fixes googleapis#1868, fixes googleapis#1865 and fixes googleapis#1855
@pongad @garrettjonesgoogle all yours to review thanks |
I will have @pongad review it first. |
return messages; | ||
} | ||
|
||
public void done() { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
@@ -57,6 +61,9 @@ | |||
@VisibleForTesting static final Duration PENDING_ACKS_SEND_DELAY = Duration.millis(100); | |||
private static final int MAX_ACK_DEADLINE_EXTENSION_SECS = 10 * 60; // 10m | |||
|
|||
private static final ScheduledExecutorService alarmsExecutor = |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
And some other minor changes addressing feedback
Looks like there is some fallout from the merge that is causing the build to fail |
builder | ||
.flowControlSettings | ||
.toBuilder() | ||
.setLimitExceededBehavior(LimitExceededBehavior.ThrowException) |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
} | ||
} | ||
|
||
public void processReceivedMessages(List<ReceivedMessage> messages, Runnable doneCallback) { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
by the user, this allows for proper unit testing plus adds the ability to the user to override the executor.
LGTM |
The current subscriber logic has a few concurrency issues explained as follows:
Lastly, I have also address a minor issues which is the inability of choosing the number of messages to pull per pull request, now such number is taken from the flow controller settings.
This change fixes #1868, fixes #1865 and fixes #1855