Skip to content

Commit 0b6f5e2

Browse files
committed
fix: replay request SSE responses after disconnect
1 parent 78fbe27 commit 0b6f5e2

2 files changed

Lines changed: 105 additions & 7 deletions

File tree

packages/middleware/node/test/streamableHttp.test.ts

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1954,6 +1954,99 @@ describe('Zod v4', () => {
19541954
toolResolve!();
19551955
});
19561956

1957+
it('should replay terminal response after request-scoped SSE stream is closed', async () => {
1958+
const result = await createTestServer({
1959+
sessionIdGenerator: () => randomUUID(),
1960+
eventStore: createEventStore(),
1961+
retryInterval: 1000
1962+
});
1963+
server = result.server;
1964+
transport = result.transport;
1965+
baseUrl = result.baseUrl;
1966+
mcpServer = result.mcpServer;
1967+
1968+
let streamCloseCalled = false;
1969+
let toolResolve: () => void;
1970+
const toolCompletePromise = new Promise<void>(resolve => {
1971+
toolResolve = resolve;
1972+
});
1973+
1974+
mcpServer.registerTool('close-and-finish-tool', { description: 'Closes and then finishes' }, async ctx => {
1975+
ctx.http?.closeSSE?.();
1976+
streamCloseCalled = true;
1977+
await toolCompletePromise;
1978+
return { content: [{ type: 'text', text: 'finished after reconnect' }] };
1979+
});
1980+
1981+
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
1982+
sessionId = initResponse.headers.get('mcp-session-id') as string;
1983+
expect(sessionId).toBeDefined();
1984+
1985+
const toolCallRequest: JSONRPCMessage = {
1986+
jsonrpc: '2.0',
1987+
id: 101,
1988+
method: 'tools/call',
1989+
params: { name: 'close-and-finish-tool', arguments: {} }
1990+
};
1991+
1992+
const postResponse = await fetch(baseUrl, {
1993+
method: 'POST',
1994+
headers: {
1995+
'Content-Type': 'application/json',
1996+
Accept: 'text/event-stream, application/json',
1997+
'mcp-session-id': sessionId,
1998+
'mcp-protocol-version': '2025-11-25'
1999+
},
2000+
body: JSON.stringify(toolCallRequest)
2001+
});
2002+
2003+
expect(postResponse.status).toBe(200);
2004+
2005+
const reader = postResponse.body?.getReader();
2006+
const priming = await reader!.read();
2007+
const primingText = new TextDecoder().decode(priming.value);
2008+
const idMatch = primingText.match(/id: ([^\n]+)/);
2009+
expect(idMatch).toBeTruthy();
2010+
const lastEventId = idMatch![1]!;
2011+
2012+
await new Promise(resolve => setTimeout(resolve, 100));
2013+
expect(streamCloseCalled).toBe(true);
2014+
2015+
const { done } = await reader!.read();
2016+
expect(done).toBe(true);
2017+
2018+
toolResolve!();
2019+
await new Promise(resolve => setTimeout(resolve, 100));
2020+
2021+
const reconnectResponse = await fetch(baseUrl, {
2022+
method: 'GET',
2023+
headers: {
2024+
Accept: 'text/event-stream',
2025+
'mcp-session-id': sessionId,
2026+
'mcp-protocol-version': '2025-11-25',
2027+
'last-event-id': lastEventId
2028+
}
2029+
});
2030+
2031+
expect(reconnectResponse.status).toBe(200);
2032+
2033+
const reconnectReader = reconnectResponse.body?.getReader();
2034+
let replayedText = '';
2035+
const timeout = setTimeout(() => reconnectReader!.cancel(), 2000);
2036+
try {
2037+
while (!replayedText.includes('finished after reconnect')) {
2038+
const { value, done } = await reconnectReader!.read();
2039+
if (done) break;
2040+
replayedText += new TextDecoder().decode(value);
2041+
}
2042+
} finally {
2043+
clearTimeout(timeout);
2044+
}
2045+
2046+
expect(replayedText).toContain('finished after reconnect');
2047+
expect(replayedText).toContain('"id":101');
2048+
});
2049+
19572050
it('should provide closeSSEStream callback in ctx when eventStore is configured', async () => {
19582051
const result = await createTestServer({
19592052
sessionIdGenerator: () => randomUUID(),

packages/server/src/server/streamableHttp.ts

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1002,14 +1002,12 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
10021002

10031003
const stream = this._streamMapping.get(streamId);
10041004

1005-
if (!this._enableJsonResponse && stream?.controller && stream?.encoder) {
1006-
// For SSE responses, generate event ID if event store is provided
1007-
let eventId: string | undefined;
1005+
let eventId: string | undefined;
1006+
if (!this._enableJsonResponse && this._eventStore) {
1007+
eventId = await this._eventStore.storeEvent(streamId, message);
1008+
}
10081009

1009-
if (this._eventStore) {
1010-
eventId = await this._eventStore.storeEvent(streamId, message);
1011-
}
1012-
// Write the event to the response stream
1010+
if (!this._enableJsonResponse && stream?.controller && stream?.encoder) {
10131011
this.writeSSEEvent(stream.controller, stream.encoder, message, eventId);
10141012
}
10151013

@@ -1022,6 +1020,13 @@ export class WebStandardStreamableHTTPServerTransport implements Transport {
10221020

10231021
if (allResponsesReady) {
10241022
if (!stream) {
1023+
if (!this._enableJsonResponse && this._eventStore) {
1024+
for (const id of relatedIds) {
1025+
this._requestResponseMap.delete(id);
1026+
this._requestToStreamMapping.delete(id);
1027+
}
1028+
return;
1029+
}
10251030
throw new Error(`No connection established for request ID: ${String(requestId)}`);
10261031
}
10271032
if (this._enableJsonResponse && stream.resolveJson) {

0 commit comments

Comments
 (0)