1-
1+ import os
2+ import logging
3+ import asyncio
4+ import time
5+ import sys
6+ import inspect
7+ import json
8+ from typing import List , Dict , Any , Optional , Callable
9+ from contextlib import AsyncExitStack
10+
11+ from mcp import ClientSession
12+ from mcp .client .sse import sse_client
13+
14+ from praisonaiagents import Agent
15+
16+ # Set up logging based on environment variable
17+ log_level = os .environ .get ("LOGLEVEL" , "info" ).upper ()
18+ logging .basicConfig (level = getattr (logging , log_level ))
19+ logger = logging .getLogger ("mcp-client" )
20+
21+ # Create a custom prompt that explicitly mentions the tools
22+ system_prompt = """You are a helpful assistant that can provide greetings and check weather information.
23+
24+ You have access to the following tools:
25+ 1. get_greeting(name: str) - Get a personalized greeting for a given name
26+ 2. get_weather(city: str) - Get weather information for a city (Paris, London, New York, Tokyo, Sydney)
27+
28+ When asked about weather, always use the get_weather tool with the appropriate city.
29+ When asked for a greeting, always use the get_greeting tool with the appropriate name.
30+ """
31+
32+ # Global event loop for async operations
33+ event_loop = None
34+
35+ def get_event_loop ():
36+ """Get or create a global event loop."""
37+ global event_loop
38+ if event_loop is None or event_loop .is_closed ():
39+ event_loop = asyncio .new_event_loop ()
40+ asyncio .set_event_loop (event_loop )
41+ return event_loop
42+
43+ class SSEMCPTool :
44+ """A wrapper for an MCP tool that can be used with praisonaiagents."""
45+
46+ def __init__ (self , name : str , description : str , session : ClientSession , input_schema : Optional [Dict [str , Any ]] = None ):
47+ self .name = name
48+ self .__name__ = name # Required for Agent to recognize it as a tool
49+ self .__qualname__ = name # Required for Agent to recognize it as a tool
50+ self .__doc__ = description # Required for Agent to recognize it as a tool
51+ self .description = description
52+ self .session = session
53+ self .input_schema = input_schema or {}
54+
55+ # Create a signature based on input schema
56+ params = []
57+ if input_schema and 'properties' in input_schema :
58+ for param_name in input_schema ['properties' ]:
59+ params .append (
60+ inspect .Parameter (
61+ name = param_name ,
62+ kind = inspect .Parameter .POSITIONAL_OR_KEYWORD ,
63+ default = inspect .Parameter .empty if param_name in input_schema .get ('required' , []) else None ,
64+ annotation = str # Default to string
65+ )
66+ )
67+
68+ self .__signature__ = inspect .Signature (params )
69+
70+ def __call__ (self , ** kwargs ):
71+ """Synchronous wrapper for the async call."""
72+ logger .debug (f"Tool { self .name } called with args: { kwargs } " )
73+
74+ # Use the global event loop
75+ loop = get_event_loop ()
76+
77+ # Run the async call in the event loop
78+ future = asyncio .run_coroutine_threadsafe (self ._async_call (** kwargs ), loop )
79+ try :
80+ # Wait for the result with a timeout
81+ return future .result (timeout = 30 )
82+ except Exception as e :
83+ logger .error (f"Error calling tool { self .name } : { e } " )
84+ return f"Error: { str (e )} "
85+
86+ async def _async_call (self , ** kwargs ):
87+ """Call the tool with the provided arguments."""
88+ logger .debug (f"Async calling tool { self .name } with args: { kwargs } " )
89+ try :
90+ result = await self .session .call_tool (self .name , kwargs )
91+
92+ # Extract text from result
93+ if hasattr (result , 'content' ) and result .content :
94+ if hasattr (result .content [0 ], 'text' ):
95+ return result .content [0 ].text
96+ return str (result .content [0 ])
97+ return str (result )
98+ except Exception as e :
99+ logger .error (f"Error in _async_call for { self .name } : { e } " )
100+ raise
101+
102+ def to_openai_tool (self ):
103+ """Convert the tool to OpenAI format."""
104+ return {
105+ "type" : "function" ,
106+ "function" : {
107+ "name" : self .name ,
108+ "description" : self .description ,
109+ "parameters" : self .input_schema
110+ }
111+ }
112+
113+
114+ class SSEMCPClient :
115+ """A client for connecting to an MCP server over SSE."""
116+
117+ def __init__ (self , server_url : str ):
118+ self .server_url = server_url
119+ self .session = None
120+ self .tools = []
121+ self ._initialize ()
122+
123+ def _initialize (self ):
124+ """Initialize the connection and tools."""
125+ # Use the global event loop
126+ loop = get_event_loop ()
127+
128+ # Start a background thread to run the event loop
129+ def run_event_loop ():
130+ asyncio .set_event_loop (loop )
131+ loop .run_forever ()
132+
133+ import threading
134+ self .loop_thread = threading .Thread (target = run_event_loop , daemon = True )
135+ self .loop_thread .start ()
136+
137+ # Run the initialization in the event loop
138+ future = asyncio .run_coroutine_threadsafe (self ._async_initialize (), loop )
139+ self .tools = future .result (timeout = 30 )
140+
141+ async def _async_initialize (self ):
142+ """Asynchronously initialize the connection and tools."""
143+ logger .debug (f"Connecting to MCP server at { self .server_url } " )
144+
145+ # Create SSE client
146+ self ._streams_context = sse_client (url = self .server_url )
147+ streams = await self ._streams_context .__aenter__ ()
148+
149+ self ._session_context = ClientSession (* streams )
150+ self .session = await self ._session_context .__aenter__ ()
151+
152+ # Initialize
153+ await self .session .initialize ()
154+
155+ # List available tools
156+ logger .debug ("Listing tools..." )
157+ response = await self .session .list_tools ()
158+ tools_data = response .tools
159+ logger .debug (f"Found { len (tools_data )} tools: { [tool .name for tool in tools_data ]} " )
160+
161+ # Create tool wrappers
162+ tools = []
163+ for tool in tools_data :
164+ input_schema = tool .inputSchema if hasattr (tool , 'inputSchema' ) else None
165+ wrapper = SSEMCPTool (
166+ name = tool .name ,
167+ description = tool .description if hasattr (tool , 'description' ) else f"Call the { tool .name } tool" ,
168+ session = self .session ,
169+ input_schema = input_schema
170+ )
171+ tools .append (wrapper )
172+
173+ return tools
174+
175+ def __iter__ (self ):
176+ """Return an iterator over the tools."""
177+ return iter (self .tools )
178+
179+
180+ def main ():
181+ # Server URL
182+ server_url = "http://0.0.0.0:8080/sse"
183+
184+ try :
185+ # Connect to the MCP server
186+ client = SSEMCPClient (server_url )
187+
188+ if not client .tools :
189+ logger .error ("No tools found on the server" )
190+ return
191+
192+ logger .info (f"Connected to server with { len (client .tools )} tools: { [tool .name for tool in client .tools ]} " )
193+
194+ # Create OpenAI-compatible tool definitions
195+ openai_tools = [tool .to_openai_tool () for tool in client .tools ]
196+ logger .debug (f"OpenAI tools: { json .dumps (openai_tools , indent = 2 )} " )
197+
198+ # Create an agent with the tools
199+ assistant_agent = Agent (
200+ instructions = system_prompt ,
201+ llm = "openai/gpt-4o-mini" ,
202+ tools = client .tools ,
203+ verbose = True
204+ )
205+
206+ # Start the agent with a query
207+ logger .info ("Starting agent with query about weather in Paris" )
208+ result = assistant_agent .chat (
209+ "Hello! Can you tell me what the weather is like in Paris today?" ,
210+ tools = openai_tools
211+ )
212+
213+ logger .info (f"Agent response: { result } " )
214+
215+ except Exception as e :
216+ logger .error (f"Error: { e } " )
217+ import traceback
218+ traceback .print_exc ()
219+
220+
221+ if __name__ == "__main__" :
222+ main ()
0 commit comments