-
Notifications
You must be signed in to change notification settings - Fork 2.6k
feat(mcp): fix CPU leak with mcp close #1456
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,6 +1,7 @@ | ||||||||||||||||||||||||||||||
| # -*- coding: utf-8 -*- | ||||||||||||||||||||||||||||||
| """The base MCP stateful client class in AgentScope, that provides basic | ||||||||||||||||||||||||||||||
| functionality for stateful MCP clients.""" | ||||||||||||||||||||||||||||||
| import asyncio | ||||||||||||||||||||||||||||||
| from abc import ABC | ||||||||||||||||||||||||||||||
| from contextlib import AsyncExitStack | ||||||||||||||||||||||||||||||
| from typing import List | ||||||||||||||||||||||||||||||
|
|
@@ -19,6 +20,13 @@ class StatefulClientBase(MCPClientBase, ABC): | |||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| The developers should use `connect()` and `close()` methods to manage | ||||||||||||||||||||||||||||||
| the client lifecycle. | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| The context manager lifecycle (``AsyncExitStack`` enter/exit) is run | ||||||||||||||||||||||||||||||
| inside a dedicated background task so that ``connect()`` and ``close()`` | ||||||||||||||||||||||||||||||
| can safely be called from different asyncio tasks — this avoids the | ||||||||||||||||||||||||||||||
| ``anyio.CancelScope`` "exit in a different task" error that occurs in | ||||||||||||||||||||||||||||||
| frameworks like uvicorn/FastAPI where startup and shutdown may run in | ||||||||||||||||||||||||||||||
| separate tasks. | ||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| is_connected: bool | ||||||||||||||||||||||||||||||
|
|
@@ -43,17 +51,29 @@ def __init__(self, name: str) -> None: | |||||||||||||||||||||||||||||
| # Cache the tools to avoid fetching them multiple times | ||||||||||||||||||||||||||||||
| self._cached_tools = None | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| async def connect(self) -> None: | ||||||||||||||||||||||||||||||
| """Connect to MCP server.""" | ||||||||||||||||||||||||||||||
| if self.is_connected: | ||||||||||||||||||||||||||||||
| raise RuntimeError( | ||||||||||||||||||||||||||||||
| "The MCP server is already connected. Call close() " | ||||||||||||||||||||||||||||||
| "before connecting again.", | ||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| self.stack = AsyncExitStack() | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| # Cross-task lifecycle management: a dedicated background task | ||||||||||||||||||||||||||||||
| # owns the AsyncExitStack so that __aenter__/__aexit__ always | ||||||||||||||||||||||||||||||
| # execute in the same task, satisfying anyio.CancelScope. | ||||||||||||||||||||||||||||||
| self._lifecycle_task: asyncio.Task | None = None | ||||||||||||||||||||||||||||||
| self._stop_event: asyncio.Event | None = None | ||||||||||||||||||||||||||||||
| self._ready_event: asyncio.Event | None = None | ||||||||||||||||||||||||||||||
| self._init_error: BaseException | None = None | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| # ------------------------------------------------------------------ | ||||||||||||||||||||||||||||||
| # Lifecycle worker | ||||||||||||||||||||||||||||||
| # ------------------------------------------------------------------ | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| async def _lifecycle_worker(self) -> None: | ||||||||||||||||||||||||||||||
| """Run the full context-manager lifecycle in one task. | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| This method is spawned by ``connect()`` as a background task. | ||||||||||||||||||||||||||||||
| It enters the ``AsyncExitStack``, signals readiness, then blocks | ||||||||||||||||||||||||||||||
| until ``close()`` sets the stop event. On exit the stack is | ||||||||||||||||||||||||||||||
| closed **in the same task** that entered it, which is the key | ||||||||||||||||||||||||||||||
| requirement of ``anyio.CancelScope``. | ||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||
| self.stack = AsyncExitStack() | ||||||||||||||||||||||||||||||
| context = await self.stack.enter_async_context( | ||||||||||||||||||||||||||||||
| self.client, | ||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||
|
|
@@ -63,36 +83,106 @@ async def connect(self) -> None: | |||||||||||||||||||||||||||||
| await self.session.initialize() | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| self.is_connected = True | ||||||||||||||||||||||||||||||
| self._ready_event.set() | ||||||||||||||||||||||||||||||
| logger.info("MCP client connected.") | ||||||||||||||||||||||||||||||
| except Exception: | ||||||||||||||||||||||||||||||
| await self.stack.aclose() | ||||||||||||||||||||||||||||||
| self.stack = None | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| # Block until close() signals. The wait may also be | ||||||||||||||||||||||||||||||
| # interrupted by CancelledError if the session's internal | ||||||||||||||||||||||||||||||
| # anyio cancel scope is torn down; treat that as a stop. | ||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||
| await self._stop_event.wait() | ||||||||||||||||||||||||||||||
| except (asyncio.CancelledError, Exception): | ||||||||||||||||||||||||||||||
| pass | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| except Exception as e: | ||||||||||||||||||||||||||||||
| self._init_error = e | ||||||||||||||||||||||||||||||
| self._ready_event.set() | ||||||||||||||||||||||||||||||
| finally: | ||||||||||||||||||||||||||||||
| self.session = None | ||||||||||||||||||||||||||||||
| self.is_connected = False | ||||||||||||||||||||||||||||||
| self._cached_tools = None | ||||||||||||||||||||||||||||||
| if self.stack: | ||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||
| await self.stack.aclose() | ||||||||||||||||||||||||||||||
| except Exception as e: | ||||||||||||||||||||||||||||||
| logger.warning( | ||||||||||||||||||||||||||||||
| "Error during MCP client cleanup: %s", | ||||||||||||||||||||||||||||||
| e, | ||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||
| finally: | ||||||||||||||||||||||||||||||
| self.stack = None | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| # ------------------------------------------------------------------ | ||||||||||||||||||||||||||||||
| # Public API | ||||||||||||||||||||||||||||||
| # ------------------------------------------------------------------ | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| async def connect(self) -> None: | ||||||||||||||||||||||||||||||
| """Connect to MCP server. | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| Spawns a background task that owns the full context-manager | ||||||||||||||||||||||||||||||
| lifecycle so that ``close()`` can be called from any task. | ||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||
| if self.is_connected: | ||||||||||||||||||||||||||||||
| raise RuntimeError( | ||||||||||||||||||||||||||||||
| "The MCP server is already connected. Call close() " | ||||||||||||||||||||||||||||||
| "before connecting again.", | ||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| self._stop_event = asyncio.Event() | ||||||||||||||||||||||||||||||
| self._ready_event = asyncio.Event() | ||||||||||||||||||||||||||||||
| self._init_error = None | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| self._lifecycle_task = asyncio.create_task( | ||||||||||||||||||||||||||||||
| self._lifecycle_worker(), | ||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||
|
Comment on lines
+120
to
+141
|
||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||
| await self._ready_event.wait() | ||||||||||||||||||||||||||||||
| except BaseException: | ||||||||||||||||||||||||||||||
| # If connect() is cancelled externally (e.g. asyncio.wait_for | ||||||||||||||||||||||||||||||
| # timeout), ensure the lifecycle worker is stopped. We must | ||||||||||||||||||||||||||||||
| # cancel the task (not just set _stop_event) because the worker | ||||||||||||||||||||||||||||||
| # may still be blocked inside enter_async_context(). | ||||||||||||||||||||||||||||||
| self._lifecycle_task.cancel() | ||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||
| await self._lifecycle_task | ||||||||||||||||||||||||||||||
| except (asyncio.CancelledError, Exception): | ||||||||||||||||||||||||||||||
| pass | ||||||||||||||||||||||||||||||
| self._lifecycle_task = None | ||||||||||||||||||||||||||||||
| raise | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| if self._init_error is not None: | ||||||||||||||||||||||||||||||
| await self._lifecycle_task | ||||||||||||||||||||||||||||||
| self._lifecycle_task = None | ||||||||||||||||||||||||||||||
| raise self._init_error | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
| if ( | |
| not self.is_connected | |
| or self._stop_event.is_set() | |
| or self._lifecycle_task is None | |
| or self._lifecycle_task.done() | |
| ): | |
| if self._lifecycle_task is not None: | |
| await self._lifecycle_task | |
| self._lifecycle_task = None | |
| raise RuntimeError( | |
| "The MCP server was closed during connect().", | |
| ) |
Copilot
AI
Apr 15, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New/updated docstrings (_lifecycle_worker(), connect(), close()) don’t follow the repository’s docstring template (description + explicit Args/Returns sections with typed backticks). Please align these docstrings with the project’s standard so generated docs stay consistent.
Copilot
AI
Apr 15, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
close() doesn’t reset _lifecycle_task (and related lifecycle fields) if await self._lifecycle_task raises. That can leave the instance stuck in a state where future close() calls keep re-awaiting a failed task. Consider moving _lifecycle_task = None (and clearing _stop_event/_ready_event) into a finally block so state is consistent even on errors/cancellation.
| self._lifecycle_task = None | |
| except Exception as e: | |
| if not ignore_errors: | |
| raise e | |
| logger.warning("Error during MCP client cleanup: %s", e) | |
| except Exception as e: | |
| if not ignore_errors: | |
| raise e | |
| logger.warning("Error during MCP client cleanup: %s", e) | |
| finally: | |
| self._lifecycle_task = None | |
| self._stop_event = None | |
| self._ready_event = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_lifecycle_worker()swallows all exceptions fromawait self._stop_event.wait()viaexcept (asyncio.CancelledError, Exception): pass. This can hide real programming errors (e.g.,_stop_eventbeingNone) and make unexpected shutdowns silent. It’s safer to only treat cancellation as a stop signal and let unexpected exceptions propagate/log explicitly.