(위 이미지를 클릭하면 이 강의의 동영상을 볼 수 있습니다)
이 강의는 MCP 서버와 기능을 생산 환경에서 개발, 테스트 및 배포할 때 필요한 고급 모범 사례에 초점을 맞춥니다. MCP 생태계가 점점 복잡해지고 중요성이 커짐에 따라, 확립된 패턴을 따르는 것은 신뢰성, 유지보수성 및 상호운용성을 보장하는 데 필수적입니다. 이 강의는 실제 MCP 구현에서 얻은 실용적인 지혜를 통합하여 강력하고 효율적인 서버를 만들고 효과적인 리소스, 프롬프트 및 도구를 활용하는 방법을 안내합니다.
이 강의를 마치면 다음을 수행할 수 있습니다:
- MCP 서버와 기능 설계에서 업계 모범 사례를 적용
- MCP 서버에 대한 포괄적인 테스트 전략 수립
- 복잡한 MCP 애플리케이션을 위한 효율적이고 재사용 가능한 워크플로 패턴 설계
- MCP 서버에서 적절한 오류 처리, 로깅 및 관찰성 구현
- 성능, 보안 및 유지보수를 최적화한 MCP 구현
구체적인 구현 사례를 살펴보기 전에, 효과적인 MCP 개발을 이끄는 핵심 원칙을 이해하는 것이 중요합니다:
-
표준화된 통신: MCP는 JSON-RPC 2.0을 기반으로 하며, 모든 구현에서 요청, 응답 및 오류 처리를 위한 일관된 형식을 제공합니다.
-
사용자 중심 설계: MCP 구현에서 항상 사용자 동의, 제어 및 투명성을 우선시하십시오.
-
보안 우선: 인증, 권한 부여, 유효성 검사 및 속도 제한을 포함한 강력한 보안 조치를 구현하십시오.
-
모듈형 아키텍처: 각 도구와 리소스가 명확하고 집중된 목적을 가지도록 MCP 서버를 모듈형으로 설계하십시오.
-
상태 유지 연결: 여러 요청에 걸쳐 상태를 유지할 수 있는 MCP의 기능을 활용하여 더 일관되고 컨텍스트를 고려한 상호작용을 만드십시오.
다음 모범 사례는 공식 모델 컨텍스트 프로토콜 문서에서 가져왔습니다:
-
사용자 동의 및 제어: 데이터를 액세스하거나 작업을 수행하기 전에 항상 명시적인 사용자 동의를 요구하십시오. 공유되는 데이터와 승인된 작업에 대한 명확한 제어를 제공하십시오.
-
데이터 프라이버시: 명시적인 동의 없이 사용자 데이터를 노출하지 말고 적절한 액세스 제어로 보호하십시오. 무단 데이터 전송을 방지하십시오.
-
도구 안전성: 도구를 호출하기 전에 명시적인 사용자 동의를 요구하십시오. 각 도구의 기능을 사용자에게 명확히 이해시키고 강력한 보안 경계를 적용하십시오.
-
도구 권한 제어: 세션 중 모델이 사용할 수 있는 도구를 구성하여 명시적으로 승인된 도구만 접근 가능하도록 하십시오.
-
인증: API 키, OAuth 토큰 또는 기타 안전한 인증 방법을 사용하여 도구, 리소스 또는 민감한 작업에 대한 액세스를 부여하기 전에 적절한 인증을 요구하십시오.
-
매개변수 유효성 검사: 잘못된 입력이나 악의적인 입력이 도구 구현에 도달하지 않도록 모든 도구 호출에 대해 유효성을 검사하십시오.
-
속도 제한: 서버 리소스의 남용을 방지하고 공정한 사용을 보장하기 위해 속도 제한을 구현하십시오.
-
기능 협상: 연결 설정 중 지원되는 기능, 프로토콜 버전, 사용 가능한 도구 및 리소스에 대한 정보를 교환하십시오.
-
도구 설계: 여러 문제를 처리하는 거대한 도구보다는 특정 작업에 뛰어난 집중된 도구를 만드십시오.
-
오류 처리: 문제를 진단하고 실패를 우아하게 처리하며 실행 가능한 피드백을 제공할 수 있도록 표준화된 오류 메시지와 코드를 구현하십시오.
-
로깅: 프로토콜 상호작용을 감사, 디버깅 및 모니터링하기 위해 구조화된 로그를 구성하십시오.
-
진행 상황 추적: 장시간 실행되는 작업에 대해 진행 상황 업데이트를 보고하여 반응형 사용자 인터페이스를 가능하게 하십시오.
-
요청 취소: 더 이상 필요하지 않거나 너무 오래 걸리는 요청을 클라이언트가 취소할 수 있도록 허용하십시오.
MCP 모범 사례에 대한 최신 정보를 얻으려면 다음을 참조하십시오:
각 MCP 도구는 명확하고 집중된 목적을 가져야 합니다. 여러 문제를 처리하려는 거대한 도구를 만드는 대신 특정 작업에 뛰어난 전문화된 도구를 개발하십시오.
// A focused tool that does one thing well
public class WeatherForecastTool : ITool
{
private readonly IWeatherService _weatherService;
public WeatherForecastTool(IWeatherService weatherService)
{
_weatherService = weatherService;
}
public string Name => "weatherForecast";
public string Description => "Gets weather forecast for a specific location";
public ToolDefinition GetDefinition()
{
return new ToolDefinition
{
Name = Name,
Description = Description,
Parameters = new Dictionary<string, ParameterDefinition>
{
["location"] = new ParameterDefinition
{
Type = ParameterType.String,
Description = "City or location name"
},
["days"] = new ParameterDefinition
{
Type = ParameterType.Integer,
Description = "Number of forecast days",
Default = 3
}
},
Required = new[] { "location" }
};
}
public async Task<ToolResponse> ExecuteAsync(IDictionary<string, object> parameters)
{
var location = parameters["location"].ToString();
var days = parameters.ContainsKey("days")
? Convert.ToInt32(parameters["days"])
: 3;
var forecast = await _weatherService.GetForecastAsync(location, days);
return new ToolResponse
{
Content = new List<ContentItem>
{
new TextContent(JsonSerializer.Serialize(forecast))
}
};
}
}정보 제공 오류 메시지와 적절한 복구 메커니즘을 갖춘 강력한 오류 처리를 구현하십시오.
# Python example with comprehensive error handling
class DataQueryTool:
def get_name(self):
return "dataQuery"
def get_description(self):
return "Queries data from specified database tables"
async def execute(self, parameters):
try:
# Parameter validation
if "query" not in parameters:
raise ToolParameterError("Missing required parameter: query")
query = parameters["query"]
# Security validation
if self._contains_unsafe_sql(query):
raise ToolSecurityError("Query contains potentially unsafe SQL")
try:
# Database operation with timeout
async with timeout(10): # 10 second timeout
result = await self._database.execute_query(query)
return ToolResponse(
content=[TextContent(json.dumps(result))]
)
except asyncio.TimeoutError:
raise ToolExecutionError("Database query timed out after 10 seconds")
except DatabaseConnectionError as e:
# Connection errors might be transient
self._log_error("Database connection error", e)
raise ToolExecutionError(f"Database connection error: {str(e)}")
except DatabaseQueryError as e:
# Query errors are likely client errors
self._log_error("Database query error", e)
raise ToolExecutionError(f"Invalid query: {str(e)}")
except ToolError:
# Let tool-specific errors pass through
raise
except Exception as e:
# Catch-all for unexpected errors
self._log_error("Unexpected error in DataQueryTool", e)
raise ToolExecutionError(f"An unexpected error occurred: {str(e)}")
def _contains_unsafe_sql(self, query):
# Implementation of SQL injection detection
pass
def _log_error(self, message, error):
# Implementation of error logging
pass잘못된 입력이나 악의적인 입력을 방지하기 위해 항상 매개변수를 철저히 검증하십시오.
// JavaScript/TypeScript example with detailed parameter validation
class FileOperationTool {
getName() {
return "fileOperation";
}
getDescription() {
return "Performs file operations like read, write, and delete";
}
getDefinition() {
return {
name: this.getName(),
description: this.getDescription(),
parameters: {
operation: {
type: "string",
description: "Operation to perform",
enum: ["read", "write", "delete"]
},
path: {
type: "string",
description: "File path (must be within allowed directories)"
},
content: {
type: "string",
description: "Content to write (only for write operation)",
optional: true
}
},
required: ["operation", "path"]
};
}
async execute(parameters) {
// 1. Validate parameter presence
if (!parameters.operation) {
throw new ToolError("Missing required parameter: operation");
}
if (!parameters.path) {
throw new ToolError("Missing required parameter: path");
}
// 2. Validate parameter types
if (typeof parameters.operation !== "string") {
throw new ToolError("Parameter 'operation' must be a string");
}
if (typeof parameters.path !== "string") {
throw new ToolError("Parameter 'path' must be a string");
}
// 3. Validate parameter values
const validOperations = ["read", "write", "delete"];
if (!validOperations.includes(parameters.operation)) {
throw new ToolError(`Invalid operation. Must be one of: ${validOperations.join(", ")}`);
}
// 4. Validate content presence for write operation
if (parameters.operation === "write" && !parameters.content) {
throw new ToolError("Content parameter is required for write operation");
}
// 5. Path safety validation
if (!this.isPathWithinAllowedDirectories(parameters.path)) {
throw new ToolError("Access denied: path is outside of allowed directories");
}
// Implementation based on validated parameters
// ...
}
isPathWithinAllowedDirectories(path) {
// Implementation of path safety check
// ...
}
}// Java example with authentication and authorization
public class SecureDataAccessTool implements Tool {
private final AuthenticationService authService;
private final AuthorizationService authzService;
private final DataService dataService;
// Dependency injection
public SecureDataAccessTool(
AuthenticationService authService,
AuthorizationService authzService,
DataService dataService) {
this.authService = authService;
this.authzService = authzService;
this.dataService = dataService;
}
@Override
public String getName() {
return "secureDataAccess";
}
@Override
public ToolResponse execute(ToolRequest request) {
// 1. Extract authentication context
String authToken = request.getContext().getAuthToken();
// 2. Authenticate user
UserIdentity user;
try {
user = authService.validateToken(authToken);
} catch (AuthenticationException e) {
return ToolResponse.error("Authentication failed: " + e.getMessage());
}
// 3. Check authorization for the specific operation
String dataId = request.getParameters().get("dataId").getAsString();
String operation = request.getParameters().get("operation").getAsString();
boolean isAuthorized = authzService.isAuthorized(user, "data:" + dataId, operation);
if (!isAuthorized) {
return ToolResponse.error("Access denied: Insufficient permissions for this operation");
}
// 4. Proceed with authorized operation
try {
switch (operation) {
case "read":
Object data = dataService.getData(dataId, user.getId());
return ToolResponse.success(data);
case "update":
JsonNode newData = request.getParameters().get("newData");
dataService.updateData(dataId, newData, user.getId());
return ToolResponse.success("Data updated successfully");
default:
return ToolResponse.error("Unsupported operation: " + operation);
}
} catch (Exception e) {
return ToolResponse.error("Operation failed: " + e.getMessage());
}
}
}// C# rate limiting implementation
public class RateLimitingMiddleware
{
private readonly RequestDelegate _next;
private readonly IMemoryCache _cache;
private readonly ILogger<RateLimitingMiddleware> _logger;
// Configuration options
private readonly int _maxRequestsPerMinute;
public RateLimitingMiddleware(
RequestDelegate next,
IMemoryCache cache,
ILogger<RateLimitingMiddleware> logger,
IConfiguration config)
{
_next = next;
_cache = cache;
_logger = logger;
_maxRequestsPerMinute = config.GetValue<int>("RateLimit:MaxRequestsPerMinute", 60);
}
public async Task InvokeAsync(HttpContext context)
{
// 1. Get client identifier (API key or user ID)
string clientId = GetClientIdentifier(context);
// 2. Get rate limiting key for this minute
string cacheKey = $"rate_limit:{clientId}:{DateTime.UtcNow:yyyyMMddHHmm}";
// 3. Check current request count
if (!_cache.TryGetValue(cacheKey, out int requestCount))
{
requestCount = 0;
}
// 4. Enforce rate limit
if (requestCount >= _maxRequestsPerMinute)
{
_logger.LogWarning("Rate limit exceeded for client {ClientId}", clientId);
context.Response.StatusCode = StatusCodes.Status429TooManyRequests;
context.Response.Headers.Add("Retry-After", "60");
await context.Response.WriteAsJsonAsync(new
{
error = "Rate limit exceeded",
message = "Too many requests. Please try again later.",
retryAfterSeconds = 60
});
return;
}
// 5. Increment request count
_cache.Set(cacheKey, requestCount + 1, TimeSpan.FromMinutes(2));
// 6. Add rate limit headers
context.Response.Headers.Add("X-RateLimit-Limit", _maxRequestsPerMinute.ToString());
context.Response.Headers.Add("X-RateLimit-Remaining", (_maxRequestsPerMinute - requestCount - 1).ToString());
// 7. Continue with the request
await _next(context);
}
private string GetClientIdentifier(HttpContext context)
{
// Implementation to extract API key or user ID
// ...
}
}외부 종속성을 모의(Mock)하여 도구를 독립적으로 테스트하십시오:
// TypeScript example of a tool unit test
describe('WeatherForecastTool', () => {
let tool: WeatherForecastTool;
let mockWeatherService: jest.Mocked<IWeatherService>;
beforeEach(() => {
// Create a mock weather service
mockWeatherService = {
getForecasts: jest.fn()
} as any;
// Create the tool with the mock dependency
tool = new WeatherForecastTool(mockWeatherService);
});
it('should return weather forecast for a location', async () => {
// Arrange
const mockForecast = {
location: 'Seattle',
forecasts: [
{ date: '2025-07-16', temperature: 72, conditions: 'Sunny' },
{ date: '2025-07-17', temperature: 68, conditions: 'Partly Cloudy' },
{ date: '2025-07-18', temperature: 65, conditions: 'Rain' }
]
};
mockWeatherService.getForecasts.mockResolvedValue(mockForecast);
// Act
const response = await tool.execute({
location: 'Seattle',
days: 3
});
// Assert
expect(mockWeatherService.getForecasts).toHaveBeenCalledWith('Seattle', 3);
expect(response.content[0].text).toContain('Seattle');
expect(response.content[0].text).toContain('Sunny');
});
it('should handle errors from the weather service', async () => {
// Arrange
mockWeatherService.getForecasts.mockRejectedValue(new Error('Service unavailable'));
// Act & Assert
await expect(tool.execute({
location: 'Seattle',
days: 3
})).rejects.toThrow('Weather service error: Service unavailable');
});
});클라이언트 요청에서 서버 응답까지의 전체 흐름을 테스트하십시오:
# Python integration test example
@pytest.mark.asyncio
async def test_mcp_server_integration():
# Start a test server
server = McpServer()
server.register_tool(WeatherForecastTool(MockWeatherService()))
await server.start(port=5000)
try:
# Create a client
client = McpClient("http://localhost:5000")
# Test tool discovery
tools = await client.discover_tools()
assert "weatherForecast" in [t.name for t in tools]
# Test tool execution
response = await client.execute_tool("weatherForecast", {
"location": "Seattle",
"days": 3
})
# Verify response
assert response.status_code == 200
assert "Seattle" in response.content[0].text
assert len(json.loads(response.content[0].text)["forecasts"]) == 3
finally:
# Clean up
await server.stop()지연 시간과 리소스 사용을 줄이기 위해 적절한 캐싱을 구현하십시오:
// C# example with caching
public class CachedWeatherTool : ITool
{
private readonly IWeatherService _weatherService;
private readonly IDistributedCache _cache;
private readonly ILogger<CachedWeatherTool> _logger;
public CachedWeatherTool(
IWeatherService weatherService,
IDistributedCache cache,
ILogger<CachedWeatherTool> logger)
{
_weatherService = weatherService;
_cache = cache;
_logger = logger;
}
public string Name => "weatherForecast";
public async Task<ToolResponse> ExecuteAsync(IDictionary<string, object> parameters)
{
var location = parameters["location"].ToString();
var days = Convert.ToInt32(parameters.GetValueOrDefault("days", 3));
// Create cache key
string cacheKey = $"weather:{location}:{days}";
// Try to get from cache
string cachedForecast = await _cache.GetStringAsync(cacheKey);
if (!string.IsNullOrEmpty(cachedForecast))
{
_logger.LogInformation("Cache hit for weather forecast: {Location}", location);
return new ToolResponse
{
Content = new List<ContentItem>
{
new TextContent(cachedForecast)
}
};
}
// Cache miss - get from service
_logger.LogInformation("Cache miss for weather forecast: {Location}", location);
var forecast = await _weatherService.GetForecastAsync(location, days);
string forecastJson = JsonSerializer.Serialize(forecast);
// Store in cache (weather forecasts valid for 1 hour)
await _cache.SetStringAsync(
cacheKey,
forecastJson,
new DistributedCacheEntryOptions
{
AbsoluteExpirationRelativeToNow = TimeSpan.FromHours(1)
});
return new ToolResponse
{
Content = new List<ContentItem>
{
new TextContent(forecastJson)
}
};
}
}도구가 생성자 주입을 통해 의존성을 받을 수 있도록 설계하여 테스트 가능성과 구성 가능성을 높이십시오:
// Java example with dependency injection
public class CurrencyConversionTool implements Tool {
private final ExchangeRateService exchangeService;
private final CacheService cacheService;
private final Logger logger;
// Dependencies injected through constructor
public CurrencyConversionTool(
ExchangeRateService exchangeService,
CacheService cacheService,
Logger logger) {
this.exchangeService = exchangeService;
this.cacheService = cacheService;
this.logger = logger;
}
// Tool implementation
// ...
}더 복잡한 워크플로를 생성하기 위해 도구를 함께 구성할 수 있도록 설계하십시오:
# Python example showing composable tools
class DataFetchTool(Tool):
def get_name(self):
return "dataFetch"
# Implementation...
class DataAnalysisTool(Tool):
def get_name(self):
return "dataAnalysis"
# This tool can use results from the dataFetch tool
async def execute_async(self, request):
# Implementation...
pass
class DataVisualizationTool(Tool):
def get_name(self):
return "dataVisualize"
# This tool can use results from the dataAnalysis tool
async def execute_async(self, request):
# Implementation...
pass
# These tools can be used independently or as part of a workflow스키마는 모델과 도구 간의 계약입니다. 잘 설계된 스키마는 도구 사용성을 향상시킵니다.
각 매개변수에 대한 설명 정보를 항상 포함하십시오:
public object GetSchema()
{
return new {
type = "object",
properties = new {
query = new {
type = "string",
description = "Search query text. Use precise keywords for better results."
},
filters = new {
type = "object",
description = "Optional filters to narrow down search results",
properties = new {
dateRange = new {
type = "string",
description = "Date range in format YYYY-MM-DD:YYYY-MM-DD"
},
category = new {
type = "string",
description = "Category name to filter by"
}
}
},
limit = new {
type = "integer",
description = "Maximum number of results to return (1-50)",
default = 10
}
},
required = new[] { "query" }
};
}잘못된 입력을 방지하기 위해 유효성 검사 제약 조건을 포함하십시오:
Map<String, Object> getSchema() {
Map<String, Object> schema = new HashMap<>();
schema.put("type", "object");
Map<String, Object> properties = new HashMap<>();
// Email property with format validation
Map<String, Object> email = new HashMap<>();
email.put("type", "string");
email.put("format", "email");
email.put("description", "User email address");
// Age property with numeric constraints
Map<String, Object> age = new HashMap<>();
age.put("type", "integer");
age.put("minimum", 13);
age.put("maximum", 120);
age.put("description", "User age in years");
// Enumerated property
Map<String, Object> subscription = new HashMap<>();
subscription.put("type", "string");
subscription.put("enum", Arrays.asList("free", "basic", "premium"));
subscription.put("default", "free");
subscription.put("description", "Subscription tier");
properties.put("email", email);
properties.put("age", age);
properties.put("subscription", subscription);
schema.put("properties", properties);
schema.put("required", Arrays.asList("email"));
return schema;
}모델이 결과를 해석하기 쉽게 응답 구조의 일관성을 유지하십시오:
async def execute_async(self, request):
try:
# Process request
results = await self._search_database(request.parameters["query"])
# Always return a consistent structure
return ToolResponse(
result={
"matches": [self._format_item(item) for item in results],
"totalCount": len(results),
"queryTime": calculation_time_ms,
"status": "success"
}
)
except Exception as e:
return ToolResponse(
result={
"matches": [],
"totalCount": 0,
"queryTime": 0,
"status": "error",
"error": str(e)
}
)
def _format_item(self, item):
"""Ensures each item has a consistent structure"""
return {
"id": item.id,
"title": item.title,
"summary": item.summary[:100] + "..." if len(item.summary) > 100 else item.summary,
"url": item.url,
"relevance": item.score
}강력한 오류 처리는 MCP 도구의 신뢰성을 유지하는 데 필수적입니다.
적절한 수준에서 오류를 처리하고 정보 제공 메시지를 제공하십시오:
public async Task<ToolResponse> ExecuteAsync(ToolRequest request)
{
try
{
string fileId = request.Parameters.GetProperty("fileId").GetString();
try
{
var fileData = await _fileService.GetFileAsync(fileId);
return new ToolResponse {
Result = JsonSerializer.SerializeToElement(fileData)
};
}
catch (FileNotFoundException)
{
throw new ToolExecutionException($"File not found: {fileId}");
}
catch (UnauthorizedAccessException)
{
throw new ToolExecutionException("You don't have permission to access this file");
}
catch (Exception ex) when (ex is IOException || ex is TimeoutException)
{
_logger.LogError(ex, "Error accessing file {FileId}", fileId);
throw new ToolExecutionException("Error accessing file: The service is temporarily unavailable");
}
}
catch (JsonException)
{
throw new ToolExecutionException("Invalid file ID format");
}
catch (Exception ex)
{
_logger.LogError(ex, "Unexpected error in FileAccessTool");
throw new ToolExecutionException("An unexpected error occurred");
}
}가능한 경우 구조화된 오류 정보를 반환하십시오:
@Override
public ToolResponse execute(ToolRequest request) {
try {
// Implementation
} catch (Exception ex) {
Map<String, Object> errorResult = new HashMap<>();
errorResult.put("success", false);
if (ex instanceof ValidationException) {
ValidationException validationEx = (ValidationException) ex;
errorResult.put("errorType", "validation");
errorResult.put("errorMessage", validationEx.getMessage());
errorResult.put("validationErrors", validationEx.getErrors());
return new ToolResponse.Builder()
.setResult(errorResult)
.build();
}
// Re-throw other exceptions as ToolExecutionException
throw new ToolExecutionException("Tool execution failed: " + ex.getMessage(), ex);
}
}일시적인 실패에 대해 적절한 재시도 로직을 구현하십시오:
async def execute_async(self, request):
max_retries = 3
retry_count = 0
base_delay = 1 # seconds
while retry_count < max_retries:
try:
# Call external API
return await self._call_api(request.parameters)
except TransientError as e:
retry_count += 1
if retry_count >= max_retries:
raise ToolExecutionException(f"Operation failed after {max_retries} attempts: {str(e)}")
# Exponential backoff
delay = base_delay * (2 ** (retry_count - 1))
logging.warning(f"Transient error, retrying in {delay}s: {str(e)}")
await asyncio.sleep(delay)
except Exception as e:
# Non-transient error, don't retry
raise ToolExecutionException(f"Operation failed: {str(e)}")비용이 많이 드는 작업에 대해 캐싱을 구현하십시오:
public class CachedDataTool : IMcpTool
{
private readonly IDatabase _database;
private readonly IMemoryCache _cache;
public CachedDataTool(IDatabase database, IMemoryCache cache)
{
_database = database;
_cache = cache;
}
public async Task<ToolResponse> ExecuteAsync(ToolRequest request)
{
var query = request.Parameters.GetProperty("query").GetString();
// Create cache key based on parameters
var cacheKey = $"data_query_{ComputeHash(query)}";
// Try to get from cache first
if (_cache.TryGetValue(cacheKey, out var cachedResult))
{
return new ToolResponse { Result = cachedResult };
}
// Cache miss - perform actual query
var result = await _database.QueryAsync(query);
// Store in cache with expiration
var cacheOptions = new MemoryCacheEntryOptions()
.SetAbsoluteExpiration(TimeSpan.FromMinutes(15));
_cache.Set(cacheKey, JsonSerializer.SerializeToElement(result), cacheOptions);
return new ToolResponse { Result = JsonSerializer.SerializeToElement(result) };
}
private string ComputeHash(string input)
{
// Implementation to generate stable hash for cache key
}
}I/O 중심 작업에 대해 비동기 프로그래밍 패턴을 사용하십시오:
public class AsyncDocumentProcessingTool implements Tool {
private final DocumentService documentService;
private final ExecutorService executorService;
@Override
public ToolResponse execute(ToolRequest request) {
String documentId = request.getParameters().get("documentId").asText();
// For long-running operations, return a processing ID immediately
String processId = UUID.randomUUID().toString();
// Start async processing
CompletableFuture.runAsync(() -> {
try {
// Perform long-running operation
documentService.processDocument(documentId);
// Update status (would typically be stored in a database)
processStatusRepository.updateStatus(processId, "completed");
} catch (Exception ex) {
processStatusRepository.updateStatus(processId, "failed", ex.getMessage());
}
}, executorService);
// Return immediate response with process ID
Map<String, Object> result = new HashMap<>();
result.put("processId", processId);
result.put("status", "processing");
result.put("estimatedCompletionTime", ZonedDateTime.now().plusMinutes(5));
return new ToolResponse.Builder().setResult(result).build();
}
// Companion status check tool
public class ProcessStatusTool implements Tool {
@Override
public ToolResponse execute(ToolRequest request) {
String processId = request.getParameters().get("processId").asText();
ProcessStatus status = processStatusRepository.getStatus(processId);
return new ToolResponse.Builder().setResult(status).build();
}
}
}과부하를 방지하기 위해 리소스 제한을 구현하십시오:
class ThrottledApiTool(Tool):
def __init__(self):
self.rate_limiter = TokenBucketRateLimiter(
tokens_per_second=5, # Allow 5 requests per second
bucket_size=10 # Allow bursts up to 10 requests
)
async def execute_async(self, request):
# Check if we can proceed or need to wait
delay = self.rate_limiter.get_delay_time()
if delay > 0:
if delay > 2.0: # If wait is too long
raise ToolExecutionException(
f"Rate limit exceeded. Please try again in {delay:.1f} seconds."
)
else:
# Wait for the appropriate delay time
await asyncio.sleep(delay)
# Consume a token and proceed with the request
self.rate_limiter.consume()
# Call API
result = await self._call_api(request.parameters)
return ToolResponse(result=result)
class TokenBucketRateLimiter:
def __init__(self, tokens_per_second, bucket_size):
self.tokens_per_second = tokens_per_second
self.bucket_size = bucket_size
self.tokens = bucket_size
self.last_refill = time.time()
self.lock = asyncio.Lock()
async def get_delay_time(self):
async with self.lock:
self._refill()
if self.tokens >= 1:
return 0
# Calculate time until next token available
return (1 - self.tokens) / self.tokens_per_second
async def consume(self):
async with self.lock:
self._refill()
self.tokens -= 1
def _refill(self):
now = time.time()
elapsed = now - self.last_refill
# Add new tokens based on elapsed time
new_tokens = elapsed * self.tokens_per_second
self.tokens = min(self.bucket_size, self.tokens + new_tokens)
self.last_refill = now항상 입력 매개변수를 철저히 검증하십시오:
public async Task<ToolResponse> ExecuteAsync(ToolRequest request)
{
// Validate parameters exist
if (!request.Parameters.TryGetProperty("query", out var queryProp))
{
throw new ToolExecutionException("Missing required parameter: query");
}
// Validate correct type
if (queryProp.ValueKind != JsonValueKind.String)
{
throw new ToolExecutionException("Query parameter must be a string");
}
var query = queryProp.GetString();
// Validate string content
if (string.IsNullOrWhiteSpace(query))
{
throw new ToolExecutionException("Query parameter cannot be empty");
}
if (query.Length > 500)
{
throw new ToolExecutionException("Query parameter exceeds maximum length of 500 characters");
}
// Check for SQL injection attacks if applicable
if (ContainsSqlInjection(query))
{
throw new ToolExecutionException("Invalid query: contains potentially unsafe SQL");
}
// Proceed with execution
// ...
}적절한 권한 확인을 구현하십시오:
@Override
public ToolResponse execute(ToolRequest request) {
// Get user context from request
UserContext user = request.getContext().getUserContext();
// Check if user has required permissions
if (!authorizationService.hasPermission(user, "documents:read")) {
throw new ToolExecutionException("User does not have permission to access documents");
}
// For specific resources, check access to that resource
String documentId = request.getParameters().get("documentId").asText();
if (!documentService.canUserAccess(user.getId(), documentId)) {
throw new ToolExecutionException("Access denied to the requested document");
}
// Proceed with tool execution
// ...
}민감한 데이터를 신중하게 처리하십시오:
class SecureDataTool(Tool):
def get_schema(self):
return {
"type": "object",
"properties": {
"userId": {"type": "string"},
"includeSensitiveData": {"type": "boolean", "default": False}
},
"required": ["userId"]
}
async def execute_async(self, request):
user_id = request.parameters["userId"]
include_sensitive = request.parameters.get("includeSensitiveData", False)
# Get user data
user_data = await self.user_service.get_user_data(user_id)
# Filter sensitive fields unless explicitly requested AND authorized
if not include_sensitive or not self._is_authorized_for_sensitive_data(request):
user_data = self._redact_sensitive_fields(user_data)
return ToolResponse(result=user_data)
def _is_authorized_for_sensitive_data(self, request):
# Check authorization level in request context
auth_level = request.context.get("authorizationLevel")
return auth_level == "admin"
def _redact_sensitive_fields(self, user_data):
# Create a copy to avoid modifying the original
redacted = user_data.copy()
# Redact specific sensitive fields
sensitive_fields = ["ssn", "creditCardNumber", "password"]
for field in sensitive_fields:
if field in redacted:
redacted[field] = "REDACTED"
# Redact nested sensitive data
if "financialInfo" in redacted:
redacted["financialInfo"] = {"available": True, "accessRestricted": True}
return redacted포괄적인 테스트는 MCP 도구가 올바르게 작동하고, 엣지 케이스를 처리하며, 시스템의 다른 부분과 제대로 통합되도록 보장합니다.
각 도구의 기능에 대한 집중된 테스트를 작성하십시오:
[Fact]
public async Task WeatherTool_ValidLocation_ReturnsCorrectForecast()
{
// Arrange
var mockWeatherService = new Mock<IWeatherService>();
mockWeatherService
.Setup(s => s.GetForecastAsync("Seattle", 3))
.ReturnsAsync(new WeatherForecast(/* test data */));
var tool = new WeatherForecastTool(mockWeatherService.Object);
var request = new ToolRequest(
toolName: "weatherForecast",
parameters: JsonSerializer.SerializeToElement(new {
location = "Seattle",
days = 3
})
);
// Act
var response = await tool.ExecuteAsync(request);
// Assert
Assert.NotNull(response);
var result = JsonSerializer.Deserialize<WeatherForecast>(response.Result);
Assert.Equal("Seattle", result.Location);
Assert.Equal(3, result.DailyForecasts.Count);
}
[Fact]
public async Task WeatherTool_InvalidLocation_ThrowsToolExecutionException()
{
// Arrange
var mockWeatherService = new Mock<IWeatherService>();
mockWeatherService
.Setup(s => s.GetForecastAsync("InvalidLocation", It.IsAny<int>()))
.ThrowsAsync(new LocationNotFoundException("Location not found"));
var tool = new WeatherForecastTool(mockWeatherService.Object);
var request = new ToolRequest(
toolName: "weatherForecast",
parameters: JsonSerializer.SerializeToElement(new {
location = "InvalidLocation",
days = 3
})
);
// Act & Assert
var exception = await Assert.ThrowsAsync<ToolExecutionException>(
() => tool.ExecuteAsync(request)
);
Assert.Contains("Location not found", exception.Message);
}스키마가 유효하며 제약 조건을 올바르게 적용하는지 테스트하십시오:
@Test
public void testSchemaValidation() {
// Create tool instance
SearchTool searchTool = new SearchTool();
// Get schema
Object schema = searchTool.getSchema();
// Convert schema to JSON for validation
String schemaJson = objectMapper.writeValueAsString(schema);
// Validate schema is valid JSONSchema
JsonSchemaFactory factory = JsonSchemaFactory.byDefault();
JsonSchema jsonSchema = factory.getJsonSchema(schemaJson);
// Test valid parameters
JsonNode validParams = objectMapper.createObjectNode()
.put("query", "test query")
.put("limit", 5);
ProcessingReport validReport = jsonSchema.validate(validParams);
assertTrue(validReport.isSuccess());
// Test missing required parameter
JsonNode missingRequired = objectMapper.createObjectNode()
.put("limit", 5);
ProcessingReport missingReport = jsonSchema.validate(missingRequired);
assertFalse(missingReport.isSuccess());
// Test invalid parameter type
JsonNode invalidType = objectMapper.createObjectNode()
.put("query", "test")
.put("limit", "not-a-number");
ProcessingReport invalidReport = jsonSchema.validate(invalidType);
assertFalse(invalidReport.isSuccess());
}오류 조건에 대한 특정 테스트를 작성하십시오:
@pytest.mark.asyncio
async def test_api_tool_handles_timeout():
# Arrange
tool = ApiTool(timeout=0.1) # Very short timeout
# Mock a request that will time out
with aioresponses() as mocked:
mocked.get(
"https://api.example.com/data",
callback=lambda *args, **kwargs: asyncio.sleep(0.5) # Longer than timeout
)
request = ToolRequest(
tool_name="apiTool",
parameters={"url": "https://api.example.com/data"}
)
# Act & Assert
with pytest.raises(ToolExecutionException) as exc_info:
await tool.execute_async(request)
# Verify exception message
assert "timed out" in str(exc_info.value).lower()
@pytest.mark.asyncio
async def test_api_tool_handles_rate_limiting():
# Arrange
tool = ApiTool()
# Mock a rate-limited response
with aioresponses() as mocked:
mocked.get(
"https://api.example.com/data",
status=429,
headers={"Retry-After": "2"},
body=json.dumps({"error": "Rate limit exceeded"})
)
request = ToolRequest(
tool_name="apiTool",
parameters={"url": "https://api.example.com/data"}
)
# Act & Assert
with pytest.raises(ToolExecutionException) as exc_info:
await tool.execute_async(request)
# Verify exception contains rate limit information
error_msg = str(exc_info.value).lower()
assert "rate limit" in error_msg
assert "try again" in error_msg예상되는 조합으로 함께 작동하는 도구를 테스트하십시오:
[Fact]
public async Task DataProcessingWorkflow_CompletesSuccessfully()
{
// Arrange
var dataFetchTool = new DataFetchTool(mockDataService.Object);
var analysisTools = new DataAnalysisTool(mockAnalysisService.Object);
var visualizationTool = new DataVisualizationTool(mockVisualizationService.Object);
var toolRegistry = new ToolRegistry();
toolRegistry.RegisterTool(dataFetchTool);
toolRegistry.RegisterTool(analysisTools);
toolRegistry.RegisterTool(visualizationTool);
var workflowExecutor = new WorkflowExecutor(toolRegistry);
// Act
var result = await workflowExecutor.ExecuteWorkflowAsync(new[] {
new ToolCall("dataFetch", new { source = "sales2023" }),
new ToolCall("dataAnalysis", ctx => new {
data = ctx.GetResult("dataFetch"),
analysis = "trend"
}),
new ToolCall("dataVisualize", ctx => new {
analysisResult = ctx.GetResult("dataAnalysis"),
type = "line-chart"
})
});
// Assert
Assert.NotNull(result);
Assert.True(result.Success);
Assert.NotNull(result.GetResult("dataVisualize"));
Assert.Contains("chartUrl", result.GetResult("dataVisualize").ToString());
}전체 도구 등록 및 실행을 포함한 MCP 서버를 테스트하십시오:
@SpringBootTest
@AutoConfigureMockMvc
public class McpServerIntegrationTest {
@Autowired
private MockMvc mockMvc;
@Autowired
private ObjectMapper objectMapper;
@Test
public void testToolDiscovery() throws Exception {
// Test the discovery endpoint
mockMvc.perform(get("/mcp/tools"))
.andExpect(status().isOk())
.andExpect(jsonPath("$.tools").isArray())
.andExpect(jsonPath("$.tools[*].name").value(hasItems(
"weatherForecast", "calculator", "documentSearch"
)));
}
@Test
public void testToolExecution() throws Exception {
// Create tool request
Map<String, Object> request = new HashMap<>();
request.put("toolName", "calculator");
Map<String, Object> parameters = new HashMap<>();
parameters.put("operation", "add");
parameters.put("a", 5);
parameters.put("b", 7);
request.put("parameters", parameters);
// Send request and verify response
mockMvc.perform(post("/mcp/execute")
.contentType(MediaType.APPLICATION_JSON)
.content(objectMapper.writeValueAsString(request)))
.andExpect(status().isOk())
.andExpect(jsonPath("$.result.value").value(12));
}
@Test
public void testToolValidation() throws Exception {
// Create invalid tool request
Map<String, Object> request = new HashMap<>();
request.put("toolName", "calculator");
Map<String, Object> parameters = new HashMap<>();
parameters.put("operation", "divide");
parameters.put("a", 10);
// Missing parameter "b"
request.put("parameters", parameters);
// Send request and verify error response
mockMvc.perform(post("/mcp/execute")
.contentType(MediaType.APPLICATION_JSON)
.content(objectMapper.writeValueAsString(request)))
.andExpect(status().isBadRequest())
.andExpect(jsonPath("$.error").exists());
}
}모델 프롬프트에서 도구 실행까지의 전체 워크플로를 테스트하십시오:
@pytest.mark.asyncio
async def test_model_interaction_with_tool():
# Arrange - Set up MCP client and mock model
mcp_client = McpClient(server_url="http://localhost:5000")
# Mock model responses
mock_model = MockLanguageModel([
MockResponse(
"What's the weather in Seattle?",
tool_calls=[{
"tool_name": "weatherForecast",
"parameters": {"location": "Seattle", "days": 3}
}]
),
MockResponse(
"Here's the weather forecast for Seattle:\n- Today: 65°F, Partly Cloudy\n- Tomorrow: 68°F, Sunny\n- Day after: 62°F, Rain",
tool_calls=[]
)
])
# Mock weather tool response
with aioresponses() as mocked:
mocked.post(
"http://localhost:5000/mcp/execute",
payload={
"result": {
"location": "Seattle",
"forecast": [
{"date": "2023-06-01", "temperature": 65, "conditions": "Partly Cloudy"},
{"date": "2023-06-02", "temperature": 68, "conditions": "Sunny"},
{"date": "2023-06-03", "temperature": 62, "conditions": "Rain"}
]
}
}
)
# Act
response = await mcp_client.send_prompt(
"What's the weather in Seattle?",
model=mock_model,
allowed_tools=["weatherForecast"]
)
# Assert
assert "Seattle" in response.generated_text
assert "65" in response.generated_text
assert "Sunny" in response.generated_text
assert "Rain" in response.generated_text
assert len(response.tool_calls) == 1
assert response.tool_calls[0].tool_name == "weatherForecast"MCP 서버가 처리할 수 있는 동시 요청 수를 테스트하십시오:
[Fact]
public async Task McpServer_HandlesHighConcurrency()
{
// Arrange
var server = new McpServer(
name: "TestServer",
version: "1.0",
maxConcurrentRequests: 100
);
server.RegisterTool(new FastExecutingTool());
await server.StartAsync();
var client = new McpClient("http://localhost:5000");
// Act
var tasks = new List<Task<McpResponse>>();
for (int i = 0; i < 1000; i++)
{
tasks.Add(client.ExecuteToolAsync("fastTool", new { iteration = i }));
}
var results = await Task.WhenAll(tasks);
// Assert
Assert.Equal(1000, results.Length);
Assert.All(results, r => Assert.NotNull(r));
}극한 부하 상태에서 시스템을 테스트하십시오:
@Test
public void testServerUnderStress() {
int maxUsers = 1000;
int rampUpTimeSeconds = 60;
int testDurationSeconds = 300;
// Set up JMeter for stress testing
StandardJMeterEngine jmeter = new StandardJMeterEngine();
// Configure JMeter test plan
HashTree testPlanTree = new HashTree();
// Create test plan, thread group, samplers, etc.
TestPlan testPlan = new TestPlan("MCP Server Stress Test");
testPlanTree.add(testPlan);
ThreadGroup threadGroup = new ThreadGroup();
threadGroup.setNumThreads(maxUsers);
threadGroup.setRampUp(rampUpTimeSeconds);
threadGroup.setScheduler(true);
threadGroup.setDuration(testDurationSeconds);
testPlanTree.add(threadGroup);
// Add HTTP sampler for tool execution
HTTPSampler toolExecutionSampler = new HTTPSampler();
toolExecutionSampler.setDomain("localhost");
toolExecutionSampler.setPort(5000);
toolExecutionSampler.setPath("/mcp/execute");
toolExecutionSampler.setMethod("POST");
toolExecutionSampler.addArgument("toolName", "calculator");
toolExecutionSampler.addArgument("parameters", "{\"operation\":\"add\",\"a\":5,\"b\":7}");
threadGroup.add(toolExecutionSampler);
// Add listeners
SummaryReport summaryReport = new SummaryReport();
threadGroup.add(summaryReport);
// Run test
jmeter.configure(testPlanTree);
jmeter.run();
// Validate results
assertEquals(0, summaryReport.getErrorCount());
assertTrue(summaryReport.getAverage() < 200); // Average response time < 200ms
assertTrue(summaryReport.getPercentile(90.0) < 500); // 90th percentile < 500ms
}장기적인 성능 분석을 위해 모니터링을 설정하십시오:
# Configure monitoring for an MCP server
def configure_monitoring(server):
# Set up Prometheus metrics
prometheus_metrics = {
"request_count": Counter("mcp_requests_total", "Total MCP requests"),
"request_latency": Histogram(
"mcp_request_duration_seconds",
"Request duration in seconds",
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 2.5, 5.0, 10.0]
),
"tool_execution_count": Counter(
"mcp_tool_executions_total",
"Tool execution count",
labelnames=["tool_name"]
),
"tool_execution_latency": Histogram(
"mcp_tool_duration_seconds",
"Tool execution duration in seconds",
labelnames=["tool_name"],
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 2.5, 5.0, 10.0]
),
"tool_errors": Counter(
"mcp_tool_errors_total",
"Tool execution errors",
labelnames=["tool_name", "error_type"]
)
}
# Add middleware for timing and recording metrics
server.add_middleware(PrometheusMiddleware(prometheus_metrics))
# Expose metrics endpoint
@server.router.get("/metrics")
async def metrics():
return generate_latest()
return server잘 설계된 MCP 워크플로는 효율성, 신뢰성 및 유지보수성을 향상시킵니다. 다음은 따라야 할 주요 패턴입니다:
각 도구의 출력이 다음 도구의 입력이 되는 순서로 여러 도구를 연결하십시오:
# Python Chain of Tools implementation
class ChainWorkflow:
def __init__(self, tools_chain):
self.tools_chain = tools_chain # List of tool names to execute in sequence
async def execute(self, mcp_client, initial_input):
current_result = initial_input
all_results = {"input": initial_input}
for tool_name in self.tools_chain:
# Execute each tool in the chain, passing previous result
response = await mcp_client.execute_tool(tool_name, current_result)
# Store result and use as input for next tool
all_results[tool_name] = response.result
current_result = response.result
return {
"final_result": current_result,
"all_results": all_results
}
# Example usage
data_processing_chain = ChainWorkflow([
"dataFetch",
"dataCleaner",
"dataAnalyzer",
"dataVisualizer"
])
result = await data_processing_chain.execute(
mcp_client,
{"source": "sales_database", "table": "transactions"}
)입력에 따라 전문화된 도구로 전달하는 중앙 도구를 사용하십시오:
public class ContentDispatcherTool : IMcpTool
{
private readonly IMcpClient _mcpClient;
public ContentDispatcherTool(IMcpClient mcpClient)
{
_mcpClient = mcpClient;
}
public string Name => "contentProcessor";
public string Description => "Processes content of various types";
public object GetSchema()
{
return new {
type = "object",
properties = new {
content = new { type = "string" },
contentType = new {
type = "string",
enum = new[] { "text", "html", "markdown", "csv", "code" }
},
operation = new {
type = "string",
enum = new[] { "summarize", "analyze", "extract", "convert" }
}
},
required = new[] { "content", "contentType", "operation" }
};
}
public async Task<ToolResponse> ExecuteAsync(ToolRequest request)
{
var content = request.Parameters.GetProperty("content").GetString();
var contentType = request.Parameters.GetProperty("contentType").GetString();
var operation = request.Parameters.GetProperty("operation").GetString();
// Determine which specialized tool to use
string targetTool = DetermineTargetTool(contentType, operation);
// Forward to the specialized tool
var specializedResponse = await _mcpClient.ExecuteToolAsync(
targetTool,
new { content, options = GetOptionsForTool(targetTool, operation) }
);
return new ToolResponse { Result = specializedResponse.Result };
}
private string DetermineTargetTool(string contentType, string operation)
{
return (contentType, operation) switch
{
("text", "summarize") => "textSummarizer",
("text", "analyze") => "textAnalyzer",
("html", _) => "htmlProcessor",
("markdown", _) => "markdownProcessor",
("csv", _) => "csvProcessor",
("code", _) => "codeAnalyzer",
_ => throw new ToolExecutionException($"No tool available for {contentType}/{operation}")
};
}
private object GetOptionsForTool(string toolName, string operation)
{
// Return appropriate options for each specialized tool
return toolName switch
{
"textSummarizer" => new { length = "medium" },
"htmlProcessor" => new { cleanUp = true, operation },
// Options for other tools...
_ => new { }
};
}
}효율성을 위해 여러 도구를 동시에 실행하십시오:
public class ParallelDataProcessingWorkflow {
private final McpClient mcpClient;
public ParallelDataProcessingWorkflow(McpClient mcpClient) {
this.mcpClient = mcpClient;
}
public WorkflowResult execute(String datasetId) {
// Step 1: Fetch dataset metadata (synchronous)
ToolResponse metadataResponse = mcpClient.executeTool("datasetMetadata",
Map.of("datasetId", datasetId));
// Step 2: Launch multiple analyses in parallel
CompletableFuture<ToolResponse> statisticalAnalysis = CompletableFuture.supplyAsync(() ->
mcpClient.executeTool("statisticalAnalysis", Map.of(
"datasetId", datasetId,
"type", "comprehensive"
))
);
CompletableFuture<ToolResponse> correlationAnalysis = CompletableFuture.supplyAsync(() ->
mcpClient.executeTool("correlationAnalysis", Map.of(
"datasetId", datasetId,
"method", "pearson"
))
);
CompletableFuture<ToolResponse> outlierDetection = CompletableFuture.supplyAsync(() ->
mcpClient.executeTool("outlierDetection", Map.of(
"datasetId", datasetId,
"sensitivity", "medium"
))
);
// Wait for all parallel tasks to complete
CompletableFuture<Void> allAnalyses = CompletableFuture.allOf(
statisticalAnalysis, correlationAnalysis, outlierDetection
);
allAnalyses.join(); // Wait for completion
// Step 3: Combine results
Map<String, Object> combinedResults = new HashMap<>();
combinedResults.put("metadata", metadataResponse.getResult());
combinedResults.put("statistics", statisticalAnalysis.join().getResult());
combinedResults.put("correlations", correlationAnalysis.join().getResult());
combinedResults.put("outliers", outlierDetection.join().getResult());
// Step 4: Generate summary report
ToolResponse summaryResponse = mcpClient.executeTool("reportGenerator",
Map.of("analysisResults", combinedResults));
// Return complete workflow result
WorkflowResult result = new WorkflowResult();
result.setDatasetId(datasetId);
result.setAnalysisResults(combinedResults);
result.setSummaryReport(summaryResponse.getResult());
return result;
}
}도구 실패에 대해 우아한 대체 방법을 구현하십시오:
class ResilientWorkflow:
def __init__(self, mcp_client):
self.client = mcp_client
async def execute_with_fallback(self, primary_tool, fallback_tool, parameters):
try:
# Try primary tool first
response = await self.client.execute_tool(primary_tool, parameters)
return {
"result": response.result,
"source": "primary",
"tool": primary_tool
}
except ToolExecutionException as e:
# Log the failure
logging.warning(f"Primary tool '{primary_tool}' failed: {str(e)}")
# Fall back to secondary tool
try:
# Might need to transform parameters for fallback tool
fallback_params = self._adapt_parameters(parameters, primary_tool, fallback_tool)
response = await self.client.execute_tool(fallback_tool, fallback_params)
return {
"result": response.result,
"source": "fallback",
"tool": fallback_tool,
"primaryError": str(e)
}
except ToolExecutionException as fallback_error:
# Both tools failed
logging.error(f"Both primary and fallback tools failed. Fallback error: {str(fallback_error)}")
raise WorkflowExecutionException(
f"Workflow failed: primary error: {str(e)}; fallback error: {str(fallback_error)}"
)
def _adapt_parameters(self, params, from_tool, to_tool):
"""Adapt parameters between different tools if needed"""
# This implementation would depend on the specific tools
# For this example, we'll just return the original parameters
return params
# Example usage
async def get_weather(workflow, location):
return await workflow.execute_with_fallback(
"premiumWeatherService", # Primary (paid) weather API
"basicWeatherService", # Fallback (free) weather API
{"location": location}
)더 간단한 워크플로를 구성하여 복잡한 워크플로를 만드십시오:
public class CompositeWorkflow : IWorkflow
{
private readonly List<IWorkflow> _workflows;
public CompositeWorkflow(IEnumerable<IWorkflow> workflows)
{
_workflows = new List<IWorkflow>(workflows);
}
public async Task<WorkflowResult> ExecuteAsync(WorkflowContext context)
{
var results = new Dictionary<string, object>();
foreach (var workflow in _workflows)
{
var workflowResult = await workflow.ExecuteAsync(context);
// Store each workflow's result
results[workflow.Name] = workflowResult;
// Update context with the result for the next workflow
context = context.WithResult(workflow.Name, workflowResult);
}
return new WorkflowResult(results);
}
public string Name => "CompositeWorkflow";
public string Description => "Executes multiple workflows in sequence";
}
// Example usage
var documentWorkflow = new CompositeWorkflow(new IWorkflow[] {
new DocumentFetchWorkflow(),
new DocumentProcessingWorkflow(),
new InsightGenerationWorkflow(),
new ReportGenerationWorkflow()
});
var result = await documentWorkflow.ExecuteAsync(new WorkflowContext {
Parameters = new { documentId = "12345" }
});테스트는 신뢰할 수 있고 고품질의 MCP 서버를 개발하는 데 중요한 요소입니다. 이 가이드는 단위 테스트에서 통합 테스트 및 엔드 투 엔드 검증에 이르기까지 개발 주기 전반에 걸쳐 MCP 서버를 테스트하기 위한 포괄적인 모범 사례와 팁을 제공합니다.
MCP 서버는 AI 모델과 클라이언트 애플리케이션 간의 중요한 미들웨어 역할을 합니다. 철저한 테스트는 다음을 보장합니다:
- 생산 환경에서의 신뢰성
- 요청 및 응답 처리의 정확성
- MCP 사양의 올바른 구현
- 실패 및 엣지 케이스에 대한 복원력
- 다양한 부하에서의 일관된 성능
단위 테스트는 MCP 서버의 개별 구성 요소를 독립적으로 검증합니다.
- 리소스 핸들러: 각 리소스 핸들러의 로직을 독립적으로 테스트
- 도구 구현: 다양한 입력으로 도구 동작 검증
- 프롬프트 템플릿: 프롬프트 템플릿이 올바르게 렌더링되는지 확인
- 스키마 유효성 검사: 매개변수 유효성 검사 로직 테스트
- 오류 처리: 잘못된 입력에 대한 오류 응답 검증
// Example unit test for a calculator tool in C#
[Fact]
public async Task CalculatorTool_Add_ReturnsCorrectSum()
{
// Arrange
var calculator = new CalculatorTool();
var parameters = new Dictionary<string, object>
{
["operation"] = "add",
["a"] = 5,
["b"] = 7
};
// Act
var response = await calculator.ExecuteAsync(parameters);
var result = JsonSerializer.Deserialize<CalculationResult>(response.Content[0].ToString());
// Assert
Assert.Equal(12, result.Value);
}# Example unit test for a calculator tool in Python
def test_calculator_tool_add():
# Arrange
calculator = CalculatorTool()
parameters = {
"operation": "add",
"a": 5,
"b": 7
}
# Act
response = calculator.execute(parameters)
result = json.loads(response.content[0].text)
# Assert
assert result["value"] == 12통합 테스트는 MCP 서버의 구성 요소 간 상호작용을 검증합니다.
- 서버 초기화: 다양한 구성으로 서버 시작 테스트
- 경로 등록: 모든 엔드포인트가 올바르게 등록되었는지 확인
- 요청 처리: 전체 요청-응답 주기 테스트
- 오류 전파: 구성 요소 간 오류가 올바르게 처리되는지 확인
- 인증 및 권한 부여: 보안 메커니즘 테스트
// Example integration test for MCP server in C#
[Fact]
public async Task Server_ProcessToolRequest_ReturnsValidResponse()
{
// Arrange
var server = new McpServer();
server.RegisterTool(new CalculatorTool());
await server.StartAsync();
var request = new McpRequest
{
Tool = "calculator",
Parameters = new Dictionary<string, object>
{
["operation"] = "multiply",
["a"] = 6,
["b"] = 7
}
};
// Act
var response = await server.ProcessRequestAsync(request);
// Assert
Assert.NotNull(response);
Assert.Equal(McpStatusCodes.Success, response.StatusCode);
// Additional assertions for response content
// Cleanup
await server.StopAsync();
}엔드 투 엔드 테스트는 클라이언트에서 서버까지의 전체 시스템 동작을 검증합니다.
- 클라이언트-서버 통신: 전체 요청-응답 주기 테스트
- 실제 클라이언트 SDK: 실제 클라이언트 구현으로 테스트
- 부하 상태에서의 성능: 여러 동시 요청으로 동작 검증
- 오류 복구: 실패에서 시스템 복구 테스트
- 장시간 실행 작업: 스트리밍 및 장시간 작업 처리 검증
// Example E2E test with a client in TypeScript
describe('MCP Server E2E Tests', () => {
let client: McpClient;
beforeAll(async () => {
// Start server in test environment
await startTestServer();
client = new McpClient('http://localhost:5000');
});
afterAll(async () => {
await stopTestServer();
});
test('Client can invoke calculator tool and get correct result', async () => {
// Act
const response = await client.invokeToolAsync('calculator', {
operation: 'divide',
a: 20,
b: 4
});
// Assert
expect(response.statusCode).toBe(200);
expect(response.content[0].text).toContain('5');
});
});모의(Mock)는 테스트 중 구성 요소를 분리하는 데 필수적입니다.
- 외부 AI 모델: 예측 가능한 테스트를 위해 모델 응답 모의
- 외부 서비스: API 종속성(데이터베이스, 타사 서비스) 모의
- 인증 서비스: ID 제공자 모의
- 리소스 제공자: 비용이 많이 드는 리소스 핸들러 모의
// C# example with Moq
var mockModel = new Mock<ILanguageModel>();
mockModel
.Setup(m => m.GenerateResponseAsync(
It.IsAny<string>(),
It.IsAny<McpRequestContext>()))
.ReturnsAsync(new ModelResponse {
Text = "Mocked model response",
FinishReason = FinishReason.Completed
});
var server = new McpServer(modelClient: mockModel.Object);# Python example with unittest.mock
@patch('mcp_server.models.OpenAIModel')
def test_with_mock_model(mock_model):
# Configure mock
mock_model.return_value.generate_response.return_value = {
"text": "Mocked model response",
"finish_reason": "completed"
}
# Use mock in test
server = McpServer(model_client=mock_model)
# Continue with test성능 테스트는 생산 MCP 서버에 필수적입니다.
- 지연 시간: 요청에 대한 응답 시간
- 처리량: 초당 처리되는 요청 수
- 리소스 사용량: CPU, 메모리, 네트워크 사용량
- 동시 처리 능력: 병렬 요청 처리 동작
- 확장 특성: 부하 증가 시 성능
- k6: 오픈소스 부하 테스트 도구
- JMeter: 종합적인 성능 테스트
- Locust: Python 기반 부하 테스트
- Azure Load Testing: 클라우드 기반 성능 테스트
// k6 script for load testing MCP server
import http from 'k6/http';
import { check, sleep } from 'k6';
export const options = {
vus: 10, // 10 virtual users
duration: '30s',
};
export default function () {
const payload = JSON.stringify({
tool: 'calculator',
parameters: {
operation: 'add',
a: Math.floor(Math.random() * 100),
b: Math.floor(Math.random() * 100)
}
});
const params = {
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer test-token'
},
};
const res = http.post('http://localhost:5000/api/tools/invoke', payload, params);
check(res, {
'status is 200': (r) => r.status === 200,
'response time < 500ms': (r) => r.timings.duration < 500,
});
sleep(1);
}테스트를 자동화하면 일관된 품질과 빠른 피드백 루프를 보장합니다.
- 풀 요청에서 단위 테스트 실행: 코드 변경이 기존 기능을 깨뜨리지 않도록 보장
- 스테이징에서 통합 테스트 실행: 사전 생산 환경에서 통합 테스트 실행
- 성능 기준: 성능 벤치마크를 유지하여 회귀 문제를 감지
- 보안 스캔: 파이프라인의 일부로 보안 테스트를 자동화
name: MCP Server Tests
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Runtime
uses: actions/setup-dotnet@v1
with:
dotnet-version: '8.0.x'
- name: Restore dependencies
run: dotnet restore
- name: Build
run: dotnet build --no-restore
- name: Unit Tests
run: dotnet test --no-build --filter Category=Unit
- name: Integration Tests
run: dotnet test --no-build --filter Category=Integration
- name: Performance Tests
run: dotnet run --project tests/PerformanceTests/PerformanceTests.csproj서버가 MCP 사양을 올바르게 구현했는지 확인하세요.
- API 엔드포인트: 필수 엔드포인트 (/resources, /tools 등) 테스트
- 요청/응답 형식: 스키마 준수 여부 확인
- 에러 코드: 다양한 시나리오에 대한 올바른 상태 코드 확인
- 콘텐츠 유형: 다양한 콘텐츠 유형 처리 테스트
- 인증 흐름: 사양에 맞는 인증 메커니즘 확인
[Fact]
public async Task Server_ResourceEndpoint_ReturnsCorrectSchema()
{
// Arrange
var client = new HttpClient();
client.DefaultRequestHeaders.Add("Authorization", "Bearer test-token");
// Act
var response = await client.GetAsync("http://localhost:5000/api/resources");
var content = await response.Content.ReadAsStringAsync();
var resources = JsonSerializer.Deserialize<ResourceList>(content);
// Assert
Assert.Equal(HttpStatusCode.OK, response.StatusCode);
Assert.NotNull(resources);
Assert.All(resources.Resources, resource =>
{
Assert.NotNull(resource.Id);
Assert.NotNull(resource.Type);
// Additional schema validation
});
}- 도구 정의를 별도로 테스트: 도구 로직과는 독립적으로 스키마 정의를 검증
- 매개변수화된 테스트 사용: 다양한 입력값, 특히 극단적인 경우를 포함하여 도구를 테스트
- 에러 응답 확인: 모든 가능한 에러 조건에 대한 적절한 에러 처리 확인
- 권한 부여 로직 테스트: 다양한 사용자 역할에 대한 적절한 접근 제어 확인
- 테스트 커버리지 모니터링: 중요한 코드 경로에 대해 높은 커버리지를 목표로 설정
- 스트리밍 응답 테스트: 스트리밍 콘텐츠 처리 확인
- 네트워크 문제 시뮬레이션: 열악한 네트워크 조건에서의 동작 테스트
- 자원 제한 테스트: 할당량 또는 속도 제한에 도달했을 때의 동작 확인
- 회귀 테스트 자동화: 코드 변경 시마다 실행되는 테스트 스위트 구축
- 테스트 케이스 문서화: 테스트 시나리오에 대한 명확한 문서 유지
- 행복 경로 테스트에 지나치게 의존: 에러 케이스를 철저히 테스트해야 함
- 성능 테스트 무시: 프로덕션에 영향을 미치기 전에 병목 현상을 식별
- 단독 테스트만 수행: 단위, 통합, E2E 테스트를 결합하여 실행
- 불완전한 API 커버리지: 모든 엔드포인트와 기능이 테스트되었는지 확인
- 일관성 없는 테스트 환경: 컨테이너를 사용하여 일관된 테스트 환경 유지
포괄적인 테스트 전략은 신뢰할 수 있고 고품질의 MCP 서버를 개발하는 데 필수적입니다. 이 가이드에서 제시된 모범 사례와 팁을 구현함으로써 MCP 구현이 최고 수준의 품질, 신뢰성, 성능을 충족하도록 할 수 있습니다.
- 도구 설계: 단일 책임 원칙을 따르고, 의존성 주입을 사용하며, 구성 가능성을 고려하여 설계
- 스키마 설계: 명확하고 잘 문서화된 스키마를 생성하고 적절한 검증 제약 조건 추가
- 에러 처리: 우아한 에러 처리, 구조화된 에러 응답, 재시도 로직 구현
- 성능: 캐싱, 비동기 처리, 자원 제한 적용
- 보안: 철저한 입력 검증, 권한 확인, 민감한 데이터 처리 적용
- 테스트: 포괄적인 단위, 통합, E2E 테스트 생성
- 워크플로 패턴: 체인, 디스패처, 병렬 처리와 같은 확립된 패턴 적용
다음 시나리오에 적합한 MCP 도구와 워크플로를 설계하세요:
- 여러 형식(PDF, DOCX, TXT)의 문서를 수락
- 문서에서 텍스트와 주요 정보를 추출
- 문서를 유형과 내용별로 분류
- 각 문서의 요약 생성
이 구현에 적합한 도구 스키마, 에러 처리, 워크플로 패턴을 설계하세요. 이 구현을 테스트하는 방법도 고려하세요.
- 최신 정보를 얻기 위해 Azure AI Foundry Discord Community에서 MCP 커뮤니티에 참여하세요
- 오픈소스 MCP 프로젝트에 기여하세요
- MCP 원칙을 자신의 조직의 AI 이니셔티브에 적용하세요
- 자신의 산업에 특화된 MCP 구현을 탐색하세요
- 멀티모달 통합 또는 엔터프라이즈 애플리케이션 통합과 같은 특정 MCP 주제에 대한 고급 과정을 고려하세요
- Hands on Lab을 통해 배운 원칙을 사용하여 자신만의 MCP 도구와 워크플로를 실험적으로 구축하세요
다음: 모범 사례 사례 연구
면책 조항:
이 문서는 AI 번역 서비스 Co-op Translator를 사용하여 번역되었습니다. 정확성을 위해 최선을 다하고 있지만, 자동 번역에는 오류나 부정확성이 포함될 수 있습니다. 원본 문서의 원어 버전을 권위 있는 출처로 간주해야 합니다. 중요한 정보의 경우, 전문적인 인간 번역을 권장합니다. 이 번역 사용으로 인해 발생하는 오해나 잘못된 해석에 대해 책임을 지지 않습니다.
