Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit e913d80

Browse files
authoredMay 1, 2025
Add client-side Streamable HTTP transport support (#356)
1 parent 7c507bc commit e913d80

19 files changed

+908
-464
lines changed
 

‎src/ModelContextProtocol.AspNetCore/StreamableHttpHandler.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ internal sealed class StreamableHttpHandler(
2323
ILoggerFactory loggerFactory,
2424
IServiceProvider applicationServices)
2525
{
26-
private static JsonTypeInfo<JsonRpcError> s_errorTypeInfo = GetRequiredJsonTypeInfo<JsonRpcError>();
27-
private static MediaTypeHeaderValue ApplicationJsonMediaType = new("application/json");
28-
private static MediaTypeHeaderValue TextEventStreamMediaType = new("text/event-stream");
26+
private static readonly JsonTypeInfo<JsonRpcError> s_errorTypeInfo = GetRequiredJsonTypeInfo<JsonRpcError>();
27+
private static readonly MediaTypeHeaderValue s_applicationJsonMediaType = new("application/json");
28+
private static readonly MediaTypeHeaderValue s_textEventStreamMediaType = new("text/event-stream");
2929

3030
public ConcurrentDictionary<string, HttpMcpSession<StreamableHttpServerTransport>> Sessions { get; } = new(StringComparer.Ordinal);
3131

@@ -36,7 +36,7 @@ public async Task HandlePostRequestAsync(HttpContext context)
3636
// so we have to do this manually. The spec doesn't mandate that servers MUST reject these requests,
3737
// but it's probably good to at least start out trying to be strict.
3838
var acceptHeaders = context.Request.GetTypedHeaders().Accept;
39-
if (!acceptHeaders.Contains(ApplicationJsonMediaType) || !acceptHeaders.Contains(TextEventStreamMediaType))
39+
if (!acceptHeaders.Contains(s_applicationJsonMediaType) || !acceptHeaders.Contains(s_textEventStreamMediaType))
4040
{
4141
await WriteJsonRpcErrorAsync(context,
4242
"Not Acceptable: Client must accept both application/json and text/event-stream",
@@ -64,7 +64,7 @@ await WriteJsonRpcErrorAsync(context,
6464
public async Task HandleGetRequestAsync(HttpContext context)
6565
{
6666
var acceptHeaders = context.Request.GetTypedHeaders().Accept;
67-
if (!acceptHeaders.Contains(TextEventStreamMediaType))
67+
if (!acceptHeaders.Contains(s_textEventStreamMediaType))
6868
{
6969
await WriteJsonRpcErrorAsync(context,
7070
"Not Acceptable: Client must accept text/event-stream",

‎src/ModelContextProtocol/Protocol/Transport/SseClientSessionTransport.cs

Lines changed: 3 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -87,40 +87,17 @@ public override async Task SendMessageAsync(
8787
messageId = messageWithId.Id.ToString();
8888
}
8989

90-
var httpRequestMessage = new HttpRequestMessage(HttpMethod.Post, _messageEndpoint)
90+
using var httpRequestMessage = new HttpRequestMessage(HttpMethod.Post, _messageEndpoint)
9191
{
9292
Content = content,
9393
};
94-
CopyAdditionalHeaders(httpRequestMessage.Headers);
94+
StreamableHttpClientSessionTransport.CopyAdditionalHeaders(httpRequestMessage.Headers, _options.AdditionalHeaders);
9595
var response = await _httpClient.SendAsync(httpRequestMessage, cancellationToken).ConfigureAwait(false);
9696

9797
response.EnsureSuccessStatusCode();
9898

9999
var responseContent = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false);
100100

101-
// Check if the message was an initialize request
102-
if (message is JsonRpcRequest request && request.Method == RequestMethods.Initialize)
103-
{
104-
// If the response is not a JSON-RPC response, it is an SSE message
105-
if (string.IsNullOrEmpty(responseContent) || responseContent.Equals("accepted", StringComparison.OrdinalIgnoreCase))
106-
{
107-
LogAcceptedPost(Name, messageId);
108-
// The response will arrive as an SSE message
109-
}
110-
else
111-
{
112-
JsonRpcResponse initializeResponse = JsonSerializer.Deserialize(responseContent, McpJsonUtilities.JsonContext.Default.JsonRpcResponse) ??
113-
throw new InvalidOperationException("Failed to initialize client");
114-
115-
LogTransportReceivedMessage(Name, messageId);
116-
await WriteMessageAsync(initializeResponse, cancellationToken).ConfigureAwait(false);
117-
LogTransportMessageWritten(Name, messageId);
118-
}
119-
120-
return;
121-
}
122-
123-
// Otherwise, check if the response was accepted (the response will come as an SSE message)
124101
if (string.IsNullOrEmpty(responseContent) || responseContent.Equals("accepted", StringComparison.OrdinalIgnoreCase))
125102
{
126103
LogAcceptedPost(Name, messageId);
@@ -177,17 +154,13 @@ public override async ValueTask DisposeAsync()
177154
}
178155
}
179156

180-
internal Uri? MessageEndpoint => _messageEndpoint;
181-
182-
internal SseClientTransportOptions Options => _options;
183-
184157
private async Task ReceiveMessagesAsync(CancellationToken cancellationToken)
185158
{
186159
try
187160
{
188161
using var request = new HttpRequestMessage(HttpMethod.Get, _sseEndpoint);
189162
request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("text/event-stream"));
190-
CopyAdditionalHeaders(request.Headers);
163+
StreamableHttpClientSessionTransport.CopyAdditionalHeaders(request.Headers, _options.AdditionalHeaders);
191164

192165
using var response = await _httpClient.SendAsync(
193166
request,
@@ -251,15 +224,7 @@ private async Task ProcessSseMessage(string data, CancellationToken cancellation
251224
return;
252225
}
253226

254-
string messageId = "(no id)";
255-
if (message is JsonRpcMessageWithId messageWithId)
256-
{
257-
messageId = messageWithId.Id.ToString();
258-
}
259-
260-
LogTransportReceivedMessage(Name, messageId);
261227
await WriteMessageAsync(message, cancellationToken).ConfigureAwait(false);
262-
LogTransportMessageWritten(Name, messageId);
263228
}
264229
catch (JsonException ex)
265230
{
@@ -290,20 +255,6 @@ private void HandleEndpointEvent(string data)
290255
_connectionEstablished.TrySetResult(true);
291256
}
292257

293-
private void CopyAdditionalHeaders(HttpRequestHeaders headers)
294-
{
295-
if (_options.AdditionalHeaders is not null)
296-
{
297-
foreach (var header in _options.AdditionalHeaders)
298-
{
299-
if (!headers.TryAddWithoutValidation(header.Key, header.Value))
300-
{
301-
throw new InvalidOperationException($"Failed to add header '{header.Key}' with value '{header.Value}' from {nameof(SseClientTransportOptions.AdditionalHeaders)}.");
302-
}
303-
}
304-
}
305-
}
306-
307258
[LoggerMessage(Level = LogLevel.Information, Message = "{EndpointName} accepted SSE transport POST for message ID '{MessageId}'.")]
308259
private partial void LogAcceptedPost(string endpointName, string messageId);
309260

‎src/ModelContextProtocol/Protocol/Transport/SseClientTransport.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,11 @@ public SseClientTransport(SseClientTransportOptions transportOptions, HttpClient
5757
/// <inheritdoc />
5858
public async Task<ITransport> ConnectAsync(CancellationToken cancellationToken = default)
5959
{
60+
if (_options.UseStreamableHttp)
61+
{
62+
return new StreamableHttpClientSessionTransport(_options, _httpClient, _loggerFactory, Name);
63+
}
64+
6065
var sessionTransport = new SseClientSessionTransport(_options, _httpClient, _loggerFactory, Name);
6166

6267
try

‎src/ModelContextProtocol/Protocol/Transport/SseClientTransportOptions.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,20 @@ public required Uri Endpoint
3030
}
3131
}
3232

33+
/// <summary>
34+
/// Gets or sets a value indicating whether to use "Streamable HTTP" for the transport rather than "HTTP with SSE". Defaults to false.
35+
/// <see href="https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http">Streamable HTTP transport specification</see>.
36+
/// <see href="https://modelcontextprotocol.io/specification/2024-11-05/basic/transports#http-with-sse">HTTP with SSE transport specification</see>.
37+
/// </summary>
38+
public bool UseStreamableHttp { get; init; }
39+
3340
/// <summary>
3441
/// Gets a transport identifier used for logging purposes.
3542
/// </summary>
3643
public string? Name { get; init; }
3744

3845
/// <summary>
39-
/// Gets or sets a timeout used to establish the initial connection to the SSE server.
46+
/// Gets or sets a timeout used to establish the initial connection to the SSE server. Defaults to 30 seconds.
4047
/// </summary>
4148
/// <remarks>
4249
/// This timeout controls how long the client waits for:

‎src/ModelContextProtocol/Protocol/Transport/StreamClientSessionTransport.cs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -146,15 +146,7 @@ private async Task ProcessMessageAsync(string line, CancellationToken cancellati
146146
var message = (JsonRpcMessage?)JsonSerializer.Deserialize(line.AsSpan().Trim(), McpJsonUtilities.DefaultOptions.GetTypeInfo(typeof(JsonRpcMessage)));
147147
if (message != null)
148148
{
149-
string messageId = "(no id)";
150-
if (message is JsonRpcMessageWithId messageWithId)
151-
{
152-
messageId = messageWithId.Id.ToString();
153-
}
154-
155-
LogTransportReceivedMessage(Name, messageId);
156149
await WriteMessageAsync(message, cancellationToken).ConfigureAwait(false);
157-
LogTransportMessageWritten(Name, messageId);
158150
}
159151
else
160152
{

‎src/ModelContextProtocol/Protocol/Transport/StreamServerTransport.cs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -111,15 +111,7 @@ private async Task ReadMessagesAsync()
111111
{
112112
if (JsonSerializer.Deserialize(line, McpJsonUtilities.DefaultOptions.GetTypeInfo(typeof(JsonRpcMessage))) is JsonRpcMessage message)
113113
{
114-
string messageId = "(no id)";
115-
if (message is JsonRpcMessageWithId messageWithId)
116-
{
117-
messageId = messageWithId.Id.ToString();
118-
}
119-
120-
LogTransportReceivedMessage(Name, messageId);
121114
await WriteMessageAsync(message, shutdownToken).ConfigureAwait(false);
122-
LogTransportMessageWritten(Name, messageId);
123115
}
124116
else
125117
{

0 commit comments

Comments
 (0)