(Clicca sull'immagine sopra per vedere il video di questa lezione)
Questa lezione si concentra sulle best practice avanzate per lo sviluppo, il testing e il deployment di server e funzionalità MCP in ambienti di produzione. Man mano che gli ecosistemi MCP crescono in complessità e importanza, seguire schemi consolidati garantisce affidabilità, manutenibilità e interoperabilità. Questa lezione consolida la saggezza pratica acquisita da implementazioni MCP reali per guidarti nella creazione di server robusti ed efficienti con risorse, prompt e strumenti efficaci.
Alla fine di questa lezione, sarai in grado di:
- Applicare le best practice del settore nel design di server e funzionalità MCP
- Creare strategie di testing complete per server MCP
- Progettare schemi di workflow efficienti e riutilizzabili per applicazioni MCP complesse
- Implementare una corretta gestione degli errori, logging e osservabilità nei server MCP
- Ottimizzare le implementazioni MCP per prestazioni, sicurezza e manutenibilità
Prima di entrare nelle pratiche di implementazione specifiche, è importante comprendere i principi fondamentali che guidano uno sviluppo MCP efficace:
-
Comunicazione Standardizzata: MCP utilizza JSON-RPC 2.0 come base, fornendo un formato coerente per richieste, risposte e gestione degli errori in tutte le implementazioni.
-
Design User-Centric: Dai sempre priorità al consenso, al controllo e alla trasparenza dell’utente nelle tue implementazioni MCP.
-
Sicurezza Prima di Tutto: Implementa misure di sicurezza robuste includendo autenticazione, autorizzazione, validazione e limitazione della velocità.
-
Architettura Modulare: Progetta i tuoi server MCP con un approccio modulare, dove ogni strumento e risorsa ha uno scopo chiaro e focalizzato.
-
Connessioni Stateful: Sfrutta la capacità di MCP di mantenere lo stato attraverso più richieste per interazioni più coerenti e consapevoli del contesto.
Le seguenti best practice derivano dalla documentazione ufficiale del Model Context Protocol:
-
Consenso e Controllo Utente: Richiedi sempre consenso esplicito dell’utente prima di accedere ai dati o eseguire operazioni. Fornisci un controllo chiaro su quali dati sono condivisi e quali azioni sono autorizzate.
-
Privacy dei Dati: Esporre dati utente solo con consenso esplicito e proteggerli con controlli di accesso appropriati. Salvaguardare da trasmissioni di dati non autorizzate.
-
Sicurezza degli Strumenti: Richiedere consenso esplicito prima di invocare qualsiasi strumento. Assicurarsi che gli utenti comprendano la funzionalità di ogni strumento e applicare confini di sicurezza robusti.
-
Controllo dei Permessi degli Strumenti: Configurare quali strumenti un modello può usare durante una sessione, assicurando che siano accessibili solo gli strumenti autorizzati esplicitamente.
-
Autenticazione: Richiedere un’adeguata autenticazione prima di concedere accesso a strumenti, risorse o operazioni sensibili usando chiavi API, token OAuth o altri metodi sicuri.
-
Validazione dei Parametri: Applicare la validazione per tutte le invocazioni degli strumenti per evitare input malformati o malevoli nelle implementazioni di strumenti.
-
Limitazione della Velocità (Rate Limiting): Implementare limitazioni per prevenire abusi e garantire un uso equo delle risorse del server.
-
Negoziazione delle Capacità: Durante la configurazione della connessione, scambiarsi informazioni sulle funzionalità supportate, versioni del protocollo, strumenti e risorse disponibili.
-
Design degli Strumenti: Creare strumenti focalizzati che fanno bene una cosa, piuttosto che strumenti monolitici che gestiscono molteplici preoccupazioni.
-
Gestione degli Errori: Implementare messaggi di errore e codici standardizzati per aiutare a diagnosticare problemi, gestire errori con grazia e fornire feedback azionabili.
-
Logging: Configurare log strutturati per auditing, debugging e monitoraggio delle interazioni del protocollo.
-
Tracciamento del Progresso: Per operazioni a lunga durata, riportare aggiornamenti di progresso per abilitare interfacce utente reattive.
-
Cancellazione delle Richieste: Permettere ai client di cancellare richieste in corso che non sono più necessarie o che richiedono troppo tempo.
Per informazioni aggiornate sulle best practice MCP, consulta:
- Documentazione MCP
- Specifiche MCP (2025-11-25)
- Repository GitHub
- Best Practice di Sicurezza
- OWASP MCP Top 10 - Rischi di sicurezza e mitigazioni
- Workshop MCP Security Summit (Sherpa) - Formazione pratica sulla sicurezza
Ogni strumento MCP dovrebbe avere uno scopo chiaro e focalizzato. Invece di creare strumenti monolitici che cercano di gestire molteplici aspetti, sviluppa strumenti specializzati che eccellono in compiti specifici.
// A focused tool that does one thing well
public class WeatherForecastTool : ITool
{
private readonly IWeatherService _weatherService;
public WeatherForecastTool(IWeatherService weatherService)
{
_weatherService = weatherService;
}
public string Name => "weatherForecast";
public string Description => "Gets weather forecast for a specific location";
public ToolDefinition GetDefinition()
{
return new ToolDefinition
{
Name = Name,
Description = Description,
Parameters = new Dictionary<string, ParameterDefinition>
{
["location"] = new ParameterDefinition
{
Type = ParameterType.String,
Description = "City or location name"
},
["days"] = new ParameterDefinition
{
Type = ParameterType.Integer,
Description = "Number of forecast days",
Default = 3
}
},
Required = new[] { "location" }
};
}
public async Task<ToolResponse> ExecuteAsync(IDictionary<string, object> parameters)
{
var location = parameters["location"].ToString();
var days = parameters.ContainsKey("days")
? Convert.ToInt32(parameters["days"])
: 3;
var forecast = await _weatherService.GetForecastAsync(location, days);
return new ToolResponse
{
Content = new List<ContentItem>
{
new TextContent(JsonSerializer.Serialize(forecast))
}
};
}
}Implementa una gestione robusta degli errori con messaggi informativi e meccanismi di recupero appropriati.
# Esempio Python con gestione completa degli errori
class DataQueryTool:
def get_name(self):
return "dataQuery"
def get_description(self):
return "Queries data from specified database tables"
async def execute(self, parameters):
try:
# Validazione dei parametri
if "query" not in parameters:
raise ToolParameterError("Missing required parameter: query")
query = parameters["query"]
# Validazione della sicurezza
if self._contains_unsafe_sql(query):
raise ToolSecurityError("Query contains potentially unsafe SQL")
try:
# Operazione su database con timeout
async with timeout(10): # Timeout di 10 secondi
result = await self._database.execute_query(query)
return ToolResponse(
content=[TextContent(json.dumps(result))]
)
except asyncio.TimeoutError:
raise ToolExecutionError("Database query timed out after 10 seconds")
except DatabaseConnectionError as e:
# Gli errori di connessione potrebbero essere transitori
self._log_error("Database connection error", e)
raise ToolExecutionError(f"Database connection error: {str(e)}")
except DatabaseQueryError as e:
# Gli errori di query sono probabilmente errori lato client
self._log_error("Database query error", e)
raise ToolExecutionError(f"Invalid query: {str(e)}")
except ToolError:
# Lascia passare gli errori specifici dello strumento
raise
except Exception as e:
# Gestione generale per errori inaspettati
self._log_error("Unexpected error in DataQueryTool", e)
raise ToolExecutionError(f"An unexpected error occurred: {str(e)}")
def _contains_unsafe_sql(self, query):
# Implementazione del rilevamento di SQL injection
pass
def _log_error(self, message, error):
# Implementazione della registrazione degli errori
passValida sempre in modo approfondito i parametri per prevenire input malformati o malevoli.
// Esempio JavaScript/TypeScript con validazione dettagliata dei parametri
class FileOperationTool {
getName() {
return "fileOperation";
}
getDescription() {
return "Performs file operations like read, write, and delete";
}
getDefinition() {
return {
name: this.getName(),
description: this.getDescription(),
parameters: {
operation: {
type: "string",
description: "Operation to perform",
enum: ["read", "write", "delete"]
},
path: {
type: "string",
description: "File path (must be within allowed directories)"
},
content: {
type: "string",
description: "Content to write (only for write operation)",
optional: true
}
},
required: ["operation", "path"]
};
}
async execute(parameters) {
// 1. Validare la presenza del parametro
if (!parameters.operation) {
throw new ToolError("Missing required parameter: operation");
}
if (!parameters.path) {
throw new ToolError("Missing required parameter: path");
}
// 2. Validare i tipi dei parametri
if (typeof parameters.operation !== "string") {
throw new ToolError("Parameter 'operation' must be a string");
}
if (typeof parameters.path !== "string") {
throw new ToolError("Parameter 'path' must be a string");
}
// 3. Validare i valori dei parametri
const validOperations = ["read", "write", "delete"];
if (!validOperations.includes(parameters.operation)) {
throw new ToolError(`Invalid operation. Must be one of: ${validOperations.join(", ")}`);
}
// 4. Validare la presenza di contenuto per l'operazione di scrittura
if (parameters.operation === "write" && !parameters.content) {
throw new ToolError("Content parameter is required for write operation");
}
// 5. Validazione della sicurezza del percorso
if (!this.isPathWithinAllowedDirectories(parameters.path)) {
throw new ToolError("Access denied: path is outside of allowed directories");
}
// Implementazione basata sui parametri validati
// ...
}
isPathWithinAllowedDirectories(path) {
// Implementazione del controllo di sicurezza del percorso
// ...
}
}// Esempio Java con autenticazione e autorizzazione
public class SecureDataAccessTool implements Tool {
private final AuthenticationService authService;
private final AuthorizationService authzService;
private final DataService dataService;
// Iniezione delle dipendenze
public SecureDataAccessTool(
AuthenticationService authService,
AuthorizationService authzService,
DataService dataService) {
this.authService = authService;
this.authzService = authzService;
this.dataService = dataService;
}
@Override
public String getName() {
return "secureDataAccess";
}
@Override
public ToolResponse execute(ToolRequest request) {
// 1. Estrarre il contesto di autenticazione
String authToken = request.getContext().getAuthToken();
// 2. Autenticare l'utente
UserIdentity user;
try {
user = authService.validateToken(authToken);
} catch (AuthenticationException e) {
return ToolResponse.error("Authentication failed: " + e.getMessage());
}
// 3. Verificare l'autorizzazione per l'operazione specifica
String dataId = request.getParameters().get("dataId").getAsString();
String operation = request.getParameters().get("operation").getAsString();
boolean isAuthorized = authzService.isAuthorized(user, "data:" + dataId, operation);
if (!isAuthorized) {
return ToolResponse.error("Access denied: Insufficient permissions for this operation");
}
// 4. Procedere con l'operazione autorizzata
try {
switch (operation) {
case "read":
Object data = dataService.getData(dataId, user.getId());
return ToolResponse.success(data);
case "update":
JsonNode newData = request.getParameters().get("newData");
dataService.updateData(dataId, newData, user.getId());
return ToolResponse.success("Data updated successfully");
default:
return ToolResponse.error("Unsupported operation: " + operation);
}
} catch (Exception e) {
return ToolResponse.error("Operation failed: " + e.getMessage());
}
}
}// C# rate limiting implementation
public class RateLimitingMiddleware
{
private readonly RequestDelegate _next;
private readonly IMemoryCache _cache;
private readonly ILogger<RateLimitingMiddleware> _logger;
// Configuration options
private readonly int _maxRequestsPerMinute;
public RateLimitingMiddleware(
RequestDelegate next,
IMemoryCache cache,
ILogger<RateLimitingMiddleware> logger,
IConfiguration config)
{
_next = next;
_cache = cache;
_logger = logger;
_maxRequestsPerMinute = config.GetValue<int>("RateLimit:MaxRequestsPerMinute", 60);
}
public async Task InvokeAsync(HttpContext context)
{
// 1. Get client identifier (API key or user ID)
string clientId = GetClientIdentifier(context);
// 2. Get rate limiting key for this minute
string cacheKey = $"rate_limit:{clientId}:{DateTime.UtcNow:yyyyMMddHHmm}";
// 3. Check current request count
if (!_cache.TryGetValue(cacheKey, out int requestCount))
{
requestCount = 0;
}
// 4. Enforce rate limit
if (requestCount >= _maxRequestsPerMinute)
{
_logger.LogWarning("Rate limit exceeded for client {ClientId}", clientId);
context.Response.StatusCode = StatusCodes.Status429TooManyRequests;
context.Response.Headers.Add("Retry-After", "60");
await context.Response.WriteAsJsonAsync(new
{
error = "Rate limit exceeded",
message = "Too many requests. Please try again later.",
retryAfterSeconds = 60
});
return;
}
// 5. Increment request count
_cache.Set(cacheKey, requestCount + 1, TimeSpan.FromMinutes(2));
// 6. Add rate limit headers
context.Response.Headers.Add("X-RateLimit-Limit", _maxRequestsPerMinute.ToString());
context.Response.Headers.Add("X-RateLimit-Remaining", (_maxRequestsPerMinute - requestCount - 1).ToString());
// 7. Continue with the request
await _next(context);
}
private string GetClientIdentifier(HttpContext context)
{
// Implementation to extract API key or user ID
// ...
}
}Testa sempre i tuoi strumenti in isolamento, simulando dipendenze esterne:
// Esempio di test unitario di uno strumento in TypeScript
describe('WeatherForecastTool', () => {
let tool: WeatherForecastTool;
let mockWeatherService: jest.Mocked<IWeatherService>;
beforeEach(() => {
// Crea un servizio meteo fittizio
mockWeatherService = {
getForecasts: jest.fn()
} as any;
// Crea lo strumento con la dipendenza fittizia
tool = new WeatherForecastTool(mockWeatherService);
});
it('should return weather forecast for a location', async () => {
// Prepara
const mockForecast = {
location: 'Seattle',
forecasts: [
{ date: '2025-07-16', temperature: 72, conditions: 'Sunny' },
{ date: '2025-07-17', temperature: 68, conditions: 'Partly Cloudy' },
{ date: '2025-07-18', temperature: 65, conditions: 'Rain' }
]
};
mockWeatherService.getForecasts.mockResolvedValue(mockForecast);
// Esegui
const response = await tool.execute({
location: 'Seattle',
days: 3
});
// Verifica
expect(mockWeatherService.getForecasts).toHaveBeenCalledWith('Seattle', 3);
expect(response.content[0].text).toContain('Seattle');
expect(response.content[0].text).toContain('Sunny');
});
it('should handle errors from the weather service', async () => {
// Prepara
mockWeatherService.getForecasts.mockRejectedValue(new Error('Service unavailable'));
// Esegui e verifica
await expect(tool.execute({
location: 'Seattle',
days: 3
})).rejects.toThrow('Weather service error: Service unavailable');
});
});Testa il flusso completo dalle richieste client alle risposte del server:
# Esempio di test di integrazione Python
@pytest.mark.asyncio
async def test_mcp_server_integration():
# Avvia un server di test
server = McpServer()
server.register_tool(WeatherForecastTool(MockWeatherService()))
await server.start(port=5000)
try:
# Crea un client
client = McpClient("http://localhost:5000")
# Testa la scoperta degli strumenti
tools = await client.discover_tools()
assert "weatherForecast" in [t.name for t in tools]
# Testa l'esecuzione dello strumento
response = await client.execute_tool("weatherForecast", {
"location": "Seattle",
"days": 3
})
# Verifica la risposta
assert response.status_code == 200
assert "Seattle" in response.content[0].text
assert len(json.loads(response.content[0].text)["forecasts"]) == 3
finally:
# Pulisci
await server.stop()Implementa caching adeguato per ridurre latenza e uso delle risorse:
// C# example with caching
public class CachedWeatherTool : ITool
{
private readonly IWeatherService _weatherService;
private readonly IDistributedCache _cache;
private readonly ILogger<CachedWeatherTool> _logger;
public CachedWeatherTool(
IWeatherService weatherService,
IDistributedCache cache,
ILogger<CachedWeatherTool> logger)
{
_weatherService = weatherService;
_cache = cache;
_logger = logger;
}
public string Name => "weatherForecast";
public async Task<ToolResponse> ExecuteAsync(IDictionary<string, object> parameters)
{
var location = parameters["location"].ToString();
var days = Convert.ToInt32(parameters.GetValueOrDefault("days", 3));
// Create cache key
string cacheKey = $"weather:{location}:{days}";
// Try to get from cache
string cachedForecast = await _cache.GetStringAsync(cacheKey);
if (!string.IsNullOrEmpty(cachedForecast))
{
_logger.LogInformation("Cache hit for weather forecast: {Location}", location);
return new ToolResponse
{
Content = new List<ContentItem>
{
new TextContent(cachedForecast)
}
};
}
// Cache miss - get from service
_logger.LogInformation("Cache miss for weather forecast: {Location}", location);
var forecast = await _weatherService.GetForecastAsync(location, days);
string forecastJson = JsonSerializer.Serialize(forecast);
// Store in cache (weather forecasts valid for 1 hour)
await _cache.SetStringAsync(
cacheKey,
forecastJson,
new DistributedCacheEntryOptions
{
AbsoluteExpirationRelativeToNow = TimeSpan.FromHours(1)
});
return new ToolResponse
{
Content = new List<ContentItem>
{
new TextContent(forecastJson)
}
};
}
}Progetta gli strumenti per ricevere le loro dipendenze tramite injection nel costruttore, rendendoli testabili e configurabili:
// Esempio Java con iniezione delle dipendenze
public class CurrencyConversionTool implements Tool {
private final ExchangeRateService exchangeService;
private final CacheService cacheService;
private final Logger logger;
// Dipendenze iniettate tramite costruttore
public CurrencyConversionTool(
ExchangeRateService exchangeService,
CacheService cacheService,
Logger logger) {
this.exchangeService = exchangeService;
this.cacheService = cacheService;
this.logger = logger;
}
// Implementazione dello strumento
// ...
}Progetta strumenti che possano essere composti insieme per creare workflow più complessi:
# Esempio Python che mostra strumenti componibili
class DataFetchTool(Tool):
def get_name(self):
return "dataFetch"
# Implementazione...
class DataAnalysisTool(Tool):
def get_name(self):
return "dataAnalysis"
# Questo strumento può utilizzare i risultati dello strumento dataFetch
async def execute_async(self, request):
# Implementazione...
pass
class DataVisualizationTool(Tool):
def get_name(self):
return "dataVisualize"
# Questo strumento può utilizzare i risultati dello strumento dataAnalysis
async def execute_async(self, request):
# Implementazione...
pass
# Questi strumenti possono essere utilizzati indipendentemente o come parte di un flusso di lavoroLo schema è il contratto tra il modello e il tuo strumento. Schemi ben progettati portano a una migliore usabilità dello strumento.
Includi sempre informazioni descrittive per ogni parametro:
public object GetSchema()
{
return new {
type = "object",
properties = new {
query = new {
type = "string",
description = "Search query text. Use precise keywords for better results."
},
filters = new {
type = "object",
description = "Optional filters to narrow down search results",
properties = new {
dateRange = new {
type = "string",
description = "Date range in format YYYY-MM-DD:YYYY-MM-DD"
},
category = new {
type = "string",
description = "Category name to filter by"
}
}
},
limit = new {
type = "integer",
description = "Maximum number of results to return (1-50)",
default = 10
}
},
required = new[] { "query" }
};
}Includi vincoli di validazione per prevenire input non validi:
Map<String, Object> getSchema() {
Map<String, Object> schema = new HashMap<>();
schema.put("type", "object");
Map<String, Object> properties = new HashMap<>();
// Proprietà email con convalida del formato
Map<String, Object> email = new HashMap<>();
email.put("type", "string");
email.put("format", "email");
email.put("description", "User email address");
// Proprietà età con vincoli numerici
Map<String, Object> age = new HashMap<>();
age.put("type", "integer");
age.put("minimum", 13);
age.put("maximum", 120);
age.put("description", "User age in years");
// Proprietà enumerata
Map<String, Object> subscription = new HashMap<>();
subscription.put("type", "string");
subscription.put("enum", Arrays.asList("free", "basic", "premium"));
subscription.put("default", "free");
subscription.put("description", "Subscription tier");
properties.put("email", email);
properties.put("age", age);
properties.put("subscription", subscription);
schema.put("properties", properties);
schema.put("required", Arrays.asList("email"));
return schema;
}Mantieni coerenza nelle tue strutture di risposta per facilitare l’interpretazione dei risultati da parte dei modelli:
async def execute_async(self, request):
try:
# Elabora la richiesta
results = await self._search_database(request.parameters["query"])
# Restituisci sempre una struttura coerente
return ToolResponse(
result={
"matches": [self._format_item(item) for item in results],
"totalCount": len(results),
"queryTime": calculation_time_ms,
"status": "success"
}
)
except Exception as e:
return ToolResponse(
result={
"matches": [],
"totalCount": 0,
"queryTime": 0,
"status": "error",
"error": str(e)
}
)
def _format_item(self, item):
"""Ensures each item has a consistent structure"""
return {
"id": item.id,
"title": item.title,
"summary": item.summary[:100] + "..." if len(item.summary) > 100 else item.summary,
"url": item.url,
"relevance": item.score
}Una gestione robusta degli errori è cruciale per mantenere l’affidabilità degli strumenti MCP.
Gestisci gli errori a livelli appropriati e fornisci messaggi informativi:
public async Task<ToolResponse> ExecuteAsync(ToolRequest request)
{
try
{
string fileId = request.Parameters.GetProperty("fileId").GetString();
try
{
var fileData = await _fileService.GetFileAsync(fileId);
return new ToolResponse {
Result = JsonSerializer.SerializeToElement(fileData)
};
}
catch (FileNotFoundException)
{
throw new ToolExecutionException($"File not found: {fileId}");
}
catch (UnauthorizedAccessException)
{
throw new ToolExecutionException("You don't have permission to access this file");
}
catch (Exception ex) when (ex is IOException || ex is TimeoutException)
{
_logger.LogError(ex, "Error accessing file {FileId}", fileId);
throw new ToolExecutionException("Error accessing file: The service is temporarily unavailable");
}
}
catch (JsonException)
{
throw new ToolExecutionException("Invalid file ID format");
}
catch (Exception ex)
{
_logger.LogError(ex, "Unexpected error in FileAccessTool");
throw new ToolExecutionException("An unexpected error occurred");
}
}Ritorna informazioni di errore strutturate quando possibile:
@Override
public ToolResponse execute(ToolRequest request) {
try {
// Implementazione
} catch (Exception ex) {
Map<String, Object> errorResult = new HashMap<>();
errorResult.put("success", false);
if (ex instanceof ValidationException) {
ValidationException validationEx = (ValidationException) ex;
errorResult.put("errorType", "validation");
errorResult.put("errorMessage", validationEx.getMessage());
errorResult.put("validationErrors", validationEx.getErrors());
return new ToolResponse.Builder()
.setResult(errorResult)
.build();
}
// Rilancia altre eccezioni come ToolExecutionException
throw new ToolExecutionException("Tool execution failed: " + ex.getMessage(), ex);
}
}Implementa logiche di ritento appropriate per errori transitori:
async def execute_async(self, request):
max_retries = 3
retry_count = 0
base_delay = 1 # secondi
while retry_count < max_retries:
try:
# Chiamare API esterna
return await self._call_api(request.parameters)
except TransientError as e:
retry_count += 1
if retry_count >= max_retries:
raise ToolExecutionException(f"Operation failed after {max_retries} attempts: {str(e)}")
# Ritardo esponenziale
delay = base_delay * (2 ** (retry_count - 1))
logging.warning(f"Transient error, retrying in {delay}s: {str(e)}")
await asyncio.sleep(delay)
except Exception as e:
# Errore non transitorio, non riprovare
raise ToolExecutionException(f"Operation failed: {str(e)}")Implementa caching per operazioni costose:
public class CachedDataTool : IMcpTool
{
private readonly IDatabase _database;
private readonly IMemoryCache _cache;
public CachedDataTool(IDatabase database, IMemoryCache cache)
{
_database = database;
_cache = cache;
}
public async Task<ToolResponse> ExecuteAsync(ToolRequest request)
{
var query = request.Parameters.GetProperty("query").GetString();
// Create cache key based on parameters
var cacheKey = $"data_query_{ComputeHash(query)}";
// Try to get from cache first
if (_cache.TryGetValue(cacheKey, out var cachedResult))
{
return new ToolResponse { Result = cachedResult };
}
// Cache miss - perform actual query
var result = await _database.QueryAsync(query);
// Store in cache with expiration
var cacheOptions = new MemoryCacheEntryOptions()
.SetAbsoluteExpiration(TimeSpan.FromMinutes(15));
_cache.Set(cacheKey, JsonSerializer.SerializeToElement(result), cacheOptions);
return new ToolResponse { Result = JsonSerializer.SerializeToElement(result) };
}
private string ComputeHash(string input)
{
// Implementation to generate stable hash for cache key
}
}Usa pattern di programmazione asincrona per operazioni I/O-bound:
public class AsyncDocumentProcessingTool implements Tool {
private final DocumentService documentService;
private final ExecutorService executorService;
@Override
public ToolResponse execute(ToolRequest request) {
String documentId = request.getParameters().get("documentId").asText();
// Per operazioni a lunga durata, restituisci immediatamente un ID di elaborazione
String processId = UUID.randomUUID().toString();
// Avvia l'elaborazione asincrona
CompletableFuture.runAsync(() -> {
try {
// Esegui un'operazione a lunga durata
documentService.processDocument(documentId);
// Aggiorna lo stato (normalmente verrebbe salvato in un database)
processStatusRepository.updateStatus(processId, "completed");
} catch (Exception ex) {
processStatusRepository.updateStatus(processId, "failed", ex.getMessage());
}
}, executorService);
// Restituisci una risposta immediata con l'ID del processo
Map<String, Object> result = new HashMap<>();
result.put("processId", processId);
result.put("status", "processing");
result.put("estimatedCompletionTime", ZonedDateTime.now().plusMinutes(5));
return new ToolResponse.Builder().setResult(result).build();
}
// Strumento di controllo stato companion
public class ProcessStatusTool implements Tool {
@Override
public ToolResponse execute(ToolRequest request) {
String processId = request.getParameters().get("processId").asText();
ProcessStatus status = processStatusRepository.getStatus(processId);
return new ToolResponse.Builder().setResult(status).build();
}
}
}Implementa throttling delle risorse per prevenire sovraccarichi:
class ThrottledApiTool(Tool):
def __init__(self):
self.rate_limiter = TokenBucketRateLimiter(
tokens_per_second=5, # Permettere 5 richieste al secondo
bucket_size=10 # Permettere picchi fino a 10 richieste
)
async def execute_async(self, request):
# Verificare se possiamo procedere o se dobbiamo aspettare
delay = self.rate_limiter.get_delay_time()
if delay > 0:
if delay > 2.0: # Se l'attesa è troppo lunga
raise ToolExecutionException(
f"Rate limit exceeded. Please try again in {delay:.1f} seconds."
)
else:
# Aspettare il tempo di ritardo appropriato
await asyncio.sleep(delay)
# Consumare un token e procedere con la richiesta
self.rate_limiter.consume()
# Chiamare l'API
result = await self._call_api(request.parameters)
return ToolResponse(result=result)
class TokenBucketRateLimiter:
def __init__(self, tokens_per_second, bucket_size):
self.tokens_per_second = tokens_per_second
self.bucket_size = bucket_size
self.tokens = bucket_size
self.last_refill = time.time()
self.lock = asyncio.Lock()
async def get_delay_time(self):
async with self.lock:
self._refill()
if self.tokens >= 1:
return 0
# Calcolare il tempo fino al prossimo token disponibile
return (1 - self.tokens) / self.tokens_per_second
async def consume(self):
async with self.lock:
self._refill()
self.tokens -= 1
def _refill(self):
now = time.time()
elapsed = now - self.last_refill
# Aggiungere nuovi token basati sul tempo trascorso
new_tokens = elapsed * self.tokens_per_second
self.tokens = min(self.bucket_size, self.tokens + new_tokens)
self.last_refill = nowValida sempre in modo approfondito i parametri di input:
public async Task<ToolResponse> ExecuteAsync(ToolRequest request)
{
// Validate parameters exist
if (!request.Parameters.TryGetProperty("query", out var queryProp))
{
throw new ToolExecutionException("Missing required parameter: query");
}
// Validate correct type
if (queryProp.ValueKind != JsonValueKind.String)
{
throw new ToolExecutionException("Query parameter must be a string");
}
var query = queryProp.GetString();
// Validate string content
if (string.IsNullOrWhiteSpace(query))
{
throw new ToolExecutionException("Query parameter cannot be empty");
}
if (query.Length > 500)
{
throw new ToolExecutionException("Query parameter exceeds maximum length of 500 characters");
}
// Check for SQL injection attacks if applicable
if (ContainsSqlInjection(query))
{
throw new ToolExecutionException("Invalid query: contains potentially unsafe SQL");
}
// Proceed with execution
// ...
}Implementa controlli di autorizzazione adeguati:
@Override
public ToolResponse execute(ToolRequest request) {
// Ottieni il contesto utente dalla richiesta
UserContext user = request.getContext().getUserContext();
// Verifica se l'utente ha i permessi richiesti
if (!authorizationService.hasPermission(user, "documents:read")) {
throw new ToolExecutionException("User does not have permission to access documents");
}
// Per risorse specifiche, verifica l'accesso a quella risorsa
String documentId = request.getParameters().get("documentId").asText();
if (!documentService.canUserAccess(user.getId(), documentId)) {
throw new ToolExecutionException("Access denied to the requested document");
}
// Procedi con l'esecuzione dello strumento
// ...
}Gestisci con cura i dati sensibili:
class SecureDataTool(Tool):
def get_schema(self):
return {
"type": "object",
"properties": {
"userId": {"type": "string"},
"includeSensitiveData": {"type": "boolean", "default": False}
},
"required": ["userId"]
}
async def execute_async(self, request):
user_id = request.parameters["userId"]
include_sensitive = request.parameters.get("includeSensitiveData", False)
# Ottenere dati utente
user_data = await self.user_service.get_user_data(user_id)
# Filtrare campi sensibili a meno che non sia esplicitamente richiesto E autorizzato
if not include_sensitive or not self._is_authorized_for_sensitive_data(request):
user_data = self._redact_sensitive_fields(user_data)
return ToolResponse(result=user_data)
def _is_authorized_for_sensitive_data(self, request):
# Verificare il livello di autorizzazione nel contesto della richiesta
auth_level = request.context.get("authorizationLevel")
return auth_level == "admin"
def _redact_sensitive_fields(self, user_data):
# Creare una copia per evitare di modificare l'originale
redacted = user_data.copy()
# Censurare campi sensibili specifici
sensitive_fields = ["ssn", "creditCardNumber", "password"]
for field in sensitive_fields:
if field in redacted:
redacted[field] = "REDACTED"
# Censurare dati sensibili nidificati
if "financialInfo" in redacted:
redacted["financialInfo"] = {"available": True, "accessRestricted": True}
return redactedUn testing completo assicura che gli strumenti MCP funzionino correttamente, gestiscano i casi limite e si integrino correttamente con il resto del sistema.
Crea test mirati per le funzionalità di ogni strumento:
[Fact]
public async Task WeatherTool_ValidLocation_ReturnsCorrectForecast()
{
// Arrange
var mockWeatherService = new Mock<IWeatherService>();
mockWeatherService
.Setup(s => s.GetForecastAsync("Seattle", 3))
.ReturnsAsync(new WeatherForecast(/* test data */));
var tool = new WeatherForecastTool(mockWeatherService.Object);
var request = new ToolRequest(
toolName: "weatherForecast",
parameters: JsonSerializer.SerializeToElement(new {
location = "Seattle",
days = 3
})
);
// Act
var response = await tool.ExecuteAsync(request);
// Assert
Assert.NotNull(response);
var result = JsonSerializer.Deserialize<WeatherForecast>(response.Result);
Assert.Equal("Seattle", result.Location);
Assert.Equal(3, result.DailyForecasts.Count);
}
[Fact]
public async Task WeatherTool_InvalidLocation_ThrowsToolExecutionException()
{
// Arrange
var mockWeatherService = new Mock<IWeatherService>();
mockWeatherService
.Setup(s => s.GetForecastAsync("InvalidLocation", It.IsAny<int>()))
.ThrowsAsync(new LocationNotFoundException("Location not found"));
var tool = new WeatherForecastTool(mockWeatherService.Object);
var request = new ToolRequest(
toolName: "weatherForecast",
parameters: JsonSerializer.SerializeToElement(new {
location = "InvalidLocation",
days = 3
})
);
// Act & Assert
var exception = await Assert.ThrowsAsync<ToolExecutionException>(
() => tool.ExecuteAsync(request)
);
Assert.Contains("Location not found", exception.Message);
}Verifica che gli schemi siano validi e applichino correttamente i vincoli:
@Test
public void testSchemaValidation() {
// Crea istanza dello strumento
SearchTool searchTool = new SearchTool();
// Ottieni schema
Object schema = searchTool.getSchema();
// Converti lo schema in JSON per la validazione
String schemaJson = objectMapper.writeValueAsString(schema);
// Valida che lo schema sia un JSONSchema valido
JsonSchemaFactory factory = JsonSchemaFactory.byDefault();
JsonSchema jsonSchema = factory.getJsonSchema(schemaJson);
// Testa parametri validi
JsonNode validParams = objectMapper.createObjectNode()
.put("query", "test query")
.put("limit", 5);
ProcessingReport validReport = jsonSchema.validate(validParams);
assertTrue(validReport.isSuccess());
// Testa parametro richiesto mancante
JsonNode missingRequired = objectMapper.createObjectNode()
.put("limit", 5);
ProcessingReport missingReport = jsonSchema.validate(missingRequired);
assertFalse(missingReport.isSuccess());
// Testa tipo di parametro non valido
JsonNode invalidType = objectMapper.createObjectNode()
.put("query", "test")
.put("limit", "not-a-number");
ProcessingReport invalidReport = jsonSchema.validate(invalidType);
assertFalse(invalidReport.isSuccess());
}Crea test specifici per condizioni di errore:
@pytest.mark.asyncio
async def test_api_tool_handles_timeout():
# Disporre
tool = ApiTool(timeout=0.1) # Timeout molto breve
# Simula una richiesta che scadrà
with aioresponses() as mocked:
mocked.get(
"https://api.example.com/data",
callback=lambda *args, **kwargs: asyncio.sleep(0.5) # Più lungo del timeout
)
request = ToolRequest(
tool_name="apiTool",
parameters={"url": "https://api.example.com/data"}
)
# Agire e Verificare
with pytest.raises(ToolExecutionException) as exc_info:
await tool.execute_async(request)
# Verificare il messaggio di eccezione
assert "timed out" in str(exc_info.value).lower()
@pytest.mark.asyncio
async def test_api_tool_handles_rate_limiting():
# Disporre
tool = ApiTool()
# Simula una risposta con limitazione di velocità
with aioresponses() as mocked:
mocked.get(
"https://api.example.com/data",
status=429,
headers={"Retry-After": "2"},
body=json.dumps({"error": "Rate limit exceeded"})
)
request = ToolRequest(
tool_name="apiTool",
parameters={"url": "https://api.example.com/data"}
)
# Agire e Verificare
with pytest.raises(ToolExecutionException) as exc_info:
await tool.execute_async(request)
# Verificare che l'eccezione contenga informazioni sul limite di velocità
error_msg = str(exc_info.value).lower()
assert "rate limit" in error_msg
assert "try again" in error_msgTesta gli strumenti che lavorano insieme nelle combinazioni previste:
[Fact]
public async Task DataProcessingWorkflow_CompletesSuccessfully()
{
// Arrange
var dataFetchTool = new DataFetchTool(mockDataService.Object);
var analysisTools = new DataAnalysisTool(mockAnalysisService.Object);
var visualizationTool = new DataVisualizationTool(mockVisualizationService.Object);
var toolRegistry = new ToolRegistry();
toolRegistry.RegisterTool(dataFetchTool);
toolRegistry.RegisterTool(analysisTools);
toolRegistry.RegisterTool(visualizationTool);
var workflowExecutor = new WorkflowExecutor(toolRegistry);
// Act
var result = await workflowExecutor.ExecuteWorkflowAsync(new[] {
new ToolCall("dataFetch", new { source = "sales2023" }),
new ToolCall("dataAnalysis", ctx => new {
data = ctx.GetResult("dataFetch"),
analysis = "trend"
}),
new ToolCall("dataVisualize", ctx => new {
analysisResult = ctx.GetResult("dataAnalysis"),
type = "line-chart"
})
});
// Assert
Assert.NotNull(result);
Assert.True(result.Success);
Assert.NotNull(result.GetResult("dataVisualize"));
Assert.Contains("chartUrl", result.GetResult("dataVisualize").ToString());
}Testa il server MCP con registrazione completa degli strumenti ed esecuzione:
@SpringBootTest
@AutoConfigureMockMvc
public class McpServerIntegrationTest {
@Autowired
private MockMvc mockMvc;
@Autowired
private ObjectMapper objectMapper;
@Test
public void testToolDiscovery() throws Exception {
// Testare l'endpoint di scoperta
mockMvc.perform(get("/mcp/tools"))
.andExpect(status().isOk())
.andExpect(jsonPath("$.tools").isArray())
.andExpect(jsonPath("$.tools[*].name").value(hasItems(
"weatherForecast", "calculator", "documentSearch"
)));
}
@Test
public void testToolExecution() throws Exception {
// Creare richiesta dello strumento
Map<String, Object> request = new HashMap<>();
request.put("toolName", "calculator");
Map<String, Object> parameters = new HashMap<>();
parameters.put("operation", "add");
parameters.put("a", 5);
parameters.put("b", 7);
request.put("parameters", parameters);
// Inviare la richiesta e verificare la risposta
mockMvc.perform(post("/mcp/execute")
.contentType(MediaType.APPLICATION_JSON)
.content(objectMapper.writeValueAsString(request)))
.andExpect(status().isOk())
.andExpect(jsonPath("$.result.value").value(12));
}
@Test
public void testToolValidation() throws Exception {
// Creare richiesta dello strumento non valida
Map<String, Object> request = new HashMap<>();
request.put("toolName", "calculator");
Map<String, Object> parameters = new HashMap<>();
parameters.put("operation", "divide");
parameters.put("a", 10);
// Parametro mancante "b"
request.put("parameters", parameters);
// Inviare la richiesta e verificare la risposta di errore
mockMvc.perform(post("/mcp/execute")
.contentType(MediaType.APPLICATION_JSON)
.content(objectMapper.writeValueAsString(request)))
.andExpect(status().isBadRequest())
.andExpect(jsonPath("$.error").exists());
}
}Testa flussi completi dal prompt del modello all’esecuzione dello strumento:
@pytest.mark.asyncio
async def test_model_interaction_with_tool():
# Configura - Imposta il client MCP e il modello mock
mcp_client = McpClient(server_url="http://localhost:5000")
# Risposte del modello mock
mock_model = MockLanguageModel([
MockResponse(
"What's the weather in Seattle?",
tool_calls=[{
"tool_name": "weatherForecast",
"parameters": {"location": "Seattle", "days": 3}
}]
),
MockResponse(
"Here's the weather forecast for Seattle:\n- Today: 65°F, Partly Cloudy\n- Tomorrow: 68°F, Sunny\n- Day after: 62°F, Rain",
tool_calls=[]
)
])
# Risposta dello strumento meteo mock
with aioresponses() as mocked:
mocked.post(
"http://localhost:5000/mcp/execute",
payload={
"result": {
"location": "Seattle",
"forecast": [
{"date": "2023-06-01", "temperature": 65, "conditions": "Partly Cloudy"},
{"date": "2023-06-02", "temperature": 68, "conditions": "Sunny"},
{"date": "2023-06-03", "temperature": 62, "conditions": "Rain"}
]
}
}
)
# Agisci
response = await mcp_client.send_prompt(
"What's the weather in Seattle?",
model=mock_model,
allowed_tools=["weatherForecast"]
)
# Verifica
assert "Seattle" in response.generated_text
assert "65" in response.generated_text
assert "Sunny" in response.generated_text
assert "Rain" in response.generated_text
assert len(response.tool_calls) == 1
assert response.tool_calls[0].tool_name == "weatherForecast"Testa quante richieste concorrenti il tuo server MCP può gestire:
[Fact]
public async Task McpServer_HandlesHighConcurrency()
{
// Arrange
var server = new McpServer(
name: "TestServer",
version: "1.0",
maxConcurrentRequests: 100
);
server.RegisterTool(new FastExecutingTool());
await server.StartAsync();
var client = new McpClient("http://localhost:5000");
// Act
var tasks = new List<Task<McpResponse>>();
for (int i = 0; i < 1000; i++)
{
tasks.Add(client.ExecuteToolAsync("fastTool", new { iteration = i }));
}
var results = await Task.WhenAll(tasks);
// Assert
Assert.Equal(1000, results.Length);
Assert.All(results, r => Assert.NotNull(r));
}Testa il sistema sotto carichi estremi:
@Test
public void testServerUnderStress() {
int maxUsers = 1000;
int rampUpTimeSeconds = 60;
int testDurationSeconds = 300;
// Configura JMeter per il test di stress
StandardJMeterEngine jmeter = new StandardJMeterEngine();
// Configura il piano di test JMeter
HashTree testPlanTree = new HashTree();
// Crea piano di test, gruppo di thread, sampler, ecc.
TestPlan testPlan = new TestPlan("MCP Server Stress Test");
testPlanTree.add(testPlan);
ThreadGroup threadGroup = new ThreadGroup();
threadGroup.setNumThreads(maxUsers);
threadGroup.setRampUp(rampUpTimeSeconds);
threadGroup.setScheduler(true);
threadGroup.setDuration(testDurationSeconds);
testPlanTree.add(threadGroup);
// Aggiungi sampler HTTP per l'esecuzione dello strumento
HTTPSampler toolExecutionSampler = new HTTPSampler();
toolExecutionSampler.setDomain("localhost");
toolExecutionSampler.setPort(5000);
toolExecutionSampler.setPath("/mcp/execute");
toolExecutionSampler.setMethod("POST");
toolExecutionSampler.addArgument("toolName", "calculator");
toolExecutionSampler.addArgument("parameters", "{\"operation\":\"add\",\"a\":5,\"b\":7}");
threadGroup.add(toolExecutionSampler);
// Aggiungi listener
SummaryReport summaryReport = new SummaryReport();
threadGroup.add(summaryReport);
// Esegui il test
jmeter.configure(testPlanTree);
jmeter.run();
// Valida i risultati
assertEquals(0, summaryReport.getErrorCount());
assertTrue(summaryReport.getAverage() < 200); // Tempo di risposta medio < 200ms
assertTrue(summaryReport.getPercentile(90.0) < 500); // 90° percentile < 500ms
}Configura il monitoraggio per analisi delle prestazioni a lungo termine:
# Configura il monitoraggio per un server MCP
def configure_monitoring(server):
# Configura metriche Prometheus
prometheus_metrics = {
"request_count": Counter("mcp_requests_total", "Total MCP requests"),
"request_latency": Histogram(
"mcp_request_duration_seconds",
"Request duration in seconds",
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 2.5, 5.0, 10.0]
),
"tool_execution_count": Counter(
"mcp_tool_executions_total",
"Tool execution count",
labelnames=["tool_name"]
),
"tool_execution_latency": Histogram(
"mcp_tool_duration_seconds",
"Tool execution duration in seconds",
labelnames=["tool_name"],
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 2.5, 5.0, 10.0]
),
"tool_errors": Counter(
"mcp_tool_errors_total",
"Tool execution errors",
labelnames=["tool_name", "error_type"]
)
}
# Aggiungi middleware per temporizzazione e registrazione delle metriche
server.add_middleware(PrometheusMiddleware(prometheus_metrics))
# Esporre l'endpoint delle metriche
@server.router.get("/metrics")
async def metrics():
return generate_latest()
return serverWorkflow MCP ben progettati migliorano efficienza, affidabilità e manutenibilità. Ecco i pattern chiave da seguire:
Collega più strumenti in sequenza dove l’output di ciascuno diventa input per il prossimo:
# Implementazione della catena di strumenti in Python
class ChainWorkflow:
def __init__(self, tools_chain):
self.tools_chain = tools_chain # Elenco dei nomi degli strumenti da eseguire in sequenza
async def execute(self, mcp_client, initial_input):
current_result = initial_input
all_results = {"input": initial_input}
for tool_name in self.tools_chain:
# Esegui ogni strumento nella catena, passando il risultato precedente
response = await mcp_client.execute_tool(tool_name, current_result)
# Memorizza il risultato e usalo come input per il prossimo strumento
all_results[tool_name] = response.result
current_result = response.result
return {
"final_result": current_result,
"all_results": all_results
}
# Esempio di utilizzo
data_processing_chain = ChainWorkflow([
"dataFetch",
"dataCleaner",
"dataAnalyzer",
"dataVisualizer"
])
result = await data_processing_chain.execute(
mcp_client,
{"source": "sales_database", "table": "transactions"}
)Usa uno strumento centrale che distribuisce a strumenti specializzati basandosi sull’input:
public class ContentDispatcherTool : IMcpTool
{
private readonly IMcpClient _mcpClient;
public ContentDispatcherTool(IMcpClient mcpClient)
{
_mcpClient = mcpClient;
}
public string Name => "contentProcessor";
public string Description => "Processes content of various types";
public object GetSchema()
{
return new {
type = "object",
properties = new {
content = new { type = "string" },
contentType = new {
type = "string",
enum = new[] { "text", "html", "markdown", "csv", "code" }
},
operation = new {
type = "string",
enum = new[] { "summarize", "analyze", "extract", "convert" }
}
},
required = new[] { "content", "contentType", "operation" }
};
}
public async Task<ToolResponse> ExecuteAsync(ToolRequest request)
{
var content = request.Parameters.GetProperty("content").GetString();
var contentType = request.Parameters.GetProperty("contentType").GetString();
var operation = request.Parameters.GetProperty("operation").GetString();
// Determine which specialized tool to use
string targetTool = DetermineTargetTool(contentType, operation);
// Forward to the specialized tool
var specializedResponse = await _mcpClient.ExecuteToolAsync(
targetTool,
new { content, options = GetOptionsForTool(targetTool, operation) }
);
return new ToolResponse { Result = specializedResponse.Result };
}
private string DetermineTargetTool(string contentType, string operation)
{
return (contentType, operation) switch
{
("text", "summarize") => "textSummarizer",
("text", "analyze") => "textAnalyzer",
("html", _) => "htmlProcessor",
("markdown", _) => "markdownProcessor",
("csv", _) => "csvProcessor",
("code", _) => "codeAnalyzer",
_ => throw new ToolExecutionException($"No tool available for {contentType}/{operation}")
};
}
private object GetOptionsForTool(string toolName, string operation)
{
// Return appropriate options for each specialized tool
return toolName switch
{
"textSummarizer" => new { length = "medium" },
"htmlProcessor" => new { cleanUp = true, operation },
// Options for other tools...
_ => new { }
};
}
}Esegui più strumenti simultaneamente per efficienza:
public class ParallelDataProcessingWorkflow {
private final McpClient mcpClient;
public ParallelDataProcessingWorkflow(McpClient mcpClient) {
this.mcpClient = mcpClient;
}
public WorkflowResult execute(String datasetId) {
// Passo 1: Recupera i metadati del dataset (sincrono)
ToolResponse metadataResponse = mcpClient.executeTool("datasetMetadata",
Map.of("datasetId", datasetId));
// Passo 2: Avvia più analisi in parallelo
CompletableFuture<ToolResponse> statisticalAnalysis = CompletableFuture.supplyAsync(() ->
mcpClient.executeTool("statisticalAnalysis", Map.of(
"datasetId", datasetId,
"type", "comprehensive"
))
);
CompletableFuture<ToolResponse> correlationAnalysis = CompletableFuture.supplyAsync(() ->
mcpClient.executeTool("correlationAnalysis", Map.of(
"datasetId", datasetId,
"method", "pearson"
))
);
CompletableFuture<ToolResponse> outlierDetection = CompletableFuture.supplyAsync(() ->
mcpClient.executeTool("outlierDetection", Map.of(
"datasetId", datasetId,
"sensitivity", "medium"
))
);
// Attendi il completamento di tutti i task paralleli
CompletableFuture<Void> allAnalyses = CompletableFuture.allOf(
statisticalAnalysis, correlationAnalysis, outlierDetection
);
allAnalyses.join(); // Attendi il completamento
// Passo 3: Combina i risultati
Map<String, Object> combinedResults = new HashMap<>();
combinedResults.put("metadata", metadataResponse.getResult());
combinedResults.put("statistics", statisticalAnalysis.join().getResult());
combinedResults.put("correlations", correlationAnalysis.join().getResult());
combinedResults.put("outliers", outlierDetection.join().getResult());
// Passo 4: Genera il rapporto riepilogativo
ToolResponse summaryResponse = mcpClient.executeTool("reportGenerator",
Map.of("analysisResults", combinedResults));
// Restituisci il risultato completo del flusso di lavoro
WorkflowResult result = new WorkflowResult();
result.setDatasetId(datasetId);
result.setAnalysisResults(combinedResults);
result.setSummaryReport(summaryResponse.getResult());
return result;
}
}Implementa fallback graziosi per errori degli strumenti:
class ResilientWorkflow:
def __init__(self, mcp_client):
self.client = mcp_client
async def execute_with_fallback(self, primary_tool, fallback_tool, parameters):
try:
# Prova prima lo strumento principale
response = await self.client.execute_tool(primary_tool, parameters)
return {
"result": response.result,
"source": "primary",
"tool": primary_tool
}
except ToolExecutionException as e:
# Registra il fallimento
logging.warning(f"Primary tool '{primary_tool}' failed: {str(e)}")
# Passa allo strumento secondario
try:
# Potrebbe essere necessario trasformare i parametri per lo strumento di riserva
fallback_params = self._adapt_parameters(parameters, primary_tool, fallback_tool)
response = await self.client.execute_tool(fallback_tool, fallback_params)
return {
"result": response.result,
"source": "fallback",
"tool": fallback_tool,
"primaryError": str(e)
}
except ToolExecutionException as fallback_error:
# Entrambi gli strumenti hanno fallito
logging.error(f"Both primary and fallback tools failed. Fallback error: {str(fallback_error)}")
raise WorkflowExecutionException(
f"Workflow failed: primary error: {str(e)}; fallback error: {str(fallback_error)}"
)
def _adapt_parameters(self, params, from_tool, to_tool):
"""Adapt parameters between different tools if needed"""
# Questa implementazione dipenderebbe dagli strumenti specifici
# Per questo esempio, restituiremo solo i parametri originali
return params
# Esempio di utilizzo
async def get_weather(workflow, location):
return await workflow.execute_with_fallback(
"premiumWeatherService", # API meteo primaria (a pagamento)
"basicWeatherService", # API meteo di riserva (gratuita)
{"location": location}
)Costruisci workflow complessi componendo quelli più semplici:
public class CompositeWorkflow : IWorkflow
{
private readonly List<IWorkflow> _workflows;
public CompositeWorkflow(IEnumerable<IWorkflow> workflows)
{
_workflows = new List<IWorkflow>(workflows);
}
public async Task<WorkflowResult> ExecuteAsync(WorkflowContext context)
{
var results = new Dictionary<string, object>();
foreach (var workflow in _workflows)
{
var workflowResult = await workflow.ExecuteAsync(context);
// Store each workflow's result
results[workflow.Name] = workflowResult;
// Update context with the result for the next workflow
context = context.WithResult(workflow.Name, workflowResult);
}
return new WorkflowResult(results);
}
public string Name => "CompositeWorkflow";
public string Description => "Executes multiple workflows in sequence";
}
// Example usage
var documentWorkflow = new CompositeWorkflow(new IWorkflow[] {
new DocumentFetchWorkflow(),
new DocumentProcessingWorkflow(),
new InsightGenerationWorkflow(),
new ReportGenerationWorkflow()
});
var result = await documentWorkflow.ExecuteAsync(new WorkflowContext {
Parameters = new { documentId = "12345" }
});Il testing è un aspetto critico dello sviluppo di server MCP affidabili e di alta qualità. Questa guida fornisce best practice e consigli completi per testare i tuoi server MCP durante tutto il ciclo di sviluppo, dai test unitari ai test di integrazione e convalida end-to-end.
I server MCP fungono da middleware cruciale tra modelli AI e applicazioni client. Un testing approfondito garantisce:
- Affidabilità in ambienti di produzione
- Gestione accurata di richieste e risposte
- Implementazione corretta delle specifiche MCP
- Resilienza contro errori e casi limite
- Prestazioni costanti sotto carichi vari
I test unitari verificano singoli componenti del server MCP in isolamento.
- Gestori di Risorse: Testare la logica di ogni gestore di risorsa indipendentemente
- Implementazioni degli Strumenti: Verificare il comportamento degli strumenti con diversi input
- Template di Prompt: Assicurarsi che i template di prompt siano renderizzati correttamente
- Validazione dello Schema: Testare la logica di validazione dei parametri
- Gestione degli Errori: Verificare risposte di errore per input non validi
// Example unit test for a calculator tool in C#
[Fact]
public async Task CalculatorTool_Add_ReturnsCorrectSum()
{
// Arrange
var calculator = new CalculatorTool();
var parameters = new Dictionary<string, object>
{
["operation"] = "add",
["a"] = 5,
["b"] = 7
};
// Act
var response = await calculator.ExecuteAsync(parameters);
var result = JsonSerializer.Deserialize<CalculationResult>(response.Content[0].ToString());
// Assert
Assert.Equal(12, result.Value);
}# Esempio di test unitario per uno strumento calcolatrice in Python
def test_calculator_tool_add():
# Prepara
calculator = CalculatorTool()
parameters = {
"operation": "add",
"a": 5,
"b": 7
}
# Agisci
response = calculator.execute(parameters)
result = json.loads(response.content[0].text)
# Verifica
assert result["value"] == 12I test di integrazione verificano le interazioni tra i componenti del server MCP.
- Inizializzazione del Server: Testare l’avvio del server con varie configurazioni
- Registrazione delle Rotte: Verificare che tutti gli endpoint siano correttamente registrati
- Elaborazione delle Richieste: Testare il ciclo completo richiesta-risposta
- Propagazione degli Errori: Assicurare che gli errori siano gestiti correttamente tra i componenti
- Autenticazione e Autorizzazione: Testare i meccanismi di sicurezza
// Example integration test for MCP server in C#
[Fact]
public async Task Server_ProcessToolRequest_ReturnsValidResponse()
{
// Arrange
var server = new McpServer();
server.RegisterTool(new CalculatorTool());
await server.StartAsync();
var request = new McpRequest
{
Tool = "calculator",
Parameters = new Dictionary<string, object>
{
["operation"] = "multiply",
["a"] = 6,
["b"] = 7
}
};
// Act
var response = await server.ProcessRequestAsync(request);
// Assert
Assert.NotNull(response);
Assert.Equal(McpStatusCodes.Success, response.StatusCode);
// Additional assertions for response content
// Cleanup
await server.StopAsync();
}I test end-to-end verificano il comportamento completo del sistema da client a server.
- Comunicazione Client-Server: Testare cicli completi di richiesta-risposta
- SDK Client Reali: Testare con implementazioni client reali
- Prestazioni sotto Carico: Verificare il comportamento con più richieste concorrenti
- Recupero dagli Errori: Testare il recupero del sistema da errori
- Operazioni a Lunga Durata: Verificare la gestione di streaming e operazioni prolungate
// Esempio di test E2E con un client in TypeScript
describe('MCP Server E2E Tests', () => {
let client: McpClient;
beforeAll(async () => {
// Avvia il server in ambiente di test
await startTestServer();
client = new McpClient('http://localhost:5000');
});
afterAll(async () => {
await stopTestServer();
});
test('Client can invoke calculator tool and get correct result', async () => {
// Agisci
const response = await client.invokeToolAsync('calculator', {
operation: 'divide',
a: 20,
b: 4
});
// Asserisci
expect(response.statusCode).toBe(200);
expect(response.content[0].text).toContain('5');
});
});Il mocking è essenziale per isolare i componenti durante il testing.
- Modelli AI Esterni: Mock delle risposte dei modelli per un testing prevedibile
- Servizi Esterni: Mock delle dipendenze API (database, servizi di terze parti)
- Servizi di Autenticazione: Mock dei provider di identità
- Provider di Risorse: Mock dei gestori di risorse costose
// C# example with Moq
var mockModel = new Mock<ILanguageModel>();
mockModel
.Setup(m => m.GenerateResponseAsync(
It.IsAny<string>(),
It.IsAny<McpRequestContext>()))
.ReturnsAsync(new ModelResponse {
Text = "Mocked model response",
FinishReason = FinishReason.Completed
});
var server = new McpServer(modelClient: mockModel.Object);# Esempio Python con unittest.mock
@patch('mcp_server.models.OpenAIModel')
def test_with_mock_model(mock_model):
# Configura il mock
mock_model.return_value.generate_response.return_value = {
"text": "Mocked model response",
"finish_reason": "completed"
}
# Usa il mock nel test
server = McpServer(model_client=mock_model)
# Continua con il testIl testing delle prestazioni è cruciale per server MCP in produzione.
- Latenza: Tempo di risposta alle richieste
- Throughput: Richieste gestite al secondo
- Utilizzo delle Risorse: Uso di CPU, memoria, network
- Gestione della Concorrenza: Comportamento sotto richieste parallele
- Caratteristiche di Scalabilità: Prestazioni con l’aumento del carico
- k6: Strumento open-source per load testing
- JMeter: Testing completo delle prestazioni
- Locust: Load testing basato su Python
- Azure Load Testing: Testing delle prestazioni basato su cloud
// script k6 per il test di carico del server MCP
import http from 'k6/http';
import { check, sleep } from 'k6';
export const options = {
vus: 10, // 10 utenti virtuali
duration: '30s',
};
export default function () {
const payload = JSON.stringify({
tool: 'calculator',
parameters: {
operation: 'add',
a: Math.floor(Math.random() * 100),
b: Math.floor(Math.random() * 100)
}
});
const params = {
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer test-token'
},
};
const res = http.post('http://localhost:5000/api/tools/invoke', payload, params);
check(res, {
'status is 200': (r) => r.status === 200,
'response time < 500ms': (r) => r.timings.duration < 500,
});
sleep(1);
}Automatizzare i test assicura qualità costante e cicli di feedback più rapidi.
- Eseguire test unitari sulle pull request: Assicurarsi che le modifiche al codice non compromettano le funzionalità esistenti
- Test di integrazione in staging: Eseguire test di integrazione negli ambienti pre-produzione
- Riferimenti di prestazioni: Mantenere benchmark di prestazioni per individuare regressioni
- Scansioni di sicurezza: Automatizzare i test di sicurezza come parte della pipeline
name: MCP Server Tests
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Runtime
uses: actions/setup-dotnet@v1
with:
dotnet-version: '8.0.x'
- name: Restore dependencies
run: dotnet restore
- name: Build
run: dotnet build --no-restore
- name: Unit Tests
run: dotnet test --no-build --filter Category=Unit
- name: Integration Tests
run: dotnet test --no-build --filter Category=Integration
- name: Performance Tests
run: dotnet run --project tests/PerformanceTests/PerformanceTests.csprojVerifica che il tuo server implementi correttamente la specifica MCP.
- Endpoint API: Testare gli endpoint richiesti (/resources, /tools, ecc.)
- Formato richiesta/risposta: Validare la conformità dello schema
- Codici di errore: Verificare i codici di stato corretti per vari scenari
- Tipi di contenuto: Testare la gestione di diversi tipi di contenuto
- Flusso di autenticazione: Verificare i meccanismi di autenticazione conformi alla specifica
[Fact]
public async Task Server_ResourceEndpoint_ReturnsCorrectSchema()
{
// Arrange
var client = new HttpClient();
client.DefaultRequestHeaders.Add("Authorization", "Bearer test-token");
// Act
var response = await client.GetAsync("http://localhost:5000/api/resources");
var content = await response.Content.ReadAsStringAsync();
var resources = JsonSerializer.Deserialize<ResourceList>(content);
// Assert
Assert.Equal(HttpStatusCode.OK, response.StatusCode);
Assert.NotNull(resources);
Assert.All(resources.Resources, resource =>
{
Assert.NotNull(resource.Id);
Assert.NotNull(resource.Type);
// Additional schema validation
});
}- Testare le definizioni degli strumenti separatamente: Verificare le definizioni dello schema indipendentemente dalla logica degli strumenti
- Usare test parametrizzati: Testare gli strumenti con una varietà di input, inclusi casi limite
- Verificare le risposte di errore: Assicurarsi che la gestione degli errori sia corretta in tutte le condizioni di errore possibili
- Testare la logica di autorizzazione: Garantire il controllo accessi corretto per diversi ruoli utente
- Monitorare la copertura dei test: Puntare a una copertura elevata del codice del percorso critico
- Testare le risposte in streaming: Verificare la corretta gestione dei contenuti in streaming
- Simulare problemi di rete: Testare il comportamento in condizioni di rete sfavorevoli
- Testare i limiti delle risorse: Verificare il comportamento a raggiungimento di quote o limiti di velocità
- Automatizzare i test di regressione: Costruire una suite che venga eseguita a ogni modifica del codice
- Documentare i casi di test: Mantenere una documentazione chiara degli scenari di test
- Affidarsi troppo ai test del cammino positivo: Assicurarsi di testare approfonditamente i casi di errore
- Ignorare i test di prestazioni: Identificare i colli di bottiglia prima che impattino la produzione
- Testare solo in isolamento: Combinare test unitari, di integrazione e end-to-end
- Copertura API incompleta: Assicurarsi che tutti gli endpoint e le funzionalità siano testati
- Ambientazioni di test non coerenti: Usare container per garantire ambienti di test coerenti
Una strategia di testing completa è essenziale per sviluppare server MCP affidabili e di alta qualità. Implementando le migliori pratiche e i suggerimenti illustrati in questa guida, puoi garantire che le tue implementazioni MCP soddisfino i più alti standard di qualità, affidabilità e prestazioni.
- Progettazione degli strumenti: Seguire il principio di responsabilità singola, usare l'iniezione delle dipendenze e progettare per la composabilità
- Progettazione degli schemi: Creare schemi chiari, ben documentati con opportune restrizioni di validazione
- Gestione degli errori: Implementare una gestione degli errori elegante, risposte strutturate e logica di ritentativo
- Prestazioni: Usare caching, elaborazione asincrona e limitazione delle risorse
- Sicurezza: Applicare una validazione approfondita degli input, controlli di autorizzazione e gestione dei dati sensibili
- Testing: Creare test unitari, di integrazione e end-to-end completi
- Pattern di workflow: Applicare pattern consolidati come catene, dispatcher e elaborazione in parallelo
Progetta uno strumento MCP e un workflow per un sistema di elaborazione documenti che:
- Accetti documenti in più formati (PDF, DOCX, TXT)
- Estragga testo e informazioni chiave dai documenti
- Classifichi i documenti per tipo e contenuto
- Generi un riepilogo di ogni documento
Implementa gli schemi dello strumento, la gestione degli errori e un pattern di workflow che meglio si adatti a questo scenario. Considera come testeresti questa implementazione.
- Unisciti alla comunità MCP su Azure AI Foundry Discord Community per rimanere aggiornato sugli ultimi sviluppi
- Contribuisci a progetti open-source MCP
- Applica i principi MCP nelle iniziative di AI della tua organizzazione
- Esplora implementazioni MCP specializzate per il tuo settore
- Considera di seguire corsi avanzati su argomenti MCP specifici, come integrazione multimodale o integrazione di applicazioni enterprise
- Sperimenta costruendo i tuoi strumenti e workflow MCP utilizzando i principi appresi attraverso il Hands on Lab
Prossimo: Case Studies
Disclaimer: Questo documento è stato tradotto utilizzando il servizio di traduzione automatica Co-op Translator. Sebbene ci impegniamo per garantire accuratezza, si prega di notare che le traduzioni automatiche possono contenere errori o inesattezze. Il documento originale nella sua lingua natale deve essere considerato la fonte autorevole. Per informazioni critiche si raccomanda la traduzione professionale umana. Non siamo responsabili per eventuali malintesi o interpretazioni errate derivanti dall'uso di questa traduzione.
