Skip to content

Commit 3597345

Browse files
committed
fix: fail fast after SSE reconnect exhaustion
1 parent 22595b9 commit 3597345

3 files changed

Lines changed: 82 additions & 17 deletions

File tree

.changeset/quiet-cups-retry.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@modelcontextprotocol/client": patch
3+
---
4+
5+
Make StreamableHTTPClientTransport retry standalone SSE streams longer and fail fast after reconnection exhaustion instead of allowing later request responses to disappear behind a dead SSE channel. SSE open failures now include a fallback HTTP status when statusText is empty.

packages/client/src/client/streamableHttp.ts

Lines changed: 51 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,34 @@ const DEFAULT_STREAMABLE_HTTP_RECONNECTION_OPTIONS: StreamableHTTPReconnectionOp
2222
initialReconnectionDelay: 1000,
2323
maxReconnectionDelay: 30_000,
2424
reconnectionDelayGrowFactor: 1.5,
25-
maxRetries: 2
25+
maxRetries: 10
2626
};
2727

28+
function errorMessage(error: unknown): string {
29+
if (error instanceof Error && error.message) {
30+
return error.message;
31+
}
32+
33+
if (typeof error === 'object' && error) {
34+
const maybeError = error as { name?: string; status?: number; statusText?: string };
35+
if (maybeError.statusText) {
36+
return maybeError.statusText;
37+
}
38+
if (maybeError.status !== undefined) {
39+
return `HTTP ${maybeError.status}`;
40+
}
41+
if (maybeError.name) {
42+
return maybeError.name;
43+
}
44+
}
45+
46+
if (typeof error === 'string' && error) {
47+
return error;
48+
}
49+
50+
return 'unknown';
51+
}
52+
2853
/**
2954
* Options for starting or authenticating an SSE connection
3055
*/
@@ -74,7 +99,7 @@ export interface StreamableHTTPReconnectionOptions {
7499

75100
/**
76101
* Maximum number of reconnection attempts before giving up.
77-
* Default is 2.
102+
* Default is 10.
78103
*/
79104
maxRetries: number;
80105
}
@@ -184,6 +209,7 @@ export class StreamableHTTPClientTransport implements Transport {
184209
private _serverRetryMs?: number; // Server-provided retry delay from SSE retry field
185210
private readonly _reconnectionScheduler?: ReconnectionScheduler;
186211
private _cancelReconnection?: () => void;
212+
private _standaloneSseReconnectError?: Error;
187213

188214
onclose?: () => void;
189215
onerror?: (error: Error) => void;
@@ -288,12 +314,14 @@ export class StreamableHTTPClientTransport implements Transport {
288314
return;
289315
}
290316

291-
throw new SdkError(SdkErrorCode.ClientHttpFailedToOpenStream, `Failed to open SSE stream: ${response.statusText}`, {
317+
const statusText = response.statusText || `HTTP ${response.status}`;
318+
throw new SdkError(SdkErrorCode.ClientHttpFailedToOpenStream, `Failed to open SSE stream: ${statusText}`, {
292319
status: response.status,
293320
statusText: response.statusText
294321
});
295322
}
296323

324+
this._standaloneSseReconnectError = undefined;
297325
this._handleSseStream(response.body, options, true);
298326
} catch (error) {
299327
this.onerror?.(error as Error);
@@ -328,13 +356,17 @@ export class StreamableHTTPClientTransport implements Transport {
328356
* @param lastEventId The ID of the last received event for resumability
329357
* @param attemptCount Current reconnection attempt count for this specific stream
330358
*/
331-
private _scheduleReconnection(options: StartSSEOptions, attemptCount = 0): void {
359+
private _scheduleReconnection(options: StartSSEOptions, attemptCount = 0, failFutureRequests = false): void {
332360
// Use provided options or default options
333361
const maxRetries = this._reconnectionOptions.maxRetries;
334362

335363
// Check if we've exceeded maximum retry attempts
336364
if (attemptCount >= maxRetries) {
337-
this.onerror?.(new Error(`Maximum reconnection attempts (${maxRetries}) exceeded.`));
365+
const error = new Error(`Maximum reconnection attempts (${maxRetries}) exceeded.`);
366+
if (failFutureRequests) {
367+
this._standaloneSseReconnectError = error;
368+
}
369+
this.onerror?.(error);
338370
return;
339371
}
340372

@@ -345,9 +377,9 @@ export class StreamableHTTPClientTransport implements Transport {
345377
this._cancelReconnection = undefined;
346378
if (this._abortController?.signal.aborted) return;
347379
this._startOrAuthSse(options).catch(error => {
348-
this.onerror?.(new Error(`Failed to reconnect SSE stream: ${error instanceof Error ? error.message : String(error)}`));
380+
this.onerror?.(new Error(`Failed to reconnect SSE stream: ${errorMessage(error)}`));
349381
try {
350-
this._scheduleReconnection(options, attemptCount + 1);
382+
this._scheduleReconnection(options, attemptCount + 1, failFutureRequests);
351383
} catch (scheduleError) {
352384
this.onerror?.(scheduleError instanceof Error ? scheduleError : new Error(String(scheduleError)));
353385
}
@@ -443,12 +475,13 @@ export class StreamableHTTPClientTransport implements Transport {
443475
onresumptiontoken,
444476
replayMessageId
445477
},
446-
0
478+
0,
479+
isReconnectable
447480
);
448481
}
449482
} catch (error) {
450483
// Handle stream errors - likely a network disconnect
451-
this.onerror?.(new Error(`SSE stream disconnected: ${error}`));
484+
this.onerror?.(new Error(`SSE stream disconnected: ${errorMessage(error)}`));
452485

453486
// Attempt to reconnect if the stream disconnects unexpectedly and we aren't closing
454487
// Reconnect if: already reconnectable (GET stream) OR received a priming event (POST stream with event ID)
@@ -464,10 +497,11 @@ export class StreamableHTTPClientTransport implements Transport {
464497
onresumptiontoken,
465498
replayMessageId
466499
},
467-
0
500+
0,
501+
isReconnectable
468502
);
469503
} catch (error) {
470-
this.onerror?.(new Error(`Failed to reconnect: ${error instanceof Error ? error.message : String(error)}`));
504+
this.onerror?.(new Error(`Failed to reconnect: ${errorMessage(error)}`));
471505
}
472506
}
473507
}
@@ -538,6 +572,12 @@ export class StreamableHTTPClientTransport implements Transport {
538572
return;
539573
}
540574

575+
const messages = Array.isArray(message) ? message : [message];
576+
const hasRequests = messages.some(msg => 'method' in msg && 'id' in msg && msg.id !== undefined);
577+
if (hasRequests && this._standaloneSseReconnectError) {
578+
throw new Error(`SSE stream reconnection failed: ${this._standaloneSseReconnectError.message}`);
579+
}
580+
541581
const headers = await this._commonHeaders();
542582
headers.set('content-type', 'application/json');
543583
const userAccept = headers.get('accept');
@@ -649,11 +689,6 @@ export class StreamableHTTPClientTransport implements Transport {
649689
return;
650690
}
651691

652-
// Get original message(s) for detecting request IDs
653-
const messages = Array.isArray(message) ? message : [message];
654-
655-
const hasRequests = messages.some(msg => 'method' in msg && 'id' in msg && msg.id !== undefined);
656-
657692
// Check the response type
658693
const contentType = response.headers.get('content-type');
659694

packages/client/test/client/streamableHttp.test.ts

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -927,7 +927,7 @@ describe('StreamableHTTPClientTransport', () => {
927927
// ASSERT
928928
expect(errorSpy).toHaveBeenCalledWith(
929929
expect.objectContaining({
930-
message: expect.stringContaining('SSE stream disconnected: Error: Network failure')
930+
message: expect.stringContaining('SSE stream disconnected: Network failure')
931931
})
932932
);
933933
// THE KEY ASSERTION: A second fetch call proves reconnection was attempted.
@@ -1810,6 +1810,31 @@ describe('StreamableHTTPClientTransport', () => {
18101810
// Clean up the pending reconnection to avoid test pollution
18111811
transport['_cancelReconnection']?.();
18121812
});
1813+
1814+
it('should fail future requests after standalone SSE reconnect attempts are exhausted', async () => {
1815+
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
1816+
reconnectionOptions: {
1817+
initialReconnectionDelay: 10,
1818+
maxRetries: 0,
1819+
maxReconnectionDelay: 1000,
1820+
reconnectionDelayGrowFactor: 1
1821+
}
1822+
});
1823+
1824+
transport['_scheduleReconnection']({}, 0, true);
1825+
1826+
const message: JSONRPCRequest = {
1827+
jsonrpc: '2.0',
1828+
method: 'tools/call',
1829+
params: {},
1830+
id: 'request-after-dead-sse'
1831+
};
1832+
1833+
await expect(transport.send(message)).rejects.toThrow(
1834+
'SSE stream reconnection failed: Maximum reconnection attempts (0) exceeded.'
1835+
);
1836+
expect(globalThis.fetch).not.toHaveBeenCalled();
1837+
});
18131838
});
18141839

18151840
describe('prevent infinite recursion when server returns 401 after successful auth', () => {

0 commit comments

Comments
 (0)