feat(mcp): fix CPU leak with mcp close#1456
feat(mcp): fix CPU leak with mcp close#1456qbc2016 wants to merge 2 commits intoagentscope-ai:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR updates the MCP stateful client lifecycle management to avoid cross-task context-manager cleanup issues (notably anyio CancelScope “exit in a different task”) that can surface during uvicorn/FastAPI startup/shutdown and reload flows.
Changes:
- Introduces a dedicated background lifecycle task that owns
AsyncExitStackenter/exit for the MCP client session. - Reworks
connect()/close()to coordinate lifecycle via asyncio events and await full cleanup through the background task. - Adds explanatory documentation in the class docstring about cross-task lifecycle handling.
| async def connect(self) -> None: | ||
| """Connect to MCP server. | ||
|
|
||
| Spawns a background task that owns the full context-manager | ||
| lifecycle so that ``close()`` can be called from any task. | ||
| """ | ||
| if self.is_connected: | ||
| raise RuntimeError( | ||
| "The MCP server is already connected. Call close() " | ||
| "before connecting again.", | ||
| ) | ||
|
|
||
| self._stop_event = asyncio.Event() | ||
| self._ready_event = asyncio.Event() | ||
| self._init_error = None | ||
|
|
||
| self._lifecycle_task = asyncio.create_task( | ||
| self._lifecycle_worker(), | ||
| ) |
There was a problem hiding this comment.
This change introduces new cross-task lifecycle semantics (background task ownership + event signaling) intended to prevent the anyio CancelScope “exit in a different task” issue. Existing MCP tests cover basic connect()/close() but don’t cover calling close() from a different task or overlapping connect()/close(). Please add unit tests to lock in the new behavior and prevent regressions (e.g., connect() then asyncio.create_task(client.close()) from another task, ensuring no hang and is_connected transitions correctly).
| if self.is_connected: | ||
| raise RuntimeError( | ||
| "The MCP server is already connected. Call close() " | ||
| "before connecting again.", | ||
| ) | ||
|
|
||
| self._stop_event = asyncio.Event() | ||
| self._ready_event = asyncio.Event() | ||
| self._init_error = None | ||
|
|
||
| self._lifecycle_task = asyncio.create_task( | ||
| self._lifecycle_worker(), | ||
| ) |
There was a problem hiding this comment.
connect() only checks is_connected, so two concurrent connect() calls can both proceed before _lifecycle_worker flips is_connected=True. The second call overwrites _stop_event/_ready_event/_lifecycle_task, leaving the first lifecycle task orphaned and potentially leaking resources. Consider guarding with _lifecycle_task is not None and not _lifecycle_task.done() (or an asyncio.Lock) and raising a clear error (or awaiting the in-flight connect).
| await self._lifecycle_task | ||
| self._lifecycle_task = None | ||
| raise self._init_error | ||
|
|
There was a problem hiding this comment.
There’s a race where close() can be called before _ready_event is set: the worker will set _ready_event, immediately observe _stop_event already set, then exit and set is_connected=False. In that case connect() returns successfully even though the client is already closed. After waiting on _ready_event, consider validating that is_connected is still True (and/or _stop_event is not set / _lifecycle_task still running) and raise an error if shutdown happened during connect.
| if ( | |
| not self.is_connected | |
| or self._stop_event.is_set() | |
| or self._lifecycle_task is None | |
| or self._lifecycle_task.done() | |
| ): | |
| if self._lifecycle_task is not None: | |
| await self._lifecycle_task | |
| self._lifecycle_task = None | |
| raise RuntimeError( | |
| "The MCP server was closed during connect().", | |
| ) |
| # Block until close() signals. The wait may also be | ||
| # interrupted by CancelledError if the session's internal | ||
| # anyio cancel scope is torn down; treat that as a stop. | ||
| try: | ||
| await self._stop_event.wait() | ||
| except (asyncio.CancelledError, Exception): |
There was a problem hiding this comment.
_lifecycle_worker() swallows all exceptions from await self._stop_event.wait() via except (asyncio.CancelledError, Exception): pass. This can hide real programming errors (e.g., _stop_event being None) and make unexpected shutdowns silent. It’s safer to only treat cancellation as a stop signal and let unexpected exceptions propagate/log explicitly.
| # Block until close() signals. The wait may also be | |
| # interrupted by CancelledError if the session's internal | |
| # anyio cancel scope is torn down; treat that as a stop. | |
| try: | |
| await self._stop_event.wait() | |
| except (asyncio.CancelledError, Exception): | |
| # Block until close() signals. The wait may also be | |
| # interrupted by CancelledError if the session's internal | |
| # anyio cancel scope is torn down; treat only cancellation | |
| # as a stop signal and let unexpected errors propagate. | |
| try: | |
| await self._stop_event.wait() | |
| except asyncio.CancelledError: | |
| # Cancellation during shutdown is expected here. |
| self._lifecycle_task = None | ||
| except Exception as e: | ||
| if not ignore_errors: | ||
| raise e | ||
| logger.warning("Error during MCP client cleanup: %s", e) |
There was a problem hiding this comment.
close() doesn’t reset _lifecycle_task (and related lifecycle fields) if await self._lifecycle_task raises. That can leave the instance stuck in a state where future close() calls keep re-awaiting a failed task. Consider moving _lifecycle_task = None (and clearing _stop_event/_ready_event) into a finally block so state is consistent even on errors/cancellation.
| self._lifecycle_task = None | |
| except Exception as e: | |
| if not ignore_errors: | |
| raise e | |
| logger.warning("Error during MCP client cleanup: %s", e) | |
| except Exception as e: | |
| if not ignore_errors: | |
| raise e | |
| logger.warning("Error during MCP client cleanup: %s", e) | |
| finally: | |
| self._lifecycle_task = None | |
| self._stop_event = None | |
| self._ready_event = None |
| @@ -63,36 +83,106 @@ async def connect(self) -> None: | |||
| await self.session.initialize() | |||
|
|
|||
| self.is_connected = True | |||
| self._ready_event.set() | |||
| logger.info("MCP client connected.") | |||
| except Exception: | |||
| await self.stack.aclose() | |||
| self.stack = None | |||
|
|
|||
| # Block until close() signals. The wait may also be | |||
| # interrupted by CancelledError if the session's internal | |||
| # anyio cancel scope is torn down; treat that as a stop. | |||
| try: | |||
| await self._stop_event.wait() | |||
| except (asyncio.CancelledError, Exception): | |||
| pass | |||
|
|
|||
| except Exception as e: | |||
| self._init_error = e | |||
| self._ready_event.set() | |||
| finally: | |||
| self.session = None | |||
| self.is_connected = False | |||
| self._cached_tools = None | |||
| if self.stack: | |||
| try: | |||
| await self.stack.aclose() | |||
| except Exception as e: | |||
| logger.warning( | |||
| "Error during MCP client cleanup: %s", | |||
| e, | |||
| ) | |||
| finally: | |||
| self.stack = None | |||
|
|
|||
| # ------------------------------------------------------------------ | |||
| # Public API | |||
| # ------------------------------------------------------------------ | |||
|
|
|||
| async def connect(self) -> None: | |||
| """Connect to MCP server. | |||
|
|
|||
| Spawns a background task that owns the full context-manager | |||
| lifecycle so that ``close()`` can be called from any task. | |||
| """ | |||
| if self.is_connected: | |||
| raise RuntimeError( | |||
| "The MCP server is already connected. Call close() " | |||
| "before connecting again.", | |||
| ) | |||
|
|
|||
| self._stop_event = asyncio.Event() | |||
| self._ready_event = asyncio.Event() | |||
| self._init_error = None | |||
|
|
|||
| self._lifecycle_task = asyncio.create_task( | |||
| self._lifecycle_worker(), | |||
| ) | |||
|
|
|||
| try: | |||
| await self._ready_event.wait() | |||
| except BaseException: | |||
| # If connect() is cancelled externally (e.g. asyncio.wait_for | |||
| # timeout), ensure the lifecycle worker is stopped. We must | |||
| # cancel the task (not just set _stop_event) because the worker | |||
| # may still be blocked inside enter_async_context(). | |||
| self._lifecycle_task.cancel() | |||
| try: | |||
| await self._lifecycle_task | |||
| except (asyncio.CancelledError, Exception): | |||
| pass | |||
| self._lifecycle_task = None | |||
| raise | |||
|
|
|||
| if self._init_error is not None: | |||
| await self._lifecycle_task | |||
| self._lifecycle_task = None | |||
| raise self._init_error | |||
|
|
|||
| async def close(self, ignore_errors: bool = True) -> None: | |||
| """Clean up the MCP client resources. You must call this method when | |||
| your application is done. | |||
|
|
|||
| Signals the background lifecycle task to exit and waits for full | |||
| cleanup. | |||
|
|
|||
| Args: | |||
| ignore_errors (`bool`): | |||
| Whether to ignore errors during cleanup. Defaults to `True`. | |||
| """ | |||
| if not self.is_connected: | |||
| if not self.is_connected and self._lifecycle_task is None: | |||
| raise RuntimeError( | |||
| "The MCP server is not connected. Call connect() before " | |||
| "closing.", | |||
| ) | |||
There was a problem hiding this comment.
New/updated docstrings (_lifecycle_worker(), connect(), close()) don’t follow the repository’s docstring template (description + explicit Args/Returns sections with typed backticks). Please align these docstrings with the project’s standard so generated docs stay consistent.
AgentScope Version
1.0.19dev
Description
This PR fixes a critical CPU leak issue in MCP (Model Context Protocol) client management that occurs during workspace reload operations in uvicorn/FastAPI environments.
Checklist
Please check the following items before code is ready to be reviewed.
pre-commit run --all-filescommand