(點擊上方圖片觀看本課程的影片)
本課程專注於在生產環境中開發、測試及部署 MCP 伺服器和功能的高級最佳實踐。隨著 MCP 生態系統的複雜性和重要性不斷增長,遵循既定模式能確保可靠性、可維護性及互操作性。本課程整合了從實際 MCP 實施中獲得的實用經驗,指導您建立穩健、高效的伺服器,並有效利用資源、提示及工具。
完成本課程後,您將能夠:
- 在 MCP 伺服器和功能設計中應用行業最佳實踐
- 制定全面的 MCP 伺服器測試策略
- 為複雜的 MCP 應用設計高效、可重用的工作流程模式
- 在 MCP 伺服器中實施正確的錯誤處理、日誌記錄及可觀測性
- 優化 MCP 實施以提升性能、安全性及可維護性
在深入探討具體實施方法之前,了解指導有效 MCP 開發的核心原則至關重要:
- 標準化通信:MCP 基於 JSON-RPC 2.0,提供一致的請求、響應及錯誤處理格式,適用於所有實施。
- 以用戶為中心的設計:在 MCP 實施中始終優先考慮用戶的同意、控制及透明度。
- 安全至上:實施強大的安全措施,包括身份驗證、授權、驗證及速率限制。
- 模塊化架構:以模塊化方式設計 MCP 伺服器,每個工具和資源都有明確且專注的用途。
- 有狀態連接:利用 MCP 能夠在多次請求中保持狀態的能力,實現更連貫且具上下文感知的交互。
以下最佳實踐源自官方 Model Context Protocol 文檔:
- 用戶同意及控制:在訪問數據或執行操作之前,始終要求用戶明確同意。提供清晰的控制,讓用戶了解共享的數據及授權的操作。
- 數據隱私:僅在用戶明確同意的情況下暴露用戶數據,並通過適當的訪問控制保護數據。防止未授權的數據傳輸。
- 工具安全性:在調用任何工具之前要求用戶明確同意。確保用戶了解每個工具的功能,並強制執行嚴格的安全邊界。
- 工具權限控制:配置模型在會話期間允許使用的工具,確保僅可訪問明確授權的工具。
- 身份驗證:在使用 API 密鑰、OAuth 令牌或其他安全身份驗證方法授予工具、資源或敏感操作的訪問權限之前,要求進行適當的身份驗證。
- 參數驗證:對所有工具調用強制執行驗證,防止格式錯誤或惡意輸入到達工具實現。
- 速率限制:實施速率限制以防止濫用並確保伺服器資源的公平使用。
- 能力協商:在連接設置期間,交換有關支持的功能、協議版本、可用工具及資源的信息。
- 工具設計:創建專注的工具,專注於做好一件事,而不是設計處理多個問題的單一工具。
- 錯誤處理:實施標準化的錯誤消息及代碼,以幫助診斷問題、優雅地處理故障並提供可操作的反饋。
- 日誌記錄:配置結構化日誌以審計、調試及監控協議交互。
- 進度跟蹤:對於長時間運行的操作,報告進度更新以支持響應式用戶界面。
- 請求取消:允許客戶端取消不再需要或耗時過長的進行中請求。
有關 MCP 最佳實踐的最新信息,請參考:
每個 MCP 工具應有明確且專注的用途。與其創建試圖處理多個問題的單一工具,不如開發專注於特定任務的專業工具。
// A focused tool that does one thing well
public class WeatherForecastTool : ITool
{
private readonly IWeatherService _weatherService;
public WeatherForecastTool(IWeatherService weatherService)
{
_weatherService = weatherService;
}
public string Name => "weatherForecast";
public string Description => "Gets weather forecast for a specific location";
public ToolDefinition GetDefinition()
{
return new ToolDefinition
{
Name = Name,
Description = Description,
Parameters = new Dictionary<string, ParameterDefinition>
{
["location"] = new ParameterDefinition
{
Type = ParameterType.String,
Description = "City or location name"
},
["days"] = new ParameterDefinition
{
Type = ParameterType.Integer,
Description = "Number of forecast days",
Default = 3
}
},
Required = new[] { "location" }
};
}
public async Task<ToolResponse> ExecuteAsync(IDictionary<string, object> parameters)
{
var location = parameters["location"].ToString();
var days = parameters.ContainsKey("days")
? Convert.ToInt32(parameters["days"])
: 3;
var forecast = await _weatherService.GetForecastAsync(location, days);
return new ToolResponse
{
Content = new List<ContentItem>
{
new TextContent(JsonSerializer.Serialize(forecast))
}
};
}
}實施強大的錯誤處理,提供信息豐富的錯誤消息及適當的恢復機制。
# Python example with comprehensive error handling
class DataQueryTool:
def get_name(self):
return "dataQuery"
def get_description(self):
return "Queries data from specified database tables"
async def execute(self, parameters):
try:
# Parameter validation
if "query" not in parameters:
raise ToolParameterError("Missing required parameter: query")
query = parameters["query"]
# Security validation
if self._contains_unsafe_sql(query):
raise ToolSecurityError("Query contains potentially unsafe SQL")
try:
# Database operation with timeout
async with timeout(10): # 10 second timeout
result = await self._database.execute_query(query)
return ToolResponse(
content=[TextContent(json.dumps(result))]
)
except asyncio.TimeoutError:
raise ToolExecutionError("Database query timed out after 10 seconds")
except DatabaseConnectionError as e:
# Connection errors might be transient
self._log_error("Database connection error", e)
raise ToolExecutionError(f"Database connection error: {str(e)}")
except DatabaseQueryError as e:
# Query errors are likely client errors
self._log_error("Database query error", e)
raise ToolExecutionError(f"Invalid query: {str(e)}")
except ToolError:
# Let tool-specific errors pass through
raise
except Exception as e:
# Catch-all for unexpected errors
self._log_error("Unexpected error in DataQueryTool", e)
raise ToolExecutionError(f"An unexpected error occurred: {str(e)}")
def _contains_unsafe_sql(self, query):
# Implementation of SQL injection detection
pass
def _log_error(self, message, error):
# Implementation of error logging
pass始終徹底驗證參數,以防止格式錯誤或惡意輸入。
// JavaScript/TypeScript example with detailed parameter validation
class FileOperationTool {
getName() {
return "fileOperation";
}
getDescription() {
return "Performs file operations like read, write, and delete";
}
getDefinition() {
return {
name: this.getName(),
description: this.getDescription(),
parameters: {
operation: {
type: "string",
description: "Operation to perform",
enum: ["read", "write", "delete"]
},
path: {
type: "string",
description: "File path (must be within allowed directories)"
},
content: {
type: "string",
description: "Content to write (only for write operation)",
optional: true
}
},
required: ["operation", "path"]
};
}
async execute(parameters) {
// 1. Validate parameter presence
if (!parameters.operation) {
throw new ToolError("Missing required parameter: operation");
}
if (!parameters.path) {
throw new ToolError("Missing required parameter: path");
}
// 2. Validate parameter types
if (typeof parameters.operation !== "string") {
throw new ToolError("Parameter 'operation' must be a string");
}
if (typeof parameters.path !== "string") {
throw new ToolError("Parameter 'path' must be a string");
}
// 3. Validate parameter values
const validOperations = ["read", "write", "delete"];
if (!validOperations.includes(parameters.operation)) {
throw new ToolError(`Invalid operation. Must be one of: ${validOperations.join(", ")}`);
}
// 4. Validate content presence for write operation
if (parameters.operation === "write" && !parameters.content) {
throw new ToolError("Content parameter is required for write operation");
}
// 5. Path safety validation
if (!this.isPathWithinAllowedDirectories(parameters.path)) {
throw new ToolError("Access denied: path is outside of allowed directories");
}
// Implementation based on validated parameters
// ...
}
isPathWithinAllowedDirectories(path) {
// Implementation of path safety check
// ...
}
}// Java example with authentication and authorization
public class SecureDataAccessTool implements Tool {
private final AuthenticationService authService;
private final AuthorizationService authzService;
private final DataService dataService;
// Dependency injection
public SecureDataAccessTool(
AuthenticationService authService,
AuthorizationService authzService,
DataService dataService) {
this.authService = authService;
this.authzService = authzService;
this.dataService = dataService;
}
@Override
public String getName() {
return "secureDataAccess";
}
@Override
public ToolResponse execute(ToolRequest request) {
// 1. Extract authentication context
String authToken = request.getContext().getAuthToken();
// 2. Authenticate user
UserIdentity user;
try {
user = authService.validateToken(authToken);
} catch (AuthenticationException e) {
return ToolResponse.error("Authentication failed: " + e.getMessage());
}
// 3. Check authorization for the specific operation
String dataId = request.getParameters().get("dataId").getAsString();
String operation = request.getParameters().get("operation").getAsString();
boolean isAuthorized = authzService.isAuthorized(user, "data:" + dataId, operation);
if (!isAuthorized) {
return ToolResponse.error("Access denied: Insufficient permissions for this operation");
}
// 4. Proceed with authorized operation
try {
switch (operation) {
case "read":
Object data = dataService.getData(dataId, user.getId());
return ToolResponse.success(data);
case "update":
JsonNode newData = request.getParameters().get("newData");
dataService.updateData(dataId, newData, user.getId());
return ToolResponse.success("Data updated successfully");
default:
return ToolResponse.error("Unsupported operation: " + operation);
}
} catch (Exception e) {
return ToolResponse.error("Operation failed: " + e.getMessage());
}
}
}// C# rate limiting implementation
public class RateLimitingMiddleware
{
private readonly RequestDelegate _next;
private readonly IMemoryCache _cache;
private readonly ILogger<RateLimitingMiddleware> _logger;
// Configuration options
private readonly int _maxRequestsPerMinute;
public RateLimitingMiddleware(
RequestDelegate next,
IMemoryCache cache,
ILogger<RateLimitingMiddleware> logger,
IConfiguration config)
{
_next = next;
_cache = cache;
_logger = logger;
_maxRequestsPerMinute = config.GetValue<int>("RateLimit:MaxRequestsPerMinute", 60);
}
public async Task InvokeAsync(HttpContext context)
{
// 1. Get client identifier (API key or user ID)
string clientId = GetClientIdentifier(context);
// 2. Get rate limiting key for this minute
string cacheKey = $"rate_limit:{clientId}:{DateTime.UtcNow:yyyyMMddHHmm}";
// 3. Check current request count
if (!_cache.TryGetValue(cacheKey, out int requestCount))
{
requestCount = 0;
}
// 4. Enforce rate limit
if (requestCount >= _maxRequestsPerMinute)
{
_logger.LogWarning("Rate limit exceeded for client {ClientId}", clientId);
context.Response.StatusCode = StatusCodes.Status429TooManyRequests;
context.Response.Headers.Add("Retry-After", "60");
await context.Response.WriteAsJsonAsync(new
{
error = "Rate limit exceeded",
message = "Too many requests. Please try again later.",
retryAfterSeconds = 60
});
return;
}
// 5. Increment request count
_cache.Set(cacheKey, requestCount + 1, TimeSpan.FromMinutes(2));
// 6. Add rate limit headers
context.Response.Headers.Add("X-RateLimit-Limit", _maxRequestsPerMinute.ToString());
context.Response.Headers.Add("X-RateLimit-Remaining", (_maxRequestsPerMinute - requestCount - 1).ToString());
// 7. Continue with the request
await _next(context);
}
private string GetClientIdentifier(HttpContext context)
{
// Implementation to extract API key or user ID
// ...
}
}始終在隔離環境中測試工具,模擬外部依賴:
// TypeScript example of a tool unit test
describe('WeatherForecastTool', () => {
let tool: WeatherForecastTool;
let mockWeatherService: jest.Mocked<IWeatherService>;
beforeEach(() => {
// Create a mock weather service
mockWeatherService = {
getForecasts: jest.fn()
} as any;
// Create the tool with the mock dependency
tool = new WeatherForecastTool(mockWeatherService);
});
it('should return weather forecast for a location', async () => {
// Arrange
const mockForecast = {
location: 'Seattle',
forecasts: [
{ date: '2025-07-16', temperature: 72, conditions: 'Sunny' },
{ date: '2025-07-17', temperature: 68, conditions: 'Partly Cloudy' },
{ date: '2025-07-18', temperature: 65, conditions: 'Rain' }
]
};
mockWeatherService.getForecasts.mockResolvedValue(mockForecast);
// Act
const response = await tool.execute({
location: 'Seattle',
days: 3
});
// Assert
expect(mockWeatherService.getForecasts).toHaveBeenCalledWith('Seattle', 3);
expect(response.content[0].text).toContain('Seattle');
expect(response.content[0].text).toContain('Sunny');
});
it('should handle errors from the weather service', async () => {
// Arrange
mockWeatherService.getForecasts.mockRejectedValue(new Error('Service unavailable'));
// Act & Assert
await expect(tool.execute({
location: 'Seattle',
days: 3
})).rejects.toThrow('Weather service error: Service unavailable');
});
});測試從客戶端請求到伺服器響應的完整流程:
# Python integration test example
@pytest.mark.asyncio
async def test_mcp_server_integration():
# Start a test server
server = McpServer()
server.register_tool(WeatherForecastTool(MockWeatherService()))
await server.start(port=5000)
try:
# Create a client
client = McpClient("http://localhost:5000")
# Test tool discovery
tools = await client.discover_tools()
assert "weatherForecast" in [t.name for t in tools]
# Test tool execution
response = await client.execute_tool("weatherForecast", {
"location": "Seattle",
"days": 3
})
# Verify response
assert response.status_code == 200
assert "Seattle" in response.content[0].text
assert len(json.loads(response.content[0].text)["forecasts"]) == 3
finally:
# Clean up
await server.stop()實施適當的快取以減少延遲及資源使用:
// C# example with caching
public class CachedWeatherTool : ITool
{
private readonly IWeatherService _weatherService;
private readonly IDistributedCache _cache;
private readonly ILogger<CachedWeatherTool> _logger;
public CachedWeatherTool(
IWeatherService weatherService,
IDistributedCache cache,
ILogger<CachedWeatherTool> logger)
{
_weatherService = weatherService;
_cache = cache;
_logger = logger;
}
public string Name => "weatherForecast";
public async Task<ToolResponse> ExecuteAsync(IDictionary<string, object> parameters)
{
var location = parameters["location"].ToString();
var days = Convert.ToInt32(parameters.GetValueOrDefault("days", 3));
// Create cache key
string cacheKey = $"weather:{location}:{days}";
// Try to get from cache
string cachedForecast = await _cache.GetStringAsync(cacheKey);
if (!string.IsNullOrEmpty(cachedForecast))
{
_logger.LogInformation("Cache hit for weather forecast: {Location}", location);
return new ToolResponse
{
Content = new List<ContentItem>
{
new TextContent(cachedForecast)
}
};
}
// Cache miss - get from service
_logger.LogInformation("Cache miss for weather forecast: {Location}", location);
var forecast = await _weatherService.GetForecastAsync(location, days);
string forecastJson = JsonSerializer.Serialize(forecast);
// Store in cache (weather forecasts valid for 1 hour)
await _cache.SetStringAsync(
cacheKey,
forecastJson,
new DistributedCacheEntryOptions
{
AbsoluteExpirationRelativeToNow = TimeSpan.FromHours(1)
});
return new ToolResponse
{
Content = new List<ContentItem>
{
new TextContent(forecastJson)
}
};
}
}設計工具以透過構造函數注入接收其依賴項,使其可測試且可配置:
// Java example with dependency injection
public class CurrencyConversionTool implements Tool {
private final ExchangeRateService exchangeService;
private final CacheService cacheService;
private final Logger logger;
// Dependencies injected through constructor
public CurrencyConversionTool(
ExchangeRateService exchangeService,
CacheService cacheService,
Logger logger) {
this.exchangeService = exchangeService;
this.cacheService = cacheService;
this.logger = logger;
}
// Tool implementation
// ...
}設計可組合的工具,以創建更複雜的工作流程:
# Python example showing composable tools
class DataFetchTool(Tool):
def get_name(self):
return "dataFetch"
# Implementation...
class DataAnalysisTool(Tool):
def get_name(self):
return "dataAnalysis"
# This tool can use results from the dataFetch tool
async def execute_async(self, request):
# Implementation...
pass
class DataVisualizationTool(Tool):
def get_name(self):
return "dataVisualize"
# This tool can use results from the dataAnalysis tool
async def execute_async(self, request):
# Implementation...
pass
# These tools can be used independently or as part of a workflow架構是模型與工具之間的契約。良好的架構設計能提升工具的可用性。
始終為每個參數提供描述性信息:
public object GetSchema()
{
return new {
type = "object",
properties = new {
query = new {
type = "string",
description = "Search query text. Use precise keywords for better results."
},
filters = new {
type = "object",
description = "Optional filters to narrow down search results",
properties = new {
dateRange = new {
type = "string",
description = "Date range in format YYYY-MM-DD:YYYY-MM-DD"
},
category = new {
type = "string",
description = "Category name to filter by"
}
}
},
limit = new {
type = "integer",
description = "Maximum number of results to return (1-50)",
default = 10
}
},
required = new[] { "query" }
};
}包含驗證約束以防止無效輸入:
Map<String, Object> getSchema() {
Map<String, Object> schema = new HashMap<>();
schema.put("type", "object");
Map<String, Object> properties = new HashMap<>();
// Email property with format validation
Map<String, Object> email = new HashMap<>();
email.put("type", "string");
email.put("format", "email");
email.put("description", "User email address");
// Age property with numeric constraints
Map<String, Object> age = new HashMap<>();
age.put("type", "integer");
age.put("minimum", 13);
age.put("maximum", 120);
age.put("description", "User age in years");
// Enumerated property
Map<String, Object> subscription = new HashMap<>();
subscription.put("type", "string");
subscription.put("enum", Arrays.asList("free", "basic", "premium"));
subscription.put("default", "free");
subscription.put("description", "Subscription tier");
properties.put("email", email);
properties.put("age", age);
properties.put("subscription", subscription);
schema.put("properties", properties);
schema.put("required", Arrays.asList("email"));
return schema;
}保持響應結構的一致性,使模型更容易解釋結果:
async def execute_async(self, request):
try:
# Process request
results = await self._search_database(request.parameters["query"])
# Always return a consistent structure
return ToolResponse(
result={
"matches": [self._format_item(item) for item in results],
"totalCount": len(results),
"queryTime": calculation_time_ms,
"status": "success"
}
)
except Exception as e:
return ToolResponse(
result={
"matches": [],
"totalCount": 0,
"queryTime": 0,
"status": "error",
"error": str(e)
}
)
def _format_item(self, item):
"""Ensures each item has a consistent structure"""
return {
"id": item.id,
"title": item.title,
"summary": item.summary[:100] + "..." if len(item.summary) > 100 else item.summary,
"url": item.url,
"relevance": item.score
}強大的錯誤處理對 MCP 工具保持可靠性至關重要。
在適當的層級處理錯誤並提供信息豐富的消息:
public async Task<ToolResponse> ExecuteAsync(ToolRequest request)
{
try
{
string fileId = request.Parameters.GetProperty("fileId").GetString();
try
{
var fileData = await _fileService.GetFileAsync(fileId);
return new ToolResponse {
Result = JsonSerializer.SerializeToElement(fileData)
};
}
catch (FileNotFoundException)
{
throw new ToolExecutionException($"File not found: {fileId}");
}
catch (UnauthorizedAccessException)
{
throw new ToolExecutionException("You don't have permission to access this file");
}
catch (Exception ex) when (ex is IOException || ex is TimeoutException)
{
_logger.LogError(ex, "Error accessing file {FileId}", fileId);
throw new ToolExecutionException("Error accessing file: The service is temporarily unavailable");
}
}
catch (JsonException)
{
throw new ToolExecutionException("Invalid file ID format");
}
catch (Exception ex)
{
_logger.LogError(ex, "Unexpected error in FileAccessTool");
throw new ToolExecutionException("An unexpected error occurred");
}
}在可能的情況下返回結構化的錯誤信息:
@Override
public ToolResponse execute(ToolRequest request) {
try {
// Implementation
} catch (Exception ex) {
Map<String, Object> errorResult = new HashMap<>();
errorResult.put("success", false);
if (ex instanceof ValidationException) {
ValidationException validationEx = (ValidationException) ex;
errorResult.put("errorType", "validation");
errorResult.put("errorMessage", validationEx.getMessage());
errorResult.put("validationErrors", validationEx.getErrors());
return new ToolResponse.Builder()
.setResult(errorResult)
.build();
}
// Re-throw other exceptions as ToolExecutionException
throw new ToolExecutionException("Tool execution failed: " + ex.getMessage(), ex);
}
}對於暫時性故障實施適當的重試邏輯:
async def execute_async(self, request):
max_retries = 3
retry_count = 0
base_delay = 1 # seconds
while retry_count < max_retries:
try:
# Call external API
return await self._call_api(request.parameters)
except TransientError as e:
retry_count += 1
if retry_count >= max_retries:
raise ToolExecutionException(f"Operation failed after {max_retries} attempts: {str(e)}")
# Exponential backoff
delay = base_delay * (2 ** (retry_count - 1))
logging.warning(f"Transient error, retrying in {delay}s: {str(e)}")
await asyncio.sleep(delay)
except Exception as e:
# Non-transient error, don't retry
raise ToolExecutionException(f"Operation failed: {str(e)}")對昂貴的操作實施快取:
public class CachedDataTool : IMcpTool
{
private readonly IDatabase _database;
private readonly IMemoryCache _cache;
public CachedDataTool(IDatabase database, IMemoryCache cache)
{
_database = database;
_cache = cache;
}
public async Task<ToolResponse> ExecuteAsync(ToolRequest request)
{
var query = request.Parameters.GetProperty("query").GetString();
// Create cache key based on parameters
var cacheKey = $"data_query_{ComputeHash(query)}";
// Try to get from cache first
if (_cache.TryGetValue(cacheKey, out var cachedResult))
{
return new ToolResponse { Result = cachedResult };
}
// Cache miss - perform actual query
var result = await _database.QueryAsync(query);
// Store in cache with expiration
var cacheOptions = new MemoryCacheEntryOptions()
.SetAbsoluteExpiration(TimeSpan.FromMinutes(15));
_cache.Set(cacheKey, JsonSerializer.SerializeToElement(result), cacheOptions);
return new ToolResponse { Result = JsonSerializer.SerializeToElement(result) };
}
private string ComputeHash(string input)
{
// Implementation to generate stable hash for cache key
}
}對 I/O 密集型操作使用非同步編程模式:
public class AsyncDocumentProcessingTool implements Tool {
private final DocumentService documentService;
private final ExecutorService executorService;
@Override
public ToolResponse execute(ToolRequest request) {
String documentId = request.getParameters().get("documentId").asText();
// For long-running operations, return a processing ID immediately
String processId = UUID.randomUUID().toString();
// Start async processing
CompletableFuture.runAsync(() -> {
try {
// Perform long-running operation
documentService.processDocument(documentId);
// Update status (would typically be stored in a database)
processStatusRepository.updateStatus(processId, "completed");
} catch (Exception ex) {
processStatusRepository.updateStatus(processId, "failed", ex.getMessage());
}
}, executorService);
// Return immediate response with process ID
Map<String, Object> result = new HashMap<>();
result.put("processId", processId);
result.put("status", "processing");
result.put("estimatedCompletionTime", ZonedDateTime.now().plusMinutes(5));
return new ToolResponse.Builder().setResult(result).build();
}
// Companion status check tool
public class ProcessStatusTool implements Tool {
@Override
public ToolResponse execute(ToolRequest request) {
String processId = request.getParameters().get("processId").asText();
ProcessStatus status = processStatusRepository.getStatus(processId);
return new ToolResponse.Builder().setResult(status).build();
}
}
}實施資源節流以防止過載:
class ThrottledApiTool(Tool):
def __init__(self):
self.rate_limiter = TokenBucketRateLimiter(
tokens_per_second=5, # Allow 5 requests per second
bucket_size=10 # Allow bursts up to 10 requests
)
async def execute_async(self, request):
# Check if we can proceed or need to wait
delay = self.rate_limiter.get_delay_time()
if delay > 0:
if delay > 2.0: # If wait is too long
raise ToolExecutionException(
f"Rate limit exceeded. Please try again in {delay:.1f} seconds."
)
else:
# Wait for the appropriate delay time
await asyncio.sleep(delay)
# Consume a token and proceed with the request
self.rate_limiter.consume()
# Call API
result = await self._call_api(request.parameters)
return ToolResponse(result=result)
class TokenBucketRateLimiter:
def __init__(self, tokens_per_second, bucket_size):
self.tokens_per_second = tokens_per_second
self.bucket_size = bucket_size
self.tokens = bucket_size
self.last_refill = time.time()
self.lock = asyncio.Lock()
async def get_delay_time(self):
async with self.lock:
self._refill()
if self.tokens >= 1:
return 0
# Calculate time until next token available
return (1 - self.tokens) / self.tokens_per_second
async def consume(self):
async with self.lock:
self._refill()
self.tokens -= 1
def _refill(self):
now = time.time()
elapsed = now - self.last_refill
# Add new tokens based on elapsed time
new_tokens = elapsed * self.tokens_per_second
self.tokens = min(self.bucket_size, self.tokens + new_tokens)
self.last_refill = now始終徹底驗證輸入參數:
public async Task<ToolResponse> ExecuteAsync(ToolRequest request)
{
// Validate parameters exist
if (!request.Parameters.TryGetProperty("query", out var queryProp))
{
throw new ToolExecutionException("Missing required parameter: query");
}
// Validate correct type
if (queryProp.ValueKind != JsonValueKind.String)
{
throw new ToolExecutionException("Query parameter must be a string");
}
var query = queryProp.GetString();
// Validate string content
if (string.IsNullOrWhiteSpace(query))
{
throw new ToolExecutionException("Query parameter cannot be empty");
}
if (query.Length > 500)
{
throw new ToolExecutionException("Query parameter exceeds maximum length of 500 characters");
}
// Check for SQL injection attacks if applicable
if (ContainsSqlInjection(query))
{
throw new ToolExecutionException("Invalid query: contains potentially unsafe SQL");
}
// Proceed with execution
// ...
}實施正確的授權檢查:
@Override
public ToolResponse execute(ToolRequest request) {
// Get user context from request
UserContext user = request.getContext().getUserContext();
// Check if user has required permissions
if (!authorizationService.hasPermission(user, "documents:read")) {
throw new ToolExecutionException("User does not have permission to access documents");
}
// For specific resources, check access to that resource
String documentId = request.getParameters().get("documentId").asText();
if (!documentService.canUserAccess(user.getId(), documentId)) {
throw new ToolExecutionException("Access denied to the requested document");
}
// Proceed with tool execution
// ...
}謹慎處理敏感數據:
class SecureDataTool(Tool):
def get_schema(self):
return {
"type": "object",
"properties": {
"userId": {"type": "string"},
"includeSensitiveData": {"type": "boolean", "default": False}
},
"required": ["userId"]
}
async def execute_async(self, request):
user_id = request.parameters["userId"]
include_sensitive = request.parameters.get("includeSensitiveData", False)
# Get user data
user_data = await self.user_service.get_user_data(user_id)
# Filter sensitive fields unless explicitly requested AND authorized
if not include_sensitive or not self._is_authorized_for_sensitive_data(request):
user_data = self._redact_sensitive_fields(user_data)
return ToolResponse(result=user_data)
def _is_authorized_for_sensitive_data(self, request):
# Check authorization level in request context
auth_level = request.context.get("authorizationLevel")
return auth_level == "admin"
def _redact_sensitive_fields(self, user_data):
# Create a copy to avoid modifying the original
redacted = user_data.copy()
# Redact specific sensitive fields
sensitive_fields = ["ssn", "creditCardNumber", "password"]
for field in sensitive_fields:
if field in redacted:
redacted[field] = "REDACTED"
# Redact nested sensitive data
if "financialInfo" in redacted:
redacted["financialInfo"] = {"available": True, "accessRestricted": True}
return redacted全面的測試能確保 MCP 工具正確運行,處理邊界情況,並與系統其他部分正確集成。
為每個工具的功能創建專注的測試:
[Fact]
public async Task WeatherTool_ValidLocation_ReturnsCorrectForecast()
{
// Arrange
var mockWeatherService = new Mock<IWeatherService>();
mockWeatherService
.Setup(s => s.GetForecastAsync("Seattle", 3))
.ReturnsAsync(new WeatherForecast(/* test data */));
var tool = new WeatherForecastTool(mockWeatherService.Object);
var request = new ToolRequest(
toolName: "weatherForecast",
parameters: JsonSerializer.SerializeToElement(new {
location = "Seattle",
days = 3
})
);
// Act
var response = await tool.ExecuteAsync(request);
// Assert
Assert.NotNull(response);
var result = JsonSerializer.Deserialize<WeatherForecast>(response.Result);
Assert.Equal("Seattle", result.Location);
Assert.Equal(3, result.DailyForecasts.Count);
}
[Fact]
public async Task WeatherTool_InvalidLocation_ThrowsToolExecutionException()
{
// Arrange
var mockWeatherService = new Mock<IWeatherService>();
mockWeatherService
.Setup(s => s.GetForecastAsync("InvalidLocation", It.IsAny<int>()))
.ThrowsAsync(new LocationNotFoundException("Location not found"));
var tool = new WeatherForecastTool(mockWeatherService.Object);
var request = new ToolRequest(
toolName: "weatherForecast",
parameters: JsonSerializer.SerializeToElement(new {
location = "InvalidLocation",
days = 3
})
);
// Act & Assert
var exception = await Assert.ThrowsAsync<ToolExecutionException>(
() => tool.ExecuteAsync(request)
);
Assert.Contains("Location not found", exception.Message);
}測試架構是否有效並正確執行約束:
@Test
public void testSchemaValidation() {
// Create tool instance
SearchTool searchTool = new SearchTool();
// Get schema
Object schema = searchTool.getSchema();
// Convert schema to JSON for validation
String schemaJson = objectMapper.writeValueAsString(schema);
// Validate schema is valid JSONSchema
JsonSchemaFactory factory = JsonSchemaFactory.byDefault();
JsonSchema jsonSchema = factory.getJsonSchema(schemaJson);
// Test valid parameters
JsonNode validParams = objectMapper.createObjectNode()
.put("query", "test query")
.put("limit", 5);
ProcessingReport validReport = jsonSchema.validate(validParams);
assertTrue(validReport.isSuccess());
// Test missing required parameter
JsonNode missingRequired = objectMapper.createObjectNode()
.put("limit", 5);
ProcessingReport missingReport = jsonSchema.validate(missingRequired);
assertFalse(missingReport.isSuccess());
// Test invalid parameter type
JsonNode invalidType = objectMapper.createObjectNode()
.put("query", "test")
.put("limit", "not-a-number");
ProcessingReport invalidReport = jsonSchema.validate(invalidType);
assertFalse(invalidReport.isSuccess());
}為錯誤情況創建特定測試:
@pytest.mark.asyncio
async def test_api_tool_handles_timeout():
# Arrange
tool = ApiTool(timeout=0.1) # Very short timeout
# Mock a request that will time out
with aioresponses() as mocked:
mocked.get(
"https://api.example.com/data",
callback=lambda *args, **kwargs: asyncio.sleep(0.5) # Longer than timeout
)
request = ToolRequest(
tool_name="apiTool",
parameters={"url": "https://api.example.com/data"}
)
# Act & Assert
with pytest.raises(ToolExecutionException) as exc_info:
await tool.execute_async(request)
# Verify exception message
assert "timed out" in str(exc_info.value).lower()
@pytest.mark.asyncio
async def test_api_tool_handles_rate_limiting():
# Arrange
tool = ApiTool()
# Mock a rate-limited response
with aioresponses() as mocked:
mocked.get(
"https://api.example.com/data",
status=429,
headers={"Retry-After": "2"},
body=json.dumps({"error": "Rate limit exceeded"})
)
request = ToolRequest(
tool_name="apiTool",
parameters={"url": "https://api.example.com/data"}
)
# Act & Assert
with pytest.raises(ToolExecutionException) as exc_info:
await tool.execute_async(request)
# Verify exception contains rate limit information
error_msg = str(exc_info.value).lower()
assert "rate limit" in error_msg
assert "try again" in error_msg測試工具在預期組合中的協同工作:
[Fact]
public async Task DataProcessingWorkflow_CompletesSuccessfully()
{
// Arrange
var dataFetchTool = new DataFetchTool(mockDataService.Object);
var analysisTools = new DataAnalysisTool(mockAnalysisService.Object);
var visualizationTool = new DataVisualizationTool(mockVisualizationService.Object);
var toolRegistry = new ToolRegistry();
toolRegistry.RegisterTool(dataFetchTool);
toolRegistry.RegisterTool(analysisTools);
toolRegistry.RegisterTool(visualizationTool);
var workflowExecutor = new WorkflowExecutor(toolRegistry);
// Act
var result = await workflowExecutor.ExecuteWorkflowAsync(new[] {
new ToolCall("dataFetch", new { source = "sales2023" }),
new ToolCall("dataAnalysis", ctx => new {
data = ctx.GetResult("dataFetch"),
analysis = "trend"
}),
new ToolCall("dataVisualize", ctx => new {
analysisResult = ctx.GetResult("dataAnalysis"),
type = "line-chart"
})
});
// Assert
Assert.NotNull(result);
Assert.True(result.Success);
Assert.NotNull(result.GetResult("dataVisualize"));
Assert.Contains("chartUrl", result.GetResult("dataVisualize").ToString());
}測試 MCP 伺服器的完整工具註冊及執行:
@SpringBootTest
@AutoConfigureMockMvc
public class McpServerIntegrationTest {
@Autowired
private MockMvc mockMvc;
@Autowired
private ObjectMapper objectMapper;
@Test
public void testToolDiscovery() throws Exception {
// Test the discovery endpoint
mockMvc.perform(get("/mcp/tools"))
.andExpect(status().isOk())
.andExpect(jsonPath("$.tools").isArray())
.andExpect(jsonPath("$.tools[*].name").value(hasItems(
"weatherForecast", "calculator", "documentSearch"
)));
}
@Test
public void testToolExecution() throws Exception {
// Create tool request
Map<String, Object> request = new HashMap<>();
request.put("toolName", "calculator");
Map<String, Object> parameters = new HashMap<>();
parameters.put("operation", "add");
parameters.put("a", 5);
parameters.put("b", 7);
request.put("parameters", parameters);
// Send request and verify response
mockMvc.perform(post("/mcp/execute")
.contentType(MediaType.APPLICATION_JSON)
.content(objectMapper.writeValueAsString(request)))
.andExpect(status().isOk())
.andExpect(jsonPath("$.result.value").value(12));
}
@Test
public void testToolValidation() throws Exception {
// Create invalid tool request
Map<String, Object> request = new HashMap<>();
request.put("toolName", "calculator");
Map<String, Object> parameters = new HashMap<>();
parameters.put("operation", "divide");
parameters.put("a", 10);
// Missing parameter "b"
request.put("parameters", parameters);
// Send request and verify error response
mockMvc.perform(post("/mcp/execute")
.contentType(MediaType.APPLICATION_JSON)
.content(objectMapper.writeValueAsString(request)))
.andExpect(status().isBadRequest())
.andExpect(jsonPath("$.error").exists());
}
}測試從模型提示到工具執行的完整工作流程:
@pytest.mark.asyncio
async def test_model_interaction_with_tool():
# Arrange - Set up MCP client and mock model
mcp_client = McpClient(server_url="http://localhost:5000")
# Mock model responses
mock_model = MockLanguageModel([
MockResponse(
"What's the weather in Seattle?",
tool_calls=[{
"tool_name": "weatherForecast",
"parameters": {"location": "Seattle", "days": 3}
}]
),
MockResponse(
"Here's the weather forecast for Seattle:\n- Today: 65°F, Partly Cloudy\n- Tomorrow: 68°F, Sunny\n- Day after: 62°F, Rain",
tool_calls=[]
)
])
# Mock weather tool response
with aioresponses() as mocked:
mocked.post(
"http://localhost:5000/mcp/execute",
payload={
"result": {
"location": "Seattle",
"forecast": [
{"date": "2023-06-01", "temperature": 65, "conditions": "Partly Cloudy"},
{"date": "2023-06-02", "temperature": 68, "conditions": "Sunny"},
{"date": "2023-06-03", "temperature": 62, "conditions": "Rain"}
]
}
}
)
# Act
response = await mcp_client.send_prompt(
"What's the weather in Seattle?",
model=mock_model,
allowed_tools=["weatherForecast"]
)
# Assert
assert "Seattle" in response.generated_text
assert "65" in response.generated_text
assert "Sunny" in response.generated_text
assert "Rain" in response.generated_text
assert len(response.tool_calls) == 1
assert response.tool_calls[0].tool_name == "weatherForecast"測試 MCP 伺服器能處理的並發請求數量:
[Fact]
public async Task McpServer_HandlesHighConcurrency()
{
// Arrange
var server = new McpServer(
name: "TestServer",
version: "1.0",
maxConcurrentRequests: 100
);
server.RegisterTool(new FastExecutingTool());
await server.StartAsync();
var client = new McpClient("http://localhost:5000");
// Act
var tasks = new List<Task<McpResponse>>();
for (int i = 0; i < 1000; i++)
{
tasks.Add(client.ExecuteToolAsync("fastTool", new { iteration = i }));
}
var results = await Task.WhenAll(tasks);
// Assert
Assert.Equal(1000, results.Length);
Assert.All(results, r => Assert.NotNull(r));
}在極端負載下測試系統:
@Test
public void testServerUnderStress() {
int maxUsers = 1000;
int rampUpTimeSeconds = 60;
int testDurationSeconds = 300;
// Set up JMeter for stress testing
StandardJMeterEngine jmeter = new StandardJMeterEngine();
// Configure JMeter test plan
HashTree testPlanTree = new HashTree();
// Create test plan, thread group, samplers, etc.
TestPlan testPlan = new TestPlan("MCP Server Stress Test");
testPlanTree.add(testPlan);
ThreadGroup threadGroup = new ThreadGroup();
threadGroup.setNumThreads(maxUsers);
threadGroup.setRampUp(rampUpTimeSeconds);
threadGroup.setScheduler(true);
threadGroup.setDuration(testDurationSeconds);
testPlanTree.add(threadGroup);
// Add HTTP sampler for tool execution
HTTPSampler toolExecutionSampler = new HTTPSampler();
toolExecutionSampler.setDomain("localhost");
toolExecutionSampler.setPort(5000);
toolExecutionSampler.setPath("/mcp/execute");
toolExecutionSampler.setMethod("POST");
toolExecutionSampler.addArgument("toolName", "calculator");
toolExecutionSampler.addArgument("parameters", "{\"operation\":\"add\",\"a\":5,\"b\":7}");
threadGroup.add(toolExecutionSampler);
// Add listeners
SummaryReport summaryReport = new SummaryReport();
threadGroup.add(summaryReport);
// Run test
jmeter.configure(testPlanTree);
jmeter.run();
// Validate results
assertEquals(0, summaryReport.getErrorCount());
assertTrue(summaryReport.getAverage() < 200); // Average response time < 200ms
assertTrue(summaryReport.getPercentile(90.0) < 500); // 90th percentile < 500ms
}設置監控以進行長期性能分析:
# Configure monitoring for an MCP server
def configure_monitoring(server):
# Set up Prometheus metrics
prometheus_metrics = {
"request_count": Counter("mcp_requests_total", "Total MCP requests"),
"request_latency": Histogram(
"mcp_request_duration_seconds",
"Request duration in seconds",
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 2.5, 5.0, 10.0]
),
"tool_execution_count": Counter(
"mcp_tool_executions_total",
"Tool execution count",
labelnames=["tool_name"]
),
"tool_execution_latency": Histogram(
"mcp_tool_duration_seconds",
"Tool execution duration in seconds",
labelnames=["tool_name"],
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 2.5, 5.0, 10.0]
),
"tool_errors": Counter(
"mcp_tool_errors_total",
"Tool execution errors",
labelnames=["tool_name", "error_type"]
)
}
# Add middleware for timing and recording metrics
server.add_middleware(PrometheusMiddleware(prometheus_metrics))
# Expose metrics endpoint
@server.router.get("/metrics")
async def metrics():
return generate_latest()
return server良好的 MCP 工作流程設計能提升效率、可靠性及可維護性。以下是關鍵模式:
將多個工具連接成序列,每個工具的輸出成為下一個工具的輸入:
# Python Chain of Tools implementation
class ChainWorkflow:
def __init__(self, tools_chain):
self.tools_chain = tools_chain # List of tool names to execute in sequence
async def execute(self, mcp_client, initial_input):
current_result = initial_input
all_results = {"input": initial_input}
for tool_name in self.tools_chain:
# Execute each tool in the chain, passing previous result
response = await mcp_client.execute_tool(tool_name, current_result)
# Store result and use as input for next tool
all_results[tool_name] = response.result
current_result = response.result
return {
"final_result": current_result,
"all_results": all_results
}
# Example usage
data_processing_chain = ChainWorkflow([
"dataFetch",
"dataCleaner",
"dataAnalyzer",
"dataVisualizer"
])
result = await data_processing_chain.execute(
mcp_client,
{"source": "sales_database", "table": "transactions"}
)使用一個中央工具根據輸入分派到專業工具:
public class ContentDispatcherTool : IMcpTool
{
private readonly IMcpClient _mcpClient;
public ContentDispatcherTool(IMcpClient mcpClient)
{
_mcpClient = mcpClient;
}
public string Name => "contentProcessor";
public string Description => "Processes content of various types";
public object GetSchema()
{
return new {
type = "object",
properties = new {
content = new { type = "string" },
contentType = new {
type = "string",
enum = new[] { "text", "html", "markdown", "csv", "code" }
},
operation = new {
type = "string",
enum = new[] { "summarize", "analyze", "extract", "convert" }
}
},
required = new[] { "content", "contentType", "operation" }
};
}
public async Task<ToolResponse> ExecuteAsync(ToolRequest request)
{
var content = request.Parameters.GetProperty("content").GetString();
var contentType = request.Parameters.GetProperty("contentType").GetString();
var operation = request.Parameters.GetProperty("operation").GetString();
// Determine which specialized tool to use
string targetTool = DetermineTargetTool(contentType, operation);
// Forward to the specialized tool
var specializedResponse = await _mcpClient.ExecuteToolAsync(
targetTool,
new { content, options = GetOptionsForTool(targetTool, operation) }
);
return new ToolResponse { Result = specializedResponse.Result };
}
private string DetermineTargetTool(string contentType, string operation)
{
return (contentType, operation) switch
{
("text", "summarize") => "textSummarizer",
("text", "analyze") => "textAnalyzer",
("html", _) => "htmlProcessor",
("markdown", _) => "markdownProcessor",
("csv", _) => "csvProcessor",
("code", _) => "codeAnalyzer",
_ => throw new ToolExecutionException($"No tool available for {contentType}/{operation}")
};
}
private object GetOptionsForTool(string toolName, string operation)
{
// Return appropriate options for each specialized tool
return toolName switch
{
"textSummarizer" => new { length = "medium" },
"htmlProcessor" => new { cleanUp = true, operation },
// Options for other tools...
_ => new { }
};
}
}同時執行多個工具以提高效率:
public class ParallelDataProcessingWorkflow {
private final McpClient mcpClient;
public ParallelDataProcessingWorkflow(McpClient mcpClient) {
this.mcpClient = mcpClient;
}
public WorkflowResult execute(String datasetId) {
// Step 1: Fetch dataset metadata (synchronous)
ToolResponse metadataResponse = mcpClient.executeTool("datasetMetadata",
Map.of("datasetId", datasetId));
// Step 2: Launch multiple analyses in parallel
CompletableFuture<ToolResponse> statisticalAnalysis = CompletableFuture.supplyAsync(() ->
mcpClient.executeTool("statisticalAnalysis", Map.of(
"datasetId", datasetId,
"type", "comprehensive"
))
);
CompletableFuture<ToolResponse> correlationAnalysis = CompletableFuture.supplyAsync(() ->
mcpClient.executeTool("correlationAnalysis", Map.of(
"datasetId", datasetId,
"method", "pearson"
))
);
CompletableFuture<ToolResponse> outlierDetection = CompletableFuture.supplyAsync(() ->
mcpClient.executeTool("outlierDetection", Map.of(
"datasetId", datasetId,
"sensitivity", "medium"
))
);
// Wait for all parallel tasks to complete
CompletableFuture<Void> allAnalyses = CompletableFuture.allOf(
statisticalAnalysis, correlationAnalysis, outlierDetection
);
allAnalyses.join(); // Wait for completion
// Step 3: Combine results
Map<String, Object> combinedResults = new HashMap<>();
combinedResults.put("metadata", metadataResponse.getResult());
combinedResults.put("statistics", statisticalAnalysis.join().getResult());
combinedResults.put("correlations", correlationAnalysis.join().getResult());
combinedResults.put("outliers", outlierDetection.join().getResult());
// Step 4: Generate summary report
ToolResponse summaryResponse = mcpClient.executeTool("reportGenerator",
Map.of("analysisResults", combinedResults));
// Return complete workflow result
WorkflowResult result = new WorkflowResult();
result.setDatasetId(datasetId);
result.setAnalysisResults(combinedResults);
result.setSummaryReport(summaryResponse.getResult());
return result;
}
}為工具故障實施優雅的回退機制:
class ResilientWorkflow:
def __init__(self, mcp_client):
self.client = mcp_client
async def execute_with_fallback(self, primary_tool, fallback_tool, parameters):
try:
# Try primary tool first
response = await self.client.execute_tool(primary_tool, parameters)
return {
"result": response.result,
"source": "primary",
"tool": primary_tool
}
except ToolExecutionException as e:
# Log the failure
logging.warning(f"Primary tool '{primary_tool}' failed: {str(e)}")
# Fall back to secondary tool
try:
# Might need to transform parameters for fallback tool
fallback_params = self._adapt_parameters(parameters, primary_tool, fallback_tool)
response = await self.client.execute_tool(fallback_tool, fallback_params)
return {
"result": response.result,
"source": "fallback",
"tool": fallback_tool,
"primaryError": str(e)
}
except ToolExecutionException as fallback_error:
# Both tools failed
logging.error(f"Both primary and fallback tools failed. Fallback error: {str(fallback_error)}")
raise WorkflowExecutionException(
f"Workflow failed: primary error: {str(e)}; fallback error: {str(fallback_error)}"
)
def _adapt_parameters(self, params, from_tool, to_tool):
"""Adapt parameters between different tools if needed"""
# This implementation would depend on the specific tools
# For this example, we'll just return the original parameters
return params
# Example usage
async def get_weather(workflow, location):
return await workflow.execute_with_fallback(
"premiumWeatherService", # Primary (paid) weather API
"basicWeatherService", # Fallback (free) weather API
{"location": location}
)通過組合簡單的工作流程構建複雜的工作流程:
public class CompositeWorkflow : IWorkflow
{
private readonly List<IWorkflow> _workflows;
public CompositeWorkflow(IEnumerable<IWorkflow> workflows)
{
_workflows = new List<IWorkflow>(workflows);
}
public async Task<WorkflowResult> ExecuteAsync(WorkflowContext context)
{
var results = new Dictionary<string, object>();
foreach (var workflow in _workflows)
{
var workflowResult = await workflow.ExecuteAsync(context);
// Store each workflow's result
results[workflow.Name] = workflowResult;
// Update context with the result for the next workflow
context = context.WithResult(workflow.Name, workflowResult);
}
return new WorkflowResult(results);
}
public string Name => "CompositeWorkflow";
public string Description => "Executes multiple workflows in sequence";
}
// Example usage
var documentWorkflow = new CompositeWorkflow(new IWorkflow[] {
new DocumentFetchWorkflow(),
new DocumentProcessingWorkflow(),
new InsightGenerationWorkflow(),
new ReportGenerationWorkflow()
});
var result = await documentWorkflow.ExecuteAsync(new WorkflowContext {
Parameters = new { documentId = "12345" }
});測試是開發可靠、高質量 MCP 伺服器的重要環節。本指南提供全面的最佳實踐及技巧,涵蓋從單元測試到集成測試及端到端驗證的 MCP 伺服器測試。
MCP 伺服器作為 AI 模型與客戶端應用之間的重要中介。全面的測試能確保:
- 生產環境中的可靠性
- 請求及響應的準確處理
- 正確實施 MCP 規範
- 抵禦故障及邊界情況的韌性
- 在不同負載下的穩定性能
單元測試在隔離環境中驗證 MCP 伺服器的各個組件。
- 資源處理器:獨立測試每個資源處理器的邏輯
- 工具實現:使用各種輸入驗證工具行為
- 提示模板:確保提示模板正確渲染
- 架構驗證:測試參數驗證邏輯
- 錯誤處理:驗證對無效輸入的錯誤響應
// Example unit test for a calculator tool in C#
[Fact]
public async Task CalculatorTool_Add_ReturnsCorrectSum()
{
// Arrange
var calculator = new CalculatorTool();
var parameters = new Dictionary<string, object>
{
["operation"] = "add",
["a"] = 5,
["b"] = 7
};
// Act
var response = await calculator.ExecuteAsync(parameters);
var result = JsonSerializer.Deserialize<CalculationResult>(response.Content[0].ToString());
// Assert
Assert.Equal(12, result.Value);
}# Example unit test for a calculator tool in Python
def test_calculator_tool_add():
# Arrange
calculator = CalculatorTool()
parameters = {
"operation": "add",
"a": 5,
"b": 7
}
# Act
response = calculator.execute(parameters)
result = json.loads(response.content[0].text)
# Assert
assert result["value"] == 12集成測試驗證 MCP 伺服器各組件之間的交互。
- 伺服器初始化:測試伺服器在不同配置下的啟動
- 路由註冊:驗證所有端點是否正確註冊
- 請求處理:測試完整的請求-響應周期
- 錯誤傳播:確保錯誤在各組件間正確處理
- 身份驗證及授權:測試安全機制
// Example integration test for MCP server in C#
[Fact]
public async Task Server_ProcessToolRequest_ReturnsValidResponse()
{
// Arrange
var server = new McpServer();
server.RegisterTool(new CalculatorTool());
await server.StartAsync();
var request = new McpRequest
{
Tool = "calculator",
Parameters = new Dictionary<string, object>
{
["operation"] = "multiply",
["a"] = 6,
["b"] = 7
}
};
// Act
var response = await server.ProcessRequestAsync(request);
// Assert
Assert.NotNull(response);
Assert.Equal(McpStatusCodes.Success, response.StatusCode);
// Additional assertions for response content
// Cleanup
await server.StopAsync();
}端到端測試驗證從客戶端到伺服器的完整系統行為。
- 客戶端-伺服器通信:測試完整的請求-響應周期
- 真實客戶端 SDK:使用實際客戶端實現進行測試
- 負載下的性能:驗證多個並發請求的行為
- 錯誤恢復:測試系統從故障中的恢復
- 長時間運行的操作:驗證流式及長時間操作的處理
// Example E2E test with a client in TypeScript
describe('MCP Server E2E Tests', () => {
let client: McpClient;
beforeAll(async () => {
// Start server in test environment
await startTestServer();
client = new McpClient('http://localhost:5000');
});
afterAll(async () => {
await stopTestServer();
});
test('Client can invoke calculator tool and get correct result', async () => {
// Act
const response = await client.invokeToolAsync('calculator', {
operation: 'divide',
a: 20,
b: 4
});
// Assert
expect(response.statusCode).toBe(200);
expect(response.content[0].text).toContain('5');
});
});模擬是隔離組件測試的關鍵。
- 外部 AI 模型:模擬模型響應以進行可預測的測試
- 外部服務:模擬 API 依賴(數據庫、第三方服務)
- 身份驗證服務:模擬身份提供者
- 資源提供者:模擬昂貴的資源處理器
// C# example with Moq
var mockModel = new Mock<ILanguageModel>();
mockModel
.Setup(m => m.GenerateResponseAsync(
It.IsAny<string>(),
It.IsAny<McpRequestContext>()))
.ReturnsAsync(new ModelResponse {
Text = "Mocked model response",
FinishReason = FinishReason.Completed
});
var server = new McpServer(modelClient: mockModel.Object);# Python example with unittest.mock
@patch('mcp_server.models.OpenAIModel')
def test_with_mock_model(mock_model):
# Configure mock
mock_model.return_value.generate_response.return_value = {
"text": "Mocked model response",
"finish_reason": "completed"
}
# Use mock in test
server = McpServer(model_client=mock_model)
# Continue with test性能測試對於生產 MCP 伺服器至關重要。
- 延遲:請求的響應時間
- 吞吐量:每秒處理的請求數量
- 資源使用:CPU、內存及網絡使用情況
- 並發處理能力:在並行請求下的行為
- 擴展特性:隨負載增加的性能表現
- k6:開源負載測試工具
- JMeter:全面的性能測試工具
- Locust:基於 Python 的負載測試工具
- Azure 負載測試:基於雲的性能測試工具
// k6 script for load testing MCP server
import http from 'k6/http';
import { check, sleep } from 'k6';
export const options = {
vus: 10, // 10 virtual users
duration: '30s',
};
export default function () {
const payload = JSON.stringify({
tool: 'calculator',
parameters: {
operation: 'add',
a: Math.floor(Math.random() * 100),
b: Math.floor(Math.random() * 100)
}
});
const params = {
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer test-token'
},
};
const res = http.post('http://localhost:5000/api/tools/invoke', payload, params);
check(res, {
'status is 200': (r) => r.status === 200,
'response time < 500ms': (r) => r.timings.duration < 500,
});
sleep(1);
}自動化測試能確保一致的質量及更快的反饋周期。
- 在拉取請求上運行單元測試:確保代碼更改不破壞現有功能
- 在預生產環境中進行集成測試:運行集成測試以驗證系統完整性
- 效能基準:維持效能基準以捕捉回歸問題
- 安全掃描:將安全測試自動化,作為管道的一部分
name: MCP Server Tests
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Runtime
uses: actions/setup-dotnet@v1
with:
dotnet-version: '8.0.x'
- name: Restore dependencies
run: dotnet restore
- name: Build
run: dotnet build --no-restore
- name: Unit Tests
run: dotnet test --no-build --filter Category=Unit
- name: Integration Tests
run: dotnet test --no-build --filter Category=Integration
- name: Performance Tests
run: dotnet run --project tests/PerformanceTests/PerformanceTests.csproj確認您的伺服器正確實現 MCP 規範。
- API 端點:測試所需的端點 (/resources, /tools 等)
- 請求/回應格式:驗證符合架構規範
- 錯誤代碼:確認各種情境下的正確狀態代碼
- 內容類型:測試不同內容類型的處理
- 身份驗證流程:確認符合規範的身份驗證機制
[Fact]
public async Task Server_ResourceEndpoint_ReturnsCorrectSchema()
{
// Arrange
var client = new HttpClient();
client.DefaultRequestHeaders.Add("Authorization", "Bearer test-token");
// Act
var response = await client.GetAsync("http://localhost:5000/api/resources");
var content = await response.Content.ReadAsStringAsync();
var resources = JsonSerializer.Deserialize<ResourceList>(content);
// Assert
Assert.Equal(HttpStatusCode.OK, response.StatusCode);
Assert.NotNull(resources);
Assert.All(resources.Resources, resource =>
{
Assert.NotNull(resource.Id);
Assert.NotNull(resource.Type);
// Additional schema validation
});
}- 分開測試工具定義:獨立驗證架構定義,而非工具邏輯
- 使用參數化測試:使用多種輸入,包括邊界情況,測試工具
- 檢查錯誤回應:驗證所有可能錯誤情況的正確錯誤處理
- 測試授權邏輯:確保不同使用者角色的正確存取控制
- 監控測試覆蓋率:目標是高覆蓋率的關鍵路徑代碼
- 測試串流回應:驗證串流內容的正確處理
- 模擬網絡問題:測試在網絡狀況不佳時的行為
- 測試資源限制:驗證在達到配額或速率限制時的行為
- 自動化回歸測試:建立一個在每次代碼變更時運行的測試套件
- 記錄測試案例:維護清晰的測試場景文檔
- 過度依賴正向測試:確保徹底測試錯誤情況
- 忽略效能測試:在影響生產之前識別瓶頸
- 僅進行孤立測試:結合單元測試、整合測試和端到端測試
- API 覆蓋不足:確保所有端點和功能都被測試
- 不一致的測試環境:使用容器確保一致的測試環境
全面的測試策略對於開發可靠、高品質的 MCP 伺服器至關重要。通過實施本指南中概述的最佳實踐和技巧,您可以確保 MCP 實現符合最高的品質、可靠性和效能標準。
- 工具設計:遵循單一職責原則,使用依賴注入,設計可組合性
- 架構設計:創建清晰、文檔完善的架構,並設置適當的驗證約束
- 錯誤處理:實現優雅的錯誤處理、結構化錯誤回應和重試邏輯
- 效能:使用緩存、非同步處理和資源節流
- 安全性:進行全面的輸入驗證、授權檢查和敏感數據處理
- 測試:創建全面的單元測試、整合測試和端到端測試
- 工作流模式:應用已建立的模式,如鏈式處理、分派器和並行處理
設計一個 MCP 工具和工作流,用於文件處理系統:
- 接收多種格式的文件 (PDF, DOCX, TXT)
- 從文件中提取文本和關鍵信息
- 根據類型和內容對文件進行分類
- 為每個文件生成摘要
實現工具架構、錯誤處理,以及最適合此場景的工作流模式。考慮如何測試此實現。
- 加入 MCP 社群 Azure AI Foundry Discord Community,了解最新動態
- 為開源 MCP 項目 做出貢獻
- 在您自己的組織的 AI 計劃中應用 MCP 原則
- 探索針對您行業的專門 MCP 實現
- 考慮參加進階課程,學習特定 MCP 主題,例如多模態整合或企業應用整合
- 通過 實驗室操作 所學原則,嘗試構建自己的 MCP 工具和工作流
下一步:最佳實踐 案例研究
免責聲明:
本文件已使用人工智能翻譯服務 Co-op Translator 進行翻譯。儘管我們致力於提供準確的翻譯,請注意自動翻譯可能包含錯誤或不準確之處。原始語言的文件應被視為權威來源。對於重要信息,建議使用專業人工翻譯。我們對因使用此翻譯而引起的任何誤解或錯誤解釋概不負責。
