Skip to content

feat: Concurrent Operations, Enhanced Logging & Robustness #79

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ cython_debug/
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

.gitignore
.DS_Store

## custom
commands.md
Expand Down
17 changes: 16 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,21 @@ RUN apt-get update && apt-get install -y --no-install-recommends curl
RUN curl -fsSL https://deb.nodesource.com/setup_20.x | bash -
RUN apt-get install -y --no-install-recommends nodejs

# Install Docker CLI
RUN apt-get update && \
apt-get install -y --no-install-recommends \
ca-certificates \
gnupg && \
install -m 0755 -d /etc/apt/keyrings && \
curl -fsSL https://download.docker.com/linux/debian/gpg -o /etc/apt/keyrings/docker.asc && \
chmod a+r /etc/apt/keyrings/docker.asc && \
echo \
"deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] https://download.docker.com/linux/debian \
$(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \
tee /etc/apt/sources.list.d/docker.list > /dev/null && \
apt-get update && \
apt-get install -y --no-install-recommends docker-ce-cli

COPY pyproject.toml .

## FOR GHCR BUILD PIPELINE
Expand All @@ -18,7 +33,7 @@ RUN uv sync

COPY mcp_bridge mcp_bridge

EXPOSE 8000
EXPOSE 3989

WORKDIR /mcp_bridge
ENTRYPOINT ["uv", "run", "main.py"]
19 changes: 19 additions & 0 deletions backup_mcp_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"mcp_servers": {
"fetch": {
"command": "uvx",
"args": ["mcp-server-fetch"]
},
"github": {
"command": "docker",
"args": [
"run",
"-i",
"--rm",
"-e",
"GITHUB_PERSONAL_ACCESS_TOKEN",
"ghcr.io/github/github-mcp-server"
]
}
}
}
8 changes: 4 additions & 4 deletions compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ services:
action: rebuild
container_name: mcp-bridge
ports:
- "8000:8000"
- "3989:3989"
environment:
- MCP_BRIDGE__CONFIG__FILE=config.json # mount the config file for this to work
- MCP_BRIDGE__CONFIG__FILE=mcp_config.json # mount the config file for this to work
# - MCP_BRIDGE__CONFIG__HTTP_URL=http://10.88.100.170:8888/config.json
# - MCP_BRIDGE__CONFIG__JSON=
# volumes:
# - ./config.json:/mcp_bridge/config.json
volumes:
- ./mcp_config.json:/mcp_bridge/mcp_config.json
restart: unless-stopped
101 changes: 101 additions & 0 deletions mcp_bridge/config/docker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
from aiodocker import Docker
from contextlib import asynccontextmanager
import anyio
import anyio.lowlevel
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
from loguru import logger
from pydantic import BaseModel, Field

from mcp import types

class DockerMCPServer(BaseModel):
container_name: str | None = Field(default=None, description="Name of the docker container")
image: str = Field(description="Image of the docker container")
args: list[str] = Field(default_factory=list, description="Command line arguments for the docker container")
env: dict[str, str] = Field(default_factory=dict, description="Environment variables for the docker container")


@asynccontextmanager
async def docker_client(server: DockerMCPServer):
"""
Client transport for Docker: this will connect to a server by
running a Docker container and communicating with it over its stdin/stdout.
"""
read_stream: MemoryObjectReceiveStream[types.JSONRPCMessage | Exception]
read_stream_writer: MemoryObjectSendStream[types.JSONRPCMessage | Exception]

write_stream: MemoryObjectSendStream[types.JSONRPCMessage]
write_stream_reader: MemoryObjectReceiveStream[types.JSONRPCMessage]

read_stream_writer, read_stream = anyio.create_memory_object_stream(0)
write_stream, write_stream_reader = anyio.create_memory_object_stream(0)

docker = Docker()

try:
# Pull the image if not available
await docker.images.pull(server.image)

# Create the container
container = await docker.containers.create({
"Image": server.image,
"Args": server.args,
"OpenStdin": True,
"AttachStdout": True,
"AttachStderr": True,
"Tty": False,
"HostConfig": {"AutoRemove": True},
})

await container.start()
logger.debug(f"Started Docker container {container.id}")

# Attach to the container's input/output streams
attach_result = container.attach(stdout=True, stdin=True)

async def read_from_stdout():
try:
async with read_stream_writer:
buffer = ""
while True:
msg = await attach_result.read_out()
if msg is None:
continue
chunk = msg.data
if isinstance(chunk, bytes):
chunk = chunk.decode("utf-8")
lines = (buffer + chunk).split("\n")
buffer = lines.pop()

for line in lines:
try:
json_message = types.JSONRPCMessage.model_validate_json(line)
await read_stream_writer.send(json_message)
except Exception as exc:
await read_stream_writer.send(exc)
except anyio.ClosedResourceError:
await anyio.lowlevel.checkpoint()

async def write_to_stdin():
try:
async with write_stream_reader:
async for message in write_stream_reader:
json = message.model_dump_json(by_alias=True, exclude_none=True)
await attach_result.write_in(json.encode("utf-8") + b"\n")
except anyio.ClosedResourceError:
await anyio.lowlevel.checkpoint()

try:
async with anyio.create_task_group() as tg:
tg.start_soon(read_from_stdout)
tg.start_soon(write_to_stdin)
yield read_stream, write_stream
finally:
await container.stop()
await container.delete()

except Exception as e:
logger.error(f"Error in docker client: {e}")
finally:
await docker.close()
logger.debug("Docker client closed.")
7 changes: 7 additions & 0 deletions mcp_bridge/config/docker_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from pydantic import BaseModel, Field

class DockerMCPServer(BaseModel):
container_name: str | None = Field(default=None, description="Name of the docker container")
image: str = Field(description="Image of the docker container")
args: list[str] = Field(default_factory=list, description="Command line arguments for the docker container")
env: dict[str, str] = Field(default_factory=dict, description="Environment variables for the docker container")
2 changes: 1 addition & 1 deletion mcp_bridge/config/final.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pydantic import BaseModel, Field

from mcp.client.stdio import StdioServerParameters
from mcpx.client.transports.docker import DockerMCPServer
from .docker import DockerMCPServer


class InferenceServer(BaseModel):
Expand Down
2 changes: 1 addition & 1 deletion mcp_bridge/mcp_clients/DockerClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from mcp_bridge.mcp_clients.session import McpClientSession
from mcp_bridge.config import config
from mcpx.client.transports.docker import docker_client, DockerMCPServer
from mcp_bridge.config.docker import docker_client, DockerMCPServer
from .AbstractClient import GenericMcpClient
from loguru import logger

Expand Down
128 changes: 94 additions & 34 deletions mcp_bridge/mcp_clients/McpClientManager.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from typing import Union
import asyncio

from loguru import logger
from mcp import McpError, StdioServerParameters
from mcpx.client.transports.docker import DockerMCPServer
from mcp_bridge.config.docker import DockerMCPServer

from mcp_bridge.config import config
from mcp_bridge.config.final import SSEMCPServer
Expand All @@ -18,35 +19,71 @@ class MCPClientManager:
clients: dict[str, client_types] = {}

async def initialize(self):
"""Initialize the MCP Client Manager and start all clients"""
"""Initialize the MCP Client Manager and start all clients concurrently."""
logger.info("Initializing MCP Client Manager")

logger.log("DEBUG", "Initializing MCP Client Manager")
construction_coroutines = []
server_names_ordered = [] # To map results back to names

for server_name, server_config in config.mcp_servers.items():
self.clients[server_name] = await self.construct_client(
server_name, server_config
)

async def construct_client(self, name, server_config) -> client_types:
logger.log("DEBUG", f"Constructing client for {server_config}")

if isinstance(server_config, StdioServerParameters):
client = StdioClient(name, server_config)
await client.start()
return client

if isinstance(server_config, SSEMCPServer):
# TODO: implement sse client
client = SseClient(name, server_config) # type: ignore
await client.start()
return client

if isinstance(server_config, DockerMCPServer):
client = DockerClient(name, server_config)
await client.start()
return client

raise NotImplementedError("Client Type not supported")
logger.info(f"Preparing to initialize MCP client: {server_name} with config: {server_config}")
construction_coroutines.append(self.construct_and_start_client(server_name, server_config))
server_names_ordered.append(server_name)

if not construction_coroutines:
logger.info("No MCP clients configured to initialize.")
return

logger.info(f"Attempting to concurrently initialize {len(construction_coroutines)} MCP clients...")
# The construct_and_start_client will return (name, client_instance) or (name, None) on failure
results = await asyncio.gather(*construction_coroutines, return_exceptions=True)

for i, result in enumerate(results):
server_name = server_names_ordered[i]
if isinstance(result, Exception):
logger.error(f"Exception during initialization of MCP client: {server_name}. Error: {result}", exc_info=result)
elif result is None: # Should not happen if construct_and_start_client returns (name, None)
logger.error(f"Failed to initialize or start MCP client: {server_name}. No client instance returned.")
elif isinstance(result, tuple) and len(result) == 2:
client_instance = result[1]
if client_instance:
self.clients[server_name] = client_instance
logger.info(f"Successfully initialized and started MCP client: {server_name}")
else: # Explicitly handled None for client_instance from construct_and_start_client
logger.error(f"Failed to initialize or start MCP client: {server_name}. Construction returned None.")
else:
logger.error(f"Unexpected result type during initialization of MCP client: {server_name}. Result: {result}")

async def construct_and_start_client(self, name, server_config) -> tuple[str, client_types | None]:
"""Constructs and starts a single client, returns (name, client_instance | None)."""
logger.info(f"Constructing client '{name}' for type: {type(server_config)}")
try:
client_instance: client_types | None = None
if isinstance(server_config, StdioServerParameters):
command_to_log = server_config.command if hasattr(server_config, 'command') else "Not specified"
args_to_log = server_config.args if hasattr(server_config, 'args') else []
logger.info(f"Creating StdioClient for '{name}' with command: '{command_to_log}' and args: '{args_to_log}'")
client_instance = StdioClient(name, server_config)
await client_instance.start()
logger.info(f"StdioClient '{name}' started.")
elif isinstance(server_config, SSEMCPServer):
logger.info(f"Creating SseClient for '{name}' with URL: '{server_config.url}'")
client_instance = SseClient(name, server_config) # type: ignore
await client_instance.start()
logger.info(f"SseClient '{name}' started.")
elif isinstance(server_config, DockerMCPServer):
logger.info(f"Creating DockerClient for '{name}' with image: '{server_config.image_name}'")
client_instance = DockerClient(name, server_config)
await client_instance.start()
logger.info(f"DockerClient '{name}' started.")
else:
logger.error(f"Unsupported MCP server config type for client '{name}': {type(server_config)}")
# raise NotImplementedError("Client Type not supported") # Don't raise, return None
return name, None
return name, client_instance
except Exception as e:
logger.error(f"Error constructing or starting client {name}: {e}", exc_info=True)
return name, None # Return name and None on failure

def get_client(self, server_name: str):
return self.clients[server_name]
Expand All @@ -55,34 +92,57 @@ def get_clients(self):
return list(self.clients.items())

async def get_client_from_tool(self, tool: str):
logger.info(f"Attempting to find client for tool: '{tool}'")
for name, client in self.get_clients():

logger.debug(f"Checking client '{name}' for tool '{tool}'")
# client cannot have tools if it is not connected
if not client.session:
logger.warning(f"Client '{name}' session not active, skipping for tool '{tool}'")
continue

try:
list_tools = await client.session.list_tools()
for client_tool in list_tools.tools:
logger.debug(f"Calling list_tools() on client '{name}' for tool '{tool}'")
list_tools_response = await client.session.list_tools()
logger.debug(f"Client '{name}' returned tools: {list_tools_response.tools}")
for client_tool in list_tools_response.tools:
if client_tool.name == tool:
logger.info(f"Found tool '{tool}' in client '{name}'")
return client
except McpError:
except McpError as e:
logger.error(f"McpError while listing tools for client '{name}': {e}", exc_info=True)
continue
except Exception as e:
logger.error(f"Unexpected error while listing tools for client '{name}': {e}", exc_info=True)
continue
logger.warning(f"Tool '{tool}' not found in any active client.")
return None # Explicitly return None if not found

async def get_client_from_prompt(self, prompt: str):
logger.info(f"Attempting to find client for prompt: '{prompt}'")
for name, client in self.get_clients():
logger.debug(f"Checking client '{name}' for prompt '{prompt}'")

# client cannot have prompts if it is not connected
if not client.session:
logger.warning(f"Client '{name}' session not active, skipping for prompt '{prompt}'")
continue

try:
list_prompts = await client.session.list_prompts()
for client_prompt in list_prompts.prompts:
logger.debug(f"Calling list_prompts() on client '{name}' for prompt '{prompt}'")
list_prompts_response = await client.session.list_prompts()
logger.debug(f"Client '{name}' returned prompts: {list_prompts_response.prompts}")
for client_prompt in list_prompts_response.prompts:
if client_prompt.name == prompt:
logger.info(f"Found prompt '{prompt}' in client '{name}'")
return client
except McpError:
except McpError as e:
logger.error(f"McpError while listing prompts for client '{name}': {e}", exc_info=True)
continue
except Exception as e:
logger.error(f"Unexpected error while listing prompts for client '{name}': {e}", exc_info=True)
continue
logger.warning(f"Prompt '{prompt}' not found in any active client.")
return None # Explicitly return None if not found


ClientManager = MCPClientManager()
Loading