Skip to content

mcp: add retry and replay to the Streamable HTTP implementation #86

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

samthanawalla
Copy link
Contributor

@samthanawalla samthanawalla commented Jul 2, 2025

Adds exponential backoff and jitter to the client-side POST. Implements replay using Last-Event-ID for resumability.

Since these concepts are intertwined- I have added this in a single CL.

For #10

Adds exponential backoff and jitter to the client-side POST. Implements
replay using Last-Event-ID for resumability.
Since these concepts are intertwined- I have added this in a single CL.
@samthanawalla samthanawalla requested review from jba and findleyr July 2, 2025 16:02
@samthanawalla samthanawalla marked this pull request as ready for review July 2, 2025 16:02
"fmt"
"io"
"math/rand"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use math/rand/v2
(did an LLM help write this? We need to teach them about math/rand/v2!)

@@ -602,6 +615,13 @@ func NewStreamableClientTransport(url string, opts *StreamableClientTransportOpt
t := &StreamableClientTransport{url: url}
if opts != nil {
t.opts = *opts
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for this: opts are already the zero value.

s.mu.Lock()
defer s.mu.Unlock()
if s.err != nil {
return nil, s.err // Return explicit error if connection closed due to error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a logically distinct change. Given the size and complexity of this PR, it would be good to separate out those changes that are unrelated to replay support into one or more separate CLs, with tests.

if currentSessionID == "" && gotSessionID != "" {
s.sessionID.Store(gotSessionID)
}
// Undefined behavior when currentSessionID != gotSessionID
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the rationale for undefined behavior here? Why isn't this an error?

// Continue
}

gotSessionID, sendErr := s.postMessage(ctx, currentSessionID, msgToSend)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a fundamental problem here (fixable, but fundamental): the spec says we can resume an sse stream by issuing a subsequent GET requests, but does not say that we should retry post requests.

Imagine that the server-side is a stateful server (rare but possible): we don't want to perform duplicate actions. Instead, we should only resume hanging requests once we've received the response header as well as a nonempty event ID. Then we can issue a GET with that last-event-id to resume the streaming of responses.

case <-s.done:
return // Connection is closed
case msg := <-s.pendingMessages:
// Use a new context for each send attempt to allow individual retries to be cancelled
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this sentence. If the overall connection is cancelled, don't we want all the retries to be cancelled too? Why retain the ability to cancel each one separately?


// Apply exponential backoff with jitter
backoffDuration := s.initialBackoff * time.Duration(1<<uint(i))
jitter := time.Duration(s.randSource.Int63n(int64(backoffDuration / 2))) // Jitter up to half of backoff
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've also seen jitter implemented as picking the delay randomly between zero and backoffDuration. I wonder what the tradeoffs are of each way. It probably doesn't matter, but the other one is slightly easier to implement.

case <-s.done:
return // Connection is closed.
default:
// Continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

omit comment

if sessionID == "" {
// Session ID not yet established (first POST hasn't completed).
// Wait and retry.
time.Sleep(100 * time.Millisecond) // Avoid busy-waiting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you could avoid this if you kept sessionID in a chan string with capacity 1. This code would receive from the channel, blocking until it was ready. Other code that didn't want to block could do a select. Everyone who reads it would immediately put it back in the channel.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it's worth it.

case <-time.After(delay):
retries++
backoffDuration *= 2 // Exponential increase
if backoffDuration > 30*time.Second { // Cap backoff duration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should't you cap it on the other side too?

case <-time.After(delay):
retries++
backoffDuration *= 2 // Exponential increase
if backoffDuration > 30*time.Second { // Cap backoff duration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are this backoff delay computations so different between the client and server sides? Can they be pulled out into something more general?

// Message successfully sent to incoming channel
case <-s.done:
// Connection closed while trying to send incoming message
return io.EOF
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be io.ErrUnexpectedEOF?

var httpErr *httpStatusError
if errors.As(err, &httpErr) {
switch httpErr.StatusCode {
case http.StatusRequestTimeout, // 408
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OOC, where did this list come from?

}()

// Wait for all messages to be received, or timeout
allMessages := []string{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var allMessages []string

@samthanawalla
Copy link
Contributor Author

samthanawalla commented Jul 8, 2025

My approach was wrong, oops.

As Rob pointed out, we shouldn't retry POST, I will try again and send a separate PR.

@samthanawalla samthanawalla deleted the retryStreamable branch July 8, 2025 16:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants