Skip to content

Commit 01f39fd

Browse files
committed
fix: make STDIO transport sendMessage thread-safe (#304)
Concurrent calls to `sendMessage` on the STDIO transports raced on the unicast sink and sporadically failed with `FAIL_NON_SERIALIZED`, dropping the message with `RuntimeException("Failed to enqueue message")`. Switch `tryEmitNext` to `emitNext` with a `busyLooping` EmitFailureHandler so concurrent emissions are serialized as recommended by the Reactor maintainers. Applied on both `StdioServerTransportProvider` and `StdioClientTransport`. Also adds a regression test that fans `sendMessage` out across 16 parallel rails and verifies all 500 emissions complete successfully and every message is written to the output stream.
1 parent 5895b2e commit 01f39fd

3 files changed

Lines changed: 58 additions & 17 deletions

File tree

mcp-core/src/main/java/io/modelcontextprotocol/client/transport/StdioClientTransport.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -227,16 +227,13 @@ private void handleIncomingErrors() {
227227

228228
@Override
229229
public Mono<Void> sendMessage(JSONRPCMessage message) {
230-
if (this.outboundSink.tryEmitNext(message).isSuccess()) {
231-
// TODO: essentially we could reschedule ourselves in some time and make
232-
// another attempt with the already read data but pause reading until
233-
// success
234-
// In this approach we delegate the retry and the backpressure onto the
235-
// caller. This might be enough for most cases.
230+
try {
231+
// busyLooping retries FAIL_NON_SERIALIZED under concurrent senders
232+
this.outboundSink.emitNext(message, Sinks.EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
236233
return Mono.empty();
237234
}
238-
else {
239-
return Mono.error(new RuntimeException("Failed to enqueue message"));
235+
catch (Sinks.EmissionException e) {
236+
return Mono.error(new RuntimeException("Failed to enqueue message", e));
240237
}
241238
}
242239

mcp-core/src/main/java/io/modelcontextprotocol/server/transport/StdioServerTransportProvider.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@
1010
import java.io.InputStreamReader;
1111
import java.io.OutputStream;
1212
import java.nio.charset.StandardCharsets;
13+
import java.time.Duration;
1314
import java.util.List;
1415
import java.util.concurrent.Executors;
1516
import java.util.concurrent.atomic.AtomicBoolean;
1617
import java.util.function.Function;
1718

1819
import io.modelcontextprotocol.json.TypeRef;
19-
import io.modelcontextprotocol.spec.McpError;
2020
import io.modelcontextprotocol.spec.McpSchema;
2121
import io.modelcontextprotocol.spec.McpSchema.JSONRPCMessage;
2222
import io.modelcontextprotocol.spec.McpServerSession;
@@ -161,11 +161,12 @@ public StdioMcpSessionTransport() {
161161
public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
162162

163163
return Mono.zip(inboundReady.asMono(), outboundReady.asMono()).then(Mono.defer(() -> {
164-
if (outboundSink.tryEmitNext(message).isSuccess()) {
164+
try {
165+
outboundSink.emitNext(message, Sinks.EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
165166
return Mono.empty();
166167
}
167-
else {
168-
return Mono.error(new RuntimeException("Failed to enqueue message"));
168+
catch (Sinks.EmissionException e) {
169+
return Mono.error(new RuntimeException("Failed to enqueue message", e));
169170
}
170171
}));
171172
}

mcp-test/src/test/java/io/modelcontextprotocol/server/transport/StdioServerTransportProviderTests.java

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@
1717
import java.util.concurrent.atomic.AtomicReference;
1818

1919
import io.modelcontextprotocol.json.McpJsonDefaults;
20-
import io.modelcontextprotocol.spec.McpError;
2120
import io.modelcontextprotocol.spec.McpSchema;
2221
import io.modelcontextprotocol.spec.McpServerSession;
2322
import io.modelcontextprotocol.spec.McpServerTransport;
2423
import org.junit.jupiter.api.AfterEach;
2524
import org.junit.jupiter.api.BeforeEach;
26-
import org.junit.jupiter.api.Disabled;
2725
import org.junit.jupiter.api.Test;
26+
import reactor.core.publisher.Flux;
2827
import reactor.core.publisher.Mono;
28+
import reactor.core.scheduler.Schedulers;
2929
import reactor.test.StepVerifier;
3030

3131
import static org.assertj.core.api.Assertions.assertThat;
@@ -98,7 +98,7 @@ void shouldCreateSessionWhenSessionFactoryIsSet() {
9898
}
9999

100100
@Test
101-
void shouldHandleIncomingMessages() throws Exception {
101+
void shouldHandleIncomingMessages() {
102102

103103
String jsonMessage = "{\"jsonrpc\":\"2.0\",\"method\":\"test\",\"params\":{},\"id\":1}\n";
104104
InputStream stream = new ByteArrayInputStream(jsonMessage.getBytes(StandardCharsets.UTF_8));
@@ -228,7 +228,7 @@ void shouldHandleNotificationBeforeSessionFactoryIsSet() {
228228
}
229229

230230
@Test
231-
void shouldHandleInvalidJsonMessage() throws Exception {
231+
void shouldHandleInvalidJsonMessage() {
232232

233233
// Write an invalid JSON message to the input stream
234234
String jsonMessage = "{invalid json}\n";
@@ -247,7 +247,7 @@ void shouldHandleInvalidJsonMessage() throws Exception {
247247
}
248248

249249
@Test
250-
void shouldHandleSessionClose() throws Exception {
250+
void shouldHandleSessionClose() {
251251
// Set session factory
252252
transportProvider.setSessionFactory(sessionFactory);
253253

@@ -258,4 +258,47 @@ void shouldHandleSessionClose() throws Exception {
258258
verify(mockSession).closeGracefully();
259259
}
260260

261+
@Test
262+
void shouldHandleConcurrentSendMessage() {
263+
// Redirect the transport output to a buffer so we can verify every message lands
264+
ByteArrayOutputStream output = new ByteArrayOutputStream();
265+
PrintStream outputPrintStream = new PrintStream(output, true, StandardCharsets.UTF_8);
266+
transportProvider = new StdioServerTransportProvider(McpJsonDefaults.getMapper(), System.in, outputPrintStream);
267+
268+
// Capture the inner McpServerTransport handed to the session factory
269+
AtomicReference<McpServerTransport> transportRef = new AtomicReference<>();
270+
McpServerSession.Factory capturingFactory = transport -> {
271+
transportRef.set(transport);
272+
return mockSession;
273+
};
274+
275+
// Set session factory
276+
transportProvider.setSessionFactory(capturingFactory);
277+
278+
McpServerTransport transport = transportRef.get();
279+
assertThat(transport).isNotNull();
280+
281+
// Fan sendMessage out across 16 parallel rails to race against the unicast sink
282+
int messageCount = 500;
283+
Flux<Integer> concurrentSends = Flux.range(0, messageCount)
284+
.parallel(16)
285+
.runOn(Schedulers.parallel())
286+
.flatMap(i -> transport
287+
.sendMessage(
288+
new McpSchema.JSONRPCNotification(McpSchema.JSONRPC_VERSION, "test/notification", Map.of()))
289+
.thenReturn(i))
290+
.sequential();
291+
292+
// Every send should complete successfully (no FAIL_NON_SERIALIZED errors)
293+
StepVerifier.create(concurrentSends).expectNextCount(messageCount).verifyComplete();
294+
295+
// Writes happen asynchronously on the outbound scheduler; wait briefly for drain
296+
// and verify every message was written as its own newline-delimited JSON line
297+
StepVerifier
298+
.create(Mono.delay(java.time.Duration.ofMillis(500))
299+
.then(Mono.fromCallable(() -> output.toString(StandardCharsets.UTF_8).lines().count())))
300+
.assertNext(lineCount -> assertThat(lineCount).isEqualTo(messageCount))
301+
.verifyComplete();
302+
}
303+
261304
}

0 commit comments

Comments
 (0)