-
Notifications
You must be signed in to change notification settings - Fork 7.4k
/
Copy pathrun_mcp.py
116 lines (95 loc) · 3.54 KB
/
run_mcp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#!/usr/bin/env python
import argparse
import asyncio
import sys
from app.agent.mcp import MCPAgent
from app.config import config
from app.logger import logger
class MCPRunner:
"""Runner class for MCP Agent with proper path handling and configuration."""
def __init__(self):
self.root_path = config.root_path
self.server_reference = config.mcp_config.server_reference
self.agent = MCPAgent()
async def initialize(
self,
connection_type: str,
server_url: str | None = None,
) -> None:
"""Initialize the MCP agent with the appropriate connection."""
logger.info(f"Initializing MCPAgent with {connection_type} connection...")
if connection_type == "stdio":
await self.agent.initialize(
connection_type="stdio",
command=sys.executable,
args=["-m", self.server_reference],
)
else: # sse
await self.agent.initialize(connection_type="sse", server_url=server_url)
logger.info(f"Connected to MCP server via {connection_type}")
async def run_interactive(self) -> None:
"""Run the agent in interactive mode."""
print("\nMCP Agent Interactive Mode (type 'exit' to quit)\n")
while True:
user_input = input("\nEnter your request: ")
if user_input.lower() in ["exit", "quit", "q"]:
break
response = await self.agent.run(user_input)
print(f"\nAgent: {response}")
async def run_single_prompt(self, prompt: str) -> None:
"""Run the agent with a single prompt."""
await self.agent.run(prompt)
async def run_default(self) -> None:
"""Run the agent in default mode."""
prompt = input("Enter your prompt: ")
if not prompt.strip():
logger.warning("Empty prompt provided.")
return
logger.warning("Processing your request...")
await self.agent.run(prompt)
logger.info("Request processing completed.")
async def cleanup(self) -> None:
"""Clean up agent resources."""
await self.agent.cleanup()
logger.info("Session ended")
def parse_args() -> argparse.Namespace:
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description="Run the MCP Agent")
parser.add_argument(
"--connection",
"-c",
choices=["stdio", "sse"],
default="stdio",
help="Connection type: stdio or sse",
)
parser.add_argument(
"--server-url",
default="http://127.0.0.1:8000/sse",
help="URL for SSE connection",
)
parser.add_argument(
"--interactive", "-i", action="store_true", help="Run in interactive mode"
)
parser.add_argument("--prompt", "-p", help="Single prompt to execute and exit")
return parser.parse_args()
async def run_mcp() -> None:
"""Main entry point for the MCP runner."""
args = parse_args()
runner = MCPRunner()
try:
await runner.initialize(args.connection, args.server_url)
if args.prompt:
await runner.run_single_prompt(args.prompt)
elif args.interactive:
await runner.run_interactive()
else:
await runner.run_default()
except KeyboardInterrupt:
logger.info("Program interrupted by user")
except Exception as e:
logger.error(f"Error running MCPAgent: {str(e)}", exc_info=True)
sys.exit(1)
finally:
await runner.cleanup()
if __name__ == "__main__":
asyncio.run(run_mcp())