Skip to content

Commit 55257b9

Browse files
authored
feat(agent): support remote MCP transports
Support SSE and streamable HTTP MCP servers, harden remote tool retries, and move registry construction off the async session loop.
1 parent 5237ce3 commit 55257b9

15 files changed

Lines changed: 1057 additions & 52 deletions

README.md

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -780,14 +780,19 @@ vibe-trading run "use my-server to do X"
780780

781781
| Field | Type | Default | Description |
782782
|-------|------|---------|-------------|
783-
| `command` | string | required | Executable to spawn |
784-
| `args` | array | `[]` | Command-line arguments |
785-
| `env` | object | `{}` | Extra environment variables merged into the subprocess env |
783+
| `type` | string | inferred for stdio; required for HTTP | Omit for stdio, or set to `sse` / `streamableHttp` for URL-based servers. |
784+
| `command` | string | required for stdio | Executable to spawn for stdio servers. Invalid for `sse` / `streamableHttp` servers. |
785+
| `args` | array | `[]` | Command-line arguments for stdio servers only. |
786+
| `env` | object | `{}` | Extra environment variables merged into the subprocess env for stdio servers only. |
787+
| `url` | string | required for `sse` / `streamableHttp` | Remote SSE / streamable HTTP endpoint URL. Not used for stdio servers. |
788+
| `headers` | object | `{}` | Extra HTTP headers for `sse` / `streamableHttp` servers only. |
786789
| `toolTimeout` | number | `30` | Per-tool call timeout in seconds |
787790
| `enabledTools` | array | `["*"]` | Tool allowlist. Use `["*"]` to expose all tools from the server |
788791

789792
Config file location: `~/.vibe-trading/agent.json` (JSON or YAML).
790793

794+
For URL-based transports, `type` is required. The agent no longer guesses between SSE and streamable HTTP from the URL suffix.
795+
791796
### Per-session overrides (API)
792797

793798
When creating a session via the API you can pass `mcpServers` inside `session.config` to extend or override the global config for that session only:
@@ -822,7 +827,7 @@ tool names unique. Rename the server in agent config if you want a different pre
822827

823828
| Limit | Detail |
824829
|-------|--------|
825-
| Transport | stdio only (SSE / streamable HTTP excluded in v1) |
830+
| Transport | stdio, SSE, and streamable HTTP |
826831
| Execution | serial only — MCP tools never enter the parallel readonly path |
827832
| Surfaces | tools only (resources and prompts excluded in v1) |
828833
| Hot reload | not supported — restart the process to pick up config changes |

agent/SKILL.md

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,12 +187,17 @@ Remote tools appear automatically in every `vibe-trading run` / `vibe-trading ch
187187

188188
| Field | Required | Default | Description |
189189
|-------|----------|---------|-------------|
190-
| `command` | yes || Executable to launch |
190+
| `type` | stdio: no, HTTP: yes | inferred only for stdio | Transport type. Use `sse` or `streamableHttp` for URL-based servers. |
191+
| `command` | stdio: yes || Executable to launch |
191192
| `args` | no | `[]` | Command arguments |
192193
| `env` | no | `{}` | Extra env vars for the subprocess |
194+
| `url` | HTTP: yes || Remote SSE / streamable HTTP endpoint URL |
195+
| `headers` | no | `{}` | Extra HTTP headers for SSE / streamable HTTP servers |
193196
| `toolTimeout` | no | `30` | Seconds before a tool call is cancelled |
194197
| `enabledTools` | no | `["*"]` | Allowlist of remote tool names. `["*"]` enables all |
195198

199+
For URL-based transports, `type` is required. The agent no longer guesses between SSE and streamable HTTP from the URL suffix.
200+
196201
### Per-session override (API)
197202

198203
> **Security — disabled by default.** `mcpServers` defines subprocess `command`/`args`/`env` and is therefore restricted to operator-level trust. API callers **cannot** inject MCP server definitions through `POST /sessions` unless the server operator explicitly opts in.
@@ -223,7 +228,7 @@ Without `ALLOW_SESSION_MCP_SERVERS=1`, any `mcpServers` key in `session.config`
223228

224229
### v1 limits
225230

226-
- **Transport:** stdio only. SSE and HTTP transports are rejected.
231+
- **Transport:** stdio, SSE, and streamable HTTP.
227232
- **Execution:** serial only. MCP tools never enter the parallel readonly path.
228233
- **Surfaces:** tools only. Resources and prompts are not exposed.
229234
- **Swarm:** MCP tools are excluded from Swarm worker registries in v1.

agent/src/config/loader.py

Lines changed: 75 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from pydantic import ValidationError
1212

1313
from src.config.paths import get_config_path
14-
from src.config.schema import AgentConfig, AgentConfigOverride
14+
from src.config.schema import AgentConfig, AgentConfigOverride, MCPServerConfig
1515

1616
logger = logging.getLogger(__name__)
1717

@@ -80,11 +80,19 @@ def merge_agent_config_overrides(
8080
)
8181
return config
8282

83-
merged = _merge_dicts(
83+
merged = _merge_agent_config_dicts(
8484
config.model_dump(mode="json"),
8585
override_model.model_dump(mode="json", exclude_unset=True),
8686
)
87-
return AgentConfig.model_validate(merged)
87+
try:
88+
return AgentConfig.model_validate(merged)
89+
except ValidationError as exc:
90+
logger.warning(
91+
"Ignoring merged agent config overrides after validation failure (%s): %s — using base config",
92+
type(exc).__name__,
93+
[str(e["loc"]) for e in exc.errors()],
94+
)
95+
return config
8896

8997

9098
# Keys in session overrides that carry subprocess definitions and therefore
@@ -169,6 +177,70 @@ def _read_config_file(path: Path) -> dict[str, Any]:
169177
return data
170178

171179

180+
def _merge_agent_config_dicts(base: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]:
181+
"""Merge top-level agent config payloads with MCP-aware server replacement."""
182+
non_mcp_override = {key: value for key, value in override.items() if key != "mcp_servers"}
183+
merged = _merge_dicts(base, non_mcp_override)
184+
185+
override_servers = override.get("mcp_servers")
186+
if not isinstance(override_servers, dict):
187+
if "mcp_servers" in override:
188+
merged["mcp_servers"] = override_servers
189+
return merged
190+
191+
merged_servers = dict(base.get("mcp_servers", {}))
192+
for server_name, server_override in override_servers.items():
193+
current_server = merged_servers.get(server_name)
194+
if isinstance(current_server, dict) and isinstance(server_override, dict):
195+
merged_servers[server_name] = _merge_mcp_server_dicts(current_server, server_override)
196+
else:
197+
merged_servers[server_name] = server_override
198+
199+
merged["mcp_servers"] = merged_servers
200+
return merged
201+
202+
203+
def _merge_mcp_server_dicts(base: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]:
204+
"""Merge one MCP server payload, resetting incompatible transport fields when needed."""
205+
if _override_switches_transport(base, override):
206+
return _merge_dicts(_default_mcp_server_payload(base), override)
207+
return _merge_dicts(base, override)
208+
209+
210+
def _override_switches_transport(base: dict[str, Any], override: dict[str, Any]) -> bool:
211+
"""Return whether a partial override changes the server transport family."""
212+
override_transport = _resolve_override_transport(override)
213+
if override_transport is None:
214+
return False
215+
base_transport = MCPServerConfig.model_validate(base).resolved_transport()
216+
return override_transport != base_transport
217+
218+
219+
def _resolve_override_transport(override: dict[str, Any]) -> str | None:
220+
"""Infer transport intent from a partial MCP server override."""
221+
explicit_type = override.get("type")
222+
if explicit_type in {"stdio", "sse", "streamableHttp"}:
223+
return str(explicit_type)
224+
if any(key in override for key in ("command", "args", "env")):
225+
return "stdio"
226+
return None
227+
228+
229+
def _default_mcp_server_payload(base: dict[str, Any]) -> dict[str, Any]:
230+
"""Return a transport-neutral MCP server payload preserving non-transport defaults."""
231+
enabled_tools = base.get("enabled_tools")
232+
return {
233+
"type": None,
234+
"command": "",
235+
"args": [],
236+
"env": {},
237+
"url": "",
238+
"headers": {},
239+
"tool_timeout": base.get("tool_timeout", 30.0),
240+
"enabled_tools": list(enabled_tools) if isinstance(enabled_tools, list) else ["*"],
241+
}
242+
243+
172244
def _merge_dicts(base: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]:
173245
"""Recursively merge two plain dictionaries.
174246

agent/src/config/schema.py

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -38,30 +38,40 @@ class MCPServerConfig(ConfigBase):
3838
tool_timeout: float = Field(default=30.0, ge=0.1)
3939
enabled_tools: list[str] = Field(default_factory=lambda: ["*"])
4040

41+
def resolved_transport(self) -> Literal["stdio", "sse", "streamableHttp"]:
42+
"""Resolve the effective transport from explicit type or implied fields."""
43+
if self.type is not None:
44+
return self.type
45+
if self.command.strip() or self.args or self.env:
46+
return "stdio"
47+
if self.url.strip():
48+
raise ValueError("HTTP MCP servers require an explicit type of 'sse' or 'streamableHttp'")
49+
return "stdio"
50+
4151
@model_validator(mode="after")
42-
def validate_v1_stdio_only(self) -> "MCPServerConfig":
43-
"""Reject non-stdio transports for the first release.
52+
def validate_transport_config(self) -> "MCPServerConfig":
53+
"""Validate transport-specific MCP server configuration.
4454
4555
Returns:
4656
The validated MCP server config instance.
4757
4858
Raises:
49-
ValueError: If the config implies a non-stdio transport, uses
50-
HTTP-only fields, or omits the command required for stdio.
59+
ValueError: If required fields are missing for the resolved
60+
transport or conflicting fields are provided.
5161
"""
52-
transport = self.type
53-
if transport is None:
54-
if self.command:
55-
transport = "stdio"
56-
elif self.url:
57-
transport = "sse" if self.url.rstrip("/").endswith("/sse") else "streamableHttp"
58-
59-
if transport and transport != "stdio":
60-
raise ValueError("Only stdio MCP servers are supported in v1")
61-
if self.url or self.headers:
62-
raise ValueError("HTTP MCP transports are not supported in v1")
63-
if not self.command.strip():
64-
raise ValueError("stdio MCP servers require a command")
62+
transport = self.resolved_transport()
63+
64+
if transport == "stdio":
65+
if not self.command.strip():
66+
raise ValueError("stdio MCP servers require a command")
67+
if self.url.strip() or self.headers:
68+
raise ValueError("stdio MCP servers do not accept url/headers")
69+
return self
70+
71+
if not self.url.strip():
72+
raise ValueError(f"{transport} MCP servers require a url")
73+
if self.command.strip() or self.args or self.env:
74+
raise ValueError(f"{transport} MCP servers do not accept command/args/env")
6575
return self
6676

6777

agent/src/session/service.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,7 @@ async def _run_with_agent(
265265

266266
session_id = attempt.session_id
267267
attempt_id = attempt.attempt_id
268+
loop = asyncio.get_running_loop()
268269

269270
safe_overrides = sanitize_session_overrides(session_config) if session_config else session_config
270271
agent_config = load_runtime_agent_config(overrides=safe_overrides)
@@ -278,13 +279,18 @@ def _mcp_collision_warn(msg: str) -> None:
278279
"""Forward MCP server-name collision warnings to the operator event channel."""
279280
self.event_bus.emit(session_id, "mcp.warning", {"attempt_id": attempt_id, "message": msg})
280281

281-
agent = AgentLoop(
282-
registry=build_registry(
282+
registry = await loop.run_in_executor(
283+
_AGENT_EXECUTOR,
284+
lambda: build_registry(
283285
persistent_memory=pm,
284286
include_shell_tools=include_shell_tools,
285287
agent_config=agent_config,
286288
warn_callback=_mcp_collision_warn,
287289
),
290+
)
291+
292+
agent = AgentLoop(
293+
registry=registry,
288294
llm=llm,
289295
event_callback=event_callback,
290296
max_iterations=50,
@@ -296,7 +302,6 @@ def _mcp_collision_warn(msg: str) -> None:
296302
history = self._convert_messages_to_history(messages) if messages else None
297303

298304
try:
299-
loop = asyncio.get_running_loop()
300305
result = await loop.run_in_executor(
301306
_AGENT_EXECUTOR,
302307
lambda: agent.run(

agent/src/tools/mcp.py

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
from fastmcp.client import Client
1717
from fastmcp.client.client import CallToolResult
18+
from fastmcp.client.transports.http import StreamableHttpTransport
19+
from fastmcp.client.transports.sse import SSETransport
1820
from fastmcp.client.transports.stdio import StdioTransport
1921
from fastmcp.exceptions import McpError, ToolError
2022
from mcp import types as mcp_types
@@ -343,19 +345,33 @@ def call_tool(
343345
}
344346

345347
def _build_client(self) -> AsyncMCPClient:
346-
"""Create the default FastMCP stdio client.
348+
"""Create the default FastMCP client from MCP transport config.
347349
348350
Returns:
349351
Configured async FastMCP client.
350352
"""
351-
env = os.environ.copy()
352-
env.update(self.server_config.env)
353-
transport = StdioTransport(
354-
command=self.server_config.command,
355-
args=list(self.server_config.args),
356-
env=env,
357-
keep_alive=False,
358-
)
353+
transport_type = self.server_config.resolved_transport()
354+
355+
if transport_type == "stdio":
356+
env = os.environ.copy()
357+
env.update(self.server_config.env)
358+
transport = StdioTransport(
359+
command=self.server_config.command,
360+
args=list(self.server_config.args),
361+
env=env,
362+
keep_alive=False,
363+
)
364+
elif transport_type == "sse":
365+
transport = SSETransport(
366+
url=self.server_config.url,
367+
headers=dict(self.server_config.headers) or None,
368+
)
369+
else:
370+
transport = StreamableHttpTransport(
371+
url=self.server_config.url,
372+
headers=dict(self.server_config.headers) or None,
373+
)
374+
359375
# Use a minimum of 30 s for init_timeout so cold-start servers (pip
360376
# install, docker pull, slow imports) do not trip the same short
361377
# deadline as a per-call tool_timeout.
@@ -388,7 +404,7 @@ async def _list_tools_once(self) -> list[mcp_types.Tool]:
388404
return await client.list_tools()
389405

390406
async def _call_tool(self, remote_name: str, arguments: dict[str, Any]) -> CallToolResult:
391-
"""Call a remote tool with timeout and a single transient retry.
407+
"""Call a remote tool with timeout and no automatic retry.
392408
393409
Args:
394410
remote_name: Remote tool name.
@@ -409,7 +425,10 @@ async def _invoke() -> CallToolResult:
409425
raise_on_error=False,
410426
)
411427

412-
return await self._run_with_retry(f"call_tool:{remote_name}", _invoke)
428+
# Remote MCP tools are arbitrary and may mutate external state. If a
429+
# timeout / connection drop happens after the server has already
430+
# committed the action, retrying here would duplicate the side effect.
431+
return await _invoke()
413432

414433
async def _run_with_retry(
415434
self,
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
"""Minimal FastMCP SSE server for integration tests.
2+
3+
Launched as an HTTP subprocess by test_mcp_sse_integration.py.
4+
Exposes two tools:
5+
- echo(message: str) -> str — returns "echo: <message>"
6+
- add(a: int, b: int) -> int — returns a + b
7+
8+
Usage (called by pytest, not directly):
9+
python agent/tests/fixtures/fake_mcp_sse_server.py --port 0 --port-file /tmp/fake-mcp-sse.port
10+
"""
11+
12+
from __future__ import annotations
13+
14+
import argparse
15+
import asyncio
16+
from pathlib import Path
17+
18+
from fastmcp import FastMCP
19+
import uvicorn
20+
21+
mcp = FastMCP("fake-mcp-sse-server")
22+
23+
24+
@mcp.tool()
25+
def echo(message: str) -> str:
26+
"""Echo a message back with a prefix."""
27+
return f"echo: {message}"
28+
29+
30+
@mcp.tool()
31+
def add(a: int, b: int) -> int:
32+
"""Add two integers."""
33+
return a + b
34+
35+
36+
def main() -> None:
37+
parser = argparse.ArgumentParser(description="Minimal FastMCP SSE test server")
38+
parser.add_argument("--port", type=int, default=18900, help="SSE port")
39+
parser.add_argument("--path", default="/sse", help="SSE path")
40+
parser.add_argument("--port-file", default="", help="Optional file to write the bound port")
41+
args = parser.parse_args()
42+
43+
app = mcp.http_app(path=args.path, transport="sse")
44+
config = uvicorn.Config(app, host="127.0.0.1", port=args.port, log_level="warning")
45+
server = uvicorn.Server(config)
46+
sock = config.bind_socket()
47+
if args.port_file:
48+
Path(args.port_file).write_text(str(sock.getsockname()[1]), encoding="utf-8")
49+
asyncio.run(server.serve(sockets=[sock]))
50+
51+
52+
if __name__ == "__main__":
53+
main()

0 commit comments

Comments
 (0)