diff --git a/.gitignore b/.gitignore
index 74906da..26d179d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -160,7 +160,8 @@ cython_debug/
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
-
+.gitignore
+.DS_Store
## custom
commands.md
diff --git a/Dockerfile b/Dockerfile
index cfe1c15..bbc5aac 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -8,6 +8,21 @@ RUN apt-get update && apt-get install -y --no-install-recommends curl
RUN curl -fsSL https://deb.nodesource.com/setup_20.x | bash -
RUN apt-get install -y --no-install-recommends nodejs
+# Install Docker CLI
+RUN apt-get update && \
+ apt-get install -y --no-install-recommends \
+ ca-certificates \
+ gnupg && \
+ install -m 0755 -d /etc/apt/keyrings && \
+ curl -fsSL https://download.docker.com/linux/debian/gpg -o /etc/apt/keyrings/docker.asc && \
+ chmod a+r /etc/apt/keyrings/docker.asc && \
+ echo \
+ "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] https://download.docker.com/linux/debian \
+ $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \
+ tee /etc/apt/sources.list.d/docker.list > /dev/null && \
+ apt-get update && \
+ apt-get install -y --no-install-recommends docker-ce-cli
+
COPY pyproject.toml .
## FOR GHCR BUILD PIPELINE
@@ -18,7 +33,7 @@ RUN uv sync
COPY mcp_bridge mcp_bridge
-EXPOSE 8000
+EXPOSE 3989
WORKDIR /mcp_bridge
ENTRYPOINT ["uv", "run", "main.py"]
diff --git a/backup_mcp_config.json b/backup_mcp_config.json
new file mode 100644
index 0000000..1e8042b
--- /dev/null
+++ b/backup_mcp_config.json
@@ -0,0 +1,19 @@
+{
+ "mcp_servers": {
+ "fetch": {
+ "command": "uvx",
+ "args": ["mcp-server-fetch"]
+ },
+ "github": {
+ "command": "docker",
+ "args": [
+ "run",
+ "-i",
+ "--rm",
+ "-e",
+ "GITHUB_PERSONAL_ACCESS_TOKEN",
+ "ghcr.io/github/github-mcp-server"
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git a/compose.yml b/compose.yml
index b48a1d4..f897f5f 100644
--- a/compose.yml
+++ b/compose.yml
@@ -8,13 +8,13 @@ services:
action: rebuild
container_name: mcp-bridge
ports:
- - "8000:8000"
+ - "3989:3989"
environment:
- - MCP_BRIDGE__CONFIG__FILE=config.json # mount the config file for this to work
+ - MCP_BRIDGE__CONFIG__FILE=mcp_config.json # mount the config file for this to work
# - MCP_BRIDGE__CONFIG__HTTP_URL=http://10.88.100.170:8888/config.json
# - MCP_BRIDGE__CONFIG__JSON=
- # volumes:
- # - ./config.json:/mcp_bridge/config.json
+ volumes:
+ - ./mcp_config.json:/mcp_bridge/mcp_config.json
restart: unless-stopped
jaeger:
diff --git a/mcp_bridge/config/docker.py b/mcp_bridge/config/docker.py
new file mode 100644
index 0000000..da24841
--- /dev/null
+++ b/mcp_bridge/config/docker.py
@@ -0,0 +1,101 @@
+from aiodocker import Docker
+from contextlib import asynccontextmanager
+import anyio
+import anyio.lowlevel
+from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
+from loguru import logger
+from pydantic import BaseModel, Field
+
+from mcp import types
+
+class DockerMCPServer(BaseModel):
+ container_name: str | None = Field(default=None, description="Name of the docker container")
+ image: str = Field(description="Image of the docker container")
+ args: list[str] = Field(default_factory=list, description="Command line arguments for the docker container")
+ env: dict[str, str] = Field(default_factory=dict, description="Environment variables for the docker container")
+
+
+@asynccontextmanager
+async def docker_client(server: DockerMCPServer):
+ """
+ Client transport for Docker: this will connect to a server by
+ running a Docker container and communicating with it over its stdin/stdout.
+ """
+ read_stream: MemoryObjectReceiveStream[types.JSONRPCMessage | Exception]
+ read_stream_writer: MemoryObjectSendStream[types.JSONRPCMessage | Exception]
+
+ write_stream: MemoryObjectSendStream[types.JSONRPCMessage]
+ write_stream_reader: MemoryObjectReceiveStream[types.JSONRPCMessage]
+
+ read_stream_writer, read_stream = anyio.create_memory_object_stream(0)
+ write_stream, write_stream_reader = anyio.create_memory_object_stream(0)
+
+ docker = Docker()
+
+ try:
+ # Pull the image if not available
+ await docker.images.pull(server.image)
+
+ # Create the container
+ container = await docker.containers.create({
+ "Image": server.image,
+ "Args": server.args,
+ "OpenStdin": True,
+ "AttachStdout": True,
+ "AttachStderr": True,
+ "Tty": False,
+ "HostConfig": {"AutoRemove": True},
+ })
+
+ await container.start()
+ logger.debug(f"Started Docker container {container.id}")
+
+ # Attach to the container's input/output streams
+ attach_result = container.attach(stdout=True, stdin=True)
+
+ async def read_from_stdout():
+ try:
+ async with read_stream_writer:
+ buffer = ""
+ while True:
+ msg = await attach_result.read_out()
+ if msg is None:
+ continue
+ chunk = msg.data
+ if isinstance(chunk, bytes):
+ chunk = chunk.decode("utf-8")
+ lines = (buffer + chunk).split("\n")
+ buffer = lines.pop()
+
+ for line in lines:
+ try:
+ json_message = types.JSONRPCMessage.model_validate_json(line)
+ await read_stream_writer.send(json_message)
+ except Exception as exc:
+ await read_stream_writer.send(exc)
+ except anyio.ClosedResourceError:
+ await anyio.lowlevel.checkpoint()
+
+ async def write_to_stdin():
+ try:
+ async with write_stream_reader:
+ async for message in write_stream_reader:
+ json = message.model_dump_json(by_alias=True, exclude_none=True)
+ await attach_result.write_in(json.encode("utf-8") + b"\n")
+ except anyio.ClosedResourceError:
+ await anyio.lowlevel.checkpoint()
+
+ try:
+ async with anyio.create_task_group() as tg:
+ tg.start_soon(read_from_stdout)
+ tg.start_soon(write_to_stdin)
+ yield read_stream, write_stream
+ finally:
+ await container.stop()
+ await container.delete()
+
+ except Exception as e:
+ logger.error(f"Error in docker client: {e}")
+ finally:
+ await docker.close()
+ logger.debug("Docker client closed.")
\ No newline at end of file
diff --git a/mcp_bridge/config/docker_config.py b/mcp_bridge/config/docker_config.py
new file mode 100644
index 0000000..aa26cf6
--- /dev/null
+++ b/mcp_bridge/config/docker_config.py
@@ -0,0 +1,7 @@
+from pydantic import BaseModel, Field
+
+class DockerMCPServer(BaseModel):
+ container_name: str | None = Field(default=None, description="Name of the docker container")
+ image: str = Field(description="Image of the docker container")
+ args: list[str] = Field(default_factory=list, description="Command line arguments for the docker container")
+ env: dict[str, str] = Field(default_factory=dict, description="Environment variables for the docker container")
\ No newline at end of file
diff --git a/mcp_bridge/config/final.py b/mcp_bridge/config/final.py
index f0bdc2f..e15d86e 100644
--- a/mcp_bridge/config/final.py
+++ b/mcp_bridge/config/final.py
@@ -3,7 +3,7 @@
from pydantic import BaseModel, Field
from mcp.client.stdio import StdioServerParameters
-from mcpx.client.transports.docker import DockerMCPServer
+from .docker import DockerMCPServer
class InferenceServer(BaseModel):
diff --git a/mcp_bridge/mcp_clients/DockerClient.py b/mcp_bridge/mcp_clients/DockerClient.py
index bf40af8..75d46a5 100644
--- a/mcp_bridge/mcp_clients/DockerClient.py
+++ b/mcp_bridge/mcp_clients/DockerClient.py
@@ -2,7 +2,7 @@
from mcp_bridge.mcp_clients.session import McpClientSession
from mcp_bridge.config import config
-from mcpx.client.transports.docker import docker_client, DockerMCPServer
+from mcp_bridge.config.docker import docker_client, DockerMCPServer
from .AbstractClient import GenericMcpClient
from loguru import logger
diff --git a/mcp_bridge/mcp_clients/McpClientManager.py b/mcp_bridge/mcp_clients/McpClientManager.py
index 7bf6c89..708b62b 100644
--- a/mcp_bridge/mcp_clients/McpClientManager.py
+++ b/mcp_bridge/mcp_clients/McpClientManager.py
@@ -1,8 +1,9 @@
from typing import Union
+import asyncio
from loguru import logger
from mcp import McpError, StdioServerParameters
-from mcpx.client.transports.docker import DockerMCPServer
+from mcp_bridge.config.docker import DockerMCPServer
from mcp_bridge.config import config
from mcp_bridge.config.final import SSEMCPServer
@@ -18,35 +19,71 @@ class MCPClientManager:
clients: dict[str, client_types] = {}
async def initialize(self):
- """Initialize the MCP Client Manager and start all clients"""
+ """Initialize the MCP Client Manager and start all clients concurrently."""
+ logger.info("Initializing MCP Client Manager")
- logger.log("DEBUG", "Initializing MCP Client Manager")
+ construction_coroutines = []
+ server_names_ordered = [] # To map results back to names
for server_name, server_config in config.mcp_servers.items():
- self.clients[server_name] = await self.construct_client(
- server_name, server_config
- )
-
- async def construct_client(self, name, server_config) -> client_types:
- logger.log("DEBUG", f"Constructing client for {server_config}")
-
- if isinstance(server_config, StdioServerParameters):
- client = StdioClient(name, server_config)
- await client.start()
- return client
-
- if isinstance(server_config, SSEMCPServer):
- # TODO: implement sse client
- client = SseClient(name, server_config) # type: ignore
- await client.start()
- return client
-
- if isinstance(server_config, DockerMCPServer):
- client = DockerClient(name, server_config)
- await client.start()
- return client
-
- raise NotImplementedError("Client Type not supported")
+ logger.info(f"Preparing to initialize MCP client: {server_name} with config: {server_config}")
+ construction_coroutines.append(self.construct_and_start_client(server_name, server_config))
+ server_names_ordered.append(server_name)
+
+ if not construction_coroutines:
+ logger.info("No MCP clients configured to initialize.")
+ return
+
+ logger.info(f"Attempting to concurrently initialize {len(construction_coroutines)} MCP clients...")
+ # The construct_and_start_client will return (name, client_instance) or (name, None) on failure
+ results = await asyncio.gather(*construction_coroutines, return_exceptions=True)
+
+ for i, result in enumerate(results):
+ server_name = server_names_ordered[i]
+ if isinstance(result, Exception):
+ logger.error(f"Exception during initialization of MCP client: {server_name}. Error: {result}", exc_info=result)
+ elif result is None: # Should not happen if construct_and_start_client returns (name, None)
+ logger.error(f"Failed to initialize or start MCP client: {server_name}. No client instance returned.")
+ elif isinstance(result, tuple) and len(result) == 2:
+ client_instance = result[1]
+ if client_instance:
+ self.clients[server_name] = client_instance
+ logger.info(f"Successfully initialized and started MCP client: {server_name}")
+ else: # Explicitly handled None for client_instance from construct_and_start_client
+ logger.error(f"Failed to initialize or start MCP client: {server_name}. Construction returned None.")
+ else:
+ logger.error(f"Unexpected result type during initialization of MCP client: {server_name}. Result: {result}")
+
+ async def construct_and_start_client(self, name, server_config) -> tuple[str, client_types | None]:
+ """Constructs and starts a single client, returns (name, client_instance | None)."""
+ logger.info(f"Constructing client '{name}' for type: {type(server_config)}")
+ try:
+ client_instance: client_types | None = None
+ if isinstance(server_config, StdioServerParameters):
+ command_to_log = server_config.command if hasattr(server_config, 'command') else "Not specified"
+ args_to_log = server_config.args if hasattr(server_config, 'args') else []
+ logger.info(f"Creating StdioClient for '{name}' with command: '{command_to_log}' and args: '{args_to_log}'")
+ client_instance = StdioClient(name, server_config)
+ await client_instance.start()
+ logger.info(f"StdioClient '{name}' started.")
+ elif isinstance(server_config, SSEMCPServer):
+ logger.info(f"Creating SseClient for '{name}' with URL: '{server_config.url}'")
+ client_instance = SseClient(name, server_config) # type: ignore
+ await client_instance.start()
+ logger.info(f"SseClient '{name}' started.")
+ elif isinstance(server_config, DockerMCPServer):
+ logger.info(f"Creating DockerClient for '{name}' with image: '{server_config.image_name}'")
+ client_instance = DockerClient(name, server_config)
+ await client_instance.start()
+ logger.info(f"DockerClient '{name}' started.")
+ else:
+ logger.error(f"Unsupported MCP server config type for client '{name}': {type(server_config)}")
+ # raise NotImplementedError("Client Type not supported") # Don't raise, return None
+ return name, None
+ return name, client_instance
+ except Exception as e:
+ logger.error(f"Error constructing or starting client {name}: {e}", exc_info=True)
+ return name, None # Return name and None on failure
def get_client(self, server_name: str):
return self.clients[server_name]
@@ -55,34 +92,57 @@ def get_clients(self):
return list(self.clients.items())
async def get_client_from_tool(self, tool: str):
+ logger.info(f"Attempting to find client for tool: '{tool}'")
for name, client in self.get_clients():
-
+ logger.debug(f"Checking client '{name}' for tool '{tool}'")
# client cannot have tools if it is not connected
if not client.session:
+ logger.warning(f"Client '{name}' session not active, skipping for tool '{tool}'")
continue
try:
- list_tools = await client.session.list_tools()
- for client_tool in list_tools.tools:
+ logger.debug(f"Calling list_tools() on client '{name}' for tool '{tool}'")
+ list_tools_response = await client.session.list_tools()
+ logger.debug(f"Client '{name}' returned tools: {list_tools_response.tools}")
+ for client_tool in list_tools_response.tools:
if client_tool.name == tool:
+ logger.info(f"Found tool '{tool}' in client '{name}'")
return client
- except McpError:
+ except McpError as e:
+ logger.error(f"McpError while listing tools for client '{name}': {e}", exc_info=True)
continue
+ except Exception as e:
+ logger.error(f"Unexpected error while listing tools for client '{name}': {e}", exc_info=True)
+ continue
+ logger.warning(f"Tool '{tool}' not found in any active client.")
+ return None # Explicitly return None if not found
async def get_client_from_prompt(self, prompt: str):
+ logger.info(f"Attempting to find client for prompt: '{prompt}'")
for name, client in self.get_clients():
+ logger.debug(f"Checking client '{name}' for prompt '{prompt}'")
# client cannot have prompts if it is not connected
if not client.session:
+ logger.warning(f"Client '{name}' session not active, skipping for prompt '{prompt}'")
continue
try:
- list_prompts = await client.session.list_prompts()
- for client_prompt in list_prompts.prompts:
+ logger.debug(f"Calling list_prompts() on client '{name}' for prompt '{prompt}'")
+ list_prompts_response = await client.session.list_prompts()
+ logger.debug(f"Client '{name}' returned prompts: {list_prompts_response.prompts}")
+ for client_prompt in list_prompts_response.prompts:
if client_prompt.name == prompt:
+ logger.info(f"Found prompt '{prompt}' in client '{name}'")
return client
- except McpError:
+ except McpError as e:
+ logger.error(f"McpError while listing prompts for client '{name}': {e}", exc_info=True)
+ continue
+ except Exception as e:
+ logger.error(f"Unexpected error while listing prompts for client '{name}': {e}", exc_info=True)
continue
+ logger.warning(f"Prompt '{prompt}' not found in any active client.")
+ return None # Explicitly return None if not found
ClientManager = MCPClientManager()
diff --git a/mcp_bridge/mcp_clients/session.py b/mcp_bridge/mcp_clients/session.py
index 56d1f94..ed94588 100644
--- a/mcp_bridge/mcp_clients/session.py
+++ b/mcp_bridge/mcp_clients/session.py
@@ -1,6 +1,7 @@
from datetime import timedelta
from typing import Awaitable, Callable
+import anyio
from loguru import logger
import mcp.types as types
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
@@ -10,12 +11,13 @@
from mcp_bridge import __version__ as version
from mcp_bridge.sampling.sampler import handle_sampling_message
+# Fix the import - import the functions before using them
+from mcp_bridge.utils.message_adapter import wrap_message
sampling_function_signature = Callable[
[types.CreateMessageRequestParams], Awaitable[types.CreateMessageResult]
]
-
class McpClientSession(
BaseSession[
types.ClientRequest,
@@ -26,6 +28,8 @@ class McpClientSession(
]
):
+
+
def __init__(
self,
read_stream: MemoryObjectReceiveStream[types.JSONRPCMessage | Exception],
@@ -39,29 +43,61 @@ def __init__(
types.ServerNotification,
read_timeout_seconds=read_timeout_seconds,
)
+ self._read_stream = read_stream
+ self.incoming_messages = read_stream
+ self._incoming_messages = read_stream
+ # self.incoming_messages = read_stream
+ # print(f"self.incoming_messages: {self.incoming_messages}")
+ # print(f"self._incoming_messages: {self._incoming_messages}")
async def __aenter__(self):
session = await super().__aenter__()
+ if not hasattr(self, 'incoming_messages') or self.incoming_messages is None:
+ self.incoming_messages = self._read_stream
+ self._incoming_messages = self._read_stream
+ logger.debug(f"SETTING self.incoming_messages: {self.incoming_messages}")
self._task_group.start_soon(self._consume_messages)
return session
async def _consume_messages(self):
+ """
+ Process messages from the incoming message stream, with robust error handling.
+ """
+ logger.info(f"McpClientSession has incoming_messages: {hasattr(self, 'incoming_messages')}")
+ if hasattr(self, 'incoming_messages') and self.incoming_messages is not None:
+ logger.info(f"incoming_messages type: {type(self.incoming_messages)}")
+ else:
+ logger.warning("incoming_messages not found or is None")
+ return
+
try:
- async for message in self.incoming_messages:
+ async for original_message in self.incoming_messages:
try:
+ # Wrap the message
+ message = wrap_message(original_message)
+
if isinstance(message, Exception):
logger.error(f"Received exception in message stream: {message}")
- elif isinstance(message, RequestResponder):
- logger.debug(f"Received request: {message.request}")
- elif isinstance(message, types.ServerNotification):
- if isinstance(message.root, types.LoggingMessageNotification):
+ elif hasattr(message, 'request'): # For RequestResponder
+ logger.debug(f"Received request: {message.request}")
+ try:
+ await self._received_request(message)
+ except Exception as req_err:
+ logger.exception(f"Error handling request: {req_err}")
+ elif hasattr(message, 'root'):
+ if hasattr(message.root, 'params') and hasattr(types, 'LoggingMessageNotification') and hasattr(types, 'ServerNotification') and isinstance(message, types.ServerNotification) and isinstance(message.root, types.LoggingMessageNotification):
logger.debug(f"Received notification from server: {message.root.params}")
else:
logger.debug(f"Received notification from server: {message}")
else:
- logger.debug(f"Received notification: {message}")
+ logger.debug(f"Received message: {message}")
+ except anyio.ClosedResourceError:
+ logger.debug("Message stream closed")
+ break
except Exception as e:
logger.exception(f"Error processing message: {e}")
+ except anyio.ClosedResourceError:
+ logger.debug("Message stream closed")
except Exception as e:
logger.exception(f"Message consumer task failed: {e}")
diff --git a/mcp_bridge/mcp_server/server.py b/mcp_bridge/mcp_server/server.py
index 030bf59..a358c6a 100644
--- a/mcp_bridge/mcp_server/server.py
+++ b/mcp_bridge/mcp_server/server.py
@@ -4,6 +4,7 @@
from pydantic import AnyUrl
from mcp_bridge.mcp_clients.McpClientManager import ClientManager
from loguru import logger
+import asyncio
__all__ = ["server", "options"]
@@ -15,26 +16,64 @@
@server.list_prompts()
async def list_prompts() -> list[types.Prompt]:
prompts = []
- for name, client in ClientManager.get_clients():
- # if client is None, then we cannot list the prompts
- if client is None:
- logger.error(f"Client '{name}' not found")
- continue
-
- client_prompts = await client.list_prompts()
- prompts.extend(client_prompts.prompts)
+ logger.info("Aggregating prompts from all managed MCP clients concurrently.")
+
+ active_clients = [(name, client) for name, client in ClientManager.get_clients() if client and client.session]
+ if not active_clients:
+ logger.info("No active clients to fetch prompts from.")
+ return []
+
+ coroutines = []
+ for name, client in active_clients:
+ logger.debug(f"Preparing to list prompts from client session: {name}")
+ coroutines.append(client.session.list_prompts())
+
+ logger.info(f"Gathering prompts from {len(coroutines)} active client sessions.")
+ results = await asyncio.gather(*coroutines, return_exceptions=True)
+
+ for i, result in enumerate(results):
+ name, client = active_clients[i] # Get corresponding client name
+ if isinstance(result, Exception):
+ logger.error(f"Error listing prompts for client '{name}': {result}", exc_info=result)
+ elif result and result.prompts:
+ logger.debug(f"Client '{name}' provided prompts: {[p.name for p in result.prompts]}")
+ prompts.extend(result.prompts)
+ else:
+ logger.debug(f"Client '{name}' provided no prompts or an empty response.")
+
+ logger.info(f"Finished prompt aggregation. Total prompts aggregated: {len(prompts)}. Names: {[p.name for p in prompts]}")
return prompts
@server.list_resources()
async def list_resources() -> list[types.Resource]:
resources = []
- for name, client in ClientManager.get_clients():
- try:
- client_resources = await client.list_resources()
- resources.extend(client_resources.resources)
- except Exception as e:
- logger.error(f"Error listing resources for {name}: {e}")
+ logger.info("Aggregating resources from all managed MCP clients concurrently.")
+
+ active_clients = [(name, client) for name, client in ClientManager.get_clients() if client and client.session]
+ if not active_clients:
+ logger.info("No active clients to fetch resources from.")
+ return []
+
+ coroutines = []
+ for name, client in active_clients:
+ logger.debug(f"Preparing to list resources from client session: {name}")
+ coroutines.append(client.session.list_resources())
+
+ logger.info(f"Gathering resources from {len(coroutines)} active client sessions.")
+ results = await asyncio.gather(*coroutines, return_exceptions=True)
+
+ for i, result in enumerate(results):
+ name, client = active_clients[i] # Get corresponding client name
+ if isinstance(result, Exception):
+ logger.error(f"Error listing resources for client '{name}': {result}", exc_info=result)
+ elif result and result.resources:
+ logger.debug(f"Client '{name}' provided {len(result.resources)} resources.")
+ resources.extend(result.resources)
+ else:
+ logger.debug(f"Client '{name}' provided no resources or an empty response.")
+
+ logger.info(f"Finished resource aggregation. Total resources aggregated: {len(resources)}.")
return resources
@@ -46,14 +85,32 @@ async def list_resource_templates() -> list[types.ResourceTemplate]:
@server.list_tools()
async def list_tools() -> list[types.Tool]:
tools = []
- for name, client in ClientManager.get_clients():
- # if client is None, then we cannot list the tools
- if client is None:
- logger.error(f"Client '{name}' not found")
- continue
-
- client_tools = await client.list_tools()
- tools.extend(client_tools.tools)
+ logger.info("Aggregating tools from all managed MCP clients concurrently.")
+
+ active_clients = [(name, client) for name, client in ClientManager.get_clients() if client and client.session]
+ if not active_clients:
+ logger.info("No active clients to fetch tools from.")
+ return []
+
+ coroutines = []
+ for name, client in active_clients:
+ logger.debug(f"Preparing to list tools from client session: {name}")
+ coroutines.append(client.session.list_tools())
+
+ logger.info(f"Gathering tools from {len(coroutines)} active client sessions.")
+ results = await asyncio.gather(*coroutines, return_exceptions=True)
+
+ for i, result in enumerate(results):
+ name, client = active_clients[i] # Get corresponding client name
+ if isinstance(result, Exception):
+ logger.error(f"Error listing tools for client '{name}': {result}", exc_info=result)
+ elif result and result.tools:
+ logger.debug(f"Client '{name}' provided tools: {[t.name for t in result.tools]}")
+ tools.extend(result.tools)
+ else:
+ logger.debug(f"Client '{name}' provided no tools or an empty response.")
+
+ logger.info(f"Finished tool aggregation. Total tools aggregated: {len(tools)}. Names: {[t.name for t in tools]}")
return tools
@@ -71,11 +128,13 @@ async def get_prompt(name: str, args: dict[str, str] | None) -> types.GetPromptR
# if args is None, then we should use an empty dict
if args is None:
args = {}
-
+
+ logger.info(f"Getting prompt '{name}' with args '{args}' from client: {client.name if client else 'Unknown'}")
result = await client.get_prompt(name, args)
if result is None:
+ logger.error(f"Prompt '{name}' not found by client {client.name if client else 'Unknown'}")
raise Exception(f"Prompt '{name}' not found")
-
+ logger.info(f"Successfully retrieved prompt '{name}'.")
return result
@@ -112,17 +171,24 @@ async def handle_read_resource(uri: AnyUrl) -> str | bytes:
async def handle_call_tool(
name: str, arguments: dict | None
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
+ logger.info(f"Attempting to call tool '{name}' with arguments: {arguments}")
client = await ClientManager.get_client_from_tool(name)
- # if client is None, then we cannot call the tool
if client is None:
+ logger.error(f"Tool '{name}' could not be mapped to any managed client.")
raise Exception(f"Tool '{name}' not found")
- # if arguments is None, then we should use an empty dict
if arguments is None:
arguments = {}
-
- return (await client.call_tool(name, arguments)).content
+
+ logger.info(f"Calling tool '{name}' on resolved client '{client.name}' with arguments: {arguments}")
+ try:
+ tool_result = await client.call_tool(name, arguments)
+ logger.info(f"Tool '{name}' executed successfully by client '{client.name}'. Result content items: {len(tool_result.content) if tool_result else 'None'}")
+ return tool_result.content
+ except Exception as e:
+ logger.error(f"Error calling tool '{name}' on client '{client.name}': {e}", exc_info=True)
+ raise
# options
diff --git a/mcp_bridge/mcp_server/sse.py b/mcp_bridge/mcp_server/sse.py
index 4d5bb3b..49b5001 100644
--- a/mcp_bridge/mcp_server/sse.py
+++ b/mcp_bridge/mcp_server/sse.py
@@ -15,23 +15,45 @@
@router.get("/", response_class=StreamingResponse)
async def handle_sse(request: Request):
- logger.info("new incoming SSE connection established")
+ logger.info("New incoming SSE connection established from client.")
async with sse.connect_sse(request) as streams:
try:
+ logger.info(f"SSE streams acquired. Handing off to MCP server run method. Streams: {streams}")
await server.run(streams[0], streams[1], options)
+ logger.info("MCP server run method completed for SSE connection.")
except BrokenResourceError:
+ logger.warning("SSE connection BrokenResourceError.")
pass
except asyncio.CancelledError:
+ logger.warning("SSE connection CancelledError.")
pass
- except ValidationError:
+ except ValidationError as ve:
+ logger.error(f"SSE handler ValidationError: {ve}", exc_info=True)
pass
- except Exception:
+ except Exception as e:
+ logger.error(f"Unexpected error in SSE handler: {e}", exc_info=True)
raise
await request.close()
+ logger.info("SSE connection closed and request finalized.")
@router.post("/messages")
async def handle_messages(request: Request):
- logger.info("incoming SSE message received")
+ client_host = request.client.host if request.client else "Unknown_Host"
+ client_port = request.client.port if request.client else "Unknown_Port"
+ logger.info(f"Incoming POST /messages from {client_host}:{client_port}.")
+
+ # Removed explicit body reading here. Let sse.handle_post_message consume the body.
+ # try:
+ # body_bytes = await request.body()
+ # try:
+ # body_str = body_bytes.decode('utf-8')
+ # logger.debug(f"POST /messages request body (decoded): {body_str}")
+ # except UnicodeDecodeError:
+ # logger.warning(f"POST /messages request body (raw bytes, could not decode as UTF-8): {body_bytes}")
+ # except Exception as e:
+ # logger.error(f"Error reading request body for POST /messages: {e}", exc_info=True)
+
+ logger.info(f"Forwarding POST /messages from {client_host}:{client_port} to sse.handle_post_message.")
await sse.handle_post_message(request.scope, request.receive, request._send)
- await request.close()
+ logger.info(f"sse.handle_post_message completed for POST /messages from {client_host}:{client_port}.")
diff --git a/mcp_bridge/utils/__init__.py b/mcp_bridge/utils/__init__.py
new file mode 100644
index 0000000..6e4ab42
--- /dev/null
+++ b/mcp_bridge/utils/__init__.py
@@ -0,0 +1,7 @@
+# mcp_bridge/utils/__init__.py
+
+# Import and apply patches
+from .library_patcher import apply_patches
+
+# Apply patches at module import time
+original_functions = apply_patches()
\ No newline at end of file
diff --git a/mcp_bridge/utils/library_patcher.py b/mcp_bridge/utils/library_patcher.py
new file mode 100644
index 0000000..3acc22f
--- /dev/null
+++ b/mcp_bridge/utils/library_patcher.py
@@ -0,0 +1,83 @@
+# mcp_bridge/utils/library_patcher.py
+
+import json
+from loguru import logger
+import mcp.types
+from mcp.types import JSONRPCMessage
+from mcp.shared.message import SessionMessage
+
+def patch_jsonrpc_message():
+ """
+ Patch the JSONRPCMessage class to add the 'message' property.
+ """
+ # Check if the class is already patched
+ if hasattr(JSONRPCMessage, '_original_getattr'):
+ logger.info("JSONRPCMessage already patched")
+ return
+
+ # Store the original __getattr__ method
+ original_getattr = JSONRPCMessage.__getattr__
+
+ # Define the patched method
+ def patched_getattr(self, name):
+ # If trying to access 'message', return self
+ if name == 'message':
+ return self
+
+ # Otherwise, use the original method
+ return original_getattr(self, name)
+
+ # Apply the patch
+ JSONRPCMessage._original_getattr = original_getattr
+ JSONRPCMessage.__getattr__ = patched_getattr
+
+ logger.info("Successfully patched JSONRPCMessage.__getattr__")
+
+ return original_getattr
+
+def patch_session_message():
+ """
+ Patch the SessionMessage class to add missing serialization methods.
+ """
+ # Skip if already patched
+ if hasattr(SessionMessage, 'model_dump_json'):
+ logger.info("SessionMessage already patched")
+ return
+
+ # Add model_dump_json method to SessionMessage
+ def model_dump_json(self, **kwargs):
+ """Convert the message to JSON string, compatible with Pydantic v2."""
+ # If message has its own model_dump_json method, use it
+ if hasattr(self.message, 'model_dump_json'):
+ return self.message.model_dump_json(**kwargs)
+
+ # If message has dict method (Pydantic v1), use it
+ if hasattr(self.message, 'dict'):
+ # Convert to dict then to JSON
+ message_dict = self.message.dict(**kwargs)
+ return json.dumps(message_dict)
+
+ # Fallback: try to convert to dict directly
+ try:
+ return json.dumps(self.message.__dict__)
+ except:
+ # Last resort, just stringify
+ return json.dumps(str(self.message))
+
+ # Add the method to the class
+ SessionMessage.model_dump_json = model_dump_json
+
+ logger.info("Successfully patched SessionMessage with model_dump_json")
+
+ return True
+
+def apply_patches():
+ """Apply all necessary patches to the MCP library."""
+ original_getattr = patch_jsonrpc_message()
+ session_message_patched = patch_session_message()
+
+ logger.info("All MCP library patches applied successfully")
+ return {
+ "original_getattr": original_getattr,
+ "session_message_patched": session_message_patched
+ }
\ No newline at end of file
diff --git a/mcp_bridge/utils/message_adapter.py b/mcp_bridge/utils/message_adapter.py
new file mode 100644
index 0000000..83fd2ec
--- /dev/null
+++ b/mcp_bridge/utils/message_adapter.py
@@ -0,0 +1,52 @@
+# mcp_bridge/utils/message_adapter.py
+
+from typing import Any
+
+class MessageWrapper:
+ """
+ A wrapper class for JSONRPCMessage objects that provides the expected structure.
+ Only used in client-side code, not for patching the library.
+ """
+ def __init__(self, original_message):
+ self._original = original_message
+
+ @property
+ def message(self):
+ """Return self to provide the message.message.root structure"""
+ return self
+
+ @property
+ def root(self):
+ """Forward root access to the original message"""
+ if hasattr(self._original, 'root'):
+ return self._original.root
+ return None
+
+ def __getattr__(self, name):
+ """Forward all other attribute access to the original message"""
+ return getattr(self._original, name)
+
+def wrap_message(message: Any) -> Any:
+ """
+ Wrap a message object to provide the expected message.message.root structure.
+ """
+ if message is None or isinstance(message, Exception):
+ return message
+
+ # No need to wrap if it's already a MessageWrapper
+ if isinstance(message, MessageWrapper):
+ return message
+
+ # Wrap the message
+ return MessageWrapper(message)
+
+# Add alias for backward compatibility
+adapt_jsonrpc_message = wrap_message
+
+# Dummy function for backward compatibility
+def patch_base_session():
+ """
+ Dummy function for backward compatibility.
+ The actual patching is done in library_patcher.py
+ """
+ return None
\ No newline at end of file
diff --git a/mcp_config.json b/mcp_config.json
new file mode 100644
index 0000000..4092a92
--- /dev/null
+++ b/mcp_config.json
@@ -0,0 +1,19 @@
+{
+ "mcp_servers": {
+ "fetch": {
+ "command": "uvx",
+ "args": ["mcp-server-fetch"]
+ },
+ "github": {
+ "command": "docker",
+ "args": [
+ "run",
+ "-i",
+ "--rm",
+ "-e",
+ "GITHUB_PERSONAL_ACCESS_TOKEN",
+ "ghcr.io/github/github-mcp-server"
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git a/pyproject.toml b/pyproject.toml
index 6d3a3bd..73c14c9 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -11,18 +11,12 @@ dependencies = [
"httpx-sse>=0.4.0",
"lmos-openai-types",
"loguru>=0.7.3",
- "mcp>=1.2.0",
- "mcpx[docker]>=0.1.1",
- "opentelemetry-api>=1.33.1",
- "opentelemetry-exporter-otlp>=1.33.1",
- "opentelemetry-instrumentation-fastapi>=0.54b1",
- "opentelemetry-instrumentation-httpx>=0.54b1",
- "opentelemetry-sdk>=1.33.1",
"pydantic>=2.10.4",
"pydantic-settings>=2.7.0",
"sse-starlette>=2.2.0",
"tortoise-orm[asyncmy,asyncpg]>=0.23.0",
"uvicorn>=0.34.0",
+ "aiodocker>=0.2.4"
]
[tool.uv.sources]