Protokol Model Context (MCP) poskytuje flexibilitu v mechanizmoch transportu, umožňujúc vlastné implementácie pre špecializované podnikové prostredia. Tento pokročilý sprievodca skúma implementácie vlastných transportov pomocou Azure Event Grid a Azure Event Hubs ako praktické príklady pre budovanie škálovateľných, cloud-native MCP riešení.
Zatiaľ čo štandardné transporty MCP (stdio a HTTP streaming) slúžia väčšine prípadov použitia, podnikové prostredia často vyžadujú špecializované mechanizmy transportu pre zlepšenú škálovateľnosť, spoľahlivosť a integráciu s existujúcou cloud infraštruktúrou. Vlastné transporty umožňujú MCP využiť cloud-native messaging služby pre asynchrónnu komunikáciu, event-driven architektúry a distribuované spracovanie.
Táto lekcia skúma pokročilé implementácie transportov založené na najnovšej špecifikácii MCP (2025-11-25), Azure messaging službách a zavedených podnikových integračných vzorcoch.
Zo špecifikácie MCP (2025-11-25):
- Štandardné transporty: stdio (odporúčané), HTTP streaming (pre vzdialené scenáre)
- Vlastné transporty: Akýkoľvek transport, ktorý implementuje MCP protokol výmeny správ
- Formát správ: JSON-RPC 2.0 s MCP-špecifickými rozšíreniami
- Obojsmerná komunikácia: Vyžaduje sa plne duplexná komunikácia pre notifikácie a odpovede
Na konci tejto pokročilej lekcie budete schopní:
- Pochopiť požiadavky na vlastné transporty: Implementovať MCP protokol nad akoukoľvek transportnou vrstvou pri zachovaní súladu
- Vytvoriť Azure Event Grid transport: Vytvoriť event-driven MCP servery využívajúce Azure Event Grid pre serverless škálovateľnosť
- Implementovať Azure Event Hubs transport: Navrhnúť vysokopriepustné MCP riešenia využívajúce Azure Event Hubs pre real-time streaming
- Aplikovať podnikové vzory: Integrovať vlastné transporty s existujúcou Azure infraštruktúrou a bezpečnostnými modelmi
- Riešiť spoľahlivosť transportu: Implementovať trvácnosť správ, poradie a spracovanie chýb pre podnikové scenáre
- Optimalizovať výkon: Navrhnúť transportné riešenia pre požiadavky na škálovateľnosť, latenciu a priepustnosť
Message Protocol:
format: "JSON-RPC 2.0 with MCP extensions"
bidirectional: "Full duplex communication required"
ordering: "Message ordering must be preserved per session"
Transport Layer:
reliability: "Transport MUST handle connection failures gracefully"
security: "Transport MUST support secure communication"
identification: "Each session MUST have unique identifier"
Custom Transport:
compliance: "MUST implement complete MCP message exchange"
extensibility: "MAY add transport-specific features"
interoperability: "MUST maintain protocol compatibility"Azure Event Grid poskytuje serverless službu na smerovanie udalostí ideálnu pre event-driven MCP architektúry. Táto implementácia demonštruje, ako vytvoriť škálovateľné, voľne viazané MCP systémy.
graph TB
Client[MCP Klient] --> EG[Azure Event Grid]
EG --> Server[MCP Server Funkcia]
Server --> EG
EG --> Client
subgraph "Azure Služby"
EG
Server
KV[Kľúčová Schránka]
Monitor[Application Insights]
end
using Azure.Messaging.EventGrid;
using Microsoft.Extensions.Azure;
using System.Text.Json;
public class EventGridMcpTransport : IMcpTransport
{
private readonly EventGridPublisherClient _publisher;
private readonly string _topicEndpoint;
private readonly string _clientId;
public EventGridMcpTransport(string topicEndpoint, string accessKey, string clientId)
{
_publisher = new EventGridPublisherClient(
new Uri(topicEndpoint),
new AzureKeyCredential(accessKey));
_topicEndpoint = topicEndpoint;
_clientId = clientId;
}
public async Task SendMessageAsync(McpMessage message)
{
var eventGridEvent = new EventGridEvent(
subject: $"mcp/{_clientId}",
eventType: "MCP.MessageReceived",
dataVersion: "1.0",
data: JsonSerializer.Serialize(message))
{
Id = Guid.NewGuid().ToString(),
EventTime = DateTimeOffset.UtcNow
};
await _publisher.SendEventAsync(eventGridEvent);
}
public async Task<McpMessage> ReceiveMessageAsync(CancellationToken cancellationToken)
{
// Event Grid is push-based, so implement webhook receiver
// This would typically be handled by Azure Functions trigger
throw new NotImplementedException("Use EventGridTrigger in Azure Functions");
}
}
// Azure Function for receiving Event Grid events
[FunctionName("McpEventGridReceiver")]
public async Task<IActionResult> HandleEventGridMessage(
[EventGridTrigger] EventGridEvent eventGridEvent,
ILogger log)
{
try
{
var mcpMessage = JsonSerializer.Deserialize<McpMessage>(
eventGridEvent.Data.ToString());
// Process MCP message
var response = await _mcpServer.ProcessMessageAsync(mcpMessage);
// Send response back via Event Grid
await _transport.SendMessageAsync(response);
return new OkResult();
}
catch (Exception ex)
{
log.LogError(ex, "Error processing Event Grid MCP message");
return new BadRequestResult();
}
}import { EventGridPublisherClient, AzureKeyCredential } from "@azure/eventgrid";
import { McpTransport, McpMessage } from "./mcp-types";
export class EventGridMcpTransport implements McpTransport {
private publisher: EventGridPublisherClient;
private clientId: string;
constructor(
private topicEndpoint: string,
private accessKey: string,
clientId: string
) {
this.publisher = new EventGridPublisherClient(
topicEndpoint,
new AzureKeyCredential(accessKey)
);
this.clientId = clientId;
}
async sendMessage(message: McpMessage): Promise<void> {
const event = {
id: crypto.randomUUID(),
source: `mcp-client-${this.clientId}`,
type: "MCP.MessageReceived",
time: new Date(),
data: message
};
await this.publisher.sendEvents([event]);
}
// Prijímanie riadené udalosťami cez Azure Functions
onMessage(handler: (message: McpMessage) => Promise<void>): void {
// Implementácia by použila spúšťač Event Grid v Azure Functions
// Toto je konceptuálne rozhranie pre prijímač webhooku
}
}
// Implementácia Azure Functions
import { app, InvocationContext, EventGridEvent } from "@azure/functions";
app.eventGrid("mcpEventGridHandler", {
handler: async (event: EventGridEvent, context: InvocationContext) => {
try {
const mcpMessage = event.data as McpMessage;
// Spracovať správu MCP
const response = await mcpServer.processMessage(mcpMessage);
// Odoslať odpoveď cez Event Grid
await transport.sendMessage(response);
} catch (error) {
context.error("Error processing MCP message:", error);
throw error;
}
}
});from azure.eventgrid import EventGridPublisherClient, EventGridEvent
from azure.core.credentials import AzureKeyCredential
import asyncio
import json
from typing import Callable, Optional
import uuid
from datetime import datetime
class EventGridMcpTransport:
def __init__(self, topic_endpoint: str, access_key: str, client_id: str):
self.client = EventGridPublisherClient(
topic_endpoint,
AzureKeyCredential(access_key)
)
self.client_id = client_id
self.message_handler: Optional[Callable] = None
async def send_message(self, message: dict) -> None:
"""Send MCP message via Event Grid"""
event = EventGridEvent(
data=message,
subject=f"mcp/{self.client_id}",
event_type="MCP.MessageReceived",
data_version="1.0"
)
await self.client.send(event)
def on_message(self, handler: Callable[[dict], None]) -> None:
"""Register message handler for incoming events"""
self.message_handler = handler
# Implementácia Azure Functions
import azure.functions as func
import logging
def main(event: func.EventGridEvent) -> None:
"""Azure Functions Event Grid trigger for MCP messages"""
try:
# Parsovanie MCP správy z udalosti Event Grid
mcp_message = json.loads(event.get_body().decode('utf-8'))
# Spracovanie MCP správy
response = process_mcp_message(mcp_message)
# Odoslanie odpovede späť cez Event Grid
# (Implementácia by vytvorila nového klienta Event Grid)
except Exception as e:
logging.error(f"Error processing MCP Event Grid message: {e}")
raiseAzure Event Hubs poskytuje vysokopriepustné, real-time streaming schopnosti pre MCP scenáre vyžadujúce nízku latenciu a vysoký objem správ.
graph TB
Client[MCP Klient] --> EH[Azure Event Hubs]
EH --> Server[MCP Server]
Server --> EH
EH --> Client
subgraph "Funkcie Event Hubs"
Partition[Particionovanie]
Retention[Ukladanie správ]
Scaling[Automatické škálovanie]
end
EH --> Partition
EH --> Retention
EH --> Scaling
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
using Azure.Messaging.EventHubs.Consumer;
using System.Text;
public class EventHubsMcpTransport : IMcpTransport, IDisposable
{
private readonly EventHubProducerClient _producer;
private readonly EventHubConsumerClient _consumer;
private readonly string _consumerGroup;
private readonly CancellationTokenSource _cancellationTokenSource;
public EventHubsMcpTransport(
string connectionString,
string eventHubName,
string consumerGroup = "$Default")
{
_producer = new EventHubProducerClient(connectionString, eventHubName);
_consumer = new EventHubConsumerClient(
consumerGroup,
connectionString,
eventHubName);
_consumerGroup = consumerGroup;
_cancellationTokenSource = new CancellationTokenSource();
}
public async Task SendMessageAsync(McpMessage message)
{
var messageBody = JsonSerializer.Serialize(message);
var eventData = new EventData(Encoding.UTF8.GetBytes(messageBody));
// Add MCP-specific properties
eventData.Properties.Add("MessageType", message.Method ?? "response");
eventData.Properties.Add("MessageId", message.Id);
eventData.Properties.Add("Timestamp", DateTimeOffset.UtcNow);
await _producer.SendAsync(new[] { eventData });
}
public async Task StartReceivingAsync(
Func<McpMessage, Task> messageHandler)
{
await foreach (PartitionEvent partitionEvent in _consumer.ReadEventsAsync(
_cancellationTokenSource.Token))
{
try
{
var messageBody = Encoding.UTF8.GetString(
partitionEvent.Data.EventBody.ToArray());
var mcpMessage = JsonSerializer.Deserialize<McpMessage>(messageBody);
await messageHandler(mcpMessage);
}
catch (Exception ex)
{
// Handle deserialization or processing errors
Console.WriteLine($"Error processing message: {ex.Message}");
}
}
}
public void Dispose()
{
_cancellationTokenSource?.Cancel();
_producer?.DisposeAsync().AsTask().Wait();
_consumer?.DisposeAsync().AsTask().Wait();
_cancellationTokenSource?.Dispose();
}
}import {
EventHubProducerClient,
EventHubConsumerClient,
EventData
} from "@azure/event-hubs";
export class EventHubsMcpTransport implements McpTransport {
private producer: EventHubProducerClient;
private consumer: EventHubConsumerClient;
private isReceiving = false;
constructor(
private connectionString: string,
private eventHubName: string,
private consumerGroup: string = "$Default"
) {
this.producer = new EventHubProducerClient(
connectionString,
eventHubName
);
this.consumer = new EventHubConsumerClient(
consumerGroup,
connectionString,
eventHubName
);
}
async sendMessage(message: McpMessage): Promise<void> {
const eventData: EventData = {
body: JSON.stringify(message),
properties: {
messageType: message.method || "response",
messageId: message.id,
timestamp: new Date().toISOString()
}
};
await this.producer.sendBatch([eventData]);
}
async startReceiving(
messageHandler: (message: McpMessage) => Promise<void>
): Promise<void> {
if (this.isReceiving) return;
this.isReceiving = true;
const subscription = this.consumer.subscribe({
processEvents: async (events, context) => {
for (const event of events) {
try {
const messageBody = event.body as string;
const mcpMessage: McpMessage = JSON.parse(messageBody);
await messageHandler(mcpMessage);
// Aktualizovať kontrolný bod pre doručenie aspoň raz
await context.updateCheckpoint(event);
} catch (error) {
console.error("Error processing Event Hubs message:", error);
}
}
},
processError: async (err, context) => {
console.error("Event Hubs error:", err);
}
});
}
async close(): Promise<void> {
this.isReceiving = false;
await this.producer.close();
await this.consumer.close();
}
}from azure.eventhub import EventHubProducerClient, EventHubConsumerClient
from azure.eventhub import EventData
import json
import asyncio
from typing import Callable, Dict, Any
import logging
class EventHubsMcpTransport:
def __init__(
self,
connection_string: str,
eventhub_name: str,
consumer_group: str = "$Default"
):
self.producer = EventHubProducerClient.from_connection_string(
connection_string,
eventhub_name=eventhub_name
)
self.consumer = EventHubConsumerClient.from_connection_string(
connection_string,
consumer_group=consumer_group,
eventhub_name=eventhub_name
)
self.is_receiving = False
async def send_message(self, message: Dict[str, Any]) -> None:
"""Send MCP message via Event Hubs"""
event_data = EventData(json.dumps(message))
# Pridajte vlastnosti špecifické pre MCP
event_data.properties = {
"messageType": message.get("method", "response"),
"messageId": message.get("id"),
"timestamp": "2025-01-14T10:30:00Z" # Použite aktuálny časový údaj
}
async with self.producer:
event_data_batch = await self.producer.create_batch()
event_data_batch.add(event_data)
await self.producer.send_batch(event_data_batch)
async def start_receiving(
self,
message_handler: Callable[[Dict[str, Any]], None]
) -> None:
"""Start receiving MCP messages from Event Hubs"""
if self.is_receiving:
return
self.is_receiving = True
async with self.consumer:
await self.consumer.receive(
on_event=self._on_event_received(message_handler),
starting_position="-1" # Začnite od začiatku
)
def _on_event_received(self, handler: Callable):
"""Internal event handler wrapper"""
async def handle_event(partition_context, event):
try:
# Analyzujte správu MCP z udalosti Event Hubs
message_body = event.body_as_str(encoding='UTF-8')
mcp_message = json.loads(message_body)
# Spracujte správu MCP
await handler(mcp_message)
# Aktualizujte kontrolný bod pre doručenie aspoň raz
await partition_context.update_checkpoint(event)
except Exception as e:
logging.error(f"Error processing Event Hubs message: {e}")
return handle_event
async def close(self) -> None:
"""Clean up transport resources"""
self.is_receiving = False
await self.producer.close()
await self.consumer.close()// Implementing message durability with retry logic
public class ReliableTransportWrapper : IMcpTransport
{
private readonly IMcpTransport _innerTransport;
private readonly RetryPolicy _retryPolicy;
public async Task SendMessageAsync(McpMessage message)
{
await _retryPolicy.ExecuteAsync(async () =>
{
try
{
await _innerTransport.SendMessageAsync(message);
}
catch (TransportException ex) when (ex.IsRetryable)
{
// Log and retry
throw;
}
});
}
}// Integrating Azure Key Vault for transport security
public class SecureTransportFactory
{
private readonly SecretClient _keyVaultClient;
public async Task<IMcpTransport> CreateEventGridTransportAsync()
{
var accessKey = await _keyVaultClient.GetSecretAsync("EventGridAccessKey");
var topicEndpoint = await _keyVaultClient.GetSecretAsync("EventGridTopic");
return new EventGridMcpTransport(
topicEndpoint.Value.Value,
accessKey.Value.Value,
Environment.MachineName
);
}
}// Adding telemetry to custom transports
public class ObservableTransport : IMcpTransport
{
private readonly IMcpTransport _transport;
private readonly ILogger _logger;
private readonly TelemetryClient _telemetryClient;
public async Task SendMessageAsync(McpMessage message)
{
using var activity = Activity.StartActivity("MCP.Transport.Send");
activity?.SetTag("transport.type", "EventGrid");
activity?.SetTag("message.method", message.Method);
var stopwatch = Stopwatch.StartNew();
try
{
await _transport.SendMessageAsync(message);
_telemetryClient.TrackDependency(
"EventGrid",
"SendMessage",
DateTime.UtcNow.Subtract(stopwatch.Elapsed),
stopwatch.Elapsed,
true
);
}
catch (Exception ex)
{
_telemetryClient.TrackException(ex);
throw;
}
}
}Použitie Azure Event Grid na distribúciu MCP požiadaviek medzi viaceré spracovateľské uzly:
Architecture:
- MCP Client sends requests to Event Grid topic
- Multiple Azure Functions subscribe to process different tool types
- Results aggregated and returned via separate response topic
Benefits:
- Horizontal scaling based on message volume
- Fault tolerance through redundant processors
- Cost optimization with serverless computePoužitie Azure Event Hubs pre vysokofrekvenčné MCP interakcie:
Architecture:
- MCP Client streams continuous requests via Event Hubs
- Stream Analytics processes and routes messages
- Multiple consumers handle different aspect of processing
Benefits:
- Low latency for real-time scenarios
- High throughput for batch processing
- Built-in partitioning for parallel processingKombinovanie viacerých transportov pre rôzne prípady použitia:
public class HybridMcpTransport : IMcpTransport
{
private readonly IMcpTransport _realtimeTransport; // Event Hubs
private readonly IMcpTransport _batchTransport; // Event Grid
private readonly IMcpTransport _fallbackTransport; // HTTP Streaming
public async Task SendMessageAsync(McpMessage message)
{
// Route based on message characteristics
var transport = message.Method switch
{
"tools/call" when IsRealtime(message) => _realtimeTransport,
"resources/read" when IsBatch(message) => _batchTransport,
_ => _fallbackTransport
};
await transport.SendMessageAsync(message);
}
}public class BatchingEventGridTransport : IMcpTransport
{
private readonly List<McpMessage> _messageBuffer = new();
private readonly Timer _flushTimer;
private const int MaxBatchSize = 100;
public async Task SendMessageAsync(McpMessage message)
{
lock (_messageBuffer)
{
_messageBuffer.Add(message);
if (_messageBuffer.Count >= MaxBatchSize)
{
_ = Task.Run(FlushMessages);
}
}
}
private async Task FlushMessages()
{
List<McpMessage> toSend;
lock (_messageBuffer)
{
toSend = new List<McpMessage>(_messageBuffer);
_messageBuffer.Clear();
}
if (toSend.Any())
{
var events = toSend.Select(CreateEventGridEvent);
await _publisher.SendEventsAsync(events);
}
}
}public class PartitionedEventHubsTransport : IMcpTransport
{
public async Task SendMessageAsync(McpMessage message)
{
// Partition by client ID for session affinity
var partitionKey = ExtractClientId(message);
var eventData = new EventData(JsonSerializer.SerializeToUtf8Bytes(message))
{
PartitionKey = partitionKey
};
await _producer.SendAsync(new[] { eventData });
}
}[Test]
public async Task EventGridTransport_SendMessage_PublishesCorrectEvent()
{
// Arrange
var mockPublisher = new Mock<EventGridPublisherClient>();
var transport = new EventGridMcpTransport(mockPublisher.Object);
var message = new McpMessage { Method = "tools/list", Id = "test-123" };
// Act
await transport.SendMessageAsync(message);
// Assert
mockPublisher.Verify(
x => x.SendEventAsync(
It.Is<EventGridEvent>(e =>
e.EventType == "MCP.MessageReceived" &&
e.Subject == "mcp/test-client"
)
),
Times.Once
);
}[Test]
public async Task EventHubsTransport_IntegrationTest()
{
// Using Testcontainers for integration testing
var eventHubsContainer = new EventHubsContainer()
.WithEventHub("test-hub");
await eventHubsContainer.StartAsync();
var transport = new EventHubsMcpTransport(
eventHubsContainer.GetConnectionString(),
"test-hub"
);
// Test message round-trip
var sentMessage = new McpMessage { Method = "test", Id = "123" };
McpMessage receivedMessage = null;
await transport.StartReceivingAsync(msg => {
receivedMessage = msg;
return Task.CompletedTask;
});
await transport.SendMessageAsync(sentMessage);
await Task.Delay(1000); // Allow for message processing
Assert.That(receivedMessage?.Id, Is.EqualTo("123"));
}- Idempotentnosť: Zabezpečte, aby spracovanie správ bolo idempotentné pre zvládanie duplikátov
- Spracovanie chýb: Implementujte komplexné spracovanie chýb a dead letter fronty
- Monitorovanie: Pridajte detailnú telemetriu a kontroly stavu
- Bezpečnosť: Používajte spravované identity a prístup s najmenšími právami
- Výkon: Navrhnite podľa vašich špecifických požiadaviek na latenciu a priepustnosť
- Používajte spravovanú identitu: Vyhnite sa connection stringom v produkcii
- Implementujte circuit breakery: Chráňte sa pred výpadkami Azure služieb
- Monitorujte náklady: Sledujte objem správ a náklady na spracovanie
- Plánujte škálovanie: Navrhnite partitioning a škálovacie stratégie včas
- Testujte dôkladne: Používajte Azure DevTest Labs pre komplexné testovanie
Vlastné MCP transporty umožňujú silné podnikové scenáre využitím Azure messaging služieb. Implementáciou Event Grid alebo Event Hubs transportov môžete vytvoriť škálovateľné, spoľahlivé MCP riešenia, ktoré sa bezproblémovo integrujú s existujúcou Azure infraštruktúrou.
Poskytnuté príklady demonštrujú produkčne pripravené vzory pre implementáciu vlastných transportov pri zachovaní súladu s MCP protokolom a najlepšími praktikami Azure.
- MCP Specification 2025-06-18
- Azure Event Grid Documentation
- Azure Event Hubs Documentation
- Azure Functions Event Grid Trigger
- Azure SDK for .NET
- Azure SDK for TypeScript
- Azure SDK for Python
Tento sprievodca sa zameriava na praktické implementačné vzory pre produkčné MCP systémy. Vždy overujte implementácie transportov podľa vašich špecifických požiadaviek a limitov Azure služieb. Aktuálny štandard: Tento sprievodca odráža MCP Specification 2025-06-18 požiadavky na transport a pokročilé vzory transportu pre podnikové prostredia.
Zrieknutie sa zodpovednosti: Tento dokument bol preložený pomocou AI prekladateľskej služby Co-op Translator. Aj keď sa snažíme o presnosť, majte prosím na pamäti, že automatizované preklady môžu obsahovať chyby alebo nepresnosti. Originálny dokument v jeho pôvodnom jazyku by mal byť považovaný za autoritatívny zdroj. Pre kritické informácie sa odporúča profesionálny ľudský preklad. Nie sme zodpovední za akékoľvek nedorozumenia alebo nesprávne interpretácie vyplývajúce z použitia tohto prekladu.