1
1
from swarms import Agent
2
- from swarms .tools .mcp_integration import MCPServerSseParams
3
2
from loguru import logger
4
3
import sys
5
-
4
+ from swarms . prompts . agent_prompts import MATH_PROMPT
6
5
# Configure logging
7
6
logger .remove ()
8
- logger .add (sys .stdout ,
9
- level = "INFO" ,
10
- format = "{time} | {level} | {module}:{function}:{line} - {message}" )
7
+ logger .add (sys .stdout , level = "INFO" , format = "{time} | {level} | {message}" )
11
8
12
9
# Math prompt for testing MCP integration
13
- MATH_PROMPT = """
14
- You are a math calculator assistant that uses tools to perform calculations.
15
-
16
- When asked for calculations, determine the operation and numbers, then use one of these tools:
17
- - add: Add two numbers
18
- - multiply: Multiply two numbers
19
- - divide: Divide first number by second
20
-
21
- FORMAT as JSON:
22
- {"tool_name": "add", "a": 5, "b": 10}
23
- """
24
10
25
11
26
12
def main ():
27
- """Main function to test MCP integration with Agent."""
13
+ """Test MCP integration with Agent."""
28
14
print ("=== MINIMAL MCP AGENT INTEGRATION TEST ===" )
29
- print ("Testing only the core MCP integration with Agent" )
30
15
31
16
try :
32
- # Create the server parameters correctly
33
- logger .info ("Creating MCP server parameters..." )
34
- mcp_server = MCPServerSseParams (
35
- url = "http://0.0.0.0:8000" ,
36
- headers = {
17
+ # Create the MCP server parameters as a dictionary
18
+ mcp_server = {
19
+ "url" : "http://0.0.0.0:8000" ,
20
+ "headers" : {
37
21
"Content-Type" : "application/json" ,
38
22
"Accept" : "text/event-stream"
39
23
},
40
- timeout = 10.0 ,
41
- sse_read_timeout = 30.0
42
- )
43
-
44
- # Log the server params to verify they're correct
45
- logger .info (f"MCP Server URL: { mcp_server .url } " )
46
- logger .info ("MCP Headers configured" )
24
+ "timeout" : 10.0 ,
25
+ "sse_read_timeout" : 30.0
26
+ }
47
27
48
28
# Create agent with minimal configuration
49
- logger .info ("Creating Agent with MCP integration..." )
50
29
agent = Agent (
51
30
agent_name = "MCP Test Agent" ,
52
31
system_prompt = MATH_PROMPT ,
53
- mcp_servers = [mcp_server ], # Pass server params object
54
- verbose = True )
32
+ mcp_servers = [mcp_server ], # Pass as a list of dictionaries
33
+ model_name = "gpt-4o-mini" ,
34
+ verbose = False # Reduce verbosity to focus on errors
35
+ )
55
36
56
37
print ("\n Agent created successfully!" )
57
38
print ("Enter a math query or 'exit' to quit" )
@@ -62,17 +43,18 @@ def main():
62
43
if query .lower () == 'exit' :
63
44
break
64
45
65
- # Run the agent, which should use the MCP server
66
- logger . info (f"Processing query : { query } " )
46
+ # Run the agent
47
+ print (f"\n Processing : { query } " )
67
48
result = agent .run (query )
68
49
69
50
# Display result
70
51
print (f"\n Result: { result } " )
71
52
72
53
except Exception as e :
73
- logger .error (f"Error during MCP integration test: { e } " , exc_info = True )
74
- print (f"\n ERROR: { type (e ).__name__ } : { str (e )} " )
54
+ logger .error (f"Error: { str (e )} " )
55
+ import traceback
56
+ traceback .print_exc ()
75
57
76
58
77
59
if __name__ == "__main__" :
78
- main ()
60
+ main ()
0 commit comments