From 47afccc26a27e55451754bf08b2803b937959c7a Mon Sep 17 00:00:00 2001 From: georgiedekker Date: Sat, 10 May 2025 14:38:41 +0200 Subject: [PATCH 1/5] initial commit --- Dockerfile | 17 +++- backup_mcp_config.json | 19 ++++ compose.yml | 8 +- mcp_bridge/config/docker.py | 101 +++++++++++++++++++++ mcp_bridge/config/docker_config.py | 7 ++ mcp_bridge/config/final.py | 2 +- mcp_bridge/mcp_clients/DockerClient.py | 2 +- mcp_bridge/mcp_clients/McpClientManager.py | 2 +- mcp_bridge/mcp_clients/session.py | 16 ++++ mcp_config.json | 19 ++++ pyproject.toml | 4 +- 11 files changed, 187 insertions(+), 10 deletions(-) create mode 100644 backup_mcp_config.json create mode 100644 mcp_bridge/config/docker.py create mode 100644 mcp_bridge/config/docker_config.py create mode 100644 mcp_config.json diff --git a/Dockerfile b/Dockerfile index cfe1c15..bbc5aac 100644 --- a/Dockerfile +++ b/Dockerfile @@ -8,6 +8,21 @@ RUN apt-get update && apt-get install -y --no-install-recommends curl RUN curl -fsSL https://deb.nodesource.com/setup_20.x | bash - RUN apt-get install -y --no-install-recommends nodejs +# Install Docker CLI +RUN apt-get update && \ + apt-get install -y --no-install-recommends \ + ca-certificates \ + gnupg && \ + install -m 0755 -d /etc/apt/keyrings && \ + curl -fsSL https://download.docker.com/linux/debian/gpg -o /etc/apt/keyrings/docker.asc && \ + chmod a+r /etc/apt/keyrings/docker.asc && \ + echo \ + "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] https://download.docker.com/linux/debian \ + $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \ + tee /etc/apt/sources.list.d/docker.list > /dev/null && \ + apt-get update && \ + apt-get install -y --no-install-recommends docker-ce-cli + COPY pyproject.toml . ## FOR GHCR BUILD PIPELINE @@ -18,7 +33,7 @@ RUN uv sync COPY mcp_bridge mcp_bridge -EXPOSE 8000 +EXPOSE 3989 WORKDIR /mcp_bridge ENTRYPOINT ["uv", "run", "main.py"] diff --git a/backup_mcp_config.json b/backup_mcp_config.json new file mode 100644 index 0000000..1e8042b --- /dev/null +++ b/backup_mcp_config.json @@ -0,0 +1,19 @@ +{ + "mcp_servers": { + "fetch": { + "command": "uvx", + "args": ["mcp-server-fetch"] + }, + "github": { + "command": "docker", + "args": [ + "run", + "-i", + "--rm", + "-e", + "GITHUB_PERSONAL_ACCESS_TOKEN", + "ghcr.io/github/github-mcp-server" + ] + } + } +} \ No newline at end of file diff --git a/compose.yml b/compose.yml index b0744fb..652b246 100644 --- a/compose.yml +++ b/compose.yml @@ -8,11 +8,11 @@ services: action: rebuild container_name: mcp-bridge ports: - - "8000:8000" + - "3989:3989" environment: - - MCP_BRIDGE__CONFIG__FILE=config.json # mount the config file for this to work + - MCP_BRIDGE__CONFIG__FILE=mcp_config.json # mount the config file for this to work # - MCP_BRIDGE__CONFIG__HTTP_URL=http://10.88.100.170:8888/config.json # - MCP_BRIDGE__CONFIG__JSON= - # volumes: - # - ./config.json:/mcp_bridge/config.json + volumes: + - ./mcp_config.json:/mcp_bridge/mcp_config.json restart: unless-stopped diff --git a/mcp_bridge/config/docker.py b/mcp_bridge/config/docker.py new file mode 100644 index 0000000..da24841 --- /dev/null +++ b/mcp_bridge/config/docker.py @@ -0,0 +1,101 @@ +from aiodocker import Docker +from contextlib import asynccontextmanager +import anyio +import anyio.lowlevel +from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream +from loguru import logger +from pydantic import BaseModel, Field + +from mcp import types + +class DockerMCPServer(BaseModel): + container_name: str | None = Field(default=None, description="Name of the docker container") + image: str = Field(description="Image of the docker container") + args: list[str] = Field(default_factory=list, description="Command line arguments for the docker container") + env: dict[str, str] = Field(default_factory=dict, description="Environment variables for the docker container") + + +@asynccontextmanager +async def docker_client(server: DockerMCPServer): + """ + Client transport for Docker: this will connect to a server by + running a Docker container and communicating with it over its stdin/stdout. + """ + read_stream: MemoryObjectReceiveStream[types.JSONRPCMessage | Exception] + read_stream_writer: MemoryObjectSendStream[types.JSONRPCMessage | Exception] + + write_stream: MemoryObjectSendStream[types.JSONRPCMessage] + write_stream_reader: MemoryObjectReceiveStream[types.JSONRPCMessage] + + read_stream_writer, read_stream = anyio.create_memory_object_stream(0) + write_stream, write_stream_reader = anyio.create_memory_object_stream(0) + + docker = Docker() + + try: + # Pull the image if not available + await docker.images.pull(server.image) + + # Create the container + container = await docker.containers.create({ + "Image": server.image, + "Args": server.args, + "OpenStdin": True, + "AttachStdout": True, + "AttachStderr": True, + "Tty": False, + "HostConfig": {"AutoRemove": True}, + }) + + await container.start() + logger.debug(f"Started Docker container {container.id}") + + # Attach to the container's input/output streams + attach_result = container.attach(stdout=True, stdin=True) + + async def read_from_stdout(): + try: + async with read_stream_writer: + buffer = "" + while True: + msg = await attach_result.read_out() + if msg is None: + continue + chunk = msg.data + if isinstance(chunk, bytes): + chunk = chunk.decode("utf-8") + lines = (buffer + chunk).split("\n") + buffer = lines.pop() + + for line in lines: + try: + json_message = types.JSONRPCMessage.model_validate_json(line) + await read_stream_writer.send(json_message) + except Exception as exc: + await read_stream_writer.send(exc) + except anyio.ClosedResourceError: + await anyio.lowlevel.checkpoint() + + async def write_to_stdin(): + try: + async with write_stream_reader: + async for message in write_stream_reader: + json = message.model_dump_json(by_alias=True, exclude_none=True) + await attach_result.write_in(json.encode("utf-8") + b"\n") + except anyio.ClosedResourceError: + await anyio.lowlevel.checkpoint() + + try: + async with anyio.create_task_group() as tg: + tg.start_soon(read_from_stdout) + tg.start_soon(write_to_stdin) + yield read_stream, write_stream + finally: + await container.stop() + await container.delete() + + except Exception as e: + logger.error(f"Error in docker client: {e}") + finally: + await docker.close() + logger.debug("Docker client closed.") \ No newline at end of file diff --git a/mcp_bridge/config/docker_config.py b/mcp_bridge/config/docker_config.py new file mode 100644 index 0000000..aa26cf6 --- /dev/null +++ b/mcp_bridge/config/docker_config.py @@ -0,0 +1,7 @@ +from pydantic import BaseModel, Field + +class DockerMCPServer(BaseModel): + container_name: str | None = Field(default=None, description="Name of the docker container") + image: str = Field(description="Image of the docker container") + args: list[str] = Field(default_factory=list, description="Command line arguments for the docker container") + env: dict[str, str] = Field(default_factory=dict, description="Environment variables for the docker container") \ No newline at end of file diff --git a/mcp_bridge/config/final.py b/mcp_bridge/config/final.py index eaa79b9..c88a35c 100644 --- a/mcp_bridge/config/final.py +++ b/mcp_bridge/config/final.py @@ -3,7 +3,7 @@ from pydantic import BaseModel, Field from mcp.client.stdio import StdioServerParameters -from mcpx.client.transports.docker import DockerMCPServer +from .docker import DockerMCPServer class InferenceServer(BaseModel): diff --git a/mcp_bridge/mcp_clients/DockerClient.py b/mcp_bridge/mcp_clients/DockerClient.py index bf40af8..75d46a5 100644 --- a/mcp_bridge/mcp_clients/DockerClient.py +++ b/mcp_bridge/mcp_clients/DockerClient.py @@ -2,7 +2,7 @@ from mcp_bridge.mcp_clients.session import McpClientSession from mcp_bridge.config import config -from mcpx.client.transports.docker import docker_client, DockerMCPServer +from mcp_bridge.config.docker import docker_client, DockerMCPServer from .AbstractClient import GenericMcpClient from loguru import logger diff --git a/mcp_bridge/mcp_clients/McpClientManager.py b/mcp_bridge/mcp_clients/McpClientManager.py index 7bf6c89..e4bfa85 100644 --- a/mcp_bridge/mcp_clients/McpClientManager.py +++ b/mcp_bridge/mcp_clients/McpClientManager.py @@ -2,7 +2,7 @@ from loguru import logger from mcp import McpError, StdioServerParameters -from mcpx.client.transports.docker import DockerMCPServer +from mcp_bridge.config.docker import DockerMCPServer from mcp_bridge.config import config from mcp_bridge.config.final import SSEMCPServer diff --git a/mcp_bridge/mcp_clients/session.py b/mcp_bridge/mcp_clients/session.py index 56d1f94..13f0280 100644 --- a/mcp_bridge/mcp_clients/session.py +++ b/mcp_bridge/mcp_clients/session.py @@ -26,6 +26,8 @@ class McpClientSession( ] ): + + def __init__( self, read_stream: MemoryObjectReceiveStream[types.JSONRPCMessage | Exception], @@ -39,13 +41,27 @@ def __init__( types.ServerNotification, read_timeout_seconds=read_timeout_seconds, ) + self._read_stream = read_stream + # self.incoming_messages = read_stream + # print(f"self.incoming_messages: {self.incoming_messages}") + # print(f"self._incoming_messages: {self._incoming_messages}") async def __aenter__(self): session = await super().__aenter__() + if not hasattr(self, 'incoming_messages') or self.incoming_messages is None: + self.incoming_messages = self._read_stream + self._incoming_messages = self._read_stream + print(f"SETTING self.incoming_messages: {self.incoming_messages}") self._task_group.start_soon(self._consume_messages) return session async def _consume_messages(self): + # logger.info(f"Starting message consumer task for session {self.session_id}") + logger.info(f"McpClientSession has incoming_messages: {hasattr(self, 'incoming_messages')}") + if hasattr(self, 'incoming_messages') and self.incoming_messages is not None: + logger.info(f"incoming_messages type: {type(self.incoming_messages)}") + else: + logger.warning("incoming_messages not found or is None") try: async for message in self.incoming_messages: try: diff --git a/mcp_config.json b/mcp_config.json new file mode 100644 index 0000000..4092a92 --- /dev/null +++ b/mcp_config.json @@ -0,0 +1,19 @@ +{ + "mcp_servers": { + "fetch": { + "command": "uvx", + "args": ["mcp-server-fetch"] + }, + "github": { + "command": "docker", + "args": [ + "run", + "-i", + "--rm", + "-e", + "GITHUB_PERSONAL_ACCESS_TOKEN", + "ghcr.io/github/github-mcp-server" + ] + } + } +} \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index ef76626..bc11329 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,13 +11,13 @@ dependencies = [ "httpx-sse>=0.4.0", "lmos-openai-types", "loguru>=0.7.3", - "mcp>=1.2.0", - "mcpx[docker]>=0.1.1", + "mcp>=1.8.0", "pydantic>=2.10.4", "pydantic-settings>=2.7.0", "sse-starlette>=2.2.0", "tortoise-orm[asyncmy,asyncpg]>=0.23.0", "uvicorn>=0.34.0", + "aiodocker>=0.2.4" ] [tool.uv.sources] From 971d29eee65f979cdc36a19d19b6220727972043 Mon Sep 17 00:00:00 2001 From: georgiedekker Date: Sat, 10 May 2025 18:04:09 +0200 Subject: [PATCH 2/5] updates --- mcp_bridge/mcp_clients/session.py | 36 ++++++++++--- mcp_bridge/utils/__init__.py | 7 +++ mcp_bridge/utils/library_patcher.py | 83 +++++++++++++++++++++++++++++ mcp_bridge/utils/message_adapter.py | 52 ++++++++++++++++++ 4 files changed, 170 insertions(+), 8 deletions(-) create mode 100644 mcp_bridge/utils/__init__.py create mode 100644 mcp_bridge/utils/library_patcher.py create mode 100644 mcp_bridge/utils/message_adapter.py diff --git a/mcp_bridge/mcp_clients/session.py b/mcp_bridge/mcp_clients/session.py index 13f0280..ab95034 100644 --- a/mcp_bridge/mcp_clients/session.py +++ b/mcp_bridge/mcp_clients/session.py @@ -1,6 +1,7 @@ from datetime import timedelta from typing import Awaitable, Callable +import anyio from loguru import logger import mcp.types as types from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream @@ -10,12 +11,13 @@ from mcp_bridge import __version__ as version from mcp_bridge.sampling.sampler import handle_sampling_message +# Fix the import - import the functions before using them +from mcp_bridge.utils.message_adapter import wrap_message sampling_function_signature = Callable[ [types.CreateMessageRequestParams], Awaitable[types.CreateMessageResult] ] - class McpClientSession( BaseSession[ types.ClientRequest, @@ -42,6 +44,8 @@ def __init__( read_timeout_seconds=read_timeout_seconds, ) self._read_stream = read_stream + self.incoming_messages = read_stream + self._incoming_messages = read_stream # self.incoming_messages = read_stream # print(f"self.incoming_messages: {self.incoming_messages}") # print(f"self._incoming_messages: {self._incoming_messages}") @@ -56,28 +60,44 @@ async def __aenter__(self): return session async def _consume_messages(self): - # logger.info(f"Starting message consumer task for session {self.session_id}") + """ + Process messages from the incoming message stream, with robust error handling. + """ logger.info(f"McpClientSession has incoming_messages: {hasattr(self, 'incoming_messages')}") if hasattr(self, 'incoming_messages') and self.incoming_messages is not None: logger.info(f"incoming_messages type: {type(self.incoming_messages)}") else: logger.warning("incoming_messages not found or is None") + return + try: - async for message in self.incoming_messages: + async for original_message in self.incoming_messages: try: + # Wrap the message + message = wrap_message(original_message) + if isinstance(message, Exception): logger.error(f"Received exception in message stream: {message}") - elif isinstance(message, RequestResponder): - logger.debug(f"Received request: {message.request}") - elif isinstance(message, types.ServerNotification): - if isinstance(message.root, types.LoggingMessageNotification): + elif hasattr(message, 'request'): # For RequestResponder + logger.debug(f"Received request: {message.request}") + try: + await self._received_request(message) + except Exception as req_err: + logger.exception(f"Error handling request: {req_err}") + elif hasattr(message, 'root'): + if hasattr(message.root, 'params') and hasattr(types, 'LoggingMessageNotification') and hasattr(types, 'ServerNotification') and isinstance(message, types.ServerNotification) and isinstance(message.root, types.LoggingMessageNotification): logger.debug(f"Received notification from server: {message.root.params}") else: logger.debug(f"Received notification from server: {message}") else: - logger.debug(f"Received notification: {message}") + logger.debug(f"Received message: {message}") + except anyio.ClosedResourceError: + logger.debug("Message stream closed") + break except Exception as e: logger.exception(f"Error processing message: {e}") + except anyio.ClosedResourceError: + logger.debug("Message stream closed") except Exception as e: logger.exception(f"Message consumer task failed: {e}") diff --git a/mcp_bridge/utils/__init__.py b/mcp_bridge/utils/__init__.py new file mode 100644 index 0000000..6e4ab42 --- /dev/null +++ b/mcp_bridge/utils/__init__.py @@ -0,0 +1,7 @@ +# mcp_bridge/utils/__init__.py + +# Import and apply patches +from .library_patcher import apply_patches + +# Apply patches at module import time +original_functions = apply_patches() \ No newline at end of file diff --git a/mcp_bridge/utils/library_patcher.py b/mcp_bridge/utils/library_patcher.py new file mode 100644 index 0000000..3acc22f --- /dev/null +++ b/mcp_bridge/utils/library_patcher.py @@ -0,0 +1,83 @@ +# mcp_bridge/utils/library_patcher.py + +import json +from loguru import logger +import mcp.types +from mcp.types import JSONRPCMessage +from mcp.shared.message import SessionMessage + +def patch_jsonrpc_message(): + """ + Patch the JSONRPCMessage class to add the 'message' property. + """ + # Check if the class is already patched + if hasattr(JSONRPCMessage, '_original_getattr'): + logger.info("JSONRPCMessage already patched") + return + + # Store the original __getattr__ method + original_getattr = JSONRPCMessage.__getattr__ + + # Define the patched method + def patched_getattr(self, name): + # If trying to access 'message', return self + if name == 'message': + return self + + # Otherwise, use the original method + return original_getattr(self, name) + + # Apply the patch + JSONRPCMessage._original_getattr = original_getattr + JSONRPCMessage.__getattr__ = patched_getattr + + logger.info("Successfully patched JSONRPCMessage.__getattr__") + + return original_getattr + +def patch_session_message(): + """ + Patch the SessionMessage class to add missing serialization methods. + """ + # Skip if already patched + if hasattr(SessionMessage, 'model_dump_json'): + logger.info("SessionMessage already patched") + return + + # Add model_dump_json method to SessionMessage + def model_dump_json(self, **kwargs): + """Convert the message to JSON string, compatible with Pydantic v2.""" + # If message has its own model_dump_json method, use it + if hasattr(self.message, 'model_dump_json'): + return self.message.model_dump_json(**kwargs) + + # If message has dict method (Pydantic v1), use it + if hasattr(self.message, 'dict'): + # Convert to dict then to JSON + message_dict = self.message.dict(**kwargs) + return json.dumps(message_dict) + + # Fallback: try to convert to dict directly + try: + return json.dumps(self.message.__dict__) + except: + # Last resort, just stringify + return json.dumps(str(self.message)) + + # Add the method to the class + SessionMessage.model_dump_json = model_dump_json + + logger.info("Successfully patched SessionMessage with model_dump_json") + + return True + +def apply_patches(): + """Apply all necessary patches to the MCP library.""" + original_getattr = patch_jsonrpc_message() + session_message_patched = patch_session_message() + + logger.info("All MCP library patches applied successfully") + return { + "original_getattr": original_getattr, + "session_message_patched": session_message_patched + } \ No newline at end of file diff --git a/mcp_bridge/utils/message_adapter.py b/mcp_bridge/utils/message_adapter.py new file mode 100644 index 0000000..83fd2ec --- /dev/null +++ b/mcp_bridge/utils/message_adapter.py @@ -0,0 +1,52 @@ +# mcp_bridge/utils/message_adapter.py + +from typing import Any + +class MessageWrapper: + """ + A wrapper class for JSONRPCMessage objects that provides the expected structure. + Only used in client-side code, not for patching the library. + """ + def __init__(self, original_message): + self._original = original_message + + @property + def message(self): + """Return self to provide the message.message.root structure""" + return self + + @property + def root(self): + """Forward root access to the original message""" + if hasattr(self._original, 'root'): + return self._original.root + return None + + def __getattr__(self, name): + """Forward all other attribute access to the original message""" + return getattr(self._original, name) + +def wrap_message(message: Any) -> Any: + """ + Wrap a message object to provide the expected message.message.root structure. + """ + if message is None or isinstance(message, Exception): + return message + + # No need to wrap if it's already a MessageWrapper + if isinstance(message, MessageWrapper): + return message + + # Wrap the message + return MessageWrapper(message) + +# Add alias for backward compatibility +adapt_jsonrpc_message = wrap_message + +# Dummy function for backward compatibility +def patch_base_session(): + """ + Dummy function for backward compatibility. + The actual patching is done in library_patcher.py + """ + return None \ No newline at end of file From e8ce8d7e14544548603b51449708d0e5e67c12da Mon Sep 17 00:00:00 2001 From: georgiedekker Date: Sun, 11 May 2025 07:26:28 +0200 Subject: [PATCH 3/5] small bridge updates, working now --- mcp_bridge/mcp_clients/McpClientManager.py | 126 +++++++++++++++------ mcp_bridge/mcp_server/server.py | 122 +++++++++++++++----- mcp_bridge/mcp_server/sse.py | 32 +++++- 3 files changed, 214 insertions(+), 66 deletions(-) diff --git a/mcp_bridge/mcp_clients/McpClientManager.py b/mcp_bridge/mcp_clients/McpClientManager.py index e4bfa85..708b62b 100644 --- a/mcp_bridge/mcp_clients/McpClientManager.py +++ b/mcp_bridge/mcp_clients/McpClientManager.py @@ -1,4 +1,5 @@ from typing import Union +import asyncio from loguru import logger from mcp import McpError, StdioServerParameters @@ -18,35 +19,71 @@ class MCPClientManager: clients: dict[str, client_types] = {} async def initialize(self): - """Initialize the MCP Client Manager and start all clients""" + """Initialize the MCP Client Manager and start all clients concurrently.""" + logger.info("Initializing MCP Client Manager") - logger.log("DEBUG", "Initializing MCP Client Manager") + construction_coroutines = [] + server_names_ordered = [] # To map results back to names for server_name, server_config in config.mcp_servers.items(): - self.clients[server_name] = await self.construct_client( - server_name, server_config - ) - - async def construct_client(self, name, server_config) -> client_types: - logger.log("DEBUG", f"Constructing client for {server_config}") - - if isinstance(server_config, StdioServerParameters): - client = StdioClient(name, server_config) - await client.start() - return client - - if isinstance(server_config, SSEMCPServer): - # TODO: implement sse client - client = SseClient(name, server_config) # type: ignore - await client.start() - return client - - if isinstance(server_config, DockerMCPServer): - client = DockerClient(name, server_config) - await client.start() - return client - - raise NotImplementedError("Client Type not supported") + logger.info(f"Preparing to initialize MCP client: {server_name} with config: {server_config}") + construction_coroutines.append(self.construct_and_start_client(server_name, server_config)) + server_names_ordered.append(server_name) + + if not construction_coroutines: + logger.info("No MCP clients configured to initialize.") + return + + logger.info(f"Attempting to concurrently initialize {len(construction_coroutines)} MCP clients...") + # The construct_and_start_client will return (name, client_instance) or (name, None) on failure + results = await asyncio.gather(*construction_coroutines, return_exceptions=True) + + for i, result in enumerate(results): + server_name = server_names_ordered[i] + if isinstance(result, Exception): + logger.error(f"Exception during initialization of MCP client: {server_name}. Error: {result}", exc_info=result) + elif result is None: # Should not happen if construct_and_start_client returns (name, None) + logger.error(f"Failed to initialize or start MCP client: {server_name}. No client instance returned.") + elif isinstance(result, tuple) and len(result) == 2: + client_instance = result[1] + if client_instance: + self.clients[server_name] = client_instance + logger.info(f"Successfully initialized and started MCP client: {server_name}") + else: # Explicitly handled None for client_instance from construct_and_start_client + logger.error(f"Failed to initialize or start MCP client: {server_name}. Construction returned None.") + else: + logger.error(f"Unexpected result type during initialization of MCP client: {server_name}. Result: {result}") + + async def construct_and_start_client(self, name, server_config) -> tuple[str, client_types | None]: + """Constructs and starts a single client, returns (name, client_instance | None).""" + logger.info(f"Constructing client '{name}' for type: {type(server_config)}") + try: + client_instance: client_types | None = None + if isinstance(server_config, StdioServerParameters): + command_to_log = server_config.command if hasattr(server_config, 'command') else "Not specified" + args_to_log = server_config.args if hasattr(server_config, 'args') else [] + logger.info(f"Creating StdioClient for '{name}' with command: '{command_to_log}' and args: '{args_to_log}'") + client_instance = StdioClient(name, server_config) + await client_instance.start() + logger.info(f"StdioClient '{name}' started.") + elif isinstance(server_config, SSEMCPServer): + logger.info(f"Creating SseClient for '{name}' with URL: '{server_config.url}'") + client_instance = SseClient(name, server_config) # type: ignore + await client_instance.start() + logger.info(f"SseClient '{name}' started.") + elif isinstance(server_config, DockerMCPServer): + logger.info(f"Creating DockerClient for '{name}' with image: '{server_config.image_name}'") + client_instance = DockerClient(name, server_config) + await client_instance.start() + logger.info(f"DockerClient '{name}' started.") + else: + logger.error(f"Unsupported MCP server config type for client '{name}': {type(server_config)}") + # raise NotImplementedError("Client Type not supported") # Don't raise, return None + return name, None + return name, client_instance + except Exception as e: + logger.error(f"Error constructing or starting client {name}: {e}", exc_info=True) + return name, None # Return name and None on failure def get_client(self, server_name: str): return self.clients[server_name] @@ -55,34 +92,57 @@ def get_clients(self): return list(self.clients.items()) async def get_client_from_tool(self, tool: str): + logger.info(f"Attempting to find client for tool: '{tool}'") for name, client in self.get_clients(): - + logger.debug(f"Checking client '{name}' for tool '{tool}'") # client cannot have tools if it is not connected if not client.session: + logger.warning(f"Client '{name}' session not active, skipping for tool '{tool}'") continue try: - list_tools = await client.session.list_tools() - for client_tool in list_tools.tools: + logger.debug(f"Calling list_tools() on client '{name}' for tool '{tool}'") + list_tools_response = await client.session.list_tools() + logger.debug(f"Client '{name}' returned tools: {list_tools_response.tools}") + for client_tool in list_tools_response.tools: if client_tool.name == tool: + logger.info(f"Found tool '{tool}' in client '{name}'") return client - except McpError: + except McpError as e: + logger.error(f"McpError while listing tools for client '{name}': {e}", exc_info=True) continue + except Exception as e: + logger.error(f"Unexpected error while listing tools for client '{name}': {e}", exc_info=True) + continue + logger.warning(f"Tool '{tool}' not found in any active client.") + return None # Explicitly return None if not found async def get_client_from_prompt(self, prompt: str): + logger.info(f"Attempting to find client for prompt: '{prompt}'") for name, client in self.get_clients(): + logger.debug(f"Checking client '{name}' for prompt '{prompt}'") # client cannot have prompts if it is not connected if not client.session: + logger.warning(f"Client '{name}' session not active, skipping for prompt '{prompt}'") continue try: - list_prompts = await client.session.list_prompts() - for client_prompt in list_prompts.prompts: + logger.debug(f"Calling list_prompts() on client '{name}' for prompt '{prompt}'") + list_prompts_response = await client.session.list_prompts() + logger.debug(f"Client '{name}' returned prompts: {list_prompts_response.prompts}") + for client_prompt in list_prompts_response.prompts: if client_prompt.name == prompt: + logger.info(f"Found prompt '{prompt}' in client '{name}'") return client - except McpError: + except McpError as e: + logger.error(f"McpError while listing prompts for client '{name}': {e}", exc_info=True) + continue + except Exception as e: + logger.error(f"Unexpected error while listing prompts for client '{name}': {e}", exc_info=True) continue + logger.warning(f"Prompt '{prompt}' not found in any active client.") + return None # Explicitly return None if not found ClientManager = MCPClientManager() diff --git a/mcp_bridge/mcp_server/server.py b/mcp_bridge/mcp_server/server.py index 030bf59..a358c6a 100644 --- a/mcp_bridge/mcp_server/server.py +++ b/mcp_bridge/mcp_server/server.py @@ -4,6 +4,7 @@ from pydantic import AnyUrl from mcp_bridge.mcp_clients.McpClientManager import ClientManager from loguru import logger +import asyncio __all__ = ["server", "options"] @@ -15,26 +16,64 @@ @server.list_prompts() async def list_prompts() -> list[types.Prompt]: prompts = [] - for name, client in ClientManager.get_clients(): - # if client is None, then we cannot list the prompts - if client is None: - logger.error(f"Client '{name}' not found") - continue - - client_prompts = await client.list_prompts() - prompts.extend(client_prompts.prompts) + logger.info("Aggregating prompts from all managed MCP clients concurrently.") + + active_clients = [(name, client) for name, client in ClientManager.get_clients() if client and client.session] + if not active_clients: + logger.info("No active clients to fetch prompts from.") + return [] + + coroutines = [] + for name, client in active_clients: + logger.debug(f"Preparing to list prompts from client session: {name}") + coroutines.append(client.session.list_prompts()) + + logger.info(f"Gathering prompts from {len(coroutines)} active client sessions.") + results = await asyncio.gather(*coroutines, return_exceptions=True) + + for i, result in enumerate(results): + name, client = active_clients[i] # Get corresponding client name + if isinstance(result, Exception): + logger.error(f"Error listing prompts for client '{name}': {result}", exc_info=result) + elif result and result.prompts: + logger.debug(f"Client '{name}' provided prompts: {[p.name for p in result.prompts]}") + prompts.extend(result.prompts) + else: + logger.debug(f"Client '{name}' provided no prompts or an empty response.") + + logger.info(f"Finished prompt aggregation. Total prompts aggregated: {len(prompts)}. Names: {[p.name for p in prompts]}") return prompts @server.list_resources() async def list_resources() -> list[types.Resource]: resources = [] - for name, client in ClientManager.get_clients(): - try: - client_resources = await client.list_resources() - resources.extend(client_resources.resources) - except Exception as e: - logger.error(f"Error listing resources for {name}: {e}") + logger.info("Aggregating resources from all managed MCP clients concurrently.") + + active_clients = [(name, client) for name, client in ClientManager.get_clients() if client and client.session] + if not active_clients: + logger.info("No active clients to fetch resources from.") + return [] + + coroutines = [] + for name, client in active_clients: + logger.debug(f"Preparing to list resources from client session: {name}") + coroutines.append(client.session.list_resources()) + + logger.info(f"Gathering resources from {len(coroutines)} active client sessions.") + results = await asyncio.gather(*coroutines, return_exceptions=True) + + for i, result in enumerate(results): + name, client = active_clients[i] # Get corresponding client name + if isinstance(result, Exception): + logger.error(f"Error listing resources for client '{name}': {result}", exc_info=result) + elif result and result.resources: + logger.debug(f"Client '{name}' provided {len(result.resources)} resources.") + resources.extend(result.resources) + else: + logger.debug(f"Client '{name}' provided no resources or an empty response.") + + logger.info(f"Finished resource aggregation. Total resources aggregated: {len(resources)}.") return resources @@ -46,14 +85,32 @@ async def list_resource_templates() -> list[types.ResourceTemplate]: @server.list_tools() async def list_tools() -> list[types.Tool]: tools = [] - for name, client in ClientManager.get_clients(): - # if client is None, then we cannot list the tools - if client is None: - logger.error(f"Client '{name}' not found") - continue - - client_tools = await client.list_tools() - tools.extend(client_tools.tools) + logger.info("Aggregating tools from all managed MCP clients concurrently.") + + active_clients = [(name, client) for name, client in ClientManager.get_clients() if client and client.session] + if not active_clients: + logger.info("No active clients to fetch tools from.") + return [] + + coroutines = [] + for name, client in active_clients: + logger.debug(f"Preparing to list tools from client session: {name}") + coroutines.append(client.session.list_tools()) + + logger.info(f"Gathering tools from {len(coroutines)} active client sessions.") + results = await asyncio.gather(*coroutines, return_exceptions=True) + + for i, result in enumerate(results): + name, client = active_clients[i] # Get corresponding client name + if isinstance(result, Exception): + logger.error(f"Error listing tools for client '{name}': {result}", exc_info=result) + elif result and result.tools: + logger.debug(f"Client '{name}' provided tools: {[t.name for t in result.tools]}") + tools.extend(result.tools) + else: + logger.debug(f"Client '{name}' provided no tools or an empty response.") + + logger.info(f"Finished tool aggregation. Total tools aggregated: {len(tools)}. Names: {[t.name for t in tools]}") return tools @@ -71,11 +128,13 @@ async def get_prompt(name: str, args: dict[str, str] | None) -> types.GetPromptR # if args is None, then we should use an empty dict if args is None: args = {} - + + logger.info(f"Getting prompt '{name}' with args '{args}' from client: {client.name if client else 'Unknown'}") result = await client.get_prompt(name, args) if result is None: + logger.error(f"Prompt '{name}' not found by client {client.name if client else 'Unknown'}") raise Exception(f"Prompt '{name}' not found") - + logger.info(f"Successfully retrieved prompt '{name}'.") return result @@ -112,17 +171,24 @@ async def handle_read_resource(uri: AnyUrl) -> str | bytes: async def handle_call_tool( name: str, arguments: dict | None ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: + logger.info(f"Attempting to call tool '{name}' with arguments: {arguments}") client = await ClientManager.get_client_from_tool(name) - # if client is None, then we cannot call the tool if client is None: + logger.error(f"Tool '{name}' could not be mapped to any managed client.") raise Exception(f"Tool '{name}' not found") - # if arguments is None, then we should use an empty dict if arguments is None: arguments = {} - - return (await client.call_tool(name, arguments)).content + + logger.info(f"Calling tool '{name}' on resolved client '{client.name}' with arguments: {arguments}") + try: + tool_result = await client.call_tool(name, arguments) + logger.info(f"Tool '{name}' executed successfully by client '{client.name}'. Result content items: {len(tool_result.content) if tool_result else 'None'}") + return tool_result.content + except Exception as e: + logger.error(f"Error calling tool '{name}' on client '{client.name}': {e}", exc_info=True) + raise # options diff --git a/mcp_bridge/mcp_server/sse.py b/mcp_bridge/mcp_server/sse.py index 4d5bb3b..49b5001 100644 --- a/mcp_bridge/mcp_server/sse.py +++ b/mcp_bridge/mcp_server/sse.py @@ -15,23 +15,45 @@ @router.get("/", response_class=StreamingResponse) async def handle_sse(request: Request): - logger.info("new incoming SSE connection established") + logger.info("New incoming SSE connection established from client.") async with sse.connect_sse(request) as streams: try: + logger.info(f"SSE streams acquired. Handing off to MCP server run method. Streams: {streams}") await server.run(streams[0], streams[1], options) + logger.info("MCP server run method completed for SSE connection.") except BrokenResourceError: + logger.warning("SSE connection BrokenResourceError.") pass except asyncio.CancelledError: + logger.warning("SSE connection CancelledError.") pass - except ValidationError: + except ValidationError as ve: + logger.error(f"SSE handler ValidationError: {ve}", exc_info=True) pass - except Exception: + except Exception as e: + logger.error(f"Unexpected error in SSE handler: {e}", exc_info=True) raise await request.close() + logger.info("SSE connection closed and request finalized.") @router.post("/messages") async def handle_messages(request: Request): - logger.info("incoming SSE message received") + client_host = request.client.host if request.client else "Unknown_Host" + client_port = request.client.port if request.client else "Unknown_Port" + logger.info(f"Incoming POST /messages from {client_host}:{client_port}.") + + # Removed explicit body reading here. Let sse.handle_post_message consume the body. + # try: + # body_bytes = await request.body() + # try: + # body_str = body_bytes.decode('utf-8') + # logger.debug(f"POST /messages request body (decoded): {body_str}") + # except UnicodeDecodeError: + # logger.warning(f"POST /messages request body (raw bytes, could not decode as UTF-8): {body_bytes}") + # except Exception as e: + # logger.error(f"Error reading request body for POST /messages: {e}", exc_info=True) + + logger.info(f"Forwarding POST /messages from {client_host}:{client_port} to sse.handle_post_message.") await sse.handle_post_message(request.scope, request.receive, request._send) - await request.close() + logger.info(f"sse.handle_post_message completed for POST /messages from {client_host}:{client_port}.") From a2ae3dea9ca5b49a22a7476c7270a84eb80d72ff Mon Sep 17 00:00:00 2001 From: georgiedekker Date: Mon, 12 May 2025 14:51:03 +0200 Subject: [PATCH 4/5] updated gitignore --- .gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 74906da..26d179d 100644 --- a/.gitignore +++ b/.gitignore @@ -160,7 +160,8 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ - +.gitignore +.DS_Store ## custom commands.md From deeb6a335d0d163ae2bb5d87f1725d0a4ba6b1da Mon Sep 17 00:00:00 2001 From: georgiedekker Date: Tue, 3 Jun 2025 15:53:36 +0200 Subject: [PATCH 5/5] Update mcp_bridge/mcp_clients/session.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- mcp_bridge/mcp_clients/session.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mcp_bridge/mcp_clients/session.py b/mcp_bridge/mcp_clients/session.py index ab95034..ed94588 100644 --- a/mcp_bridge/mcp_clients/session.py +++ b/mcp_bridge/mcp_clients/session.py @@ -55,7 +55,7 @@ async def __aenter__(self): if not hasattr(self, 'incoming_messages') or self.incoming_messages is None: self.incoming_messages = self._read_stream self._incoming_messages = self._read_stream - print(f"SETTING self.incoming_messages: {self.incoming_messages}") + logger.debug(f"SETTING self.incoming_messages: {self.incoming_messages}") self._task_group.start_soon(self._consume_messages) return session