-
Notifications
You must be signed in to change notification settings - Fork 127
Description
Hi all,
We are experiencing issues with GCP Pubsub. I would like to hear your opinion on this, so I could contribute with a fix if needed.
Problem Description
After migrating from Pub/Sub push to pull using streaming pull connections, we're experiencing a critical issue where our application becomes completely unresponsive:
- Initial state: 8 streaming pull workers configured and running correctly
- Degradation: Connections drop one by one over time due to gRPC
Cancelled
responses - Final state: When all connections are lost,
.receive()
never terminates, leaving the process stuck indefinitely with:- No errors logged
- No automatic recovery
- Silent failure requiring manual intervention
Root Cause Analysis
I've traced the issue to how Cancelled
status codes are handled in the streaming subscriber:
let stream = match response {
Ok(r) => r.into_inner(),
Err(e) => {
if e.code() == Code::Cancelled {
tracing::trace!("stop subscriber : {}", subscription);
break; // <-- Subscriber stops permanently on Cancelled
} else if retryable_codes.contains(&e.code()) {
tracing::warn!("failed to start streaming: will reconnect {:?} : {}", e, subscription);
continue;
} else {
tracing::error!("failed to start streaming: will stop {:?} : {}", e, subscription);
break;
}
}
};
The Cancelled
status is treated as a terminal condition rather than a retryable error.
Proposed Solution
I've tested making Cancelled a retryable status code in a fork. Results after several hours:
✅ All connections remain stable
✅ No more silent failures
✅ Automatic recovery from Cancelled responses
Questions for Maintainers
Is there a specific reason Cancelled is treated as non-retryable? (I noticed the Golang implementation also treats it as non-retryable)
Could there be side effects from making Cancelled retryable that I haven't considered?
If making Cancelled retryable isn't appropriate, what alternative approach would you recommend for handling these connection drops?
I'm happy to submit a PR with either my current fix or an alternative implementation based on your guidance.