Skip to content

Prevent double continuation resumption in client #113

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 40 additions & 19 deletions Sources/MCP/Client/Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -267,12 +267,12 @@ public actor Client {
throw MCPError.internalError("Client connection not initialized")
}

// Use the actor's encoder
let requestData = try encoder.encode(request)

// Store the pending request first
return try await withCheckedThrowingContinuation { continuation in
Task {
// Add the pending request before attempting to send
self.addPendingRequest(
id: request.id,
continuation: continuation,
Expand All @@ -284,9 +284,15 @@ public actor Client {
// Use the existing connection send
try await connection.send(requestData)
} catch {
// If send fails immediately, resume continuation and remove pending request
continuation.resume(throwing: error)
self.removePendingRequest(id: request.id) // Ensure cleanup on send error
// If send fails, try to remove the pending request.
// Resume with the send error only if we successfully removed the request,
// indicating the response handler hasn't processed it yet.
if self.removePendingRequest(id: request.id) != nil {
continuation.resume(throwing: error)
}
// Otherwise, the request was already removed by the response handler
// or by disconnect, so the continuation was already resumed.
// Do nothing here.
}
}
}
Expand All @@ -300,8 +306,8 @@ public actor Client {
pendingRequests[id] = AnyPendingRequest(PendingRequest(continuation: continuation))
}

private func removePendingRequest(id: ID) {
pendingRequests.removeValue(forKey: id)
private func removePendingRequest(id: ID) -> AnyPendingRequest? {
return pendingRequests.removeValue(forKey: id)
}

// MARK: - Batching
Expand Down Expand Up @@ -562,14 +568,23 @@ public actor Client {
"Processing response",
metadata: ["id": "\(response.id)"])

switch response.result {
case .success(let value):
request.resume(returning: value)
case .failure(let error):
request.resume(throwing: error)
// Attempt to remove the pending request.
// Resume with the response only if it hadn't yet been removed.
if self.removePendingRequest(id: response.id) != nil {
switch response.result {
case .success(let value):
request.resume(returning: value)
case .failure(let error):
request.resume(throwing: error)
}
} else {
// Request was already removed
// (e.g., by send error handler or disconnect).
await logger?.warning(
"Attempted to handle response for already removed request",
metadata: ["id": "\(response.id)"]
)
}

removePendingRequest(id: response.id)
}

private func handleMessage(_ message: Message<AnyNotification>) async {
Expand Down Expand Up @@ -619,14 +634,20 @@ public actor Client {
private func handleBatchResponse(_ responses: [AnyResponse]) async {
await logger?.debug("Processing batch response", metadata: ["count": "\(responses.count)"])
for response in responses {
// Look up the pending request for this specific ID within the batch
if let request = pendingRequests[response.id] {
// Reuse the existing single response handler logic
await handleResponse(response, for: request)
// Attempt to remove the pending request. If successful, removedRequestBox will contain the request.
if let removedRequestBox = self.removePendingRequest(id: response.id) {
// If we successfully removed it, handle the response using the removed request box.
switch response.result {
case .success(let value):
removedRequestBox.resume(returning: value)
case .failure(let error):
removedRequestBox.resume(throwing: error)
}
} else {
// Log if a response ID doesn't match any pending request
// If removal failed, it means the request ID was not found (or already handled).
// Log a warning.
await logger?.warning(
"Received response in batch for unknown request ID",
"Received response in batch for unknown or already handled request ID",
metadata: ["id": "\(response.id)"]
)
}
Expand Down