-
Notifications
You must be signed in to change notification settings - Fork 46
mcp: add retry and replay to the Streamable HTTP implementation #86
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
mcp: add retry and replay to the Streamable HTTP implementation #86
Conversation
Adds exponential backoff and jitter to the client-side POST. Implements replay using Last-Event-ID for resumability. Since these concepts are intertwined- I have added this in a single CL.
e28d524
to
807b9c0
Compare
"fmt" | ||
"io" | ||
"math/rand" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use math/rand/v2
(did an LLM help write this? We need to teach them about math/rand/v2!)
@@ -602,6 +615,13 @@ func NewStreamableClientTransport(url string, opts *StreamableClientTransportOpt | |||
t := &StreamableClientTransport{url: url} | |||
if opts != nil { | |||
t.opts = *opts | |||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for this: opts are already the zero value.
s.mu.Lock() | ||
defer s.mu.Unlock() | ||
if s.err != nil { | ||
return nil, s.err // Return explicit error if connection closed due to error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a logically distinct change. Given the size and complexity of this PR, it would be good to separate out those changes that are unrelated to replay support into one or more separate CLs, with tests.
if currentSessionID == "" && gotSessionID != "" { | ||
s.sessionID.Store(gotSessionID) | ||
} | ||
// Undefined behavior when currentSessionID != gotSessionID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the rationale for undefined behavior here? Why isn't this an error?
// Continue | ||
} | ||
|
||
gotSessionID, sendErr := s.postMessage(ctx, currentSessionID, msgToSend) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there's a fundamental problem here (fixable, but fundamental): the spec says we can resume an sse stream by issuing a subsequent GET requests, but does not say that we should retry post requests.
Imagine that the server-side is a stateful server (rare but possible): we don't want to perform duplicate actions. Instead, we should only resume hanging requests once we've received the response header as well as a nonempty event ID. Then we can issue a GET with that last-event-id to resume the streaming of responses.
case <-s.done: | ||
return // Connection is closed | ||
case msg := <-s.pendingMessages: | ||
// Use a new context for each send attempt to allow individual retries to be cancelled |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this sentence. If the overall connection is cancelled, don't we want all the retries to be cancelled too? Why retain the ability to cancel each one separately?
|
||
// Apply exponential backoff with jitter | ||
backoffDuration := s.initialBackoff * time.Duration(1<<uint(i)) | ||
jitter := time.Duration(s.randSource.Int63n(int64(backoffDuration / 2))) // Jitter up to half of backoff |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've also seen jitter implemented as picking the delay randomly between zero and backoffDuration. I wonder what the tradeoffs are of each way. It probably doesn't matter, but the other one is slightly easier to implement.
case <-s.done: | ||
return // Connection is closed. | ||
default: | ||
// Continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
omit comment
if sessionID == "" { | ||
// Session ID not yet established (first POST hasn't completed). | ||
// Wait and retry. | ||
time.Sleep(100 * time.Millisecond) // Avoid busy-waiting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you could avoid this if you kept sessionID in a chan string with capacity 1. This code would receive from the channel, blocking until it was ready. Other code that didn't want to block could do a select. Everyone who reads it would immediately put it back in the channel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if it's worth it.
case <-time.After(delay): | ||
retries++ | ||
backoffDuration *= 2 // Exponential increase | ||
if backoffDuration > 30*time.Second { // Cap backoff duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should't you cap it on the other side too?
case <-time.After(delay): | ||
retries++ | ||
backoffDuration *= 2 // Exponential increase | ||
if backoffDuration > 30*time.Second { // Cap backoff duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are this backoff delay computations so different between the client and server sides? Can they be pulled out into something more general?
// Message successfully sent to incoming channel | ||
case <-s.done: | ||
// Connection closed while trying to send incoming message | ||
return io.EOF |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be io.ErrUnexpectedEOF?
var httpErr *httpStatusError | ||
if errors.As(err, &httpErr) { | ||
switch httpErr.StatusCode { | ||
case http.StatusRequestTimeout, // 408 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OOC, where did this list come from?
}() | ||
|
||
// Wait for all messages to be received, or timeout | ||
allMessages := []string{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
var allMessages []string
My approach was wrong, oops. As Rob pointed out, we shouldn't retry POST, I will try again and send a separate PR. |
Adds exponential backoff and jitter to the client-side POST. Implements replay using Last-Event-ID for resumability.
Since these concepts are intertwined- I have added this in a single CL.
For #10