Skip to content

cannot get response from await session.call_tool() #262

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
herrrX opened this issue Mar 12, 2025 · 7 comments
Open

cannot get response from await session.call_tool() #262

herrrX opened this issue Mar 12, 2025 · 7 comments

Comments

@herrrX
Copy link

herrrX commented Mar 12, 2025

hello, I'm sorry to bother you. I've recently encountered a blocking issue with the MCP client.
I've set up an MCP server, and it runs in Claude successfully. However, when I use the client I set up, the call to the tool result = await session.call_tool() gets stuck and doesn't return a response. But when I use tools = await session.list_tools(), I can retrieve the list of tools.

Through logging within the client file, I'm able to confirm that the tool call has been initiated and executed successfully, and the result is obtained, but the client side can't receive the result. It seems there's an issue with the transmission layer of the protocol. I've been struggling with this for two days and have looked for solutions without success, even after upgrading the system version.

I removed the other logic and only kept the tool invocation. The code is as follows.

Looking forward to your reply.


from mcp import ClientSession, StdioServerParameters, types
from mcp.client.stdio import stdio_client


server_params = StdioServerParameters(  # Create server parameters for stdio connection
    command="uvx",
    args=[
        "--from",
        "git+ssh://xxxx.xx.xxx",
        "server-name"
    ],
    env=None
)

# Optional: create a sampling callback
async def handle_sampling_message(message: types.CreateMessageRequestParams) -> types.CreateMessageResult:
    return types.CreateMessageResult(
        role="assistant",
        content=types.TextContent(
            type="text",
            text="Hello, world! from model",
        ),
        model="gpt-3.5-turbo",
        stopReason="endTurn",
    )

async def run():
    async with stdio_client(server_params) as (read, write):
        async with ClientSession(read, write, sampling_callback=handle_sampling_message) as session:
            # Initialize the connection
            await session.initialize()
            # List available tools
            tools = await session.list_tools()
            # Call a tool
            result = await session.call_tool("query-api-infos", arguments={"api_info_id": "8768555"})
            print(result)

if __name__ == "__main__":
    import asyncio
    asyncio.run(run())
@taha-yassine
Copy link

I'm experiencing the same issue.
When stepping into session.call_tool() with a debugger and going through BaseSession.send_request() line by line, it seems to work, but I have no idea why.

@STZhuang
Copy link

STZhuang commented Apr 3, 2025

Hey, I’m running into the same problem here! I’m on WSL (Ubuntu 24.04 on Windows) and can’t get any server replies when using the MCP client. Super annoying. But here’s the weird thing—it works totally fine on native Windows. I’m using the same uv setup for both environments, with this config:

[project]
name = "mcp-demo"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
    "jupyter>=1.1.1",
    "mcp[cli]>=1.6.0",
]

For example, when I try building a tool with this code:

from mcp.server.fastmcp import Context,FastMCP
import httpx
from bs4 import BeautifulSoup
from typing import List, Dict, Optional, Any
from dataclasses import dataclass
import urllib.parse
import sys
import traceback
import asyncio
from datetime import datetime, timedelta
import time
import re


@dataclass
class SearchResult:
    title: str
    link: str
    snippet: str
    position: int


class RateLimiter:
    def __init__(self, requests_per_minute: int = 30):
        self.requests_per_minute = requests_per_minute
        self.requests = []

    async def acquire(self):
        now = datetime.now()
        # Remove requests older than 1 minute
        self.requests = [
            req for req in self.requests if now - req < timedelta(minutes=1)
        ]

        if len(self.requests) >= self.requests_per_minute:
            # Wait until we can make another request
            wait_time = 60 - (now - self.requests[0]).total_seconds()
            if wait_time > 0:
                await asyncio.sleep(wait_time)

        self.requests.append(now)


class DuckDuckGoSearcher:
    BASE_URL = "https://html.duckduckgo.com/html"
    HEADERS = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
    }

    def __init__(self):
        self.rate_limiter = RateLimiter()

    def format_results_for_llm(self, results: List[SearchResult]) -> str:
        """Format results in a natural language style that's easier for LLMs to process"""
        if not results:
            return "No results were found for your search query. This could be due to DuckDuckGo's bot detection or the query returned no matches. Please try rephrasing your search or try again in a few minutes."

        output = []
        output.append(f"Found {len(results)} search results:\n")

        for result in results:
            output.append(f"{result.position}. {result.title}")
            output.append(f"   URL: {result.link}")
            output.append(f"   Summary: {result.snippet}")
            output.append("")  # Empty line between results

        return "\n".join(output)

    async def search(
        self, query: str, ctx: Context, max_results: int = 10
    ) -> List[SearchResult]:
        try:
            # Apply rate limiting
            await self.rate_limiter.acquire()

            # Create form data for POST request
            data = {
                "q": query,
                "b": "",
                "kl": "",
            }

            await ctx.info(f"Searching DuckDuckGo for: {query}")

            async with httpx.AsyncClient() as client:
                response = await client.post(
                    self.BASE_URL, data=data, headers=self.HEADERS, timeout=30.0
                )
                response.raise_for_status()

            # Parse HTML response
            soup = BeautifulSoup(response.text, "html.parser")
            if not soup:
                await ctx.error("Failed to parse HTML response")
                return []

            results = []
            for result in soup.select(".result"):
                title_elem = result.select_one(".result__title")
                if not title_elem:
                    continue

                link_elem = title_elem.find("a")
                if not link_elem:
                    continue

                title = link_elem.get_text(strip=True)
                link = link_elem.get("href", "")

                # Skip ad results
                if "y.js" in link:
                    continue

                # Clean up DuckDuckGo redirect URLs
                if link.startswith("//duckduckgo.com/l/?uddg="):
                    link = urllib.parse.unquote(link.split("uddg=")[1].split("&")[0])

                snippet_elem = result.select_one(".result__snippet")
                snippet = snippet_elem.get_text(strip=True) if snippet_elem else ""

                results.append(
                    SearchResult(
                        title=title,
                        link=link,
                        snippet=snippet,
                        position=len(results) + 1,
                    )
                )

                if len(results) >= max_results:
                    break

            await ctx.info(f"Successfully found {len(results)} results")
            return results

        except httpx.TimeoutException:
            await ctx.error("Search request timed out")
            return []
        except httpx.HTTPError as e:
            await ctx.error(f"HTTP error occurred: {str(e)}")
            return []
        except Exception as e:
            await ctx.error(f"Unexpected error during search: {str(e)}")
            traceback.print_exc(file=sys.stderr)
            return []


class WebContentFetcher:
    def __init__(self):
        self.rate_limiter = RateLimiter(requests_per_minute=20)

    async def fetch_and_parse(self, url: str, ctx: Context) -> str:
        """Fetch and parse content from a webpage"""
        try:
            await self.rate_limiter.acquire()

            await ctx.info(f"Fetching content from: {url}")

            async with httpx.AsyncClient() as client:
                response = await client.get(
                    url,
                    headers={
                        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
                    },
                    follow_redirects=True,
                    timeout=30.0,
                )
                response.raise_for_status()

            # Parse the HTML
            soup = BeautifulSoup(response.text, "html.parser")

            # Remove script and style elements
            for element in soup(["script", "style", "nav", "header", "footer"]):
                element.decompose()

            # Get the text content
            text = soup.get_text()

            # Clean up the text
            lines = (line.strip() for line in text.splitlines())
            chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
            text = " ".join(chunk for chunk in chunks if chunk)

            # Remove extra whitespace
            text = re.sub(r"\s+", " ", text).strip()

            # Truncate if too long
            if len(text) > 8000:
                text = text[:8000] + "... [content truncated]"

            await ctx.info(
                f"Successfully fetched and parsed content ({len(text)} characters)"
            )
            return text

        except httpx.TimeoutException:
            await ctx.error(f"Request timed out for URL: {url}")
            return "Error: The request timed out while trying to fetch the webpage."
        except httpx.HTTPError as e:
            await ctx.error(f"HTTP error occurred while fetching {url}: {str(e)}")
            return f"Error: Could not access the webpage ({str(e)})"
        except Exception as e:
            await ctx.error(f"Error fetching content from {url}: {str(e)}")
            return f"Error: An unexpected error occurred while fetching the webpage ({str(e)})"


# Initialize FastMCP server
mcp = FastMCP("ddg-search",port=7120)
searcher = DuckDuckGoSearcher()
fetcher = WebContentFetcher()


@mcp.tool()
async def search(query: str, ctx: Context, max_results: int = 10) -> str:
    """
    Search DuckDuckGo and return formatted results.

    Args:
        query: The search query string
        max_results: Maximum number of results to return (default: 10)
        ctx: MCP context for logging
    """
    try:
        results = await searcher.search(query, ctx, max_results)
        return searcher.format_results_for_llm(results)
    except Exception as e:
        traceback.print_exc(file=sys.stderr)
        return f"An error occurred while searching: {str(e)}"


@mcp.tool()
async def fetch_content(url: str, ctx: Context) -> str:
    """
    Fetch and parse content from a webpage URL.

    Args:
        url: The webpage URL to fetch content from
        ctx: MCP context for logging
    """
    return await fetcher.fetch_and_parse(url, ctx)


def main():
    mcp.run(transport = "sse")


if __name__ == "__main__":
    main()

And then test the tool_call result like this:

from mcp.client.sse import sse_client
from mcp import ClientSession
async def main():
    async with sse_client("http://localhost:7120/sse") as sse_transport:
        read, write = sse_transport
        async with ClientSession(read, write) as session :
            await session.initialize()
            print(await session.list_tools())
            result = await session.call_tool(name="search",arguments={"query":"langgraph","max_results":10})

if __name__ == "__main__":
    import asyncio 
    asyncio.run(main())

It just hangs on WSL—no response, nothing. Anyone else seeing this? Or maybe have a workaround? Works like a charm on Windows, though!

@WikiHkz
Copy link

WikiHkz commented Apr 9, 2025

I'm experiencing the same issue. Did you solve it?

@STZhuang
Copy link

I'm experiencing the same issue. Did you solve it?
Solved, just use websocket protocol instead of sse protocol for remote communication:

from pydantic import BaseModel
import logging
from pathlib import Path
from mcp.server import Server
from mcp.server.session import ServerSession
from mcp.server.stdio import stdio_server
from searcher_manager import SearchManager
from mcp.server.websocket import websocket_server
from server import DuckDuckGoSearchEngine,WebContentFetcher,format_results_for_llm,HybridSearch,SearchEngine,BingSearchEngine,BaiduSearchEngine
from mcp.types import (
    ClientCapabilities,
    TextContent,
    Tool,
    ListRootsResult,
    RootsCapability,
)
from typing import Literal,List
from mcp.server.fastmcp import Context
import traceback
import sys
from pydantic import BaseModel , Field,model_validator
from enum import Enum

from config import MAX_RETRIES,CONCURRENCY,SEARCH_ENGINE


class Search(BaseModel):
    query:str = Field(description="搜索引擎搜索关键词")
    max_results:int = Field(description="Maximum number of results to return (default: 10)")

class BatchSearch(BaseModel):
    query:List[str] = Field(description="搜索引擎搜索关键词列表")
    max_results:int = Field(description="Maximum number of results to return (default: 10)")


class Fetcher(BaseModel):
    url:str = Field(description="浏览器浏览的url地址")

class WebSearchTools(str,Enum):
    SEARCH = "searcher"
    FETCH = "fetcher"
    BATCH_SEARCH = 'batch_searcher'



class WebsearchEngine(BaseModel):
    
    searcher_engine:Literal[ "BING","DDG","hybrid","Baidu"] = Field(description="搜索引擎类型")

    def get_searcher(self) -> SearchEngine:

        match self.searcher_engine:
            case "BING":
                return BingSearchEngine()
            case "DDG":
                return DuckDuckGoSearchEngine()
            case "Baidu":
                return BaiduSearchEngine()
            case "hybrid":
                return HybridSearch(
                    engines=[
                        DuckDuckGoSearchEngine(),BingSearchEngine()
                        ],
                    max_retries=int(MAX_RETRIES),
                    concurrency=int(CONCURRENCY)
                    )
            case _:
                raise ValueError("暂时不支持的搜索引擎模式") 
    


fetcher = WebContentFetcher()


async def search(query: str, max_results: int = 10) -> str:
    """
    WebSearch  and return formatted results.
    Args:
        query: The search query string
        max_results: Maximum number of results to return (default: 10)
    """
    try:

        searcher = WebsearchEngine(searcher_engine=SEARCH_ENGINE).get_searcher()
        combined_results = await searcher.search(query=query, max_results=max_results)
        return format_results_for_llm(combined_results)
    
    except Exception as e:
        traceback.print_exc(file=sys.stderr)
        return f"An error occurred while searching: {str(e)}"


async def batch_search(query_list: List[str], max_results: int = 10) -> str:
    """
    WebSearch  and return formatted results.
    Args:
        query: The search query string
        max_results: Maximum number of results to return (default: 10)
    """
    try:

        searcher = WebsearchEngine(searcher_engine=SEARCH_ENGINE).get_searcher()
        task = [ searcher.search(query=query, max_results=max_results) for query in query_list]

        result = await asyncio.gather(*task)
        # 结果合并
        combined_results = [res for sublist in result for res in sublist]
        return format_results_for_llm(combined_results)
    
    except Exception as e:
        traceback.print_exc(file=sys.stderr)
        return f"An error occurred while searching: {str(e)}"


async def fetch_content(url: str) -> str:
    """
    Fetch and parse content from a webpage URL.

    Args:
        url: The webpage URL to fetch content from
    """
    return await fetcher.fetch_and_parse(url)


async def serve(port: int, transport: str) -> None:
    logger = logging.getLogger(__name__)
    server = Server("websearch")
    @server.list_tools()
    async def list_tools() -> list[Tool]:
        return [
            Tool(
                name=WebSearchTools.SEARCH,
                description="Search DuckDuckGo and return formatted results.",
                inputSchema=Search.schema()),
            Tool(
                name=WebSearchTools.FETCH,
                description="Fetch and parse content from a webpage URL.",
                inputSchema=Fetcher.schema()
                ),
            Tool(
                name=WebSearchTools.BATCH_SEARCH,
                description="当有多个关键词列表时,使用批量搜索",
                inputSchema=BatchSearch.schema()
                )
            

            ]
    @server.call_tool()
    async def call_tool(name: str, arguments: dict) -> list[TextContent]:
        match name:
            case WebSearchTools.SEARCH:
                status = await search(**arguments)
                return [TextContent(
                    type="text",
                    text=f"查找到的结果:\n{status}"
                )]

            case WebSearchTools.FETCH:
                diff =await fetcher(**arguments)
                return [TextContent(
                    type="text",
                    text=f"网页内容:\n{diff}"
                )]
            case WebSearchTools.BATCH_SEARCH:
                status = await batch_search(**arguments)
                return [TextContent(
                    type="text",
                    text=f"查找到的结果:\n{status}"
                )]
    options = server.create_initialization_options()
    if transport == "stdio":
        async with stdio_server() as (read_stream, write_stream):
            await server.run(read_stream, write_stream, options, raise_exceptions=True)
        
    elif transport == "sse":
        from mcp.server.sse import SseServerTransport
        from starlette.applications import Starlette
        from starlette.routing import Mount, Route
        
        sse = SseServerTransport("/messages/")
        async def handle_sse(request):
            async with sse.connect_sse(
                request.scope, request.receive, request._send
            ) as streams:
                await server.run(
                    streams[0], streams[1], server.create_initialization_options()
                )

        starlette_app = Starlette(
            debug=True,
            routes=[
                Route("/sse", endpoint=handle_sse),
                Mount("/messages/", app=sse.handle_post_message),
            ],
        )

        import uvicorn

        config = uvicorn.Config(
            starlette_app,
            host='0.0.0.0',
            port=port,
        )
        ser = uvicorn.Server(config)
        await ser.serve()

    else :
        from starlette.routing import WebSocketRoute
        from starlette.applications import Starlette
        async def handle_ws(request):
            async with websocket_server(
                request.scope, request.receive, request._send
            ) as streams:
                await server.run(
                    streams[0], streams[1], server.create_initialization_options()
                )
                
        starlette_app = Starlette(
            routes=[
                WebSocketRoute("/ws", endpoint=handle_ws)
            ],
        )
                
        import uvicorn

        config = uvicorn.Config(
            starlette_app,
            host='0.0.0.0',
            port=port,
        )
        ser = uvicorn.Server(config)
        await ser.serve()


if __name__ == "__main__":
    import click
    import asyncio
    
    @click.command()
    @click.option("--port", default=7920, help="Port to listen on for SSE/Websocket")
    @click.option(
        "--transport",
        type=click.Choice(["stdio", "sse","websocket"]),
        default="websocket",
        help="Transport type",
    )
    def main(port:int,transport:str):
        asyncio.run(serve(port,transport))
        
    main()

You just need to add websocket support to the client to make it work.

@anticomputer
Copy link

There does appear to be some sort of race condition in the handling of stdio tool call results at the jsonrpc transport layer. Similar to other bug reports in this issue, I can trace my call results back to actually returning a result, yet at the client session await level this result is never surfaced, resulting in a dead lock. This happens non-deterministically, and things like timing do seem to impact when it happens, which smells like a race condition somewhere in the jsonrpc event handling missing the actual output coming from the tool call. For me this mostly happens when I have longer running tool calls in sequence, and on the third or fourth result the jsonrpc stdio wrapper no longer surfaces results that are returned from the FastMCP tool call method itself.

@loadingvx
Copy link

loadingvx commented Apr 29, 2025

I encountered the same issue. To facilitate problem identification, I extracted the code into a separate file and found that the client could return normally. However, within the project architecture, it was strange—call_tools could send requests but failed to receive results. Eventually, I used a separate loop to handle the await process of call_tools, and then it worked properly. Although the exact reason remains unclear, the issue was resolved.

            llm_responses = self.agent.llm_client.response(self.dialogue.get_llm_dialogue())

            using_tools = False
            for content in llm_responses:
                self.logger.debug(f"llm response.iter.value =>  {content}")
                self.dialogue.put(Message(role="assistant", content=content))

                # 如果中途被打断,就停止生成
                if self.client_abort:
                    break

                if self.agent.need_call_tool(content):
---> this works 
                    tool_output = asyncio.run_coroutine_threadsafe(self.agent.call_tool(content), self.loop)
                    self.dialogue.put(Message(role="assistant", content=tool_output.result()))
----> this hangs forever
                    tool_output = await self.agent.call_tool(content)
                    self.dialogue.put(Message(role="assistant", content=tool_output))

                    using_tools = True
                    break

maybe it's python's async related issue, when using llm streaming and calling-tools in the same thread. maybe ...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants