diff --git a/src/agentscope/mcp/_stateful_client_base.py b/src/agentscope/mcp/_stateful_client_base.py index 03c96db416..4557253c21 100644 --- a/src/agentscope/mcp/_stateful_client_base.py +++ b/src/agentscope/mcp/_stateful_client_base.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- """The base MCP stateful client class in AgentScope, that provides basic functionality for stateful MCP clients.""" +import asyncio from abc import ABC from contextlib import AsyncExitStack from typing import List @@ -19,6 +20,13 @@ class StatefulClientBase(MCPClientBase, ABC): The developers should use `connect()` and `close()` methods to manage the client lifecycle. + + The context manager lifecycle (``AsyncExitStack`` enter/exit) is run + inside a dedicated background task so that ``connect()`` and ``close()`` + can safely be called from different asyncio tasks — this avoids the + ``anyio.CancelScope`` "exit in a different task" error that occurs in + frameworks like uvicorn/FastAPI where startup and shutdown may run in + separate tasks. """ is_connected: bool @@ -43,17 +51,29 @@ def __init__(self, name: str) -> None: # Cache the tools to avoid fetching them multiple times self._cached_tools = None - async def connect(self) -> None: - """Connect to MCP server.""" - if self.is_connected: - raise RuntimeError( - "The MCP server is already connected. Call close() " - "before connecting again.", - ) - - self.stack = AsyncExitStack() - + # Cross-task lifecycle management: a dedicated background task + # owns the AsyncExitStack so that __aenter__/__aexit__ always + # execute in the same task, satisfying anyio.CancelScope. + self._lifecycle_task: asyncio.Task | None = None + self._stop_event: asyncio.Event | None = None + self._ready_event: asyncio.Event | None = None + self._init_error: BaseException | None = None + + # ------------------------------------------------------------------ + # Lifecycle worker + # ------------------------------------------------------------------ + + async def _lifecycle_worker(self) -> None: + """Run the full context-manager lifecycle in one task. + + This method is spawned by ``connect()`` as a background task. + It enters the ``AsyncExitStack``, signals readiness, then blocks + until ``close()`` sets the stop event. On exit the stack is + closed **in the same task** that entered it, which is the key + requirement of ``anyio.CancelScope``. + """ try: + self.stack = AsyncExitStack() context = await self.stack.enter_async_context( self.client, ) @@ -63,36 +83,114 @@ async def connect(self) -> None: await self.session.initialize() self.is_connected = True + self._ready_event.set() logger.info("MCP client connected.") - except Exception: - await self.stack.aclose() - self.stack = None + + # Block until close() signals. The wait may also be + # interrupted by CancelledError if the session's internal + # anyio cancel scope is torn down; treat only cancellation + # as a stop signal and let unexpected errors propagate. + try: + await self._stop_event.wait() + except asyncio.CancelledError: + pass + + except Exception as e: + self._init_error = e + self._ready_event.set() + finally: + self.session = None + self.is_connected = False + self._cached_tools = None + if self.stack: + try: + await self.stack.aclose() + except Exception as e: + logger.warning( + "Error during MCP client cleanup: %s", + e, + ) + finally: + self.stack = None + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + async def connect(self) -> None: + """Connect to MCP server. + + Spawns a background task that owns the full context-manager + lifecycle so that ``close()`` can be called from any task. + """ + if self.is_connected or ( + self._lifecycle_task is not None + and not self._lifecycle_task.done() + ): + raise RuntimeError( + "The MCP server is already connected. Call close() " + "before connecting again.", + ) + + self._stop_event = asyncio.Event() + self._ready_event = asyncio.Event() + self._init_error = None + + self._lifecycle_task = asyncio.create_task( + self._lifecycle_worker(), + ) + + try: + await self._ready_event.wait() + except BaseException: + # If connect() is cancelled externally (e.g. asyncio.wait_for + # timeout), ensure the lifecycle worker is stopped. We must + # cancel the task (not just set _stop_event) because the worker + # may still be blocked inside enter_async_context(). + self._lifecycle_task.cancel() + try: + await self._lifecycle_task + except (asyncio.CancelledError, Exception): + pass + self._lifecycle_task = None raise + if self._init_error is not None: + await self._lifecycle_task + self._lifecycle_task = None + raise self._init_error + async def close(self, ignore_errors: bool = True) -> None: """Clean up the MCP client resources. You must call this method when your application is done. + Signals the background lifecycle task to exit and waits for full + cleanup. + Args: ignore_errors (`bool`): Whether to ignore errors during cleanup. Defaults to `True`. """ - if not self.is_connected: + if not self.is_connected and self._lifecycle_task is None: raise RuntimeError( "The MCP server is not connected. Call connect() before " "closing.", ) try: - await self.stack.aclose() + if self._stop_event: + self._stop_event.set() + if self._lifecycle_task: + await self._lifecycle_task except Exception as e: if not ignore_errors: raise e logger.warning("Error during MCP client cleanup: %s", e) finally: - self.stack = None - self.session = None - self.is_connected = False + if self._lifecycle_task is None or self._lifecycle_task.done(): + self._lifecycle_task = None + self._stop_event = None + self._ready_event = None async def list_tools(self) -> List[mcp.types.Tool]: """Get all available tools from the server.