Skip to content

Commit 3e0c4eb

Browse files
feat: streamable HTTP (#2424)
Co-authored-by: Wendong-Fan <[email protected]>
1 parent bc0ff39 commit 3e0c4eb

File tree

2 files changed

+60
-22
lines changed

2 files changed

+60
-22
lines changed

camel/toolkits/mcp_toolkit.py

Lines changed: 57 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
if TYPE_CHECKING:
3535
from mcp import ClientSession, ListToolsResult, Tool
3636

37+
3738
from camel.logger import get_logger
3839
from camel.toolkits import BaseToolkit, FunctionTool
3940

@@ -84,7 +85,6 @@ class MCPClient(BaseToolkit):
8485
await client.disconnect()
8586
```
8687
87-
8888
Attributes:
8989
command_or_url (str): URL for SSE mode or command executable for stdio
9090
mode. (default: :obj:`None`)
@@ -96,6 +96,10 @@ class MCPClient(BaseToolkit):
9696
(default: :obj:`None`)
9797
headers (Dict[str, str]): Headers for the HTTP request.
9898
(default: :obj:`None`)
99+
mode (Optional[str]): Connection mode. Can be "sse" for Server-Sent
100+
Events, "streamable-http" for streaming HTTP,
101+
or None for stdio mode.
102+
(default: :obj:`None`)
99103
strict (Optional[bool]): Whether to enforce strict mode for the
100104
function call. (default: :obj:`False`)
101105
"""
@@ -107,6 +111,7 @@ def __init__(
107111
env: Optional[Dict[str, str]] = None,
108112
timeout: Optional[float] = None,
109113
headers: Optional[Dict[str, str]] = None,
114+
mode: Optional[str] = None,
110115
strict: Optional[bool] = False,
111116
):
112117
from mcp import Tool
@@ -118,6 +123,7 @@ def __init__(
118123
self.env = env or {}
119124
self.headers = headers or {}
120125
self.strict = strict
126+
self.mode = mode
121127

122128
self._mcp_tools: List[Tool] = []
123129
self._session: Optional['ClientSession'] = None
@@ -133,23 +139,45 @@ async def connect(self):
133139
from mcp.client.session import ClientSession
134140
from mcp.client.sse import sse_client
135141
from mcp.client.stdio import StdioServerParameters, stdio_client
142+
from mcp.client.streamable_http import streamablehttp_client
136143

137144
if self._is_connected:
138145
logger.warning("Server is already connected")
139146
return self
140147

141148
try:
142149
if urlparse(self.command_or_url).scheme in ("http", "https"):
143-
(
144-
read_stream,
145-
write_stream,
146-
) = await self._exit_stack.enter_async_context(
147-
sse_client(
148-
self.command_or_url,
149-
headers=self.headers,
150-
timeout=self.timeout,
150+
if self.mode == "sse" or self.mode is None:
151+
(
152+
read_stream,
153+
write_stream,
154+
) = await self._exit_stack.enter_async_context(
155+
sse_client(
156+
self.command_or_url,
157+
headers=self.headers,
158+
timeout=self.timeout,
159+
)
160+
)
161+
elif self.mode == "streamable-http":
162+
try:
163+
(
164+
read_stream,
165+
write_stream,
166+
_,
167+
) = await self._exit_stack.enter_async_context(
168+
streamablehttp_client(
169+
self.command_or_url,
170+
headers=self.headers,
171+
timeout=timedelta(seconds=self.timeout),
172+
)
173+
)
174+
except Exception as e:
175+
# Handle anyio task group errors
176+
logger.error(f"Streamable HTTP client error: {e}")
177+
else:
178+
raise ValueError(
179+
f"Invalid mode '{self.mode}' for HTTP URL"
151180
)
152-
)
153181
else:
154182
command = self.command_or_url
155183
arguments = self.args
@@ -198,10 +226,14 @@ async def disconnect(self):
198226
if not self._is_connected:
199227
return
200228
self._is_connected = False
201-
await self._exit_stack.aclose()
202-
# Reset the exit stack and session for future reuse purposes
203-
self._exit_stack = AsyncExitStack()
204-
self._session = None
229+
230+
try:
231+
await self._exit_stack.aclose()
232+
except Exception as e:
233+
logger.warning(f"{e}")
234+
finally:
235+
self._exit_stack = AsyncExitStack()
236+
self._session = None
205237

206238
@asynccontextmanager
207239
async def connection(self):
@@ -217,7 +249,10 @@ async def connection(self):
217249
await self.connect()
218250
yield self
219251
finally:
220-
await self.disconnect()
252+
try:
253+
await self.disconnect()
254+
except Exception as e:
255+
logger.warning(f"Error: {e}")
221256

222257
async def list_mcp_tools(self) -> Union[str, "ListToolsResult"]:
223258
r"""Retrieves the list of available tools from the connected MCP
@@ -440,6 +475,7 @@ async def create(
440475
env: Optional[Dict[str, str]] = None,
441476
timeout: Optional[float] = None,
442477
headers: Optional[Dict[str, str]] = None,
478+
mode: Optional[str] = None,
443479
) -> "MCPClient":
444480
r"""Factory method that creates and connects to the MCP server.
445481
@@ -457,6 +493,10 @@ async def create(
457493
(default: :obj:`None`)
458494
headers (Optional[Dict[str, str]]): Headers for the HTTP request.
459495
(default: :obj:`None`)
496+
mode (Optional[str]): Connection mode. Can be "sse" for
497+
Server-Sent Events, "streamable-http" for
498+
streaming HTTP, or None for stdio mode.
499+
(default: :obj:`None`)
460500
461501
Returns:
462502
MCPClient: A fully initialized and connected MCPClient instance.
@@ -470,6 +510,7 @@ async def create(
470510
env=env,
471511
timeout=timeout,
472512
headers=headers,
513+
mode=mode,
473514
)
474515
try:
475516
await client.connect()
@@ -679,6 +720,7 @@ def _load_servers_from_dict(
679720
env={**os.environ, **cfg.get("env", {})},
680721
timeout=cfg.get("timeout", None),
681722
headers=headers,
723+
mode=cfg.get("mode", None),
682724
strict=strict,
683725
)
684726
all_servers.append(server)
Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,9 @@
11
{
22
"mcpServers": {
33
"arxiv_toolkit": {
4-
"command": "python",
5-
"args": [
6-
"-m",
7-
"examples.mcp_arxiv_toolkit.arxiv_toolkit_server",
8-
"--timeout",
9-
"30"
10-
]
4+
"url": "http://localhost:8000/mcp/sse",
5+
"timeout": 30,
6+
"mode": "streamable-http"
117
}
128
}
139
}

0 commit comments

Comments
 (0)