Hướng dẫn này khám phá các tính năng nâng cao của giao thức MCP vượt ra ngoài việc xử lý công cụ và tài nguyên cơ bản. Hiểu các tính năng này giúp bạn xây dựng các máy chủ MCP mạnh mẽ hơn, thân thiện với người dùng và sẵn sàng cho môi trường sản xuất.
- Thông báo tiến độ - Báo cáo tiến độ cho các thao tác chạy lâu
- Hủy yêu cầu - Cho phép khách hàng hủy các yêu cầu đang xử lý
- Mẫu tài nguyên - URI tài nguyên động với các tham số
- Sự kiện vòng đời máy chủ - Khởi tạo và tắt máy đúng cách
- Kiểm soát ghi log - Cấu hình ghi log phía máy chủ
- Mẫu xử lý lỗi - Phản hồi lỗi nhất quán
Đối với các thao tác mất thời gian (xử lý dữ liệu, tải file, gọi API), thông báo tiến độ giúp người dùng luôn được cập nhật.
sequenceDiagram
participant Client
participant Server
Client->>Server: tools/call (thao tác dài)
Server-->>Client: thông báo: tiến độ 10%
Server-->>Client: thông báo: tiến độ 50%
Server-->>Client: thông báo: tiến độ 90%
Server->>Client: kết quả (hoàn thành)
from mcp.server import Server, NotificationOptions
from mcp.types import ProgressNotification
import asyncio
app = Server("progress-server")
@app.tool()
async def process_large_file(file_path: str, ctx) -> str:
"""Process a large file with progress updates."""
# Lấy kích thước tệp để tính tiến trình
file_size = os.path.getsize(file_path)
processed = 0
with open(file_path, 'rb') as f:
while chunk := f.read(8192):
# Xử lý phần dữ liệu
await process_chunk(chunk)
processed += len(chunk)
# Gửi thông báo tiến trình
progress = (processed / file_size) * 100
await ctx.send_notification(
ProgressNotification(
progressToken=ctx.request_id,
progress=progress,
total=100,
message=f"Processing: {progress:.1f}%"
)
)
return f"Processed {file_size} bytes"
@app.tool()
async def batch_operation(items: list[str], ctx) -> str:
"""Process multiple items with progress."""
results = []
total = len(items)
for i, item in enumerate(items):
result = await process_item(item)
results.append(result)
# Báo cáo tiến trình sau mỗi mục
await ctx.send_notification(
ProgressNotification(
progressToken=ctx.request_id,
progress=i + 1,
total=total,
message=f"Processed {i + 1}/{total}: {item}"
)
)
return f"Completed {total} items"import { Server } from "@modelcontextprotocol/sdk/server/index.js";
server.setRequestHandler(CallToolSchema, async (request, extra) => {
const { name, arguments: args } = request.params;
if (name === "process_data") {
const items = args.items as string[];
const results = [];
for (let i = 0; i < items.length; i++) {
const result = await processItem(items[i]);
results.push(result);
// Gửi thông báo tiến trình
await extra.sendNotification({
method: "notifications/progress",
params: {
progressToken: request.id,
progress: i + 1,
total: items.length,
message: `Processing item ${i + 1}/${items.length}`
}
});
}
return { content: [{ type: "text", text: JSON.stringify(results) }] };
}
});async def handle_progress(notification):
"""Handle progress notifications from server."""
params = notification.params
print(f"Progress: {params.progress}/{params.total} - {params.message}")
# Đăng ký trình xử lý
session.on_notification("notifications/progress", handle_progress)
# Gọi công cụ (các cập nhật tiến trình sẽ được gửi qua trình xử lý)
result = await session.call_tool("process_large_file", {"file_path": "/data/large.csv"})Cho phép khách hàng hủy các yêu cầu không còn cần thiết hoặc đang mất quá nhiều thời gian.
from mcp.server import Server
from mcp.types import CancelledError
import asyncio
app = Server("cancellable-server")
@app.tool()
async def long_running_search(query: str, ctx) -> str:
"""Search that can be cancelled."""
results = []
try:
for page in range(100): # Tìm kiếm qua nhiều trang
# Kiểm tra xem có yêu cầu hủy bỏ không
if ctx.is_cancelled:
raise CancelledError("Search cancelled by user")
# Mô phỏng việc tìm kiếm trang
page_results = await search_page(query, page)
results.extend(page_results)
# Trì hoãn nhỏ cho phép kiểm tra hủy bỏ
await asyncio.sleep(0.1)
except CancelledError:
# Trả về kết quả một phần
return f"Cancelled. Found {len(results)} results before cancellation."
return f"Found {len(results)} total results"
@app.tool()
async def download_file(url: str, ctx) -> str:
"""Download with cancellation support."""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
total_size = int(response.headers.get('content-length', 0))
downloaded = 0
chunks = []
async for chunk in response.content.iter_chunked(8192):
if ctx.is_cancelled:
return f"Download cancelled at {downloaded}/{total_size} bytes"
chunks.append(chunk)
downloaded += len(chunk)
return f"Downloaded {downloaded} bytes"class CancellableContext:
"""Context object that tracks cancellation state."""
def __init__(self, request_id: str):
self.request_id = request_id
self._cancelled = asyncio.Event()
self._cancel_reason = None
@property
def is_cancelled(self) -> bool:
return self._cancelled.is_set()
def cancel(self, reason: str = "Cancelled"):
self._cancel_reason = reason
self._cancelled.set()
async def check_cancelled(self):
"""Raise if cancelled, otherwise continue."""
if self.is_cancelled:
raise CancelledError(self._cancel_reason)
async def sleep_or_cancel(self, seconds: float):
"""Sleep that can be interrupted by cancellation."""
try:
await asyncio.wait_for(
self._cancelled.wait(),
timeout=seconds
)
raise CancelledError(self._cancel_reason)
except asyncio.TimeoutError:
pass # Hết thời gian bình thường, tiếp tụcimport asyncio
async def search_with_timeout(session, query, timeout=30):
"""Search with automatic cancellation on timeout."""
task = asyncio.create_task(
session.call_tool("long_running_search", {"query": query})
)
try:
result = await asyncio.wait_for(task, timeout=timeout)
return result
except asyncio.TimeoutError:
# Yêu cầu hủy bỏ
await session.send_notification({
"method": "notifications/cancelled",
"params": {"requestId": task.request_id, "reason": "Timeout"}
})
return "Search timed out"Mẫu tài nguyên cho phép xây dựng URI động với các tham số, hữu ích cho API và cơ sở dữ liệu.
from mcp.server import Server
from mcp.types import ResourceTemplate
app = Server("template-server")
@app.list_resource_templates()
async def list_templates() -> list[ResourceTemplate]:
"""Return available resource templates."""
return [
ResourceTemplate(
uriTemplate="db://users/{user_id}",
name="User Profile",
description="Fetch user profile by ID",
mimeType="application/json"
),
ResourceTemplate(
uriTemplate="api://weather/{city}/{date}",
name="Weather Data",
description="Historical weather for city and date",
mimeType="application/json"
),
ResourceTemplate(
uriTemplate="file://{path}",
name="File Content",
description="Read file at given path",
mimeType="text/plain"
)
]
@app.read_resource()
async def read_resource(uri: str) -> str:
"""Read resource, expanding template parameters."""
# Phân tích URI để trích xuất các tham số
if uri.startswith("db://users/"):
user_id = uri.split("/")[-1]
return await fetch_user(user_id)
elif uri.startswith("api://weather/"):
parts = uri.replace("api://weather/", "").split("/")
city, date = parts[0], parts[1]
return await fetch_weather(city, date)
elif uri.startswith("file://"):
path = uri.replace("file://", "")
return await read_file(path)
raise ValueError(f"Unknown resource URI: {uri}")server.setRequestHandler(ListResourceTemplatesSchema, async () => {
return {
resourceTemplates: [
{
uriTemplate: "github://repos/{owner}/{repo}/issues/{issue_number}",
name: "GitHub Issue",
description: "Fetch a specific GitHub issue",
mimeType: "application/json"
},
{
uriTemplate: "db://tables/{table}/rows/{id}",
name: "Database Row",
description: "Fetch a row from a database table",
mimeType: "application/json"
}
]
};
});
server.setRequestHandler(ReadResourceSchema, async (request) => {
const uri = request.params.uri;
// Phân tích URI của vấn đề GitHub
const githubMatch = uri.match(/^github:\/\/repos\/([^/]+)\/([^/]+)\/issues\/(\d+)$/);
if (githubMatch) {
const [_, owner, repo, issueNumber] = githubMatch;
const issue = await fetchGitHubIssue(owner, repo, parseInt(issueNumber));
return {
contents: [{
uri,
mimeType: "application/json",
text: JSON.stringify(issue, null, 2)
}]
};
}
throw new Error(`Unknown resource URI: ${uri}`);
});Quản lý khởi tạo và tắt máy đúng cách đảm bảo quản lý tài nguyên sạch sẽ.
from mcp.server import Server
from contextlib import asynccontextmanager
app = Server("lifecycle-server")
# Trạng thái chia sẻ
db_connection = None
cache = None
@asynccontextmanager
async def lifespan(server: Server):
"""Manage server lifecycle."""
global db_connection, cache
# Khởi động
print("🚀 Server starting...")
db_connection = await create_database_connection()
cache = await create_cache_client()
print("✅ Resources initialized")
yield # Máy chủ chạy ở đây
# Tắt máy
print("🛑 Server shutting down...")
await db_connection.close()
await cache.close()
print("✅ Resources cleaned up")
app = Server("lifecycle-server", lifespan=lifespan)
@app.tool()
async def query_database(sql: str) -> str:
"""Use the shared database connection."""
result = await db_connection.execute(sql)
return str(result)import { Server } from "@modelcontextprotocol/sdk/server/index.js";
class ManagedServer {
private server: Server;
private dbConnection: DatabaseConnection | null = null;
constructor() {
this.server = new Server({
name: "lifecycle-server",
version: "1.0.0"
});
this.setupHandlers();
}
async start() {
// Khởi tạo tài nguyên
console.log("🚀 Server starting...");
this.dbConnection = await createDatabaseConnection();
console.log("✅ Database connected");
// Khởi động máy chủ
await this.server.connect(transport);
}
async stop() {
// Dọn dẹp tài nguyên
console.log("🛑 Server shutting down...");
if (this.dbConnection) {
await this.dbConnection.close();
}
await this.server.close();
console.log("✅ Cleanup complete");
}
private setupHandlers() {
this.server.setRequestHandler(CallToolSchema, async (request) => {
// Sử dụng this.dbConnection một cách an toàn
// ...
});
}
}
// Sử dụng với việc tắt máy an toàn
const server = new ManagedServer();
process.on('SIGINT', async () => {
await server.stop();
process.exit(0);
});
await server.start();MCP hỗ trợ các cấp độ ghi log phía máy chủ mà khách hàng có thể điều khiển.
from mcp.server import Server
from mcp.types import LoggingLevel
import logging
app = Server("logging-server")
# Ánh xạ các mức MCP sang các mức ghi nhật ký của Python
LEVEL_MAP = {
LoggingLevel.DEBUG: logging.DEBUG,
LoggingLevel.INFO: logging.INFO,
LoggingLevel.WARNING: logging.WARNING,
LoggingLevel.ERROR: logging.ERROR,
}
logger = logging.getLogger("mcp-server")
@app.set_logging_level()
async def set_logging_level(level: LoggingLevel) -> None:
"""Handle client request to change logging level."""
python_level = LEVEL_MAP.get(level, logging.INFO)
logger.setLevel(python_level)
logger.info(f"Logging level set to {level}")
@app.tool()
async def debug_operation(data: str) -> str:
"""Tool with various logging levels."""
logger.debug(f"Processing data: {data}")
try:
result = process(data)
logger.info(f"Successfully processed: {result}")
return result
except Exception as e:
logger.error(f"Processing failed: {e}")
raise@app.tool()
async def complex_operation(input: str, ctx) -> str:
"""Operation that logs to client."""
# Gửi thông báo nhật ký đến khách hàng
await ctx.send_log(
level="info",
message=f"Starting complex operation with input: {input}"
)
# Thực hiện công việc...
result = await do_work(input)
await ctx.send_log(
level="debug",
message=f"Operation complete, result size: {len(result)}"
)
return resultXử lý lỗi nhất quán cải thiện việc gỡ lỗi và trải nghiệm người dùng.
from mcp.types import McpError, ErrorCode
class ToolError(McpError):
"""Base class for tool errors."""
pass
class ValidationError(ToolError):
"""Invalid input parameters."""
def __init__(self, message: str):
super().__init__(ErrorCode.INVALID_PARAMS, message)
class NotFoundError(ToolError):
"""Requested resource not found."""
def __init__(self, resource: str):
super().__init__(ErrorCode.INVALID_REQUEST, f"Not found: {resource}")
class PermissionError(ToolError):
"""Access denied."""
def __init__(self, action: str):
super().__init__(ErrorCode.INVALID_REQUEST, f"Permission denied: {action}")
class InternalError(ToolError):
"""Internal server error."""
def __init__(self, message: str):
super().__init__(ErrorCode.INTERNAL_ERROR, message)@app.tool()
async def safe_operation(input: str) -> str:
"""Tool with comprehensive error handling."""
# Xác thực đầu vào
if not input:
raise ValidationError("Input cannot be empty")
if len(input) > 10000:
raise ValidationError(f"Input too large: {len(input)} chars (max 10000)")
try:
# Kiểm tra quyền
if not await check_permission(input):
raise PermissionError(f"read {input}")
# Thực hiện thao tác
result = await perform_operation(input)
if result is None:
raise NotFoundError(input)
return result
except ConnectionError as e:
raise InternalError(f"Database connection failed: {e}")
except TimeoutError as e:
raise InternalError(f"Operation timed out: {e}")
except Exception as e:
# Ghi lại lỗi không mong đợi
logger.exception(f"Unexpected error in safe_operation")
raise InternalError(f"Unexpected error: {type(e).__name__}")import { McpError, ErrorCode } from "@modelcontextprotocol/sdk/types.js";
function validateInput(data: unknown): asserts data is ValidInput {
if (typeof data !== "object" || data === null) {
throw new McpError(
ErrorCode.InvalidParams,
"Input must be an object"
);
}
// Thêm xác thực...
}
server.setRequestHandler(CallToolSchema, async (request) => {
try {
validateInput(request.params.arguments);
const result = await performOperation(request.params.arguments);
return {
content: [{ type: "text", text: JSON.stringify(result) }]
};
} catch (error) {
if (error instanceof McpError) {
throw error; // Đã là lỗi MCP
}
// Chuyển đổi các lỗi khác
if (error instanceof NotFoundError) {
throw new McpError(ErrorCode.InvalidRequest, error.message);
}
// Lỗi không xác định
console.error("Unexpected error:", error);
throw new McpError(
ErrorCode.InternalError,
"An unexpected error occurred"
);
}
});Các tính năng này được đánh dấu là thử nghiệm trong đặc tả:
# Tác vụ cho phép theo dõi các hoạt động chạy dài với trạng thái
@app.task()
async def training_task(model_id: str, data_path: str, ctx) -> str:
"""Long-running ML training task."""
# Báo cáo tác vụ đã bắt đầu
await ctx.report_status("running", "Initializing training...")
# Vòng lặp đào tạo
for epoch in range(100):
await train_epoch(model_id, data_path, epoch)
await ctx.report_status(
"running",
f"Training epoch {epoch + 1}/100",
progress=epoch + 1,
total=100
)
await ctx.report_status("completed", "Training finished")
return f"Model {model_id} trained successfully"# Các chú thích cung cấp siêu dữ liệu về hành vi của công cụ
@app.tool(
annotations={
"destructive": False, # Không sửa đổi dữ liệu
"idempotent": True, # An toàn để thử lại
"timeout_seconds": 30, # Thời lượng tối đa dự kiến
"requires_approval": False # Không cần sự chấp thuận của người dùng
}
)
async def safe_query(query: str) -> str:
"""A read-only database query tool."""
return await execute_read_query(query)Tuyên bố miễn trừ trách nhiệm:
Tài liệu này đã được dịch bằng dịch vụ dịch thuật AI Co-op Translator. Mặc dù chúng tôi cố gắng đảm bảo độ chính xác, xin lưu ý rằng bản dịch tự động có thể chứa lỗi hoặc không chính xác. Tài liệu gốc bằng ngôn ngữ mẹ đẻ của nó nên được xem là nguồn chính xác và đáng tin cậy. Đối với thông tin quan trọng, luôn khuyến nghị sử dụng dịch vụ dịch thuật chuyên nghiệp bởi con người. Chúng tôi không chịu trách nhiệm về bất kỳ sự hiểu lầm hoặc giải thích sai nào phát sinh từ việc sử dụng bản dịch này.