forked from MervinPraison/PraisonAI
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmcp.py
More file actions
673 lines (567 loc) · 26.4 KB
/
mcp.py
File metadata and controls
673 lines (567 loc) · 26.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
import asyncio
import threading
import queue
import time
import inspect
import shlex
import logging
from praisonaiagents._logging import get_logger
import os
import re
import platform
from typing import Any, List, Optional, Callable, Iterable, Union
from functools import wraps, partial
try:
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
MCP_AVAILABLE = True
except ImportError:
MCP_AVAILABLE = False
ClientSession = None
StdioServerParameters = None
stdio_client = None
class MCPToolRunner(threading.Thread):
"""A dedicated thread for running MCP operations."""
def __init__(self, server_params, timeout=60):
super().__init__(daemon=True)
self.server_params = server_params
self.queue = queue.Queue()
self.result_queue = queue.Queue()
self.initialized = threading.Event()
self.tools = []
self.timeout = timeout
self._tool_timings = {}
self._timings_lock = threading.Lock()
self.start()
def run(self):
"""Main thread function that processes MCP requests."""
asyncio.run(self._run_async())
async def _run_async(self):
"""Async entry point for MCP operations."""
try:
# Set up MCP session
async with stdio_client(self.server_params) as (read, write):
async with ClientSession(read, write) as session:
# Initialize connection
await session.initialize()
# Get tools
tools_result = await session.list_tools()
self.tools = tools_result.tools
# Signal that initialization is complete
self.initialized.set()
# Process requests
while True:
try:
# Check for new requests
try:
item = self.queue.get(block=False)
if item is None: # Shutdown signal
break
tool_name, arguments = item
try:
result = await session.call_tool(tool_name, arguments)
self.result_queue.put((True, result))
except Exception as e:
self.result_queue.put((False, str(e)))
except queue.Empty:
pass
# Give other tasks a chance to run
await asyncio.sleep(0.01)
except asyncio.CancelledError:
break
except Exception as e:
self.initialized.set() # Ensure we don't hang
self.result_queue.put((False, f"MCP initialization error: {str(e)}"))
def call_tool(self, tool_name, arguments):
"""Call an MCP tool and wait for the result."""
# Import telemetry here to avoid circular imports
try:
from ..telemetry.telemetry import get_telemetry
telemetry = get_telemetry()
except (ImportError, AttributeError):
telemetry = None
# Check initialization first (without timing)
if not self.initialized.is_set():
self.initialized.wait(timeout=self.timeout)
if not self.initialized.is_set():
# Track initialization timeout failure
if telemetry:
telemetry.track_tool_usage(tool_name, success=False, execution_time=0)
return f"Error: MCP initialization timed out after {self.timeout} seconds"
# Start timing after initialization check
start_time = time.time()
is_success = False
try:
# Put request in queue
self.queue.put((tool_name, arguments))
# Wait for result
success, result = self.result_queue.get()
if not success:
return f"Error: {result}"
# Process result
if hasattr(result, 'content') and result.content:
if hasattr(result.content[0], 'text'):
processed_result = result.content[0].text
else:
processed_result = str(result.content[0])
else:
processed_result = str(result)
is_success = True
return processed_result
except Exception as e:
return f"Error: {str(e)}"
finally:
# Track timing regardless of success/failure
end_time = time.time()
execution_time = end_time - start_time
# Log timing information for debugging
logging.debug(f"Tool '{tool_name}' execution time: {execution_time:.3f} seconds")
# Store timing in thread-safe manner
with self._timings_lock:
self._tool_timings[tool_name] = execution_time
# Track tool usage with timing information
if telemetry:
telemetry.track_tool_usage(tool_name, success=is_success, execution_time=execution_time)
def shutdown(self):
"""Signal the thread to shut down."""
self.queue.put(None)
class MCP:
"""
Model Context Protocol (MCP) integration for PraisonAI Agents.
This class provides a simple way to connect to MCP servers and use their tools
within PraisonAI agents.
Example:
```python
from praisonaiagents import Agent
from praisonaiagents.mcp import MCP
# Method 1: Using command and args separately
agent = Agent(
instructions="You are a helpful assistant...",
llm="gpt-4o-mini",
tools=MCP(
command="/path/to/python",
args=["/path/to/app.py"]
)
)
# Method 2: Using a single command string
agent = Agent(
instructions="You are a helpful assistant...",
llm="gpt-4o-mini",
tools=MCP("/path/to/python /path/to/app.py")
)
# Method 3: Using an SSE endpoint
agent = Agent(
instructions="You are a helpful assistant...",
llm="gpt-4o-mini",
tools=MCP("http://localhost:8080/sse")
)
agent.start("What is the stock price of Tesla?")
```
"""
def __init__(self, command_or_string=None, args=None, *, command=None, timeout=60, debug=False, **kwargs):
"""
Initialize the MCP connection and get tools.
Args:
command_or_string: Either:
- The command to run the MCP server (e.g., Python path)
- A complete command string (e.g., "/path/to/python /path/to/app.py")
- For NPX: 'npx' command with args for smithery tools
- An SSE URL (e.g., "http://localhost:8080/sse")
args: Arguments to pass to the command (when command_or_string is the command)
command: Alternative parameter name for backward compatibility
timeout: Timeout in seconds for MCP server initialization and tool calls (default: 60)
debug: Enable debug logging for MCP operations (default: False)
**kwargs: Additional parameters for StdioServerParameters
"""
# Check if MCP is available
if not MCP_AVAILABLE:
raise ImportError(
"MCP (Model Context Protocol) package is not installed. "
"Install it with: pip install praisonaiagents[mcp]"
)
# Handle backward compatibility with named parameter 'command'
if command_or_string is None and command is not None:
command_or_string = command
# Set up logging - default to WARNING level to hide INFO messages
if debug:
get_logger("mcp-wrapper").setLevel(logging.DEBUG)
get_logger("mcp-sse").setLevel(logging.DEBUG)
get_logger("mcp.client").setLevel(logging.DEBUG)
get_logger("sse").setLevel(logging.DEBUG)
get_logger("mcp-server").setLevel(logging.DEBUG)
get_logger("mcp-client").setLevel(logging.DEBUG)
get_logger("_client").setLevel(logging.DEBUG)
get_logger("httpx").setLevel(logging.DEBUG)
get_logger("llm").setLevel(logging.DEBUG)
else:
# Set all MCP-related loggers to WARNING level by default
get_logger("mcp-wrapper").setLevel(logging.WARNING)
get_logger("mcp-sse").setLevel(logging.WARNING)
get_logger("mcp.client").setLevel(logging.WARNING)
get_logger("sse").setLevel(logging.WARNING)
get_logger("mcp-server").setLevel(logging.WARNING)
get_logger("mcp-client").setLevel(logging.WARNING)
get_logger("_client").setLevel(logging.WARNING)
get_logger("httpx").setLevel(logging.WARNING)
get_logger("llm").setLevel(logging.WARNING)
# Store additional parameters
self.timeout = timeout
self.debug = debug
# Check if this is a WebSocket URL (ws:// or wss://)
if isinstance(command_or_string, str) and re.match(r'^wss?://', command_or_string):
from .mcp_websocket import WebSocketMCPClient
# Extract auth token if provided
auth_token = kwargs.pop('auth_token', None)
self.websocket_client = WebSocketMCPClient(
command_or_string,
debug=debug,
timeout=timeout,
auth_token=auth_token,
options=kwargs
)
self._tools = list(self.websocket_client.tools)
self.is_sse = False
self.is_http_stream = False
self.is_websocket = True
self.is_npx = False
return
# Check if this is an HTTP URL
if isinstance(command_or_string, str) and re.match(r'^https?://', command_or_string):
# Determine transport type based on URL or kwargs
if command_or_string.endswith('/sse') and 'transport_type' not in kwargs:
# Legacy SSE URL - use SSE transport for backward compatibility
from .mcp_sse import SSEMCPClient
self.sse_client = SSEMCPClient(command_or_string, debug=debug, timeout=timeout)
self._tools = list(self.sse_client.tools)
self.is_sse = True
self.is_http_stream = False
self.is_npx = False
return
else:
# Use HTTP Stream transport for all other HTTP URLs
from .mcp_http_stream import HTTPStreamMCPClient
# Extract transport options from kwargs
transport_options = {}
if 'responseMode' in kwargs:
transport_options['responseMode'] = kwargs.pop('responseMode')
if 'headers' in kwargs:
transport_options['headers'] = kwargs.pop('headers')
if 'cors' in kwargs:
transport_options['cors'] = kwargs.pop('cors')
if 'session' in kwargs:
transport_options['session'] = kwargs.pop('session')
if 'resumability' in kwargs:
transport_options['resumability'] = kwargs.pop('resumability')
self.http_stream_client = HTTPStreamMCPClient(
command_or_string,
debug=debug,
timeout=timeout,
options=transport_options
)
self._tools = list(self.http_stream_client.tools)
self.is_sse = False
self.is_http_stream = True
self.is_npx = False
return
# Handle the single string format for stdio client
if isinstance(command_or_string, str) and args is None:
# Split the string into command and args using shell-like parsing
if platform.system() == 'Windows':
# Use shlex with posix=False for Windows to handle quotes and paths with spaces
parts = shlex.split(command_or_string, posix=False)
# Remove quotes from parts if present (Windows shlex keeps them)
parts = [part.strip('"') for part in parts]
else:
parts = shlex.split(command_or_string)
if not parts:
raise ValueError("Empty command string")
cmd = parts[0]
arguments = parts[1:] if len(parts) > 1 else []
else:
# Use the original format with separate command and args
cmd = command_or_string
arguments = args or []
# Set up stdio client
self.is_sse = False
self.is_http_stream = False
# Ensure UTF-8 encoding in environment for Docker compatibility
env = kwargs.get('env', {})
if not env:
env = os.environ.copy()
# Always set Python encoding
env['PYTHONIOENCODING'] = 'utf-8'
# Only set locale variables on Unix systems
if platform.system() != 'Windows':
env.update({
'LC_ALL': 'C.UTF-8',
'LANG': 'C.UTF-8'
})
kwargs['env'] = env
self.server_params = StdioServerParameters(
command=cmd,
args=arguments,
**kwargs
)
self.runner = MCPToolRunner(self.server_params, timeout)
# Wait for initialization
if not self.runner.initialized.wait(timeout=self.timeout):
print(f"Warning: MCP initialization timed out after {self.timeout} seconds")
# Automatically detect if this is an NPX command
base_cmd = os.path.basename(cmd) if isinstance(cmd, str) else cmd
# Check for npx with or without Windows extensions
npx_variants = ['npx', 'npx.cmd', 'npx.exe']
if platform.system() == 'Windows' and isinstance(base_cmd, str):
# Case-insensitive comparison on Windows
self.is_npx = base_cmd.lower() in [v.lower() for v in npx_variants]
else:
self.is_npx = base_cmd in npx_variants
# For NPX-based MCP servers, use a different approach
if self.is_npx:
self._function_declarations = []
self._initialize_npx_mcp_tools(cmd, arguments)
else:
# Generate tool functions immediately and store them
self._tools = self._generate_tool_functions()
def _generate_tool_functions(self) -> List[Callable]:
"""
Generate functions for each MCP tool.
Returns:
List[Callable]: Functions that can be used as tools
"""
if self.is_sse:
return list(self.sse_client.tools)
if self.is_http_stream:
return list(self.http_stream_client.tools)
tool_functions = []
for tool in self.runner.tools:
wrapper = self._create_tool_wrapper(tool)
tool_functions.append(wrapper)
return tool_functions
def _create_tool_wrapper(self, tool):
"""Create a wrapper function for an MCP tool."""
# Determine parameter names from the schema
param_names = []
param_annotations = {}
required_params = []
if hasattr(tool, 'inputSchema') and tool.inputSchema:
properties = tool.inputSchema.get("properties", {})
required = tool.inputSchema.get("required", [])
for name, prop in properties.items():
param_names.append(name)
# Set annotation based on property type
prop_type = prop.get("type", "string")
if prop_type == "string":
param_annotations[name] = str
elif prop_type == "integer":
param_annotations[name] = int
elif prop_type == "number":
param_annotations[name] = float
elif prop_type == "boolean":
param_annotations[name] = bool
elif prop_type == "array":
param_annotations[name] = list
elif prop_type == "object":
param_annotations[name] = dict
else:
param_annotations[name] = Any
if name in required:
required_params.append(name)
# Create the function signature
# Separate required and optional parameters to ensure proper ordering
# (required parameters must come before optional parameters)
required_param_objects = []
optional_param_objects = []
for name in param_names:
is_required = name in required_params
param = inspect.Parameter(
name=name,
kind=inspect.Parameter.POSITIONAL_OR_KEYWORD,
default=inspect.Parameter.empty if is_required else None,
annotation=param_annotations.get(name, Any)
)
if is_required:
required_param_objects.append(param)
else:
optional_param_objects.append(param)
# Combine parameters with required first, then optional
params = required_param_objects + optional_param_objects
# Create function template to be properly decorated
def template_function(*args, **kwargs):
return None
# Create a proper function with the correct signature
template_function.__signature__ = inspect.Signature(params)
template_function.__annotations__ = param_annotations
template_function.__name__ = tool.name
template_function.__qualname__ = tool.name
template_function.__doc__ = tool.description
# Create the actual function using a decorator
@wraps(template_function)
def wrapper(*args, **kwargs):
# Map positional args to parameter names
all_args = {}
for i, arg in enumerate(args):
if i < len(param_names):
all_args[param_names[i]] = arg
# Add keyword args
all_args.update(kwargs)
# Call the tool
return self.runner.call_tool(tool.name, all_args)
# Make sure the wrapper has the correct signature for inspection
wrapper.__signature__ = inspect.Signature(params)
return wrapper
def _initialize_npx_mcp_tools(self, cmd, arguments):
"""Initialize the NPX MCP tools by extracting tool definitions."""
try:
# For NPX tools, we'll use the same approach as regular MCP tools
# but we need to handle the initialization differently
if self.debug:
logging.debug(f"Initializing NPX MCP tools with command: {cmd} {' '.join(arguments)}")
# Generate tool functions using the regular MCP approach
self._tools = self._generate_tool_functions()
if self.debug:
logging.debug(f"Generated {len(self._tools)} NPX MCP tools")
except Exception as e:
if self.debug:
logging.error(f"Failed to initialize NPX MCP tools: {e}")
raise RuntimeError(f"Failed to initialize NPX MCP tools: {e}")
def __iter__(self) -> Iterable[Callable]:
"""
Allow the MCP instance to be used directly as an iterable of tools.
This makes it possible to pass the MCP instance directly to the Agent's tools parameter.
"""
return iter(self._tools)
def get_tools(self) -> List[Callable]:
"""
Get the list of tool functions from this MCP instance.
This method provides explicit access to the tools list, which is useful
when you need to inspect or manipulate the tools programmatically.
Returns:
List[Callable]: List of tool functions that can be called
Example:
```python
mcp = MCP("npx -y @modelcontextprotocol/server-time")
tools = mcp.get_tools()
for tool in tools:
print(f"Tool: {tool.__name__}")
```
"""
return self._tools
def _fix_array_schemas(self, schema):
"""
Fix array schemas by adding missing 'items' attribute required by OpenAI.
This ensures compatibility with OpenAI's function calling format which
requires array types to specify the type of items they contain.
Args:
schema: The schema dictionary to fix
Returns:
dict: The fixed schema
"""
if not isinstance(schema, dict):
return schema
# Create a copy to avoid modifying the original
fixed_schema = schema.copy()
# Fix array types at the current level
if fixed_schema.get("type") == "array" and "items" not in fixed_schema:
# Add a default items schema for arrays without it
fixed_schema["items"] = {"type": "string"}
# Recursively fix nested schemas
if "properties" in fixed_schema:
fixed_properties = {}
for prop_name, prop_schema in fixed_schema["properties"].items():
fixed_properties[prop_name] = self._fix_array_schemas(prop_schema)
fixed_schema["properties"] = fixed_properties
# Fix items schema if it exists
if "items" in fixed_schema:
fixed_schema["items"] = self._fix_array_schemas(fixed_schema["items"])
return fixed_schema
def to_openai_tool(self):
"""Convert the MCP tool to an OpenAI-compatible tool definition.
This method is specifically invoked by the Agent class when using
provider/model format (e.g., "openai/gpt-4o-mini").
Returns:
dict or list: OpenAI-compatible tool definition(s)
"""
if self.is_sse and hasattr(self, 'sse_client') and self.sse_client.tools:
# Return all tools from SSE client
return self.sse_client.to_openai_tools()
if self.is_http_stream and hasattr(self, 'http_stream_client') and self.http_stream_client.tools:
# Return all tools from HTTP Stream client
return self.http_stream_client.to_openai_tools()
# For simplicity, we'll convert the first tool only if multiple exist
# More complex implementations could handle multiple tools
if not hasattr(self, 'runner') or not self.runner.tools:
logging.warning("No MCP tools available to convert to OpenAI format")
return None
# Convert all tools to OpenAI format
openai_tools = []
for tool in self.runner.tools:
# Create OpenAI tool definition
parameters = {}
if hasattr(tool, 'inputSchema') and tool.inputSchema:
# Fix array schemas to include 'items' attribute
parameters = self._fix_array_schemas(tool.inputSchema)
else:
# Create a minimal schema if none exists
parameters = {
"type": "object",
"properties": {},
"required": []
}
openai_tools.append({
"type": "function",
"function": {
"name": tool.name,
"description": tool.description if hasattr(tool, 'description') else f"Call the {tool.name} tool",
"parameters": parameters
}
})
return openai_tools
def __enter__(self):
"""Context manager entry - return self for use in 'with' statements."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit - clean up resources."""
self.shutdown()
return False # Don't suppress exceptions
def shutdown(self):
"""Explicitly shut down MCP resources.
Call this method when done using the MCP instance to ensure
all background threads and connections are properly cleaned up.
"""
# Shutdown stdio runner if present
if hasattr(self, 'runner') and self.runner is not None:
try:
self.runner.shutdown()
except Exception:
pass # Best effort cleanup
# Shutdown SSE client if present
if hasattr(self, 'sse_client') and self.sse_client is not None:
try:
if hasattr(self.sse_client, 'shutdown'):
self.sse_client.shutdown()
except Exception:
pass
# Shutdown HTTP stream client if present
if hasattr(self, 'http_stream_client') and self.http_stream_client is not None:
try:
if hasattr(self.http_stream_client, 'shutdown'):
self.http_stream_client.shutdown()
except Exception:
pass
# Shutdown WebSocket client if present
if hasattr(self, 'websocket_client') and self.websocket_client is not None:
try:
if hasattr(self.websocket_client, 'shutdown'):
self.websocket_client.shutdown()
except Exception:
pass
def __del__(self):
"""Clean up resources when the object is garbage collected.
Note: __del__ is called during garbage collection and may not
always run. For reliable cleanup, use the context manager
pattern or call shutdown() explicitly.
"""
try:
self.shutdown()
except Exception:
pass # Best effort cleanup in __del__