(برای مشاهده ویدئوی این درس روی تصویر بالا کلیک کنید)
این درس بر روی بهترین شیوههای پیشرفته برای توسعه، تست و استقرار سرورهای MCP و ویژگیها در محیطهای تولید تمرکز دارد. با افزایش پیچیدگی و اهمیت اکوسیستمهای MCP، پیروی از الگوهای استاندارد، قابلیت اطمینان، نگهداری و تعاملپذیری را تضمین میکند. این درس دانش عملی بهدستآمده از پیادهسازیهای واقعی MCP را جمعآوری کرده تا شما را در ایجاد سرورهای قوی و کارآمد با منابع، درخواستها و ابزارهای مؤثر راهنمایی کند.
در پایان این درس، شما قادر خواهید بود:
- بهترین شیوههای طراحی سرور و ویژگیهای MCP را به کار ببرید
- استراتژیهای تست جامع برای سرورهای MCP ایجاد کنید
- الگوهای کاری کارآمد و قابل استفاده مجدد برای برنامههای پیچیده MCP طراحی کنید
- مدیریت صحیح خطا، ثبت وقایع و مشاهدهپذیری را در سرورهای MCP پیادهسازی کنید
- پیادهسازیهای MCP را برای عملکرد، امنیت و نگهداری بهینه کنید
قبل از ورود به شیوههای خاص پیادهسازی، درک اصول اصلی که توسعه مؤثر MCP را هدایت میکنند، مهم است:
-
ارتباط استانداردسازیشده: MCP از JSON-RPC 2.0 بهعنوان پایه خود استفاده میکند و یک قالب یکسان برای درخواستها، پاسخها و مدیریت خطا در تمام پیادهسازیها ارائه میدهد.
-
طراحی کاربرمحور: همیشه رضایت، کنترل و شفافیت کاربر را در پیادهسازیهای MCP خود در اولویت قرار دهید.
-
امنیت در اولویت: اقدامات امنیتی قوی از جمله احراز هویت، مجوزدهی، اعتبارسنجی و محدودیت نرخ را پیادهسازی کنید.
-
معماری ماژولار: سرورهای MCP خود را با رویکرد ماژولار طراحی کنید، بهطوری که هر ابزار و منبع هدف مشخص و متمرکزی داشته باشد.
-
اتصالات حالتدار: از توانایی MCP برای حفظ حالت در چندین درخواست استفاده کنید تا تعاملات منسجمتر و آگاهانهتر از زمینه ایجاد شود.
بهترین شیوههای زیر از مستندات رسمی پروتکل مدل زمینه (MCP) استخراج شدهاند:
-
رضایت و کنترل کاربر: همیشه رضایت صریح کاربر را قبل از دسترسی به دادهها یا انجام عملیاتها بخواهید. کنترل واضحی بر روی دادههای به اشتراک گذاشتهشده و اقدامات مجاز ارائه دهید.
-
حریم خصوصی دادهها: فقط با رضایت صریح کاربر دادهها را افشا کنید و با کنترلهای دسترسی مناسب از آنها محافظت کنید. از انتقال غیرمجاز دادهها جلوگیری کنید.
-
ایمنی ابزارها: قبل از فراخوانی هر ابزاری، رضایت صریح کاربر را بخواهید. اطمینان حاصل کنید که کاربران عملکرد هر ابزار را درک میکنند و مرزهای امنیتی قوی را اعمال کنید.
-
کنترل مجوز ابزارها: مشخص کنید که کدام ابزارها در طول یک جلسه برای مدل مجاز هستند و اطمینان حاصل کنید که فقط ابزارهای صریحاً مجاز در دسترس باشند.
-
احراز هویت: قبل از اعطای دسترسی به ابزارها، منابع یا عملیات حساس، احراز هویت مناسب را با استفاده از کلیدهای API، توکنهای OAuth یا روشهای امن دیگر الزامی کنید.
-
اعتبارسنجی پارامترها: اعتبارسنجی را برای تمام فراخوانیهای ابزار اعمال کنید تا از رسیدن ورودیهای نادرست یا مخرب به پیادهسازی ابزار جلوگیری شود.
-
محدودیت نرخ: محدودیت نرخ را برای جلوگیری از سوءاستفاده و اطمینان از استفاده منصفانه از منابع سرور پیادهسازی کنید.
-
مذاکره قابلیتها: در طول راهاندازی اتصال، اطلاعاتی درباره ویژگیهای پشتیبانیشده، نسخههای پروتکل، ابزارها و منابع موجود تبادل کنید.
-
طراحی ابزارها: ابزارهایی ایجاد کنید که بر یک کار خاص تمرکز داشته باشند، بهجای ابزارهای بزرگ که چندین موضوع را مدیریت میکنند.
-
مدیریت خطا: پیامها و کدهای خطای استانداردی پیادهسازی کنید تا به تشخیص مشکلات کمک کرده، شکستها را بهخوبی مدیریت کنید و بازخورد عملی ارائه دهید.
-
ثبت وقایع: ثبت وقایع ساختاریافته را برای حسابرسی، اشکالزدایی و نظارت بر تعاملات پروتکل پیکربندی کنید.
-
ردیابی پیشرفت: برای عملیات طولانیمدت، بهروزرسانیهای پیشرفت را گزارش دهید تا رابطهای کاربری پاسخگو ایجاد کنید.
-
لغو درخواستها: به مشتریان اجازه دهید درخواستهای در حال اجرا را که دیگر موردنیاز نیستند یا زمان زیادی میبرند، لغو کنند.
برای اطلاعات بهروزتر درباره بهترین شیوههای MCP، به منابع زیر مراجعه کنید:
هر ابزار MCP باید هدف مشخص و متمرکزی داشته باشد. بهجای ایجاد ابزارهای بزرگ که چندین موضوع را مدیریت میکنند، ابزارهای تخصصی ایجاد کنید که در وظایف خاص خود برتری داشته باشند.
// A focused tool that does one thing well
public class WeatherForecastTool : ITool
{
private readonly IWeatherService _weatherService;
public WeatherForecastTool(IWeatherService weatherService)
{
_weatherService = weatherService;
}
public string Name => "weatherForecast";
public string Description => "Gets weather forecast for a specific location";
public ToolDefinition GetDefinition()
{
return new ToolDefinition
{
Name = Name,
Description = Description,
Parameters = new Dictionary<string, ParameterDefinition>
{
["location"] = new ParameterDefinition
{
Type = ParameterType.String,
Description = "City or location name"
},
["days"] = new ParameterDefinition
{
Type = ParameterType.Integer,
Description = "Number of forecast days",
Default = 3
}
},
Required = new[] { "location" }
};
}
public async Task<ToolResponse> ExecuteAsync(IDictionary<string, object> parameters)
{
var location = parameters["location"].ToString();
var days = parameters.ContainsKey("days")
? Convert.ToInt32(parameters["days"])
: 3;
var forecast = await _weatherService.GetForecastAsync(location, days);
return new ToolResponse
{
Content = new List<ContentItem>
{
new TextContent(JsonSerializer.Serialize(forecast))
}
};
}
}مدیریت خطای قوی با پیامهای خطای اطلاعاتی و مکانیزمهای بازیابی مناسب پیادهسازی کنید.
# Python example with comprehensive error handling
class DataQueryTool:
def get_name(self):
return "dataQuery"
def get_description(self):
return "Queries data from specified database tables"
async def execute(self, parameters):
try:
# Parameter validation
if "query" not in parameters:
raise ToolParameterError("Missing required parameter: query")
query = parameters["query"]
# Security validation
if self._contains_unsafe_sql(query):
raise ToolSecurityError("Query contains potentially unsafe SQL")
try:
# Database operation with timeout
async with timeout(10): # 10 second timeout
result = await self._database.execute_query(query)
return ToolResponse(
content=[TextContent(json.dumps(result))]
)
except asyncio.TimeoutError:
raise ToolExecutionError("Database query timed out after 10 seconds")
except DatabaseConnectionError as e:
# Connection errors might be transient
self._log_error("Database connection error", e)
raise ToolExecutionError(f"Database connection error: {str(e)}")
except DatabaseQueryError as e:
# Query errors are likely client errors
self._log_error("Database query error", e)
raise ToolExecutionError(f"Invalid query: {str(e)}")
except ToolError:
# Let tool-specific errors pass through
raise
except Exception as e:
# Catch-all for unexpected errors
self._log_error("Unexpected error in DataQueryTool", e)
raise ToolExecutionError(f"An unexpected error occurred: {str(e)}")
def _contains_unsafe_sql(self, query):
# Implementation of SQL injection detection
pass
def _log_error(self, message, error):
# Implementation of error logging
passهمیشه پارامترها را بهدقت اعتبارسنجی کنید تا از ورودیهای نادرست یا مخرب جلوگیری شود.
// JavaScript/TypeScript example with detailed parameter validation
class FileOperationTool {
getName() {
return "fileOperation";
}
getDescription() {
return "Performs file operations like read, write, and delete";
}
getDefinition() {
return {
name: this.getName(),
description: this.getDescription(),
parameters: {
operation: {
type: "string",
description: "Operation to perform",
enum: ["read", "write", "delete"]
},
path: {
type: "string",
description: "File path (must be within allowed directories)"
},
content: {
type: "string",
description: "Content to write (only for write operation)",
optional: true
}
},
required: ["operation", "path"]
};
}
async execute(parameters) {
// 1. Validate parameter presence
if (!parameters.operation) {
throw new ToolError("Missing required parameter: operation");
}
if (!parameters.path) {
throw new ToolError("Missing required parameter: path");
}
// 2. Validate parameter types
if (typeof parameters.operation !== "string") {
throw new ToolError("Parameter 'operation' must be a string");
}
if (typeof parameters.path !== "string") {
throw new ToolError("Parameter 'path' must be a string");
}
// 3. Validate parameter values
const validOperations = ["read", "write", "delete"];
if (!validOperations.includes(parameters.operation)) {
throw new ToolError(`Invalid operation. Must be one of: ${validOperations.join(", ")}`);
}
// 4. Validate content presence for write operation
if (parameters.operation === "write" && !parameters.content) {
throw new ToolError("Content parameter is required for write operation");
}
// 5. Path safety validation
if (!this.isPathWithinAllowedDirectories(parameters.path)) {
throw new ToolError("Access denied: path is outside of allowed directories");
}
// Implementation based on validated parameters
// ...
}
isPathWithinAllowedDirectories(path) {
// Implementation of path safety check
// ...
}
}// Java example with authentication and authorization
public class SecureDataAccessTool implements Tool {
private final AuthenticationService authService;
private final AuthorizationService authzService;
private final DataService dataService;
// Dependency injection
public SecureDataAccessTool(
AuthenticationService authService,
AuthorizationService authzService,
DataService dataService) {
this.authService = authService;
this.authzService = authzService;
this.dataService = dataService;
}
@Override
public String getName() {
return "secureDataAccess";
}
@Override
public ToolResponse execute(ToolRequest request) {
// 1. Extract authentication context
String authToken = request.getContext().getAuthToken();
// 2. Authenticate user
UserIdentity user;
try {
user = authService.validateToken(authToken);
} catch (AuthenticationException e) {
return ToolResponse.error("Authentication failed: " + e.getMessage());
}
// 3. Check authorization for the specific operation
String dataId = request.getParameters().get("dataId").getAsString();
String operation = request.getParameters().get("operation").getAsString();
boolean isAuthorized = authzService.isAuthorized(user, "data:" + dataId, operation);
if (!isAuthorized) {
return ToolResponse.error("Access denied: Insufficient permissions for this operation");
}
// 4. Proceed with authorized operation
try {
switch (operation) {
case "read":
Object data = dataService.getData(dataId, user.getId());
return ToolResponse.success(data);
case "update":
JsonNode newData = request.getParameters().get("newData");
dataService.updateData(dataId, newData, user.getId());
return ToolResponse.success("Data updated successfully");
default:
return ToolResponse.error("Unsupported operation: " + operation);
}
} catch (Exception e) {
return ToolResponse.error("Operation failed: " + e.getMessage());
}
}
}// C# rate limiting implementation
public class RateLimitingMiddleware
{
private readonly RequestDelegate _next;
private readonly IMemoryCache _cache;
private readonly ILogger<RateLimitingMiddleware> _logger;
// Configuration options
private readonly int _maxRequestsPerMinute;
public RateLimitingMiddleware(
RequestDelegate next,
IMemoryCache cache,
ILogger<RateLimitingMiddleware> logger,
IConfiguration config)
{
_next = next;
_cache = cache;
_logger = logger;
_maxRequestsPerMinute = config.GetValue<int>("RateLimit:MaxRequestsPerMinute", 60);
}
public async Task InvokeAsync(HttpContext context)
{
// 1. Get client identifier (API key or user ID)
string clientId = GetClientIdentifier(context);
// 2. Get rate limiting key for this minute
string cacheKey = $"rate_limit:{clientId}:{DateTime.UtcNow:yyyyMMddHHmm}";
// 3. Check current request count
if (!_cache.TryGetValue(cacheKey, out int requestCount))
{
requestCount = 0;
}
// 4. Enforce rate limit
if (requestCount >= _maxRequestsPerMinute)
{
_logger.LogWarning("Rate limit exceeded for client {ClientId}", clientId);
context.Response.StatusCode = StatusCodes.Status429TooManyRequests;
context.Response.Headers.Add("Retry-After", "60");
await context.Response.WriteAsJsonAsync(new
{
error = "Rate limit exceeded",
message = "Too many requests. Please try again later.",
retryAfterSeconds = 60
});
return;
}
// 5. Increment request count
_cache.Set(cacheKey, requestCount + 1, TimeSpan.FromMinutes(2));
// 6. Add rate limit headers
context.Response.Headers.Add("X-RateLimit-Limit", _maxRequestsPerMinute.ToString());
context.Response.Headers.Add("X-RateLimit-Remaining", (_maxRequestsPerMinute - requestCount - 1).ToString());
// 7. Continue with the request
await _next(context);
}
private string GetClientIdentifier(HttpContext context)
{
// Implementation to extract API key or user ID
// ...
}
}همیشه ابزارهای خود را بهصورت جداگانه تست کنید و وابستگیهای خارجی را شبیهسازی کنید:
// TypeScript example of a tool unit test
describe('WeatherForecastTool', () => {
let tool: WeatherForecastTool;
let mockWeatherService: jest.Mocked<IWeatherService>;
beforeEach(() => {
// Create a mock weather service
mockWeatherService = {
getForecasts: jest.fn()
} as any;
// Create the tool with the mock dependency
tool = new WeatherForecastTool(mockWeatherService);
});
it('should return weather forecast for a location', async () => {
// Arrange
const mockForecast = {
location: 'Seattle',
forecasts: [
{ date: '2025-07-16', temperature: 72, conditions: 'Sunny' },
{ date: '2025-07-17', temperature: 68, conditions: 'Partly Cloudy' },
{ date: '2025-07-18', temperature: 65, conditions: 'Rain' }
]
};
mockWeatherService.getForecasts.mockResolvedValue(mockForecast);
// Act
const response = await tool.execute({
location: 'Seattle',
days: 3
});
// Assert
expect(mockWeatherService.getForecasts).toHaveBeenCalledWith('Seattle', 3);
expect(response.content[0].text).toContain('Seattle');
expect(response.content[0].text).toContain('Sunny');
});
it('should handle errors from the weather service', async () => {
// Arrange
mockWeatherService.getForecasts.mockRejectedValue(new Error('Service unavailable'));
// Act & Assert
await expect(tool.execute({
location: 'Seattle',
days: 3
})).rejects.toThrow('Weather service error: Service unavailable');
});
});جریان کامل از درخواستهای مشتری تا پاسخهای سرور را تست کنید:
# Python integration test example
@pytest.mark.asyncio
async def test_mcp_server_integration():
# Start a test server
server = McpServer()
server.register_tool(WeatherForecastTool(MockWeatherService()))
await server.start(port=5000)
try:
# Create a client
client = McpClient("http://localhost:5000")
# Test tool discovery
tools = await client.discover_tools()
assert "weatherForecast" in [t.name for t in tools]
# Test tool execution
response = await client.execute_tool("weatherForecast", {
"location": "Seattle",
"days": 3
})
# Verify response
assert response.status_code == 200
assert "Seattle" in response.content[0].text
assert len(json.loads(response.content[0].text)["forecasts"]) == 3
finally:
# Clean up
await server.stop()کش مناسب را برای کاهش تأخیر و استفاده از منابع پیادهسازی کنید:
// C# example with caching
public class CachedWeatherTool : ITool
{
private readonly IWeatherService _weatherService;
private readonly IDistributedCache _cache;
private readonly ILogger<CachedWeatherTool> _logger;
public CachedWeatherTool(
IWeatherService weatherService,
IDistributedCache cache,
ILogger<CachedWeatherTool> logger)
{
_weatherService = weatherService;
_cache = cache;
_logger = logger;
}
public string Name => "weatherForecast";
public async Task<ToolResponse> ExecuteAsync(IDictionary<string, object> parameters)
{
var location = parameters["location"].ToString();
var days = Convert.ToInt32(parameters.GetValueOrDefault("days", 3));
// Create cache key
string cacheKey = $"weather:{location}:{days}";
// Try to get from cache
string cachedForecast = await _cache.GetStringAsync(cacheKey);
if (!string.IsNullOrEmpty(cachedForecast))
{
_logger.LogInformation("Cache hit for weather forecast: {Location}", location);
return new ToolResponse
{
Content = new List<ContentItem>
{
new TextContent(cachedForecast)
}
};
}
// Cache miss - get from service
_logger.LogInformation("Cache miss for weather forecast: {Location}", location);
var forecast = await _weatherService.GetForecastAsync(location, days);
string forecastJson = JsonSerializer.Serialize(forecast);
// Store in cache (weather forecasts valid for 1 hour)
await _cache.SetStringAsync(
cacheKey,
forecastJson,
new DistributedCacheEntryOptions
{
AbsoluteExpirationRelativeToNow = TimeSpan.FromHours(1)
});
return new ToolResponse
{
Content = new List<ContentItem>
{
new TextContent(forecastJson)
}
};
}
}ابزارها را طوری طراحی کنید که وابستگیهای خود را از طریق تزریق سازنده دریافت کنند، تا قابل تست و پیکربندی باشند:
// Java example with dependency injection
public class CurrencyConversionTool implements Tool {
private final ExchangeRateService exchangeService;
private final CacheService cacheService;
private final Logger logger;
// Dependencies injected through constructor
public CurrencyConversionTool(
ExchangeRateService exchangeService,
CacheService cacheService,
Logger logger) {
this.exchangeService = exchangeService;
this.cacheService = cacheService;
this.logger = logger;
}
// Tool implementation
// ...
}ابزارهایی طراحی کنید که بتوانند با هم ترکیب شوند تا جریانهای کاری پیچیدهتری ایجاد کنند:
# Python example showing composable tools
class DataFetchTool(Tool):
def get_name(self):
return "dataFetch"
# Implementation...
class DataAnalysisTool(Tool):
def get_name(self):
return "dataAnalysis"
# This tool can use results from the dataFetch tool
async def execute_async(self, request):
# Implementation...
pass
class DataVisualizationTool(Tool):
def get_name(self):
return "dataVisualize"
# This tool can use results from the dataAnalysis tool
async def execute_async(self, request):
# Implementation...
pass
# These tools can be used independently or as part of a workflowطرحواره قرارداد بین مدل و ابزار شماست. طرحوارههای خوب طراحیشده منجر به استفاده بهتر از ابزار میشوند.
همیشه اطلاعات توصیفی برای هر پارامتر ارائه دهید:
public object GetSchema()
{
return new {
type = "object",
properties = new {
query = new {
type = "string",
description = "Search query text. Use precise keywords for better results."
},
filters = new {
type = "object",
description = "Optional filters to narrow down search results",
properties = new {
dateRange = new {
type = "string",
description = "Date range in format YYYY-MM-DD:YYYY-MM-DD"
},
category = new {
type = "string",
description = "Category name to filter by"
}
}
},
limit = new {
type = "integer",
description = "Maximum number of results to return (1-50)",
default = 10
}
},
required = new[] { "query" }
};
}محدودیتهای اعتبارسنجی را برای جلوگیری از ورودیهای نامعتبر اضافه کنید:
Map<String, Object> getSchema() {
Map<String, Object> schema = new HashMap<>();
schema.put("type", "object");
Map<String, Object> properties = new HashMap<>();
// Email property with format validation
Map<String, Object> email = new HashMap<>();
email.put("type", "string");
email.put("format", "email");
email.put("description", "User email address");
// Age property with numeric constraints
Map<String, Object> age = new HashMap<>();
age.put("type", "integer");
age.put("minimum", 13);
age.put("maximum", 120);
age.put("description", "User age in years");
// Enumerated property
Map<String, Object> subscription = new HashMap<>();
subscription.put("type", "string");
subscription.put("enum", Arrays.asList("free", "basic", "premium"));
subscription.put("default", "free");
subscription.put("description", "Subscription tier");
properties.put("email", email);
properties.put("age", age);
properties.put("subscription", subscription);
schema.put("properties", properties);
schema.put("required", Arrays.asList("email"));
return schema;
}ثبات در ساختارهای پاسخ را حفظ کنید تا تفسیر نتایج برای مدلها آسانتر شود:
async def execute_async(self, request):
try:
# Process request
results = await self._search_database(request.parameters["query"])
# Always return a consistent structure
return ToolResponse(
result={
"matches": [self._format_item(item) for item in results],
"totalCount": len(results),
"queryTime": calculation_time_ms,
"status": "success"
}
)
except Exception as e:
return ToolResponse(
result={
"matches": [],
"totalCount": 0,
"queryTime": 0,
"status": "error",
"error": str(e)
}
)
def _format_item(self, item):
"""Ensures each item has a consistent structure"""
return {
"id": item.id,
"title": item.title,
"summary": item.summary[:100] + "..." if len(item.summary) > 100 else item.summary,
"url": item.url,
"relevance": item.score
}مدیریت خطای قوی برای ابزارهای MCP جهت حفظ قابلیت اطمینان ضروری است.
خطاها را در سطوح مناسب مدیریت کنید و پیامهای اطلاعاتی ارائه دهید:
public async Task<ToolResponse> ExecuteAsync(ToolRequest request)
{
try
{
string fileId = request.Parameters.GetProperty("fileId").GetString();
try
{
var fileData = await _fileService.GetFileAsync(fileId);
return new ToolResponse {
Result = JsonSerializer.SerializeToElement(fileData)
};
}
catch (FileNotFoundException)
{
throw new ToolExecutionException($"File not found: {fileId}");
}
catch (UnauthorizedAccessException)
{
throw new ToolExecutionException("You don't have permission to access this file");
}
catch (Exception ex) when (ex is IOException || ex is TimeoutException)
{
_logger.LogError(ex, "Error accessing file {FileId}", fileId);
throw new ToolExecutionException("Error accessing file: The service is temporarily unavailable");
}
}
catch (JsonException)
{
throw new ToolExecutionException("Invalid file ID format");
}
catch (Exception ex)
{
_logger.LogError(ex, "Unexpected error in FileAccessTool");
throw new ToolExecutionException("An unexpected error occurred");
}
}در صورت امکان اطلاعات خطای ساختاریافته بازگردانید:
@Override
public ToolResponse execute(ToolRequest request) {
try {
// Implementation
} catch (Exception ex) {
Map<String, Object> errorResult = new HashMap<>();
errorResult.put("success", false);
if (ex instanceof ValidationException) {
ValidationException validationEx = (ValidationException) ex;
errorResult.put("errorType", "validation");
errorResult.put("errorMessage", validationEx.getMessage());
errorResult.put("validationErrors", validationEx.getErrors());
return new ToolResponse.Builder()
.setResult(errorResult)
.build();
}
// Re-throw other exceptions as ToolExecutionException
throw new ToolExecutionException("Tool execution failed: " + ex.getMessage(), ex);
}
}منطق تلاش مجدد مناسب برای شکستهای موقتی پیادهسازی کنید:
async def execute_async(self, request):
max_retries = 3
retry_count = 0
base_delay = 1 # seconds
while retry_count < max_retries:
try:
# Call external API
return await self._call_api(request.parameters)
except TransientError as e:
retry_count += 1
if retry_count >= max_retries:
raise ToolExecutionException(f"Operation failed after {max_retries} attempts: {str(e)}")
# Exponential backoff
delay = base_delay * (2 ** (retry_count - 1))
logging.warning(f"Transient error, retrying in {delay}s: {str(e)}")
await asyncio.sleep(delay)
except Exception as e:
# Non-transient error, don't retry
raise ToolExecutionException(f"Operation failed: {str(e)}")برای عملیاتهای پرهزینه کش پیادهسازی کنید:
public class CachedDataTool : IMcpTool
{
private readonly IDatabase _database;
private readonly IMemoryCache _cache;
public CachedDataTool(IDatabase database, IMemoryCache cache)
{
_database = database;
_cache = cache;
}
public async Task<ToolResponse> ExecuteAsync(ToolRequest request)
{
var query = request.Parameters.GetProperty("query").GetString();
// Create cache key based on parameters
var cacheKey = $"data_query_{ComputeHash(query)}";
// Try to get from cache first
if (_cache.TryGetValue(cacheKey, out var cachedResult))
{
return new ToolResponse { Result = cachedResult };
}
// Cache miss - perform actual query
var result = await _database.QueryAsync(query);
// Store in cache with expiration
var cacheOptions = new MemoryCacheEntryOptions()
.SetAbsoluteExpiration(TimeSpan.FromMinutes(15));
_cache.Set(cacheKey, JsonSerializer.SerializeToElement(result), cacheOptions);
return new ToolResponse { Result = JsonSerializer.SerializeToElement(result) };
}
private string ComputeHash(string input)
{
// Implementation to generate stable hash for cache key
}
}الگوهای برنامهنویسی غیرهمزمان را برای عملیاتهای وابسته به I/O استفاده کنید:
public class AsyncDocumentProcessingTool implements Tool {
private final DocumentService documentService;
private final ExecutorService executorService;
@Override
public ToolResponse execute(ToolRequest request) {
String documentId = request.getParameters().get("documentId").asText();
// For long-running operations, return a processing ID immediately
String processId = UUID.randomUUID().toString();
// Start async processing
CompletableFuture.runAsync(() -> {
try {
// Perform long-running operation
documentService.processDocument(documentId);
// Update status (would typically be stored in a database)
processStatusRepository.updateStatus(processId, "completed");
} catch (Exception ex) {
processStatusRepository.updateStatus(processId, "failed", ex.getMessage());
}
}, executorService);
// Return immediate response with process ID
Map<String, Object> result = new HashMap<>();
result.put("processId", processId);
result.put("status", "processing");
result.put("estimatedCompletionTime", ZonedDateTime.now().plusMinutes(5));
return new ToolResponse.Builder().setResult(result).build();
}
// Companion status check tool
public class ProcessStatusTool implements Tool {
@Override
public ToolResponse execute(ToolRequest request) {
String processId = request.getParameters().get("processId").asText();
ProcessStatus status = processStatusRepository.getStatus(processId);
return new ToolResponse.Builder().setResult(status).build();
}
}
}محدودسازی منابع را برای جلوگیری از بارگذاری بیش از حد پیادهسازی کنید:
class ThrottledApiTool(Tool):
def __init__(self):
self.rate_limiter = TokenBucketRateLimiter(
tokens_per_second=5, # Allow 5 requests per second
bucket_size=10 # Allow bursts up to 10 requests
)
async def execute_async(self, request):
# Check if we can proceed or need to wait
delay = self.rate_limiter.get_delay_time()
if delay > 0:
if delay > 2.0: # If wait is too long
raise ToolExecutionException(
f"Rate limit exceeded. Please try again in {delay:.1f} seconds."
)
else:
# Wait for the appropriate delay time
await asyncio.sleep(delay)
# Consume a token and proceed with the request
self.rate_limiter.consume()
# Call API
result = await self._call_api(request.parameters)
return ToolResponse(result=result)
class TokenBucketRateLimiter:
def __init__(self, tokens_per_second, bucket_size):
self.tokens_per_second = tokens_per_second
self.bucket_size = bucket_size
self.tokens = bucket_size
self.last_refill = time.time()
self.lock = asyncio.Lock()
async def get_delay_time(self):
async with self.lock:
self._refill()
if self.tokens >= 1:
return 0
# Calculate time until next token available
return (1 - self.tokens) / self.tokens_per_second
async def consume(self):
async with self.lock:
self._refill()
self.tokens -= 1
def _refill(self):
now = time.time()
elapsed = now - self.last_refill
# Add new tokens based on elapsed time
new_tokens = elapsed * self.tokens_per_second
self.tokens = min(self.bucket_size, self.tokens + new_tokens)
self.last_refill = nowهمیشه پارامترهای ورودی را بهدقت اعتبارسنجی کنید:
public async Task<ToolResponse> ExecuteAsync(ToolRequest request)
{
// Validate parameters exist
if (!request.Parameters.TryGetProperty("query", out var queryProp))
{
throw new ToolExecutionException("Missing required parameter: query");
}
// Validate correct type
if (queryProp.ValueKind != JsonValueKind.String)
{
throw new ToolExecutionException("Query parameter must be a string");
}
var query = queryProp.GetString();
// Validate string content
if (string.IsNullOrWhiteSpace(query))
{
throw new ToolExecutionException("Query parameter cannot be empty");
}
if (query.Length > 500)
{
throw new ToolExecutionException("Query parameter exceeds maximum length of 500 characters");
}
// Check for SQL injection attacks if applicable
if (ContainsSqlInjection(query))
{
throw new ToolExecutionException("Invalid query: contains potentially unsafe SQL");
}
// Proceed with execution
// ...
}بررسیهای مجوز مناسب را پیادهسازی کنید:
@Override
public ToolResponse execute(ToolRequest request) {
// Get user context from request
UserContext user = request.getContext().getUserContext();
// Check if user has required permissions
if (!authorizationService.hasPermission(user, "documents:read")) {
throw new ToolExecutionException("User does not have permission to access documents");
}
// For specific resources, check access to that resource
String documentId = request.getParameters().get("documentId").asText();
if (!documentService.canUserAccess(user.getId(), documentId)) {
throw new ToolExecutionException("Access denied to the requested document");
}
// Proceed with tool execution
// ...
}دادههای حساس را با دقت مدیریت کنید:
class SecureDataTool(Tool):
def get_schema(self):
return {
"type": "object",
"properties": {
"userId": {"type": "string"},
"includeSensitiveData": {"type": "boolean", "default": False}
},
"required": ["userId"]
}
async def execute_async(self, request):
user_id = request.parameters["userId"]
include_sensitive = request.parameters.get("includeSensitiveData", False)
# Get user data
user_data = await self.user_service.get_user_data(user_id)
# Filter sensitive fields unless explicitly requested AND authorized
if not include_sensitive or not self._is_authorized_for_sensitive_data(request):
user_data = self._redact_sensitive_fields(user_data)
return ToolResponse(result=user_data)
def _is_authorized_for_sensitive_data(self, request):
# Check authorization level in request context
auth_level = request.context.get("authorizationLevel")
return auth_level == "admin"
def _redact_sensitive_fields(self, user_data):
# Create a copy to avoid modifying the original
redacted = user_data.copy()
# Redact specific sensitive fields
sensitive_fields = ["ssn", "creditCardNumber", "password"]
for field in sensitive_fields:
if field in redacted:
redacted[field] = "REDACTED"
# Redact nested sensitive data
if "financialInfo" in redacted:
redacted["financialInfo"] = {"available": True, "accessRestricted": True}
return redactedتست جامع تضمین میکند که ابزارهای MCP بهدرستی کار میکنند، موارد خاص را مدیریت میکنند و بهدرستی با سایر بخشهای سیستم ادغام میشوند.
تستهای متمرکزی برای عملکرد هر ابزار ایجاد کنید:
[Fact]
public async Task WeatherTool_ValidLocation_ReturnsCorrectForecast()
{
// Arrange
var mockWeatherService = new Mock<IWeatherService>();
mockWeatherService
.Setup(s => s.GetForecastAsync("Seattle", 3))
.ReturnsAsync(new WeatherForecast(/* test data */));
var tool = new WeatherForecastTool(mockWeatherService.Object);
var request = new ToolRequest(
toolName: "weatherForecast",
parameters: JsonSerializer.SerializeToElement(new {
location = "Seattle",
days = 3
})
);
// Act
var response = await tool.ExecuteAsync(request);
// Assert
Assert.NotNull(response);
var result = JsonSerializer.Deserialize<WeatherForecast>(response.Result);
Assert.Equal("Seattle", result.Location);
Assert.Equal(3, result.DailyForecasts.Count);
}
[Fact]
public async Task WeatherTool_InvalidLocation_ThrowsToolExecutionException()
{
// Arrange
var mockWeatherService = new Mock<IWeatherService>();
mockWeatherService
.Setup(s => s.GetForecastAsync("InvalidLocation", It.IsAny<int>()))
.ThrowsAsync(new LocationNotFoundException("Location not found"));
var tool = new WeatherForecastTool(mockWeatherService.Object);
var request = new ToolRequest(
toolName: "weatherForecast",
parameters: JsonSerializer.SerializeToElement(new {
location = "InvalidLocation",
days = 3
})
);
// Act & Assert
var exception = await Assert.ThrowsAsync<ToolExecutionException>(
() => tool.ExecuteAsync(request)
);
Assert.Contains("Location not found", exception.Message);
}تست کنید که طرحوارهها معتبر هستند و محدودیتها را بهدرستی اعمال میکنند:
@Test
public void testSchemaValidation() {
// Create tool instance
SearchTool searchTool = new SearchTool();
// Get schema
Object schema = searchTool.getSchema();
// Convert schema to JSON for validation
String schemaJson = objectMapper.writeValueAsString(schema);
// Validate schema is valid JSONSchema
JsonSchemaFactory factory = JsonSchemaFactory.byDefault();
JsonSchema jsonSchema = factory.getJsonSchema(schemaJson);
// Test valid parameters
JsonNode validParams = objectMapper.createObjectNode()
.put("query", "test query")
.put("limit", 5);
ProcessingReport validReport = jsonSchema.validate(validParams);
assertTrue(validReport.isSuccess());
// Test missing required parameter
JsonNode missingRequired = objectMapper.createObjectNode()
.put("limit", 5);
ProcessingReport missingReport = jsonSchema.validate(missingRequired);
assertFalse(missingReport.isSuccess());
// Test invalid parameter type
JsonNode invalidType = objectMapper.createObjectNode()
.put("query", "test")
.put("limit", "not-a-number");
ProcessingReport invalidReport = jsonSchema.validate(invalidType);
assertFalse(invalidReport.isSuccess());
}تستهای خاصی برای شرایط خطا ایجاد کنید:
@pytest.mark.asyncio
async def test_api_tool_handles_timeout():
# Arrange
tool = ApiTool(timeout=0.1) # Very short timeout
# Mock a request that will time out
with aioresponses() as mocked:
mocked.get(
"https://api.example.com/data",
callback=lambda *args, **kwargs: asyncio.sleep(0.5) # Longer than timeout
)
request = ToolRequest(
tool_name="apiTool",
parameters={"url": "https://api.example.com/data"}
)
# Act & Assert
with pytest.raises(ToolExecutionException) as exc_info:
await tool.execute_async(request)
# Verify exception message
assert "timed out" in str(exc_info.value).lower()
@pytest.mark.asyncio
async def test_api_tool_handles_rate_limiting():
# Arrange
tool = ApiTool()
# Mock a rate-limited response
with aioresponses() as mocked:
mocked.get(
"https://api.example.com/data",
status=429,
headers={"Retry-After": "2"},
body=json.dumps({"error": "Rate limit exceeded"})
)
request = ToolRequest(
tool_name="apiTool",
parameters={"url": "https://api.example.com/data"}
)
# Act & Assert
with pytest.raises(ToolExecutionException) as exc_info:
await tool.execute_async(request)
# Verify exception contains rate limit information
error_msg = str(exc_info.value).lower()
assert "rate limit" in error_msg
assert "try again" in error_msgابزارهایی که در ترکیبهای مورد انتظار با هم کار میکنند را تست کنید:
[Fact]
public async Task DataProcessingWorkflow_CompletesSuccessfully()
{
// Arrange
var dataFetchTool = new DataFetchTool(mockDataService.Object);
var analysisTools = new DataAnalysisTool(mockAnalysisService.Object);
var visualizationTool = new DataVisualizationTool(mockVisualizationService.Object);
var toolRegistry = new ToolRegistry();
toolRegistry.RegisterTool(dataFetchTool);
toolRegistry.RegisterTool(analysisTools);
toolRegistry.RegisterTool(visualizationTool);
var workflowExecutor = new WorkflowExecutor(toolRegistry);
// Act
var result = await workflowExecutor.ExecuteWorkflowAsync(new[] {
new ToolCall("dataFetch", new { source = "sales2023" }),
new ToolCall("dataAnalysis", ctx => new {
data = ctx.GetResult("dataFetch"),
analysis = "trend"
}),
new ToolCall("dataVisualize", ctx => new {
analysisResult = ctx.GetResult("dataAnalysis"),
type = "line-chart"
})
});
// Assert
Assert.NotNull(result);
Assert.True(result.Success);
Assert.NotNull(result.GetResult("dataVisualize"));
Assert.Contains("chartUrl", result.GetResult("dataVisualize").ToString());
}سرور MCP را با ثبت و اجرای کامل ابزارها تست کنید:
@SpringBootTest
@AutoConfigureMockMvc
public class McpServerIntegrationTest {
@Autowired
private MockMvc mockMvc;
@Autowired
private ObjectMapper objectMapper;
@Test
public void testToolDiscovery() throws Exception {
// Test the discovery endpoint
mockMvc.perform(get("/mcp/tools"))
.andExpect(status().isOk())
.andExpect(jsonPath("$.tools").isArray())
.andExpect(jsonPath("$.tools[*].name").value(hasItems(
"weatherForecast", "calculator", "documentSearch"
)));
}
@Test
public void testToolExecution() throws Exception {
// Create tool request
Map<String, Object> request = new HashMap<>();
request.put("toolName", "calculator");
Map<String, Object> parameters = new HashMap<>();
parameters.put("operation", "add");
parameters.put("a", 5);
parameters.put("b", 7);
request.put("parameters", parameters);
// Send request and verify response
mockMvc.perform(post("/mcp/execute")
.contentType(MediaType.APPLICATION_JSON)
.content(objectMapper.writeValueAsString(request)))
.andExpect(status().isOk())
.andExpect(jsonPath("$.result.value").value(12));
}
@Test
public void testToolValidation() throws Exception {
// Create invalid tool request
Map<String, Object> request = new HashMap<>();
request.put("toolName", "calculator");
Map<String, Object> parameters = new HashMap<>();
parameters.put("operation", "divide");
parameters.put("a", 10);
// Missing parameter "b"
request.put("parameters", parameters);
// Send request and verify error response
mockMvc.perform(post("/mcp/execute")
.contentType(MediaType.APPLICATION_JSON)
.content(objectMapper.writeValueAsString(request)))
.andExpect(status().isBadRequest())
.andExpect(jsonPath("$.error").exists());
}
}جریانهای کاری کامل از درخواست مدل تا اجرای ابزار را تست کنید:
@pytest.mark.asyncio
async def test_model_interaction_with_tool():
# Arrange - Set up MCP client and mock model
mcp_client = McpClient(server_url="http://localhost:5000")
# Mock model responses
mock_model = MockLanguageModel([
MockResponse(
"What's the weather in Seattle?",
tool_calls=[{
"tool_name": "weatherForecast",
"parameters": {"location": "Seattle", "days": 3}
}]
),
MockResponse(
"Here's the weather forecast for Seattle:\n- Today: 65°F, Partly Cloudy\n- Tomorrow: 68°F, Sunny\n- Day after: 62°F, Rain",
tool_calls=[]
)
])
# Mock weather tool response
with aioresponses() as mocked:
mocked.post(
"http://localhost:5000/mcp/execute",
payload={
"result": {
"location": "Seattle",
"forecast": [
{"date": "2023-06-01", "temperature": 65, "conditions": "Partly Cloudy"},
{"date": "2023-06-02", "temperature": 68, "conditions": "Sunny"},
{"date": "2023-06-03", "temperature": 62, "conditions": "Rain"}
]
}
}
)
# Act
response = await mcp_client.send_prompt(
"What's the weather in Seattle?",
model=mock_model,
allowed_tools=["weatherForecast"]
)
# Assert
assert "Seattle" in response.generated_text
assert "65" in response.generated_text
assert "Sunny" in response.generated_text
assert "Rain" in response.generated_text
assert len(response.tool_calls) == 1
assert response.tool_calls[0].tool_name == "weatherForecast"تعداد درخواستهای همزمانی که سرور MCP شما میتواند مدیریت کند را تست کنید:
[Fact]
public async Task McpServer_HandlesHighConcurrency()
{
// Arrange
var server = new McpServer(
name: "TestServer",
version: "1.0",
maxConcurrentRequests: 100
);
server.RegisterTool(new FastExecutingTool());
await server.StartAsync();
var client = new McpClient("http://localhost:5000");
// Act
var tasks = new List<Task<McpResponse>>();
for (int i = 0; i < 1000; i++)
{
tasks.Add(client.ExecuteToolAsync("fastTool", new { iteration = i }));
}
var results = await Task.WhenAll(tasks);
// Assert
Assert.Equal(1000, results.Length);
Assert.All(results, r => Assert.NotNull(r));
}سیستم را تحت بار شدید تست کنید:
@Test
public void testServerUnderStress() {
int maxUsers = 1000;
int rampUpTimeSeconds = 60;
int testDurationSeconds = 300;
// Set up JMeter for stress testing
StandardJMeterEngine jmeter = new StandardJMeterEngine();
// Configure JMeter test plan
HashTree testPlanTree = new HashTree();
// Create test plan, thread group, samplers, etc.
TestPlan testPlan = new TestPlan("MCP Server Stress Test");
testPlanTree.add(testPlan);
ThreadGroup threadGroup = new ThreadGroup();
threadGroup.setNumThreads(maxUsers);
threadGroup.setRampUp(rampUpTimeSeconds);
threadGroup.setScheduler(true);
threadGroup.setDuration(testDurationSeconds);
testPlanTree.add(threadGroup);
// Add HTTP sampler for tool execution
HTTPSampler toolExecutionSampler = new HTTPSampler();
toolExecutionSampler.setDomain("localhost");
toolExecutionSampler.setPort(5000);
toolExecutionSampler.setPath("/mcp/execute");
toolExecutionSampler.setMethod("POST");
toolExecutionSampler.addArgument("toolName", "calculator");
toolExecutionSampler.addArgument("parameters", "{\"operation\":\"add\",\"a\":5,\"b\":7}");
threadGroup.add(toolExecutionSampler);
// Add listeners
SummaryReport summaryReport = new SummaryReport();
threadGroup.add(summaryReport);
// Run test
jmeter.configure(testPlanTree);
jmeter.run();
// Validate results
assertEquals(0, summaryReport.getErrorCount());
assertTrue(summaryReport.getAverage() < 200); // Average response time < 200ms
assertTrue(summaryReport.getPercentile(90.0) < 500); // 90th percentile < 500ms
}نظارت بلندمدت برای تحلیل عملکرد تنظیم کنید:
# Configure monitoring for an MCP server
def configure_monitoring(server):
# Set up Prometheus metrics
prometheus_metrics = {
"request_count": Counter("mcp_requests_total", "Total MCP requests"),
"request_latency": Histogram(
"mcp_request_duration_seconds",
"Request duration in seconds",
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 2.5, 5.0, 10.0]
),
"tool_execution_count": Counter(
"mcp_tool_executions_total",
"Tool execution count",
labelnames=["tool_name"]
),
"tool_execution_latency": Histogram(
"mcp_tool_duration_seconds",
"Tool execution duration in seconds",
labelnames=["tool_name"],
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 2.5, 5.0, 10.0]
),
"tool_errors": Counter(
"mcp_tool_errors_total",
"Tool execution errors",
labelnames=["tool_name", "error_type"]
)
}
# Add middleware for timing and recording metrics
server.add_middleware(PrometheusMiddleware(prometheus_metrics))
# Expose metrics endpoint
@server.router.get("/metrics")
async def metrics():
return generate_latest()
return serverجریانهای کاری خوب طراحیشده MCP کارایی، قابلیت اطمینان و نگهداری را بهبود میبخشند. در اینجا الگوهای کلیدی آورده شده است:
چندین ابزار را بهصورت متوالی متصل کنید، بهطوری که خروجی هر ابزار ورودی ابزار بعدی شود:
# Python Chain of Tools implementation
class ChainWorkflow:
def __init__(self, tools_chain):
self.tools_chain = tools_chain # List of tool names to execute in sequence
async def execute(self, mcp_client, initial_input):
current_result = initial_input
all_results = {"input": initial_input}
for tool_name in self.tools_chain:
# Execute each tool in the chain, passing previous result
response = await mcp_client.execute_tool(tool_name, current_result)
# Store result and use as input for next tool
all_results[tool_name] = response.result
current_result = response.result
return {
"final_result": current_result,
"all_results": all_results
}
# Example usage
data_processing_chain = ChainWorkflow([
"dataFetch",
"dataCleaner",
"dataAnalyzer",
"dataVisualizer"
])
result = await data_processing_chain.execute(
mcp_client,
{"source": "sales_database", "table": "transactions"}
)از یک ابزار مرکزی استفاده کنید که بر اساس ورودی به ابزارهای تخصصی هدایت میکند:
public class ContentDispatcherTool : IMcpTool
{
private readonly IMcpClient _mcpClient;
public ContentDispatcherTool(IMcpClient mcpClient)
{
_mcpClient = mcpClient;
}
public string Name => "contentProcessor";
public string Description => "Processes content of various types";
public object GetSchema()
{
return new {
type = "object",
properties = new {
content = new { type = "string" },
contentType = new {
type = "string",
enum = new[] { "text", "html", "markdown", "csv", "code" }
},
operation = new {
type = "string",
enum = new[] { "summarize", "analyze", "extract", "convert" }
}
},
required = new[] { "content", "contentType", "operation" }
};
}
public async Task<ToolResponse> ExecuteAsync(ToolRequest request)
{
var content = request.Parameters.GetProperty("content").GetString();
var contentType = request.Parameters.GetProperty("contentType").GetString();
var operation = request.Parameters.GetProperty("operation").GetString();
// Determine which specialized tool to use
string targetTool = DetermineTargetTool(contentType, operation);
// Forward to the specialized tool
var specializedResponse = await _mcpClient.ExecuteToolAsync(
targetTool,
new { content, options = GetOptionsForTool(targetTool, operation) }
);
return new ToolResponse { Result = specializedResponse.Result };
}
private string DetermineTargetTool(string contentType, string operation)
{
return (contentType, operation) switch
{
("text", "summarize") => "textSummarizer",
("text", "analyze") => "textAnalyzer",
("html", _) => "htmlProcessor",
("markdown", _) => "markdownProcessor",
("csv", _) => "csvProcessor",
("code", _) => "codeAnalyzer",
_ => throw new ToolExecutionException($"No tool available for {contentType}/{operation}")
};
}
private object GetOptionsForTool(string toolName, string operation)
{
// Return appropriate options for each specialized tool
return toolName switch
{
"textSummarizer" => new { length = "medium" },
"htmlProcessor" => new { cleanUp = true, operation },
// Options for other tools...
_ => new { }
};
}
}چندین ابزار را بهطور همزمان برای کارایی اجرا کنید:
public class ParallelDataProcessingWorkflow {
private final McpClient mcpClient;
public ParallelDataProcessingWorkflow(McpClient mcpClient) {
this.mcpClient = mcpClient;
}
public WorkflowResult execute(String datasetId) {
// Step 1: Fetch dataset metadata (synchronous)
ToolResponse metadataResponse = mcpClient.executeTool("datasetMetadata",
Map.of("datasetId", datasetId));
// Step 2: Launch multiple analyses in parallel
CompletableFuture<ToolResponse> statisticalAnalysis = CompletableFuture.supplyAsync(() ->
mcpClient.executeTool("statisticalAnalysis", Map.of(
"datasetId", datasetId,
"type", "comprehensive"
))
);
CompletableFuture<ToolResponse> correlationAnalysis = CompletableFuture.supplyAsync(() ->
mcpClient.executeTool("correlationAnalysis", Map.of(
"datasetId", datasetId,
"method", "pearson"
))
);
CompletableFuture<ToolResponse> outlierDetection = CompletableFuture.supplyAsync(() ->
mcpClient.executeTool("outlierDetection", Map.of(
"datasetId", datasetId,
"sensitivity", "medium"
))
);
// Wait for all parallel tasks to complete
CompletableFuture<Void> allAnalyses = CompletableFuture.allOf(
statisticalAnalysis, correlationAnalysis, outlierDetection
);
allAnalyses.join(); // Wait for completion
// Step 3: Combine results
Map<String, Object> combinedResults = new HashMap<>();
combinedResults.put("metadata", metadataResponse.getResult());
combinedResults.put("statistics", statisticalAnalysis.join().getResult());
combinedResults.put("correlations", correlationAnalysis.join().getResult());
combinedResults.put("outliers", outlierDetection.join().getResult());
// Step 4: Generate summary report
ToolResponse summaryResponse = mcpClient.executeTool("reportGenerator",
Map.of("analysisResults", combinedResults));
// Return complete workflow result
WorkflowResult result = new WorkflowResult();
result.setDatasetId(datasetId);
result.setAnalysisResults(combinedResults);
result.setSummaryReport(summaryResponse.getResult());
return result;
}
}برای شکست ابزارها، بازگشتهای مناسب پیادهسازی کنید:
class ResilientWorkflow:
def __init__(self, mcp_client):
self.client = mcp_client
async def execute_with_fallback(self, primary_tool, fallback_tool, parameters):
try:
# Try primary tool first
response = await self.client.execute_tool(primary_tool, parameters)
return {
"result": response.result,
"source": "primary",
"tool": primary_tool
}
except ToolExecutionException as e:
# Log the failure
logging.warning(f"Primary tool '{primary_tool}' failed: {str(e)}")
# Fall back to secondary tool
try:
# Might need to transform parameters for fallback tool
fallback_params = self._adapt_parameters(parameters, primary_tool, fallback_tool)
response = await self.client.execute_tool(fallback_tool, fallback_params)
return {
"result": response.result,
"source": "fallback",
"tool": fallback_tool,
"primaryError": str(e)
}
except ToolExecutionException as fallback_error:
# Both tools failed
logging.error(f"Both primary and fallback tools failed. Fallback error: {str(fallback_error)}")
raise WorkflowExecutionException(
f"Workflow failed: primary error: {str(e)}; fallback error: {str(fallback_error)}"
)
def _adapt_parameters(self, params, from_tool, to_tool):
"""Adapt parameters between different tools if needed"""
# This implementation would depend on the specific tools
# For this example, we'll just return the original parameters
return params
# Example usage
async def get_weather(workflow, location):
return await workflow.execute_with_fallback(
"premiumWeatherService", # Primary (paid) weather API
"basicWeatherService", # Fallback (free) weather API
{"location": location}
)جریانهای کاری پیچیده را با ترکیب جریانهای سادهتر بسازید:
public class CompositeWorkflow : IWorkflow
{
private readonly List<IWorkflow> _workflows;
public CompositeWorkflow(IEnumerable<IWorkflow> workflows)
{
_workflows = new List<IWorkflow>(workflows);
}
public async Task<WorkflowResult> ExecuteAsync(WorkflowContext context)
{
var results = new Dictionary<string, object>();
foreach (var workflow in _workflows)
{
var workflowResult = await workflow.ExecuteAsync(context);
// Store each workflow's result
results[workflow.Name] = workflowResult;
// Update context with the result for the next workflow
context = context.WithResult(workflow.Name, workflowResult);
}
return new WorkflowResult(results);
}
public string Name => "CompositeWorkflow";
public string Description => "Executes multiple workflows in sequence";
}
// Example usage
var documentWorkflow = new CompositeWorkflow(new IWorkflow[] {
new DocumentFetchWorkflow(),
new DocumentProcessingWorkflow(),
new InsightGenerationWorkflow(),
new ReportGenerationWorkflow()
});
var result = await documentWorkflow.ExecuteAsync(new WorkflowContext {
Parameters = new { documentId = "12345" }
});تست یکی از جنبههای حیاتی توسعه سرورهای MCP قابلاعتماد و باکیفیت است. این راهنما بهترین شیوهها و نکات جامع برای تست سرورهای MCP شما در طول چرخه توسعه، از تستهای واحد تا تستهای یکپارچگی و اعتبارسنجی انتها به انتها را ارائه میدهد.
سرورهای MCP بهعنوان میانافزار حیاتی بین مدلهای هوش مصنوعی و برنامههای مشتری عمل میکنند. تست کامل تضمین میکند:
- قابلیت اطمینان در محیطهای تولید
- مدیریت دقیق درخواستها و پاسخها
- پیادهسازی صحیح مشخصات MCP
- مقاومت در برابر شکستها و موارد خاص
- عملکرد ثابت تحت بارهای مختلف
تستهای واحد اجزای جداگانه سرور MCP شما را بهصورت مستقل تأیید میکنند.
- مدیریتکنندههای منابع: منطق هر مدیریتکننده منبع را بهصورت مستقل تست کنید
- پیادهسازی ابزارها: رفتار ابزار را با ورودیهای مختلف تأیید کنید
- قالبهای درخواست: اطمینان حاصل کنید که قالبهای درخواست بهدرستی رندر میشوند
- اعتبارسنجی طرحواره: منطق اعتبارسنجی پارامترها را تست کنید
- مدیریت خطا: پاسخهای خطا را برای ورودیهای نامعتبر تأیید کنید
// Example unit test for a calculator tool in C#
[Fact]
public async Task CalculatorTool_Add_ReturnsCorrectSum()
{
// Arrange
var calculator = new CalculatorTool();
var parameters = new Dictionary<string, object>
{
["operation"] = "add",
["a"] = 5,
["b"] = 7
};
// Act
var response = await calculator.ExecuteAsync(parameters);
var result = JsonSerializer.Deserialize<CalculationResult>(response.Content[0].ToString());
// Assert
Assert.Equal(12, result.Value);
}# Example unit test for a calculator tool in Python
def test_calculator_tool_add():
# Arrange
calculator = CalculatorTool()
parameters = {
"operation": "add",
"a": 5,
"b": 7
}
# Act
response = calculator.execute(parameters)
result = json.loads(response.content[0].text)
# Assert
assert result["value"] == 12تستهای یکپارچگی تعاملات بین اجزای سرور MCP شما را تأیید میکنند.
- راهاندازی سرور: راهاندازی سرور را با پیکربندیهای مختلف تست کنید
- ثبت مسیرها: اطمینان حاصل کنید که تمام نقاط انتهایی بهدرستی ثبت شدهاند
- پردازش درخواستها: چرخه کامل درخواست-پاسخ را تست کنید
- انتقال خطا: اطمینان حاصل کنید که خطاها بهدرستی در اجزا مدیریت میشوند
- احراز هویت و مجوزدهی: مکانیزمهای امنیتی را تست کنید
// Example integration test for MCP server in C#
[Fact]
public async Task Server_ProcessToolRequest_ReturnsValidResponse()
{
// Arrange
var server = new McpServer();
server.RegisterTool(new CalculatorTool());
await server.StartAsync();
var request = new McpRequest
{
Tool = "calculator",
Parameters = new Dictionary<string, object>
{
["operation"] = "multiply",
["a"] = 6,
["b"] = 7
}
};
// Act
var response = await server.ProcessRequestAsync(request);
// Assert
Assert.NotNull(response);
Assert.Equal(McpStatusCodes.Success, response.StatusCode);
// Additional assertions for response content
// Cleanup
await server.StopAsync();
}تستهای انتها به انتها رفتار کامل سیستم از مشتری تا سرور را تأیید میکنند.
- ارتباط مشتری-سرور: چرخههای کامل درخواست-پاسخ را تست کنید
- SDKهای واقعی مشتری: با پیادهسازیهای واقعی مشتری تست کنید
- عملکرد تحت بار: رفتار را با چندین درخواست همزمان تأیید کنید
- بازیابی خطا: بازیابی سیستم از شکستها را تست کنید
- عملیات طولانیمدت: مدیریت جریان و عملیات طولانی را تأیید کنید
// Example E2E test with a client in TypeScript
describe('MCP Server E2E Tests', () => {
let client: McpClient;
beforeAll(async () => {
// Start server in test environment
await startTestServer();
client = new McpClient('http://localhost:5000');
});
afterAll(async () => {
await stopTestServer();
});
test('Client can invoke calculator tool and get correct result', async () => {
// Act
const response = await client.invokeToolAsync('calculator', {
operation: 'divide',
a: 20,
b: 4
});
// Assert
expect(response.statusCode).toBe(200);
expect(response.content[0].text).toContain('5');
});
});شبیهسازی برای جداسازی اجزا در طول تست ضروری است.
- مدلهای هوش مصنوعی خارجی: پاسخهای مدل را برای تست قابل پیشبینی شبیهسازی کنید
- سرویسهای خارجی: وابستگیهای API (پایگاه دادهها، سرویسهای شخص ثالث) را شبیهسازی کنید
- سرویسهای احراز هویت: ارائهدهندگان هویت را شبیهسازی کنید
- ارائهدهندگان منابع: مدیریتکنندههای منابع پرهزینه را شبیهسازی کنید
// C# example with Moq
var mockModel = new Mock<ILanguageModel>();
mockModel
.Setup(m => m.GenerateResponseAsync(
It.IsAny<string>(),
It.IsAny<McpRequestContext>()))
.ReturnsAsync(new ModelResponse {
Text = "Mocked model response",
FinishReason = FinishReason.Completed
});
var server = new McpServer(modelClient: mockModel.Object);# Python example with unittest.mock
@patch('mcp_server.models.OpenAIModel')
def test_with_mock_model(mock_model):
# Configure mock
mock_model.return_value.generate_response.return_value = {
"text": "Mocked model response",
"finish_reason": "completed"
}
# Use mock in test
server = McpServer(model_client=mock_model)
# Continue with testتست عملکرد برای سرورهای MCP تولیدی حیاتی است.
- تأخیر: زمان پاسخ برای درخواستها
- توان عملیاتی: تعداد درخواستهای پردازششده در هر ثانیه
- استفاده از منابع: استفاده از CPU، حافظه، شبکه
- مدیریت همزمانی: رفتار تحت درخواستهای موازی
- ویژگیهای مقیاسپذیری: عملکرد با افزایش بار
- k6: ابزار تست بار متنباز
- JMeter: ابزار جامع تست عملکرد
- Locust: ابزار تست بار مبتنی بر پایتون
- Azure Load Testing: تست عملکرد مبتنی بر ابر
// k6 script for load testing MCP server
import http from 'k6/http';
import { check, sleep } from 'k6';
export const options = {
vus: 10, // 10 virtual users
duration: '30s',
};
export default function () {
const payload = JSON.stringify({
tool: 'calculator',
parameters: {
operation: 'add',
a: Math.floor(Math.random() * 100),
b: Math.floor(Math.random() * 100)
}
});
const params = {
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer test-token'
},
};
const res = http.post('http://localhost:5000/api/tools/invoke', payload, params);
check(res, {
'status is 200': (r) => r.status === 200,
'response time < 500ms': (r) => r.timings.duration < 500,
});
sleep(1);
}اتوماسیون تست تضمین میکند که کیفیت بهطور مداوم حفظ شده و بازخورد سریعتر ارائه میشود.
- اجرای تستهای واحد در درخواستهای Pull: اطمینان حاصل کنید که تغییرات کد عملکرد موجود را مختل نمیکنند
- **ت
- مبنای عملکرد: حفظ معیارهای عملکرد برای شناسایی مشکلات احتمالی
- اسکنهای امنیتی: خودکارسازی تستهای امنیتی به عنوان بخشی از خط لوله
name: MCP Server Tests
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Runtime
uses: actions/setup-dotnet@v1
with:
dotnet-version: '8.0.x'
- name: Restore dependencies
run: dotnet restore
- name: Build
run: dotnet build --no-restore
- name: Unit Tests
run: dotnet test --no-build --filter Category=Unit
- name: Integration Tests
run: dotnet test --no-build --filter Category=Integration
- name: Performance Tests
run: dotnet run --project tests/PerformanceTests/PerformanceTests.csprojاطمینان حاصل کنید که سرور شما مشخصات MCP را به درستی پیادهسازی کرده است.
- نقاط پای API: تست نقاط پای مورد نیاز (/resources, /tools و غیره)
- فرمت درخواست/پاسخ: اعتبارسنجی انطباق با طرح
- کدهای خطا: بررسی کدهای وضعیت صحیح برای سناریوهای مختلف
- نوع محتوا: تست مدیریت انواع مختلف محتوا
- جریان احراز هویت: بررسی مکانیزمهای احراز هویت مطابق با مشخصات
[Fact]
public async Task Server_ResourceEndpoint_ReturnsCorrectSchema()
{
// Arrange
var client = new HttpClient();
client.DefaultRequestHeaders.Add("Authorization", "Bearer test-token");
// Act
var response = await client.GetAsync("http://localhost:5000/api/resources");
var content = await response.Content.ReadAsStringAsync();
var resources = JsonSerializer.Deserialize<ResourceList>(content);
// Assert
Assert.Equal(HttpStatusCode.OK, response.StatusCode);
Assert.NotNull(resources);
Assert.All(resources.Resources, resource =>
{
Assert.NotNull(resource.Id);
Assert.NotNull(resource.Type);
// Additional schema validation
});
}- تعاریف ابزار را جداگانه تست کنید: طرحهای ابزار را مستقل از منطق ابزار بررسی کنید
- از تستهای پارامتری استفاده کنید: ابزارها را با ورودیهای متنوع، از جمله موارد مرزی، آزمایش کنید
- پاسخهای خطا را بررسی کنید: مدیریت صحیح خطاها را برای همه شرایط ممکن بررسی کنید
- منطق احراز هویت را تست کنید: کنترل دسترسی مناسب برای نقشهای مختلف کاربران را تضمین کنید
- پوشش تست را نظارت کنید: به دنبال پوشش بالا برای کدهای مسیر بحرانی باشید
- پاسخهای جریاندار را تست کنید: مدیریت صحیح محتوای جریاندار را بررسی کنید
- مشکلات شبکه را شبیهسازی کنید: رفتار تحت شرایط شبکه ضعیف را آزمایش کنید
- محدودیتهای منابع را تست کنید: رفتار هنگام رسیدن به سهمیهها یا محدودیتهای نرخ را بررسی کنید
- تستهای بازگشتی را خودکار کنید: مجموعهای بسازید که با هر تغییر کد اجرا شود
- موارد تست را مستند کنید: مستندات واضحی از سناریوهای تست نگه دارید
- اتکای بیش از حد به تست مسیر خوشبینانه: مطمئن شوید که موارد خطا را به طور کامل آزمایش میکنید
- نادیده گرفتن تست عملکرد: گلوگاهها را قبل از تأثیرگذاری بر تولید شناسایی کنید
- تست فقط در انزوا: تستهای واحد، یکپارچه و انتها به انتها را ترکیب کنید
- پوشش ناقص API: اطمینان حاصل کنید که همه نقاط پای و ویژگیها تست شدهاند
- محیطهای تست ناسازگار: از کانتینرها برای تضمین محیطهای تست سازگار استفاده کنید
یک استراتژی جامع تست برای توسعه سرورهای MCP قابل اعتماد و با کیفیت بالا ضروری است. با اجرای بهترین شیوهها و نکات مطرح شده در این راهنما، میتوانید اطمینان حاصل کنید که پیادهسازیهای MCP شما بالاترین استانداردهای کیفیت، قابلیت اطمینان و عملکرد را برآورده میکنند.
- طراحی ابزار: اصل مسئولیت واحد را دنبال کنید، از تزریق وابستگی استفاده کنید و برای ترکیبپذیری طراحی کنید
- طراحی طرح: طرحهای واضح و مستند با محدودیتهای اعتبارسنجی مناسب ایجاد کنید
- مدیریت خطا: مدیریت خطای مناسب، پاسخهای خطای ساختاریافته و منطق تلاش مجدد را پیادهسازی کنید
- عملکرد: از کش کردن، پردازش غیرهمزمان و محدود کردن منابع استفاده کنید
- امنیت: اعتبارسنجی ورودی دقیق، بررسیهای احراز هویت و مدیریت دادههای حساس را اعمال کنید
- تست: تستهای واحد، یکپارچه و انتها به انتها جامع ایجاد کنید
- الگوهای گردش کار: از الگوهای تثبیتشده مانند زنجیرهها، توزیعکنندهها و پردازش موازی استفاده کنید
یک ابزار MCP و گردش کار برای یک سیستم پردازش اسناد طراحی کنید که:
- اسناد را در فرمتهای مختلف (PDF، DOCX، TXT) بپذیرد
- متن و اطلاعات کلیدی را از اسناد استخراج کند
- اسناد را بر اساس نوع و محتوا طبقهبندی کند
- خلاصهای از هر سند تولید کند
طرحهای ابزار، مدیریت خطا و یک الگوی گردش کار که بهترین تناسب را با این سناریو دارد پیادهسازی کنید. در نظر بگیرید که چگونه این پیادهسازی را تست خواهید کرد.
- به جامعه MCP در جامعه دیسکورد Azure AI Foundry بپیوندید تا از آخرین پیشرفتها مطلع شوید
- در پروژههای متنباز MCP مشارکت کنید
- اصول MCP را در ابتکارات هوش مصنوعی سازمان خود اعمال کنید
- پیادهسازیهای تخصصی MCP را برای صنعت خود بررسی کنید
- دورههای پیشرفته در موضوعات خاص MCP، مانند یکپارچهسازی چندوجهی یا یکپارچهسازی برنامههای سازمانی را در نظر بگیرید
- با استفاده از اصول آموخته شده از آزمایشگاه عملی، ابزارها و گردش کارهای MCP خود را آزمایش کنید
بعدی: بهترین شیوهها مطالعات موردی
سلب مسئولیت:
این سند با استفاده از سرویس ترجمه هوش مصنوعی Co-op Translator ترجمه شده است. در حالی که ما تلاش میکنیم دقت را حفظ کنیم، لطفاً توجه داشته باشید که ترجمههای خودکار ممکن است شامل خطاها یا نادرستیها باشند. سند اصلی به زبان اصلی آن باید به عنوان منبع معتبر در نظر گرفته شود. برای اطلاعات حساس، توصیه میشود از ترجمه حرفهای انسانی استفاده کنید. ما مسئولیتی در قبال سوء تفاهمها یا تفسیرهای نادرست ناشی از استفاده از این ترجمه نداریم.
