Skip to content

Handle endpoint event for HTTPClientTransport #95

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from

Conversation

zats
Copy link
Contributor

@zats zats commented May 4, 2025

I noticed that endpoint event is not being handled meaning when server responds with tokenized endpoint and client should switch over to it, we were ignoring it previously resulting in client never connecting.

To fix this we need to listen to the endpoint event and update our endpoint with the new tokenizer path, i.e. for deployed cloudflare starter MCP URL switches from https://remote-mcp-server-authless.sash-508.workers.dev/sse to https://remote-mcp-server-authless.sash-508.workers.dev/sse/message?sessionId=031450d4723c2a13a476a68f1d4ab619c279d38937f878c1f1c10c19ef32dfc2

Motivation and Context

Without it SSE servers fail to connect.
⚠️ it is hard for me to tell if this breaks other flows, i.e. do we always expect endpoint event to arrive before we can consider client connected?
⚠️ I don't think I fully internalized streaming vs non-streaming HTTPClientTransport implementation, would appreciate a more critical look at non-streaming code path

How Has This Been Tested?

I only found a limited number of remote MCP servers listed here https://support.anthropic.com/en/articles/11176164-pre-built-integrations-using-remote-mcp. Tested against:

let config: URLSessionConfiguration = …
config.httpAdditionalHeaders = [
    "Authorization": "Bearer apify_api_xxxxxxxxx"
]
let transport = HTTPClientTransport(
  endpoint: url,
  configuration: config
)

Breaking Changes

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)
  • Documentation update

Checklist

  • I have read the MCP Documentation
  • My code follows the repository's style guidelines
  • New and existing tests pass locally
  • I have added appropriate error handling
  • I have added or updated documentation as needed

Additional context

There are a few follow ups needed:

  1. For the clients requiring auth, initial connection fails with 401 code, this is currently not handled and we should fallback onto auth provider

I noticed that endpoint event is not being handled meaning when server responds with tokenized endpoint and client should switch over to it, we were ignoring it previously resulting in client never connecting
@zats zats changed the title Handle endpoint event Handle endpoint event for HTTPClientTransport May 4, 2025
@stallent
Copy link
Contributor

stallent commented May 4, 2025

Love it that you are digging deep on this! Couple of comments on this PR, though. First, only the older "HTTP+SSE transport" spec uses the endpoint message. The latest spec has replaced full SSE with the new Streamable Transport where the server has a single URL for both the POSTs and GETs. So any form of this change would be purely for backwards compatibility.

There is a larger issue, though. Because the CURRENT spec does not use the endpoint message, this change actually breaks clients working with the latest servers. Once you replace the internal endpoint property from the endpoint message, subsequent POSTs will go to the wrong place.

There is already an PR in flight to button up proper handling of either text/event-stream or application/json responses from the server (which should help clear up the second q you had above). Once thats in I bet this PR could be tweaked a little, as not to override the single endpoint to support older pure SSE servers.

@zats
Copy link
Contributor Author

zats commented May 4, 2025

Great! Yeah I knew I’m oversimplifying, was looking to unblock myself. Is the other PR working and just needs minor additions or not quite there yet?

is this the PR? #91

@stallent
Copy link
Contributor

stallent commented May 4, 2025

So the other PR is totally working, but again its only for the latest streaming spec vs the older SSE one. The quickest way to unblock is just stay pointed at your fork, or i have a full SSE Transport that i did for the old spec i am more than happy to send you. It uses LDSwiftEventSource and works fine against any SSE servers.

@zats
Copy link
Contributor Author

zats commented May 4, 2025 via email

@stallent
Copy link
Contributor

stallent commented May 4, 2025

So we have a crew on our team building out all kinds of servers which helps have a solid test bed to dev against. We deliberately have some using the older transport spec, some the newer, some using the TS sdk, some using the Python one, etc... Most are behind auth, but we do have one stood up that uses the newest transport spec and set to streaming that you are more than welcome to use. https://hgai-sandbox.aks.stage.mercury.io/streamable/mcp

If you are comfy in ts, spinning up your own based on one of the officials that may be best path. Doing that and then using proxyman or somesuch was the best way for me to deal with any aspect of the spec that was unclear or confusing to me.

@zats
Copy link
Contributor Author

zats commented May 4, 2025 via email

@zats zats closed this May 4, 2025
@stallent
Copy link
Contributor

stallent commented May 4, 2025

Sure. Its simple and clunky but was only going to be used temporarily so i didn't take my time to make it smooth. It does work though and we are using it against our older spec's servers. You can ignore the token param, we just shoved it in for our auth. Be sure to add LDSwiftEventSource via SPM

import MCP
import OSLog
import Logging
import Foundation
import LDSwiftEventSource


public actor SSETransport : Transport {
    
    public var logger: Logging.Logger
    
    public var isConnnected:Bool = false

    // Used specifically for creating the async/await for connect()
    private var connectionContinuation: CheckedContinuation<Void, Swift.Error>?
    
    // These are the AsyncStreams for the MCP Client to use with receive()
    private let messageStream: AsyncThrowingStream<Data, Swift.Error>
    private let messageContinuation: AsyncThrowingStream<Data, Swift.Error>.Continuation
    
    // The SSE client
    private let eventSource: EventSource
    // LDSwiftEventSource doesn't like being an Actor so i created a little delegate class with closures to hook into
    private let myHandler: MyEventHandler
    
    private var messageUrl: URL?
    private let baseUrlRequest: URLRequest
    
    private let token: String
    
    public init(request: URLRequest, token: String) {

        self.baseUrlRequest = request
        self.token = token
        
        // Create message stream
        var continuation: AsyncThrowingStream<Data, Swift.Error>.Continuation!
        self.messageStream = AsyncThrowingStream { continuation = $0 }
        self.messageContinuation = continuation
       
        
        myHandler = MyEventHandler()
        var config = EventSource.Config(handler: myHandler, url: request.url!)
        config.headers = ["Authorization":"Bearer \(token)"]
       
        eventSource = EventSource(config: config)
        
        //don't use
        self.logger = Logging.Logger(label: "SSETransport")
        
        myHandler.myOnError = { [weak self] _ in
            Task {
                await self?.eventSource.stop()
            }
        }
    }
    
    public func connect() async throws {
        Logger.mcp.debug("SSETransport: connect")
        
        myHandler.myOnMessage = { [weak self] eventType, messageEvent in
            switch eventType {
                case "endpoint":
                    Task {
                        await self?.setEndpoint(messageEvent.data)
                    }
                case "message":
                    Task {
                        guard let data = messageEvent.data.data(using:.utf8) else { return }
                        self?.messageContinuation.yield(data)
                    }
                
                
            default: ()
            }
        }
        
        
        
        self.eventSource.start()
        // Wait for connection to be ready
        try await withCheckedThrowingContinuation { [weak self] (continuation: CheckedContinuation<Void, Swift.Error>) in
            guard let self = self else { continuation.resume(throwing: MCPError.internalError("Transport deallocated")); return }
            Task {
                await self.setConnectionContinuation(contenuation: continuation)
            }
        }
    }
    
    public func disconnect() async {
        self.eventSource.stop()
    }
    
    public func send(_ message: Data) async throws {
        guard let messageUrl = self.messageUrl else { throw MCPError.internalError("No message URL") }
        
        Logger.mcp.debug("SSETransport: send \(message, privacy: .public)")
        
        var messageRequest = URLRequest(url: messageUrl)
        messageRequest.httpMethod = "POST"
        messageRequest.httpBody = message
        messageRequest.setValue( "application/json", forHTTPHeaderField: "Content-Type")
        messageRequest.setValue("Bearer \(token)", forHTTPHeaderField: "Authorization")
        let (_, _) = try await URLSession.shared.data(for: messageRequest)
    }
    
    public func receive() -> AsyncThrowingStream<Data, Swift.Error> {
        return AsyncThrowingStream { continuation in
            Task {
                do {
                    for try await message in messageStream {
                        continuation.yield(message)
                    }
                    continuation.finish()
                } catch {
                    continuation.finish(throwing: error)
                }
            }
        }
    }
    
    
    public func setEndpoint(_ endpoint: String?) async {
        guard let components = URLComponents(url: self.baseUrlRequest.url!, resolvingAgainstBaseURL: true) else { return }
        guard let ep = endpoint else { return }
        guard let scheme = components.scheme else { return }
        guard let host = components.host else { return }
       
        let portString = components.port.map { ":\($0)"} ?? ""
        
        guard let url = URL(string: "\(scheme)://\(host)\(portString)\(ep)") else { return }
        
        
        self.messageUrl = url
        
        Logger.mcp.debug("SSETransport endpoint set to \(url, privacy: .public)")
        
        self.connectionContinuation?.resume()
        self.connectionContinuation = nil
    }
    
    public func setConnectionContinuation(contenuation: CheckedContinuation<Void, Swift.Error>) async {
        self.connectionContinuation = contenuation
    }
   
}


class MyEventHandler : EventHandler {
    
    init() {
    }
    
    func onOpened() {
        Logger.mcp.debug("SSETransport connection opened")
        self.myOnOpened?()
    }
    
    func onClosed() {
        Logger.mcp.debug("SSETransport onnection closed")
        self.myOnClosed?()
    }
    
    func onMessage(eventType: String, messageEvent: LDSwiftEventSource.MessageEvent) {
        Logger.mcp.debug("SSETransport message: \(eventType, privacy: .public) data: \(messageEvent.data, privacy: .public)")
        self.myOnMessage?(eventType, messageEvent)
    }
    
    func onComment(comment: String) {
        Logger.mcp.debug("SSETransport comment: \(comment, privacy: .public)")
        self.myOnComment?(comment)
    }
    
    func onError(error:  Swift.Error) {
        Logger.mcp.error("SSETransport error: \(error, privacy: .public)")
        self.myOnError?(error)
    }
    
    var myOnOpened: (() -> Void)?
    var myOnClosed: (() -> Void)?
    var myOnMessage: ((String, LDSwiftEventSource.MessageEvent) -> Void)?
    var myOnComment: ((String) -> Void)?
    var myOnError: ((Swift.Error) -> Void)?
}

@mattt
Copy link
Contributor

mattt commented May 6, 2025

Hey, thanks so much for taking a look at this, @zats. And thank you, @stallent, for responding.

I'll spend some time revisiting #91 today with the goal of getting that merged and cutting a new release. And even though SSE is deprecated, I think there's enough latent demand for Swift clients to support old server implementations to warrant adding support.

Also, wanted to share that I spent some time this past week making a nice, modern EventSource package, which I plan to incorporate as a dependency for this project. If you're using LDSwiftEventSource or another library now, I'd love to hear what you think.

@stallent
Copy link
Contributor

stallent commented May 6, 2025

a nice, modern EventSource package. ohhhhhh thats awesome!!! Thank you!

I am all for supporting older servers if it doesn't add too much complexity. Happy to look into that since we have plenty of both stood up.

@zats
Copy link
Contributor Author

zats commented May 6, 2025

Thanks both, excited to see this working ❤️

@mattt
Copy link
Contributor

mattt commented May 6, 2025

@zats @stallent I just opened this PR here: #101

By the way, I just cut a new release (0.8.0, which includes the new streamable HTTP client changes and other improvements for the latest MCP spec version (2025-03-26).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants