Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
Empty file.
3 changes: 3 additions & 0 deletions src/archi/mcp_servers/submit_status/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from src.archi.mcp_servers.submit_status.server import mcp

mcp.run()
165 changes: 165 additions & 0 deletions src/archi/mcp_servers/submit_status/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import asyncio
import os
import shlex

from pathlib import Path
from mcp.server.fastmcp import FastMCP
from src.utils.logging import get_logger


logger = get_logger(__name__)

mcp = FastMCP("submit-status")

LOGIN_NODES = ["submit00", "submit01", "submit02", "submit03", "submit04", "submit05", "submit06", "submit07", "submit08"]
CEPH_NODES = ["submit50", "submit51", "submit52", "submit53", "submit54", "submit55", "submit56", "submit57", "submit58", "submit59"]
SCRATCH_NODES = ["submit30"]
GPU_NODES: list[str] = [] # e.g. ["submitgpu01", "submitgpu02"]
CPU_NODES: list[str] = [] # e.g. ["submitcpu01", "submitcpu02"]
ALL_NODES = LOGIN_NODES + SCRATCH_NODES + CEPH_NODES + GPU_NODES + CPU_NODES

_ALL_NODES_SET = set(ALL_NODES)




# SSH user and key to connect as. The container runs as root but the submit nodes only
# authorize the cluster user's key. Set SUBMIT_SSH_USER and SUBMIT_SSH_KEY in the
# deployment env. The resulting command is: ssh -i <key> -l <user> <host> <cmd>
_SSH_USER: str = os.environ.get("SUBMIT_SSH_USER", "")
_SSH_KEY: str = os.path.expanduser(os.environ.get("SUBMIT_SSH_KEY", ""))

_SSH_TIMEOUT = 30


def _ssh_opts() -> list[str]:
opts = [
"-o", "BatchMode=yes",
"-o", "ConnectTimeout=5",
"-o", "StrictHostKeyChecking=accept-new",
]
if _SSH_KEY:
opts += ["-i", _SSH_KEY]
if _SSH_USER:
opts += ["-l", _SSH_USER]
return opts


def _validate_machine(machine: str, allowed: set[str]) -> str | None:
if machine not in allowed:
return f"Unknown machine '{machine}'. Allowed: {sorted(allowed)}"
return None


async def _ssh(host: str, command: str, timeout: int = _SSH_TIMEOUT) -> str:
fqdn = host if "." in host else f"{host}.mit.edu"
try:
proc = await asyncio.create_subprocess_exec(
"ssh", *_ssh_opts(), fqdn, command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
if proc.returncode == 0:
logger.info("ssh %s: command succeeded: %s", host, command)
return stdout.decode().strip()
error = stderr.decode().strip() or f"ssh exited with code {proc.returncode}"
logger.warning("ssh %s: command failed (rc=%d): %s", host, proc.returncode, error)
return error
except asyncio.TimeoutError:
logger.warning("ssh %s: timed out after %ds running: %s", host, timeout, command)
return f"SSH to {host} timed out after {timeout}s"
except Exception as exc:
logger.error("ssh %s: unexpected error running '%s': %s", host, command, exc)
return f"SSH error connecting to {host}: {exc}"


async def _ssh_multi(hosts: list[str], command: str) -> str:
"""Run the same command on multiple hosts in parallel, return combined results."""
outputs = await asyncio.gather(*[_ssh(h, command) for h in hosts])
return "\n\n".join(f"### {host}\n{out}" for host, out in zip(hosts, outputs))




# ---------------------------------------------------------------------------
# General MCP Tools
# ---------------------------------------------------------------------------

_ALLOWED_COMMANDS: dict[str, set[str] | None] = {
"df": None,
"free": None,
"uptime": None,
"mount": None,
"w": None,
"uname": None,
"condor_q": None,
"condor_status": None,
"condor_history": None,
"ceph": {"-s", "status", "health", "df", "osd", "mon"},
"rados": {"df", "lspools"},
"systemctl": {"status", "list-units", "show", "is-active", "is-enabled"},
"journalctl": None,
}


def _validate_command(command: str, args: list[str]) -> str | None:
allowed_first_args = _ALLOWED_COMMANDS.get(command)
if allowed_first_args is None and command not in _ALLOWED_COMMANDS:
return f"Command '{command}' is not whitelisted. Allowed: {sorted(_ALLOWED_COMMANDS)}"
if allowed_first_args is not None and args and args[0] not in allowed_first_args:
return (
f"Argument '{args[0]}' is not allowed as the first argument to '{command}'. "
f"Allowed: {sorted(allowed_first_args)}"
)
return None


@mcp.tool()
async def run_diagnostic(machine: str, command: str, args: list[str] | None = None) -> str:
"""
Run a read-only diagnostic command on a submit node, choosing both the
command and its arguments yourself from a fixed whitelist of safe binaries.
Valid machines: all nodes (submit00-08, submit30, submit50-59).

Whitelisted commands: df, free, uptime, mount, w, uname, condor_q, condor_status,
condor_history, ceph (status/health/df/osd/mon subcommands only),
rados (df/lspools only), systemctl (status/list-units/show/is-active/is-enabled only),
journalctl.

Each argument is passed as a separate, independently-quoted token — you
cannot chain commands, pipe, redirect, or use shell substitution. Use this
for one-off diagnostic queries not covered by a more specific tool.

Services of interest:
submit00:
- condor.service
- crond.service


Examples: command="df", args=["-h", "/scratch"]
command="systemctl", args=["status", "condor.service"]
command="journalctl", args=["-u", "condor.service", "-n", "50", "--no-pager"]
"""
if err := _validate_machine(machine, _ALL_NODES_SET):
return err
args = args or []
if err := _validate_command(command, args):
return err
full_command = " ".join(shlex.quote(tok) for tok in [command, *args])
return await _ssh(machine, full_command)


@mcp.tool()
async def run_diagnostic_all(command: str, args: list[str] | None = None) -> str:
"""
Run a whitelisted read-only diagnostic command on all submit nodes in parallel.
Same whitelist and argument rules as run_diagnostic — see its description for
the allowed commands and examples. Returns output from every node grouped by hostname.
Useful for cluster-wide checks like uptime, free, or df on a specific mount.
"""
args = args or []
if err := _validate_command(command, args):
return err
full_command = " ".join(shlex.quote(tok) for tok in [command, *args])
return await _ssh_multi(ALL_NODES, full_command)
15 changes: 14 additions & 1 deletion src/archi/pipelines/agents/base_react.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from src.archi.pipelines.agents.utils.run_memory import RunMemory
from src.archi.pipelines.agents.utils.mcp_utils import AsyncLoopThread
from src.archi.pipelines.agents.tools import initialize_mcp_client
from src.utils.config_access import get_mcp_servers_config
from src.utils.logging import get_logger

logger = get_logger(__name__)
Expand Down Expand Up @@ -1133,14 +1134,26 @@ def _build_static_tools(self) -> List[Callable]:
static_names = [name for name in selected if name != "mcp"]
return self._select_tools_from_registry(static_names)

def get_mcp_servers_config(self) -> Dict[str, Any]:
"""
Return MCP server config for this agent: any BUILTIN_MCP_SERVERS defined
by a concrete agent subclass, merged with servers from the deployment config.
"""
builtin_servers = getattr(self, "BUILTIN_MCP_SERVERS", {})
return {**builtin_servers, **get_mcp_servers_config()}

def _build_mcp_tools(self) -> List[Callable]:
"""Retrieve MCP tools from servers defined in the config and keep those server connections alive"""
try:
mcp_servers = self.get_mcp_servers_config()
if not mcp_servers:
logger.info("No MCP servers configured for %s.", self.__class__.__name__)
return None
self._async_runner = AsyncLoopThread.get_instance()

# Initialize MCP client on the background loop
# The client and sessions will live on this loop
client, mcp_tools, skills_text = self._async_runner.run(initialize_mcp_client())
client, mcp_tools, skills_text = self._async_runner.run(initialize_mcp_client(servers=mcp_servers))
if client is None:
logger.info("No MCP servers configured.")
return None
Expand Down
10 changes: 9 additions & 1 deletion src/archi/pipelines/agents/cms_comp_ops_agent.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import json
import sys
from typing import Any, Callable, Dict, List

from src.utils.logging import get_logger
Expand All @@ -13,7 +14,6 @@
create_metadata_search_tool,
create_metadata_schema_tool,
create_retriever_tool,
initialize_mcp_client,
RemoteCatalogClient,
MONITOpenSearchClient,
create_monit_opensearch_search_tool,
Expand All @@ -27,6 +27,14 @@
class CMSCompOpsAgent(BaseReActAgent):
"""Agent designed for CMS CompOps operations."""

BUILTIN_MCP_SERVERS = {
"submit_status": {
"transport": "stdio",
"command": sys.executable,
"args": ["-m", "src.archi.mcp_servers.submit_status"],
},
}

def __init__(
self,
config: Dict[str, Any],
Expand Down
8 changes: 6 additions & 2 deletions src/archi/pipelines/agents/tools/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@

logger = get_logger(__name__)

async def initialize_mcp_client() -> Tuple[Optional[MultiServerMCPClient], List[BaseTool], str]:
async def initialize_mcp_client(servers: dict | None = None) -> Tuple[Optional[MultiServerMCPClient], List[BaseTool], str]:
"""
Initializes the MCP client and fetches tool definitions.

Args:
servers: If provided, use these server definitions directly instead of
reading from the deployment config.
Returns:
client: The active client instance (must be kept alive by the caller).
tools: The list of LangChain-compatible tools.
Expand All @@ -25,7 +29,7 @@ async def initialize_mcp_client() -> Tuple[Optional[MultiServerMCPClient], List[
the content doesn't multiply by tool count.
"""

mcp_servers = get_mcp_servers_config()
mcp_servers = servers if servers is not None else get_mcp_servers_config()

# Strip archi-only fields that langchain-mcp-adapters doesn't understand.
# These are consumed by the compose template (sidecars), the legacy stdio
Expand Down
1 change: 1 addition & 0 deletions src/cli/templates/base-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ services:
{%- for host_path in (host_file_mounts | default([])) %}
- {{ host_path }}:{{ host_path }}:ro
{%- endfor %}
- ${HOME}/.ssh:/root/.ssh:ro
{#- Build the MCP auto-install command for stdio packages with a path #}
{%- set mcp_installs = [] %}
{%- for srv_name, srv_cfg in (mcp_servers | default({})).items() %}
Expand Down
1 change: 1 addition & 0 deletions src/cli/templates/dockerfiles/Dockerfile-chat
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ RUN apt-get update && apt-get install -y \
gnupg \
git \
ca-certificates \
openssh-client \
--no-install-recommends \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
Expand Down
1 change: 1 addition & 0 deletions src/cli/templates/dockerfiles/Dockerfile-chat-gpu
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ RUN apt-get update && \
wget \
git \
ca-certificates \
openssh-client \
libgtk-3-0 \
libdbus-glib-1-2 \
libasound2 \
Expand Down
Loading