Ez az útmutató az MCP protokoll fejlettebb jellemzőit tárgyalja, amelyek túlmutatnak az alapvető eszköz- és erőforrás-kezelésen. Ezeknek a jellemzőknek a megértése segít robosztusabb, felhasználóbarátabb és éles környezetre kész MCP szervereket építeni.
- Folyamatjelzések - Hosszú ideig futó műveletek előrehaladásának jelentése
- Kérés Megszakítása - Ügyfeleknek lehetőség a folyamatban lévő kérések törlésére
- Erőforrás Sablonok - Dinamikus erőforrás URI-k paraméterekkel
- Szerver Életciklus Események - Megfelelő inicializálás és leállítás
- Naplózás Szabályozás - Szerveroldali naplózási beállítások
- Hibakezelési Minták - Következetes hibaválaszok
Az időigényes műveletek esetén (adatfeldolgozás, fájlletöltés, API hívások) a folyamatjelzések tájékoztatják a felhasználókat.
sequenceDiagram
participant Client
participant Server
Client->>Server: tools/call (hosszú művelet)
Server-->>Client: értesítés: előrehaladás 10%
Server-->>Client: értesítés: előrehaladás 50%
Server-->>Client: értesítés: előrehaladás 90%
Server->>Client: eredmény (kész)
from mcp.server import Server, NotificationOptions
from mcp.types import ProgressNotification
import asyncio
app = Server("progress-server")
@app.tool()
async def process_large_file(file_path: str, ctx) -> str:
"""Process a large file with progress updates."""
# Fájlméret lekérése a folyamatjelző számításhoz
file_size = os.path.getsize(file_path)
processed = 0
with open(file_path, 'rb') as f:
while chunk := f.read(8192):
# Feldolgozás darabonként
await process_chunk(chunk)
processed += len(chunk)
# Folyamatjelző értesítés küldése
progress = (processed / file_size) * 100
await ctx.send_notification(
ProgressNotification(
progressToken=ctx.request_id,
progress=progress,
total=100,
message=f"Processing: {progress:.1f}%"
)
)
return f"Processed {file_size} bytes"
@app.tool()
async def batch_operation(items: list[str], ctx) -> str:
"""Process multiple items with progress."""
results = []
total = len(items)
for i, item in enumerate(items):
result = await process_item(item)
results.append(result)
# Folyamatjelző jelentése minden elem után
await ctx.send_notification(
ProgressNotification(
progressToken=ctx.request_id,
progress=i + 1,
total=total,
message=f"Processed {i + 1}/{total}: {item}"
)
)
return f"Completed {total} items"import { Server } from "@modelcontextprotocol/sdk/server/index.js";
server.setRequestHandler(CallToolSchema, async (request, extra) => {
const { name, arguments: args } = request.params;
if (name === "process_data") {
const items = args.items as string[];
const results = [];
for (let i = 0; i < items.length; i++) {
const result = await processItem(items[i]);
results.push(result);
// Küldjön haladási értesítést
await extra.sendNotification({
method: "notifications/progress",
params: {
progressToken: request.id,
progress: i + 1,
total: items.length,
message: `Processing item ${i + 1}/${items.length}`
}
});
}
return { content: [{ type: "text", text: JSON.stringify(results) }] };
}
});async def handle_progress(notification):
"""Handle progress notifications from server."""
params = notification.params
print(f"Progress: {params.progress}/{params.total} - {params.message}")
# Kezelő regisztrálása
session.on_notification("notifications/progress", handle_progress)
# Eszköz hívása (a folyamat előrehaladásáról szóló frissítések a kezelőn keresztül érkeznek)
result = await session.call_tool("process_large_file", {"file_path": "/data/large.csv"})Lehetővé teszi, hogy az ügyfelek megszakítsák azokat a kéréseket, amelyekre már nincs szükség vagy amelyek túl sokáig tartanak.
from mcp.server import Server
from mcp.types import CancelledError
import asyncio
app = Server("cancellable-server")
@app.tool()
async def long_running_search(query: str, ctx) -> str:
"""Search that can be cancelled."""
results = []
try:
for page in range(100): # Több oldalon keresés
# Ellenőrizze, hogy a törlés kérése megtörtént-e
if ctx.is_cancelled:
raise CancelledError("Search cancelled by user")
# Oldalkeresés szimulálása
page_results = await search_page(query, page)
results.extend(page_results)
# Kis késleltetés lehetővé teszi a törlés ellenőrzését
await asyncio.sleep(0.1)
except CancelledError:
# Részleges eredmények visszaadása
return f"Cancelled. Found {len(results)} results before cancellation."
return f"Found {len(results)} total results"
@app.tool()
async def download_file(url: str, ctx) -> str:
"""Download with cancellation support."""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
total_size = int(response.headers.get('content-length', 0))
downloaded = 0
chunks = []
async for chunk in response.content.iter_chunked(8192):
if ctx.is_cancelled:
return f"Download cancelled at {downloaded}/{total_size} bytes"
chunks.append(chunk)
downloaded += len(chunk)
return f"Downloaded {downloaded} bytes"class CancellableContext:
"""Context object that tracks cancellation state."""
def __init__(self, request_id: str):
self.request_id = request_id
self._cancelled = asyncio.Event()
self._cancel_reason = None
@property
def is_cancelled(self) -> bool:
return self._cancelled.is_set()
def cancel(self, reason: str = "Cancelled"):
self._cancel_reason = reason
self._cancelled.set()
async def check_cancelled(self):
"""Raise if cancelled, otherwise continue."""
if self.is_cancelled:
raise CancelledError(self._cancel_reason)
async def sleep_or_cancel(self, seconds: float):
"""Sleep that can be interrupted by cancellation."""
try:
await asyncio.wait_for(
self._cancelled.wait(),
timeout=seconds
)
raise CancelledError(self._cancel_reason)
except asyncio.TimeoutError:
pass # Normál időtúllépés, folytatásimport asyncio
async def search_with_timeout(session, query, timeout=30):
"""Search with automatic cancellation on timeout."""
task = asyncio.create_task(
session.call_tool("long_running_search", {"query": query})
)
try:
result = await asyncio.wait_for(task, timeout=timeout)
return result
except asyncio.TimeoutError:
# Lemondás kérése
await session.send_notification({
"method": "notifications/cancelled",
"params": {"requestId": task.request_id, "reason": "Timeout"}
})
return "Search timed out"Az erőforrás sablonok lehetővé teszik dinamikus URI felépítését paraméterekkel, ami hasznos API-k és adatbázisok esetén.
from mcp.server import Server
from mcp.types import ResourceTemplate
app = Server("template-server")
@app.list_resource_templates()
async def list_templates() -> list[ResourceTemplate]:
"""Return available resource templates."""
return [
ResourceTemplate(
uriTemplate="db://users/{user_id}",
name="User Profile",
description="Fetch user profile by ID",
mimeType="application/json"
),
ResourceTemplate(
uriTemplate="api://weather/{city}/{date}",
name="Weather Data",
description="Historical weather for city and date",
mimeType="application/json"
),
ResourceTemplate(
uriTemplate="file://{path}",
name="File Content",
description="Read file at given path",
mimeType="text/plain"
)
]
@app.read_resource()
async def read_resource(uri: str) -> str:
"""Read resource, expanding template parameters."""
# Elemezze az URI-t a paraméterek kinyeréséhez
if uri.startswith("db://users/"):
user_id = uri.split("/")[-1]
return await fetch_user(user_id)
elif uri.startswith("api://weather/"):
parts = uri.replace("api://weather/", "").split("/")
city, date = parts[0], parts[1]
return await fetch_weather(city, date)
elif uri.startswith("file://"):
path = uri.replace("file://", "")
return await read_file(path)
raise ValueError(f"Unknown resource URI: {uri}")server.setRequestHandler(ListResourceTemplatesSchema, async () => {
return {
resourceTemplates: [
{
uriTemplate: "github://repos/{owner}/{repo}/issues/{issue_number}",
name: "GitHub Issue",
description: "Fetch a specific GitHub issue",
mimeType: "application/json"
},
{
uriTemplate: "db://tables/{table}/rows/{id}",
name: "Database Row",
description: "Fetch a row from a database table",
mimeType: "application/json"
}
]
};
});
server.setRequestHandler(ReadResourceSchema, async (request) => {
const uri = request.params.uri;
// GitHub probléma URI-ának elemzése
const githubMatch = uri.match(/^github:\/\/repos\/([^/]+)\/([^/]+)\/issues\/(\d+)$/);
if (githubMatch) {
const [_, owner, repo, issueNumber] = githubMatch;
const issue = await fetchGitHubIssue(owner, repo, parseInt(issueNumber));
return {
contents: [{
uri,
mimeType: "application/json",
text: JSON.stringify(issue, null, 2)
}]
};
}
throw new Error(`Unknown resource URI: ${uri}`);
});A megfelelő inicializálás és leállítás biztosítja az erőforrások tiszta kezelését.
from mcp.server import Server
from contextlib import asynccontextmanager
app = Server("lifecycle-server")
# Megosztott állapot
db_connection = None
cache = None
@asynccontextmanager
async def lifespan(server: Server):
"""Manage server lifecycle."""
global db_connection, cache
# Indítás
print("🚀 Server starting...")
db_connection = await create_database_connection()
cache = await create_cache_client()
print("✅ Resources initialized")
yield # A szerver itt fut
# Leállítás
print("🛑 Server shutting down...")
await db_connection.close()
await cache.close()
print("✅ Resources cleaned up")
app = Server("lifecycle-server", lifespan=lifespan)
@app.tool()
async def query_database(sql: str) -> str:
"""Use the shared database connection."""
result = await db_connection.execute(sql)
return str(result)import { Server } from "@modelcontextprotocol/sdk/server/index.js";
class ManagedServer {
private server: Server;
private dbConnection: DatabaseConnection | null = null;
constructor() {
this.server = new Server({
name: "lifecycle-server",
version: "1.0.0"
});
this.setupHandlers();
}
async start() {
// Erőforrások inicializálása
console.log("🚀 Server starting...");
this.dbConnection = await createDatabaseConnection();
console.log("✅ Database connected");
// Szerver indítása
await this.server.connect(transport);
}
async stop() {
// Erőforrások tisztítása
console.log("🛑 Server shutting down...");
if (this.dbConnection) {
await this.dbConnection.close();
}
await this.server.close();
console.log("✅ Cleanup complete");
}
private setupHandlers() {
this.server.setRequestHandler(CallToolSchema, async (request) => {
// A this.dbConnection biztonságos használata
// ...
});
}
}
// Használat szép leállítással
const server = new ManagedServer();
process.on('SIGINT', async () => {
await server.stop();
process.exit(0);
});
await server.start();Az MCP támogatja a szerveroldali naplózási szinteket, amelyeket az ügyfelek vezérelhetnek.
from mcp.server import Server
from mcp.types import LoggingLevel
import logging
app = Server("logging-server")
# MCP szintek leképezése a Python naplózási szintekre
LEVEL_MAP = {
LoggingLevel.DEBUG: logging.DEBUG,
LoggingLevel.INFO: logging.INFO,
LoggingLevel.WARNING: logging.WARNING,
LoggingLevel.ERROR: logging.ERROR,
}
logger = logging.getLogger("mcp-server")
@app.set_logging_level()
async def set_logging_level(level: LoggingLevel) -> None:
"""Handle client request to change logging level."""
python_level = LEVEL_MAP.get(level, logging.INFO)
logger.setLevel(python_level)
logger.info(f"Logging level set to {level}")
@app.tool()
async def debug_operation(data: str) -> str:
"""Tool with various logging levels."""
logger.debug(f"Processing data: {data}")
try:
result = process(data)
logger.info(f"Successfully processed: {result}")
return result
except Exception as e:
logger.error(f"Processing failed: {e}")
raise@app.tool()
async def complex_operation(input: str, ctx) -> str:
"""Operation that logs to client."""
# Küldjön napló értesítést az ügyfélnek
await ctx.send_log(
level="info",
message=f"Starting complex operation with input: {input}"
)
# Végezze el a munkát...
result = await do_work(input)
await ctx.send_log(
level="debug",
message=f"Operation complete, result size: {len(result)}"
)
return resultA következetes hibakezelés javítja a hibakeresést és a felhasználói élményt.
from mcp.types import McpError, ErrorCode
class ToolError(McpError):
"""Base class for tool errors."""
pass
class ValidationError(ToolError):
"""Invalid input parameters."""
def __init__(self, message: str):
super().__init__(ErrorCode.INVALID_PARAMS, message)
class NotFoundError(ToolError):
"""Requested resource not found."""
def __init__(self, resource: str):
super().__init__(ErrorCode.INVALID_REQUEST, f"Not found: {resource}")
class PermissionError(ToolError):
"""Access denied."""
def __init__(self, action: str):
super().__init__(ErrorCode.INVALID_REQUEST, f"Permission denied: {action}")
class InternalError(ToolError):
"""Internal server error."""
def __init__(self, message: str):
super().__init__(ErrorCode.INTERNAL_ERROR, message)@app.tool()
async def safe_operation(input: str) -> str:
"""Tool with comprehensive error handling."""
# Érvényesítsd a bemenetet
if not input:
raise ValidationError("Input cannot be empty")
if len(input) > 10000:
raise ValidationError(f"Input too large: {len(input)} chars (max 10000)")
try:
# Ellenőrizd az engedélyeket
if not await check_permission(input):
raise PermissionError(f"read {input}")
# Hajtsd végre a műveletet
result = await perform_operation(input)
if result is None:
raise NotFoundError(input)
return result
except ConnectionError as e:
raise InternalError(f"Database connection failed: {e}")
except TimeoutError as e:
raise InternalError(f"Operation timed out: {e}")
except Exception as e:
# Naplózd a váratlan hibákat
logger.exception(f"Unexpected error in safe_operation")
raise InternalError(f"Unexpected error: {type(e).__name__}")import { McpError, ErrorCode } from "@modelcontextprotocol/sdk/types.js";
function validateInput(data: unknown): asserts data is ValidInput {
if (typeof data !== "object" || data === null) {
throw new McpError(
ErrorCode.InvalidParams,
"Input must be an object"
);
}
// További érvényesítés...
}
server.setRequestHandler(CallToolSchema, async (request) => {
try {
validateInput(request.params.arguments);
const result = await performOperation(request.params.arguments);
return {
content: [{ type: "text", text: JSON.stringify(result) }]
};
} catch (error) {
if (error instanceof McpError) {
throw error; // Már MCP hiba
}
// Egyéb hibák átalakítása
if (error instanceof NotFoundError) {
throw new McpError(ErrorCode.InvalidRequest, error.message);
}
// Ismeretlen hiba
console.error("Unexpected error:", error);
throw new McpError(
ErrorCode.InternalError,
"An unexpected error occurred"
);
}
});Ezek a jellemzők a specifikáció szerint kísérleti státuszban vannak:
# A feladatok lehetővé teszik a hosszú futású műveletek állapotának nyomon követését
@app.task()
async def training_task(model_id: str, data_path: str, ctx) -> str:
"""Long-running ML training task."""
# Feladat elindítva jelzés
await ctx.report_status("running", "Initializing training...")
# Tanulási ciklus
for epoch in range(100):
await train_epoch(model_id, data_path, epoch)
await ctx.report_status(
"running",
f"Training epoch {epoch + 1}/100",
progress=epoch + 1,
total=100
)
await ctx.report_status("completed", "Training finished")
return f"Model {model_id} trained successfully"# Az annotációk metaadatokat szolgáltatnak az eszköz viselkedéséről
@app.tool(
annotations={
"destructive": False, # Nem módosít adatokat
"idempotent": True, # Biztonságos ismételni
"timeout_seconds": 30, # Várt maximális időtartam
"requires_approval": False # Nem szükséges felhasználói jóváhagyás
}
)
async def safe_query(query: str) -> str:
"""A read-only database query tool."""
return await execute_read_query(query)Felelősség kizárása:
Ezt a dokumentumot az AI fordító szolgáltatás, a Co-op Translator segítségével fordítottuk le. Bár törekszünk a pontosságra, kérjük, vegye figyelembe, hogy az automatikus fordítások hibákat vagy pontatlanságokat tartalmazhatnak. Az eredeti, anyanyelvi dokumentum tekintendő hiteles forrásnak. Kritikus információk esetén szakmai, emberi fordítást javaslunk. Nem vállalunk felelősséget az e fordítás használatából eredő félreértésekért vagy félreértelmezésekért.