Skip to content

Fixing executor and flow control issues in subscriber #1915

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 21, 2017

Conversation

davidtorres
Copy link

The current subscriber logic has a few concurrency issues explained as follows:

  • The flow controller blocks the user's executor threads; so in the event of all the threads getting blocked because of flow control limits, the subscriber won't be able to dispatch messages to the user code hence unable to release flow control permits and to make forward progress; a deadlock. This change makes the flow control non-blocking and instead regulates the flow of messages by either suspending the streaming or not issuing more pulls but until permits are released from flow controller
  • Lease extensions are handled under the user's executor, in the event of the user code monopolizing all the threads in the executor, the subscriber won't be able to execute any lease extension which will make the leases to expire and increase the chances of message redelivery; if there is a time when it is more important to maintain lease extensions is when the user code is busy working on the messages. This code change sets up a separate small (two threads) shared-across-subscribers executor for taking care of lease extensions, under my load tests (with >600 hundred subscriber objects) this executor performed well not showing signs of work backup, this is mainly because the work of scheduling and sending lease extensions is fairly cheap.
  • Flow control permits were being drawn on a per batch basis while the messages were dispatch one at a time, this could cause a deadlock if ever receiving a batch larger than the limit of message in the flow controller. This change makes the code reserve flow control permits on a per messages basis as they are dispatched to the user code.

Lastly, I have also address a minor issues which is the inability of choosing the number of messages to pull per pull request, now such number is taken from the flow controller settings.

This change fixes #1868, fixes #1865 and fixes #1855

user code

- Separated the handling of batched received messages from the per-message dispatching of message to the user code. In order to accomplish this I had to keep the batch in memory while the processing of each message will draw from the in-memory batch until completely depleted.
  - Drawing flow controller permits on a per message basis (used to try to draw permits for the whole batch potentially deadlocking the whole subscriber), this addresses the deadlock condition raised in googleapis#1868
    - No longer using blocking flow controller, instead pausing and resuming pulls/streamed-messages based on the flow controller feedback and when new permits become available.
- A separate executor for alarms (2 threads in it has showed up to scale pretty well with many subscriber, given our ack operations are pretty lightweight)
- Setting the maximum of messages to pull per request based on the number requested by the user in the flow controller (if any), this in a best effort addresses googleapis#1868

Fixes googleapis#1868, fixes googleapis#1865 and fixes googleapis#1855
@googlebot googlebot added the cla: yes This human has signed the Contributor License Agreement. label Apr 13, 2017
@davidtorres
Copy link
Author

@pongad @garrettjonesgoogle all yours to review thanks

@garrettjonesgoogle
Copy link
Member

I will have @pongad review it first.

return messages;
}

public void done() {

This comment was marked as spam.

This comment was marked as spam.

@@ -57,6 +61,9 @@
@VisibleForTesting static final Duration PENDING_ACKS_SEND_DELAY = Duration.millis(100);
private static final int MAX_ACK_DEADLINE_EXTENSION_SECS = 10 * 60; // 10m

private static final ScheduledExecutorService alarmsExecutor =

This comment was marked as spam.

This comment was marked as spam.

@garrettjonesgoogle
Copy link
Member

Looks like there is some fallout from the merge that is causing the build to fail

builder
.flowControlSettings
.toBuilder()
.setLimitExceededBehavior(LimitExceededBehavior.ThrowException)

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

}
}

public void processReceivedMessages(List<ReceivedMessage> messages, Runnable doneCallback) {

This comment was marked as spam.

This comment was marked as spam.

by the user, this allows for proper unit testing plus adds the ability
to the user to override the executor.
@garrettjonesgoogle
Copy link
Member

LGTM

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.01%) to 80.886% when pulling f716280 on davidtorres:master into a722e95 on GoogleCloudPlatform:master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla: yes This human has signed the Contributor License Agreement.
Projects
None yet
5 participants