Skip to content

Commit 367f265

Browse files
authored
Avoid race starting SseResponseStreamTransport (#91)
1 parent 60ea96a commit 367f265

File tree

1 file changed

+6
-3
lines changed

1 file changed

+6
-3
lines changed

src/ModelContextProtocol/Protocol/Transport/SseResponseStreamTransport.cs

+6-3
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public sealed class SseResponseStreamTransport(Stream sseResponseStream) : ITran
2020
private Utf8JsonWriter? _jsonWriter;
2121

2222
/// <inheritdoc/>
23-
public bool IsConnected => _sseWriteTask?.IsCompleted == false;
23+
public bool IsConnected { get; private set; }
2424

2525
/// <summary>
2626
/// Starts the transport and writes the JSON-RPC messages sent via <see cref="SendMessageAsync(IJsonRpcMessage, CancellationToken)"/>
@@ -41,6 +41,8 @@ void WriteJsonRpcMessageToBuffer(SseItem<IJsonRpcMessage?> item, IBufferWriter<b
4141
JsonSerializer.Serialize(GetUtf8JsonWriter(writer), item.Data, McpJsonUtilities.DefaultOptions.GetTypeInfo<IJsonRpcMessage?>());
4242
}
4343

44+
IsConnected = true;
45+
4446
// The very first SSE event isn't really an IJsonRpcMessage, but there's no API to write a single item of a different type,
4547
// so we fib and special-case the "endpoint" event type in the formatter.
4648
_outgoingSseChannel.Writer.TryWrite(new SseItem<IJsonRpcMessage?>(null, "endpoint"));
@@ -55,6 +57,7 @@ void WriteJsonRpcMessageToBuffer(SseItem<IJsonRpcMessage?> item, IBufferWriter<b
5557
/// <inheritdoc/>
5658
public ValueTask DisposeAsync()
5759
{
60+
IsConnected = false;
5861
_incomingChannel.Writer.TryComplete();
5962
_outgoingSseChannel.Writer.TryComplete();
6063
return new ValueTask(_sseWriteTask ?? Task.CompletedTask);
@@ -63,7 +66,7 @@ public ValueTask DisposeAsync()
6366
/// <inheritdoc/>
6467
public async Task SendMessageAsync(IJsonRpcMessage message, CancellationToken cancellationToken = default)
6568
{
66-
if (_sseWriteTask is null)
69+
if (!IsConnected)
6770
{
6871
throw new InvalidOperationException($"Transport is not connected. Make sure to call {nameof(RunAsync)} first.");
6972
}
@@ -80,7 +83,7 @@ public async Task SendMessageAsync(IJsonRpcMessage message, CancellationToken ca
8083
/// <exception cref="InvalidOperationException">Thrown when there is an attempt to process a message before calling <see cref="RunAsync(CancellationToken)"/>.</exception>
8184
public async Task OnMessageReceivedAsync(IJsonRpcMessage message, CancellationToken cancellationToken)
8285
{
83-
if (_sseWriteTask is null)
86+
if (!IsConnected)
8487
{
8588
throw new InvalidOperationException($"Transport is not connected. Make sure to call {nameof(RunAsync)} first.");
8689
}

0 commit comments

Comments
 (0)