В этой главе представлен подробный гид по реализации безопасной, масштабируемой и работающей в реальном времени потоковой передачи с помощью Model Context Protocol (MCP) через HTTPS. Рассматриваются причины использования потоковой передачи, доступные транспортные механизмы, как реализовать потоковый HTTP в MCP, лучшие практики безопасности, миграция с SSE и практические рекомендации по созданию собственных потоковых приложений MCP.
В этом разделе рассматриваются различные транспортные механизмы, доступные в MCP, и их роль в обеспечении потоковой передачи для обмена данными в реальном времени между клиентами и серверами.
Транспортный механизм определяет, как данные передаются между клиентом и сервером. MCP поддерживает несколько типов транспорта, чтобы соответствовать разным условиям и требованиям:
- stdio: стандартный ввод/вывод, подходит для локальных и CLI-инструментов. Простой, но не подходит для веба или облака.
- SSE (Server-Sent Events): позволяет серверам отправлять клиентам обновления в реальном времени по HTTP. Хорошо подходит для веб-интерфейсов, но ограничен в масштабируемости и гибкости.
- Streamable HTTP: современный HTTP-транспорт для потоковой передачи, поддерживающий уведомления и лучшую масштабируемость. Рекомендуется для большинства производственных и облачных сценариев.
Посмотрите таблицу ниже, чтобы понять различия между этими транспортными механизмами:
| Транспорт | Обновления в реальном времени | Потоковая передача | Масштабируемость | Сценарий использования |
|---|---|---|---|---|
| stdio | Нет | Нет | Низкая | Локальные CLI-инструменты |
| SSE | Да | Да | Средняя | Веб, обновления в реальном времени |
| Streamable HTTP | Да | Да | Высокая | Облако, многоклиентские приложения |
Совет: Выбор правильного транспорта влияет на производительность, масштабируемость и пользовательский опыт. Streamable HTTP рекомендуется для современных, масштабируемых и облачных приложений.
Обратите внимание на транспорты stdio и SSE, которые были показаны в предыдущих главах, и на то, что в этой главе рассматривается Streamable HTTP.
Понимание основных концепций и причин использования потоковой передачи важно для создания эффективных систем коммуникации в реальном времени.
Потоковая передача — это техника в сетевом программировании, позволяющая отправлять и получать данные небольшими, управляемыми порциями или в виде последовательности событий, а не ждать готовности всего ответа целиком. Это особенно полезно для:
- Больших файлов или наборов данных.
- Обновлений в реальном времени (например, чат, индикаторы прогресса).
- Долгих вычислений, когда нужно информировать пользователя о ходе выполнения.
Вот что важно знать о потоковой передаче в общих чертах:
- Данные доставляются постепенно, а не сразу целиком.
- Клиент может обрабатывать данные по мере их поступления.
- Снижает воспринимаемую задержку и улучшает пользовательский опыт.
Причины использования потоковой передачи следующие:
- Пользователи получают обратную связь сразу, а не только в конце.
- Позволяет создавать приложения в реальном времени и отзывчивые интерфейсы.
- Более эффективное использование сетевых и вычислительных ресурсов.
Вот простой пример реализации потоковой передачи:
Сервер (Python, с использованием FastAPI и StreamingResponse):
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import time
app = FastAPI()
async def event_stream():
for i in range(1, 6):
yield f"data: Message {i}\n\n"
time.sleep(1)
@app.get("/stream")
def stream():
return StreamingResponse(event_stream(), media_type="text/event-stream")Клиент (Python, с использованием requests):
import requests
with requests.get("http://localhost:8000/stream", stream=True) as r:
for line in r.iter_lines():
if line:
print(line.decode())Этот пример демонстрирует, как сервер отправляет клиенту серию сообщений по мере их готовности, а не ждет, пока все сообщения будут готовы.
Как это работает:
- Сервер отдает каждое сообщение по мере его готовности.
- Клиент получает и выводит каждую часть по мере поступления.
Требования:
- Сервер должен использовать потоковый ответ (например,
StreamingResponseв FastAPI). - Клиент должен обрабатывать ответ как поток (
stream=Trueв requests). - Content-Type обычно
text/event-streamилиapplication/octet-stream.
Сервер (Java, с использованием Spring Boot и Server-Sent Events):
@RestController
public class CalculatorController {
@GetMapping(value = "/calculate", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> calculate(@RequestParam double a,
@RequestParam double b,
@RequestParam String op) {
double result;
switch (op) {
case "add": result = a + b; break;
case "sub": result = a - b; break;
case "mul": result = a * b; break;
case "div": result = b != 0 ? a / b : Double.NaN; break;
default: result = Double.NaN;
}
return Flux.<ServerSentEvent<String>>just(
ServerSentEvent.<String>builder()
.event("info")
.data("Calculating: " + a + " " + op + " " + b)
.build(),
ServerSentEvent.<String>builder()
.event("result")
.data(String.valueOf(result))
.build()
)
.delayElements(Duration.ofSeconds(1));
}
}Клиент (Java, с использованием Spring WebFlux WebClient):
@SpringBootApplication
public class CalculatorClientApplication implements CommandLineRunner {
private final WebClient client = WebClient.builder()
.baseUrl("http://localhost:8080")
.build();
@Override
public void run(String... args) {
client.get()
.uri(uriBuilder -> uriBuilder
.path("/calculate")
.queryParam("a", 7)
.queryParam("b", 5)
.queryParam("op", "mul")
.build())
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(String.class)
.doOnNext(System.out::println)
.blockLast();
}
}Особенности реализации на Java:
- Используется реактивный стек Spring Boot с
Fluxдля потоковой передачи ServerSentEventобеспечивает структурированную потоковую передачу с типами событийWebClientсbodyToFlux()позволяет реактивно потреблять потокdelayElements()имитирует время обработки между событиями- События могут иметь типы (
info,result) для удобства обработки на клиенте
Различия между классическим способом потоковой передачи и тем, как это реализовано в MCP, можно представить так:
| Особенность | Классический HTTP стриминг | MCP стриминг (уведомления) |
|---|---|---|
| Основной ответ | Передается частями (chunked) | Один ответ в конце |
| Обновления прогресса | Отправляются как части данных | Отправляются в виде уведомлений |
| Требования к клиенту | Должен обрабатывать поток | Должен реализовать обработчик сообщений |
| Сценарий использования | Большие файлы, потоки токенов AI | Прогресс, логи, обратная связь в реальном времени |
Также обратите внимание на следующие отличия:
-
Паттерн коммуникации:
- Классический HTTP стриминг: использует простую chunked передачу данных
- MCP стриминг: использует структурированную систему уведомлений с протоколом JSON-RPC
-
Формат сообщений:
- Классический HTTP: простые текстовые части с переносами строк
- MCP: структурированные объекты LoggingMessageNotification с метаданными
-
Реализация клиента:
- Классический HTTP: простой клиент, обрабатывающий потоковые ответы
- MCP: более сложный клиент с обработчиком сообщений для разных типов сообщений
-
Обновления прогресса:
- Классический HTTP: прогресс — часть основного потока ответа
- MCP: прогресс отправляется отдельными уведомлениями, а основной ответ приходит в конце
Вот что мы рекомендуем при выборе между классической потоковой передачей (например, через эндпоинт /stream) и потоковой передачей через MCP:
- Для простых задач потоковой передачи: классический HTTP стриминг проще в реализации и подходит для базовых нужд.
- Для сложных, интерактивных приложений: MCP стриминг обеспечивает более структурированный подход с расширенными метаданными и разделением уведомлений и итоговых результатов.
- Для AI-приложений: система уведомлений MCP особенно полезна для долгих AI-задач, когда важно информировать пользователя о ходе выполнения.
Итак, вы уже видели рекомендации и сравнения классической потоковой передачи и MCP. Теперь рассмотрим подробнее, как именно использовать потоковую передачу в MCP.
Понимание работы потоковой передачи в рамках MCP важно для создания отзывчивых приложений, которые предоставляют пользователям обратную связь в реальном времени во время долгих операций.
В MCP потоковая передача — это не отправка основного ответа частями, а отправка уведомлений клиенту во время обработки запроса инструментом. Эти уведомления могут содержать обновления прогресса, логи или другие события.
Основной результат по-прежнему отправляется одним ответом. Однако уведомления могут отправляться отдельными сообщениями во время обработки, обновляя клиента в реальном времени. Клиент должен уметь обрабатывать и отображать эти уведомления.
Мы упомянули "уведомление", что это значит в контексте MCP?
Уведомление — это сообщение от сервера клиенту, информирующее о прогрессе, статусе или других событиях во время долгой операции. Уведомления повышают прозрачность и улучшают пользовательский опыт.
Например, клиент должен отправить уведомление после установления начального соединения с сервером.
Уведомление выглядит как JSON-сообщение:
{
jsonrpc: "2.0";
method: string;
params?: {
[key: string]: unknown;
};
}Уведомления относятся к теме в MCP, называемой "Logging".
Чтобы включить логирование, сервер должен активировать эту функцию/возможность следующим образом:
{
"capabilities": {
"logging": {}
}
}Note
В зависимости от используемого SDK, логирование может быть включено по умолчанию или его нужно явно активировать в конфигурации сервера.
Существуют разные уровни уведомлений:
| Уровень | Описание | Пример использования |
|---|---|---|
| debug | Подробная отладочная информация | Точки входа/выхода функций |
| info | Общая информационная информация | Обновления прогресса |
| notice | Обычные, но важные события | Изменения конфигурации |
| warning | Предупреждения | Использование устаревших функций |
| error | Ошибки | Сбои операций |
| critical | Критические ошибки | Отказы компонентов системы |
| alert | Требуется немедленное действие | Обнаружена порча данных |
| emergency | Система неработоспособна | Полный сбой системы |
Для реализации уведомлений в MCP необходимо настроить как серверную, так и клиентскую части для обработки обновлений в реальном времени. Это позволяет приложению предоставлять пользователям мгновенную обратную связь во время долгих операций.
Начнем с сервера. В MCP вы определяете инструменты, которые могут отправлять уведомления во время обработки запросов. Сервер использует объект контекста (обычно ctx) для отправки сообщений клиенту.
@mcp.tool(description="A tool that sends progress notifications")
async def process_files(message: str, ctx: Context) -> TextContent:
await ctx.info("Processing file 1/3...")
await ctx.info("Processing file 2/3...")
await ctx.info("Processing file 3/3...")
return TextContent(type="text", text=f"Done: {message}")В приведенном примере инструмент process_files отправляет три уведомления клиенту по мере обработки каждого файла. Метод ctx.info() используется для отправки информационных сообщений.
Кроме того, чтобы включить уведомления, убедитесь, что сервер использует потоковый транспорт (например, streamable-http), а клиент реализует обработчик сообщений для обработки уведомлений. Вот как настроить сервер для использования транспорта streamable-http:
mcp.run(transport="streamable-http")[Tool("A tool that sends progress notifications")]
public async Task<TextContent> ProcessFiles(string message, ToolContext ctx)
{
await ctx.Info("Processing file 1/3...");
await ctx.Info("Processing file 2/3...");
await ctx.Info("Processing file 3/3...");
return new TextContent
{
Type = "text",
Text = $"Done: {message}"
};
}В этом примере на .NET инструмент ProcessFiles, помеченный атрибутом Tool, отправляет три уведомления клиенту по мере обработки каждого файла. Метод ctx.Info() используется для отправки информационных сообщений.
Чтобы включить уведомления в вашем .NET MCP сервере, убедитесь, что используется потоковый транспорт:
var builder = McpBuilder.Create();
await builder
.UseStreamableHttp() // Enable streamable HTTP transport
.Build()
.RunAsync();Клиент должен реализовать обработчик сообщений, который будет обрабатывать и отображать уведомления по мере их поступления.
async def message_handler(message):
if isinstance(message, types.ServerNotification):
print("NOTIFICATION:", message)
else:
print("SERVER MESSAGE:", message)
async with ClientSession(
read_stream,
write_stream,
logging_callback=logging_collector,
message_handler=message_handler,
) as session:В приведенном коде функция message_handler проверяет, является ли входящее сообщение уведомлением. Если да, оно выводится; иначе обрабатывается как обычное серверное сообщение. Обратите внимание, что ClientSession инициализируется с message_handler для обработки входящих уведомлений.
// Define a message handler
void MessageHandler(IJsonRpcMessage message)
{
if (message is ServerNotification notification)
{
Console.WriteLine($"NOTIFICATION: {notification}");
}
else
{
Console.WriteLine($"SERVER MESSAGE: {message}");
}
}
// Create and use a client session with the message handler
var clientOptions = new ClientSessionOptions
{
MessageHandler = MessageHandler,
LoggingCallback = (level, message) => Console.WriteLine($"[{level}] {message}")
};
using var client = new ClientSession(readStream, writeStream, clientOptions);
await client.InitializeAsync();
// Now the client will process notifications through the MessageHandlerВ этом примере на .NET функция MessageHandler проверяет, является ли входящее сообщение уведомлением. Если да, оно выводится; иначе обрабатывается как обычное серверное сообщение. ClientSession инициализируется с обработчиком сообщений через ClientSessionOptions.
Чтобы включить уведомления, убедитесь, что сервер использует потоковый транспорт (например, streamable-http), а клиент реализует обработчик сообщений для обработки уведомлений.
В этом разделе объясняется концепция уведомлений о прогрессе в MCP, почему они важны и как реализовать их с помощью Streamable HTTP. Также приведено практическое задание для закрепления знаний.
Уведомления о прогрессе — это сообщения в реальном времени, отправляемые сервером клиенту во время долгих операций. Вместо того чтобы ждать завершения всего процесса, сервер информирует клиента о текущем состоянии. Это повышает прозрачность, улучшает пользовательский опыт и облегчает отладку.
Пример:
"Processing document 1/10"
"Processing document 2/10"
...
"Processing complete!"
Уведомления о прогрессе важны по нескольким причинам:
- Лучший пользовательский опыт: пользователи видят обновления по мере выполнения работы, а не только в конце.
- Обратная связь в реальном времени: клиенты могут отображать индикаторы прогресса или логи, делая приложение более отзывчивым.
- Упрощение отладки и мониторинга: разработчики и пользователи видят, где процесс может замедляться или зависать.
Вот как можно реализовать уведомления о прогрессе в MCP:
- На сервере: используйте
ctx.info()илиctx.log()для отправки уведомлений по мере обработки каждого элемента. Это отправляет сообщение клиенту до готовности основного результата. - На клиенте: реализуйте обработчик сообщений, который слушает и отображает уведомления по мере их поступления. Этот обработчик различает уведомления и итоговый результат.
Пример сервера:
@mcp.tool(description="A tool that sends progress notifications")
async def process_files(message: str, ctx: Context) -> TextContent:
for i in range(1, 11):
await ctx.info(f"Processing document {i}/10")
await ctx.info("Processing complete!")
return TextContent(type="text", text=f"Done: {message}")Пример клиента:
async def message_handler(message):
if isinstance(message, types.ServerNotification):
print("NOTIFICATION:", message)
else:
print("SERVER MESSAGE:", message)При реализации MCP серверов с HTTP-транспортами безопасность становится первоочередной задачей, требующей внимания к множеству векторов атак и механизмов защиты.
Безопасность критична при открытии MCP серверов через HTTP. Streamable HTTP вводит новые уязвимости и требует тщательной настройки.
- Проверка заголовка Origin: всегда проверяйте заголовок
Origin, чтобы предотвратить атаки DNS rebinding. - Привязка к localhost: для локальной разработки привязывайте серверы к
localhost, чтобы не открывать их в публичный интернет. - Аутентификация: реализуйте аутентификацию (например, API ключи, OAuth) для продакшн-развертываний.
- CORS: настройте политики Cross-Origin Resource Sharing для ограничения доступа.
- HTTPS: используйте HTTPS в продакшне для шифрования трафика.
- Никогда не доверяйте входящим запросам без проверки.
- Логируйте и мониторьте все обращения и ошибки.
- Регулярно обновляйте зависимости для устранения уязвимостей.
- Баланс между безопасностью и удобством разработки
- Обеспечение совместимости с разными клиентскими средами
Для приложений, которые сейчас используют Server-Sent Events (SSE), переход на Streamable HTTP обеспечивает расширенные возможности и лучшую долгосрочную поддержку ваших MCP реализаций.
Существует две веские причины перейти с SSE на Streamable HTTP:
- Streamable HTTP обеспечивает лучшую масштабируемость, совместимость и более расширенную поддержку уведомлений по сравнению с SSE.
- Это рекомендуемый транспорт для новых приложений MCP.
Вот как можно перейти с SSE на Streamable HTTP в ваших приложениях MCP:
- Обновите серверный код, используя
transport="streamable-http"вmcp.run(). - Обновите клиентский код, заменив SSE клиент на
streamablehttp_client. - Реализуйте обработчик сообщений на клиенте для обработки уведомлений.
- Проверьте совместимость с существующими инструментами и рабочими процессами.
Рекомендуется сохранять совместимость с существующими SSE клиентами в процессе миграции. Вот несколько подходов:
- Можно поддерживать оба транспорта — SSE и Streamable HTTP — запуская их на разных эндпоинтах.
- Постепенно переводить клиентов на новый транспорт.
Обратите внимание на следующие сложности при миграции:
- Обновление всех клиентов
- Обработка различий в доставке уведомлений
Безопасность должна быть приоритетом при реализации любого сервера, особенно при использовании HTTP-транспорта, такого как Streamable HTTP в MCP.
При реализации MCP серверов с HTTP-транспортом безопасность становится критически важной и требует тщательного внимания к различным вектором атак и механизмам защиты.
Безопасность крайне важна при открытии MCP серверов через HTTP. Streamable HTTP добавляет новые уязвимости и требует аккуратной настройки.
Основные моменты по безопасности:
- Проверка заголовка Origin: всегда проверяйте заголовок
Origin, чтобы предотвратить атаки DNS rebinding. - Привязка к localhost: для локальной разработки привязывайте серверы к
localhost, чтобы не открывать их в публичный интернет. - Аутентификация: реализуйте аутентификацию (например, API ключи, OAuth) для продакшн-развертываний.
- CORS: настройте политики Cross-Origin Resource Sharing (CORS) для ограничения доступа.
- HTTPS: используйте HTTPS в продакшне для шифрования трафика.
Кроме того, следуйте этим рекомендациям при обеспечении безопасности MCP стримингового сервера:
- Никогда не доверяйте входящим запросам без проверки.
- Логируйте и контролируйте все обращения и ошибки.
- Регулярно обновляйте зависимости для устранения уязвимостей.
При обеспечении безопасности MCP стриминговых серверов вы столкнетесь с такими задачами:
- Балансировка безопасности и удобства разработки
- Обеспечение совместимости с разными клиентскими средами
Сценарий:
Создайте MCP сервер и клиент, где сервер обрабатывает список элементов (например, файлов или документов) и отправляет уведомление для каждого обработанного элемента. Клиент должен отображать каждое уведомление по мере его поступления.
Шаги:
- Реализуйте серверный инструмент, который обрабатывает список и отправляет уведомления для каждого элемента.
- Реализуйте клиент с обработчиком сообщений для отображения уведомлений в реальном времени.
- Проверьте работу, запустив сервер и клиент, и наблюдайте за уведомлениями.
Чтобы продолжить изучение MCP стриминга и расширить свои знания, в этом разделе представлены дополнительные ресурсы и рекомендации по созданию более сложных приложений.
- Microsoft: Введение в HTTP Streaming
- Microsoft: Server-Sent Events (SSE)
- Microsoft: CORS в ASP.NET Core
- Python requests: Streaming Requests
- Попробуйте создать более продвинутые MCP инструменты, использующие стриминг для аналитики в реальном времени, чатов или совместного редактирования.
- Изучите интеграцию MCP стриминга с фронтенд-фреймворками (React, Vue и др.) для живого обновления интерфейса.
- Далее: Использование AI Toolkit для VSCode
Отказ от ответственности:
Этот документ был переведен с помощью сервиса автоматического перевода Co-op Translator. Несмотря на наши усилия по обеспечению точности, просим учитывать, что автоматический перевод может содержать ошибки или неточности. Оригинальный документ на его исходном языке следует считать авторитетным источником. Для получения критически важной информации рекомендуется обращаться к профессиональному переводу, выполненному человеком. Мы не несем ответственности за любые недоразумения или неправильные толкования, возникшие в результате использования данного перевода.