Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -723,14 +723,19 @@ vibe-trading run "use my-server to do X"

| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `type` | string | inferred only for stdio | Transport type. Use `sse` or `streamableHttp` for URL-based servers. |
| `command` | string | required | Executable to spawn |
| `args` | array | `[]` | Command-line arguments |
| `env` | object | `{}` | Extra environment variables merged into the subprocess env |
| `url` | string | — | Remote SSE / streamable HTTP endpoint URL |
| `headers` | object | `{}` | Extra HTTP headers for SSE / streamable HTTP servers |
Comment thread
shadowinlife marked this conversation as resolved.
Outdated
| `toolTimeout` | number | `30` | Per-tool call timeout in seconds |
| `enabledTools` | array | `["*"]` | Tool allowlist. Use `["*"]` to expose all tools from the server |

Config file location: `~/.vibe-trading/agent.json` (JSON or YAML).

For URL-based transports, `type` is required. The agent no longer guesses between SSE and streamable HTTP from the URL suffix.

### Per-session overrides (API)

When creating a session via the API you can pass `mcpServers` inside `session.config` to extend or override the global config for that session only:
Expand Down Expand Up @@ -765,7 +770,7 @@ tool names unique. Rename the server in agent config if you want a different pre

| Limit | Detail |
|-------|--------|
| Transport | stdio only (SSE / streamable HTTP excluded in v1) |
| Transport | stdio, SSE, and streamable HTTP |
| Execution | serial only — MCP tools never enter the parallel readonly path |
| Surfaces | tools only (resources and prompts excluded in v1) |
| Hot reload | not supported — restart the process to pick up config changes |
Expand Down
9 changes: 7 additions & 2 deletions agent/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,17 @@ Remote tools appear automatically in every `vibe-trading run` / `vibe-trading ch

| Field | Required | Default | Description |
|-------|----------|---------|-------------|
| `command` | yes | — | Executable to launch |
| `type` | stdio: no, HTTP: yes | inferred only for stdio | Transport type. Use `sse` or `streamableHttp` for URL-based servers. |
| `command` | stdio: yes | — | Executable to launch |
| `args` | no | `[]` | Command arguments |
| `env` | no | `{}` | Extra env vars for the subprocess |
| `url` | HTTP: yes | — | Remote SSE / streamable HTTP endpoint URL |
| `headers` | no | `{}` | Extra HTTP headers for SSE / streamable HTTP servers |
| `toolTimeout` | no | `30` | Seconds before a tool call is cancelled |
| `enabledTools` | no | `["*"]` | Allowlist of remote tool names. `["*"]` enables all |

For URL-based transports, `type` is required. The agent no longer guesses between SSE and streamable HTTP from the URL suffix.

### Per-session override (API)

> **Security — disabled by default.** `mcpServers` defines subprocess `command`/`args`/`env` and is therefore restricted to operator-level trust. API callers **cannot** inject MCP server definitions through `POST /sessions` unless the server operator explicitly opts in.
Expand Down Expand Up @@ -214,7 +219,7 @@ Without `ALLOW_SESSION_MCP_SERVERS=1`, any `mcpServers` key in `session.config`

### v1 limits

- **Transport:** stdio only. SSE and HTTP transports are rejected.
- **Transport:** stdio, SSE, and streamable HTTP.
- **Execution:** serial only. MCP tools never enter the parallel readonly path.
- **Surfaces:** tools only. Resources and prompts are not exposed.
- **Swarm:** MCP tools are excluded from Swarm worker registries in v1.
Expand Down
78 changes: 75 additions & 3 deletions agent/src/config/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from pydantic import ValidationError

from src.config.paths import get_config_path
from src.config.schema import AgentConfig, AgentConfigOverride
from src.config.schema import AgentConfig, AgentConfigOverride, MCPServerConfig

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -80,11 +80,19 @@ def merge_agent_config_overrides(
)
return config

merged = _merge_dicts(
merged = _merge_agent_config_dicts(
config.model_dump(mode="json"),
override_model.model_dump(mode="json", exclude_unset=True),
)
return AgentConfig.model_validate(merged)
try:
return AgentConfig.model_validate(merged)
except ValidationError as exc:
logger.warning(
"Ignoring merged agent config overrides after validation failure (%s): %s — using base config",
type(exc).__name__,
[str(e["loc"]) for e in exc.errors()],
)
return config


# Keys in session overrides that carry subprocess definitions and therefore
Expand Down Expand Up @@ -169,6 +177,70 @@ def _read_config_file(path: Path) -> dict[str, Any]:
return data


def _merge_agent_config_dicts(base: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]:
"""Merge top-level agent config payloads with MCP-aware server replacement."""
non_mcp_override = {key: value for key, value in override.items() if key != "mcp_servers"}
merged = _merge_dicts(base, non_mcp_override)

override_servers = override.get("mcp_servers")
if not isinstance(override_servers, dict):
if "mcp_servers" in override:
merged["mcp_servers"] = override_servers
return merged

merged_servers = dict(base.get("mcp_servers", {}))
for server_name, server_override in override_servers.items():
current_server = merged_servers.get(server_name)
if isinstance(current_server, dict) and isinstance(server_override, dict):
merged_servers[server_name] = _merge_mcp_server_dicts(current_server, server_override)
else:
merged_servers[server_name] = server_override

merged["mcp_servers"] = merged_servers
return merged


def _merge_mcp_server_dicts(base: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]:
"""Merge one MCP server payload, resetting incompatible transport fields when needed."""
if _override_switches_transport(base, override):
return _merge_dicts(_default_mcp_server_payload(base), override)
return _merge_dicts(base, override)


def _override_switches_transport(base: dict[str, Any], override: dict[str, Any]) -> bool:
"""Return whether a partial override changes the server transport family."""
override_transport = _resolve_override_transport(override)
if override_transport is None:
return False
base_transport = MCPServerConfig.model_validate(base)._resolved_transport()
return override_transport != base_transport


def _resolve_override_transport(override: dict[str, Any]) -> str | None:
"""Infer transport intent from a partial MCP server override."""
explicit_type = override.get("type")
if explicit_type in {"stdio", "sse", "streamableHttp"}:
return str(explicit_type)
if any(key in override for key in ("command", "args", "env")):
return "stdio"
return None


def _default_mcp_server_payload(base: dict[str, Any]) -> dict[str, Any]:
"""Return a transport-neutral MCP server payload preserving non-transport defaults."""
enabled_tools = base.get("enabled_tools")
return {
"type": None,
"command": "",
"args": [],
"env": {},
"url": "",
"headers": {},
"tool_timeout": base.get("tool_timeout", 30.0),
"enabled_tools": list(enabled_tools) if isinstance(enabled_tools, list) else ["*"],
}


def _merge_dicts(base: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]:
"""Recursively merge two plain dictionaries.

Expand Down
44 changes: 27 additions & 17 deletions agent/src/config/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,30 +38,40 @@ class MCPServerConfig(ConfigBase):
tool_timeout: float = Field(default=30.0, ge=0.1)
enabled_tools: list[str] = Field(default_factory=lambda: ["*"])

def _resolved_transport(self) -> Literal["stdio", "sse", "streamableHttp"]:
"""Resolve the effective transport from explicit type or implied fields."""
if self.type is not None:
return self.type
if self.command.strip() or self.args or self.env:
return "stdio"
if self.url.strip():
raise ValueError("HTTP MCP servers require an explicit type of 'sse' or 'streamableHttp'")
return "stdio"

@model_validator(mode="after")
def validate_v1_stdio_only(self) -> "MCPServerConfig":
"""Reject non-stdio transports for the first release.
def validate_transport_config(self) -> "MCPServerConfig":
"""Validate transport-specific MCP server configuration.

Returns:
The validated MCP server config instance.

Raises:
ValueError: If the config implies a non-stdio transport, uses
HTTP-only fields, or omits the command required for stdio.
ValueError: If required fields are missing for the resolved
transport or conflicting fields are provided.
"""
transport = self.type
if transport is None:
if self.command:
transport = "stdio"
elif self.url:
transport = "sse" if self.url.rstrip("/").endswith("/sse") else "streamableHttp"

if transport and transport != "stdio":
raise ValueError("Only stdio MCP servers are supported in v1")
if self.url or self.headers:
raise ValueError("HTTP MCP transports are not supported in v1")
if not self.command.strip():
raise ValueError("stdio MCP servers require a command")
transport = self._resolved_transport()

if transport == "stdio":
if not self.command.strip():
raise ValueError("stdio MCP servers require a command")
if self.url.strip() or self.headers:
raise ValueError("stdio MCP servers do not accept url/headers")
return self

if not self.url.strip():
raise ValueError(f"{transport} MCP servers require a url")
if self.command.strip() or self.args or self.env:
raise ValueError(f"{transport} MCP servers do not accept command/args/env")
return self


Expand Down
11 changes: 8 additions & 3 deletions agent/src/session/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ async def _run_with_agent(

session_id = attempt.session_id
attempt_id = attempt.attempt_id
loop = asyncio.get_running_loop()

safe_overrides = sanitize_session_overrides(session_config) if session_config else session_config
agent_config = load_runtime_agent_config(overrides=safe_overrides)
Expand All @@ -278,13 +279,18 @@ def _mcp_collision_warn(msg: str) -> None:
"""Forward MCP server-name collision warnings to the operator event channel."""
self.event_bus.emit(session_id, "mcp.warning", {"attempt_id": attempt_id, "message": msg})

agent = AgentLoop(
registry=build_registry(
registry = await loop.run_in_executor(
_AGENT_EXECUTOR,
lambda: build_registry(
persistent_memory=pm,
include_shell_tools=include_shell_tools,
agent_config=agent_config,
warn_callback=_mcp_collision_warn,
),
)

agent = AgentLoop(
registry=registry,
llm=llm,
event_callback=event_callback,
max_iterations=50,
Expand All @@ -296,7 +302,6 @@ def _mcp_collision_warn(msg: str) -> None:
history = self._convert_messages_to_history(messages) if messages else None

try:
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
_AGENT_EXECUTOR,
lambda: agent.run(
Expand Down
48 changes: 37 additions & 11 deletions agent/src/tools/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

from fastmcp.client import Client
from fastmcp.client.client import CallToolResult
from fastmcp.client.transports.http import StreamableHttpTransport
from fastmcp.client.transports.sse import SSETransport
from fastmcp.client.transports.stdio import StdioTransport
from fastmcp.exceptions import McpError, ToolError
from mcp import types as mcp_types
Expand Down Expand Up @@ -343,19 +345,40 @@ def call_tool(
}

def _build_client(self) -> AsyncMCPClient:
"""Create the default FastMCP stdio client.
"""Create the default FastMCP client from MCP transport config.

Returns:
Configured async FastMCP client.
"""
env = os.environ.copy()
env.update(self.server_config.env)
transport = StdioTransport(
command=self.server_config.command,
args=list(self.server_config.args),
env=env,
keep_alive=False,
)
transport_type = self.server_config.type
if transport_type is None:
if self.server_config.command.strip():
transport_type = "stdio"
elif self.server_config.url.strip():
transport_type = "sse" if self.server_config.url.rstrip("/").endswith("/sse") else "streamableHttp"
else:
transport_type = "stdio"
Comment thread
shadowinlife marked this conversation as resolved.
Outdated

if transport_type == "stdio":
env = os.environ.copy()
env.update(self.server_config.env)
transport = StdioTransport(
command=self.server_config.command,
args=list(self.server_config.args),
env=env,
keep_alive=False,
)
elif transport_type == "sse":
transport = SSETransport(
url=self.server_config.url,
headers=dict(self.server_config.headers) or None,
)
else:
transport = StreamableHttpTransport(
url=self.server_config.url,
headers=dict(self.server_config.headers) or None,
)

# Use a minimum of 30 s for init_timeout so cold-start servers (pip
# install, docker pull, slow imports) do not trip the same short
# deadline as a per-call tool_timeout.
Expand Down Expand Up @@ -388,7 +411,7 @@ async def _list_tools_once(self) -> list[mcp_types.Tool]:
return await client.list_tools()

async def _call_tool(self, remote_name: str, arguments: dict[str, Any]) -> CallToolResult:
"""Call a remote tool with timeout and a single transient retry.
"""Call a remote tool with timeout and no automatic retry.

Args:
remote_name: Remote tool name.
Expand All @@ -409,7 +432,10 @@ async def _invoke() -> CallToolResult:
raise_on_error=False,
)

return await self._run_with_retry(f"call_tool:{remote_name}", _invoke)
# Remote MCP tools are arbitrary and may mutate external state. If a
# timeout / connection drop happens after the server has already
# committed the action, retrying here would duplicate the side effect.
return await _invoke()

async def _run_with_retry(
self,
Expand Down
53 changes: 53 additions & 0 deletions agent/tests/fixtures/fake_mcp_sse_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""Minimal FastMCP SSE server for integration tests.

Launched as an HTTP subprocess by test_mcp_sse_integration.py.
Exposes two tools:
- echo(message: str) -> str — returns "echo: <message>"
- add(a: int, b: int) -> int — returns a + b

Usage (called by pytest, not directly):
python agent/tests/fixtures/fake_mcp_sse_server.py --port 0 --port-file /tmp/fake-mcp-sse.port
"""

from __future__ import annotations

import argparse
import asyncio
from pathlib import Path

from fastmcp import FastMCP
import uvicorn

mcp = FastMCP("fake-mcp-sse-server")


@mcp.tool()
def echo(message: str) -> str:
"""Echo a message back with a prefix."""
return f"echo: {message}"


@mcp.tool()
def add(a: int, b: int) -> int:
"""Add two integers."""
return a + b


def main() -> None:
parser = argparse.ArgumentParser(description="Minimal FastMCP SSE test server")
parser.add_argument("--port", type=int, default=18900, help="SSE port")
parser.add_argument("--path", default="/sse", help="SSE path")
parser.add_argument("--port-file", default="", help="Optional file to write the bound port")
args = parser.parse_args()

app = mcp.http_app(path=args.path, transport="sse")
config = uvicorn.Config(app, host="127.0.0.1", port=args.port, log_level="warning")
server = uvicorn.Server(config)
sock = config.bind_socket()
if args.port_file:
Path(args.port_file).write_text(str(sock.getsockname()[1]), encoding="utf-8")
asyncio.run(server.serve(sockets=[sock]))


if __name__ == "__main__":
main()
Loading
Loading