Skip to content

Commit d334196

Browse files
committed
fix: detect missing notifications, add naive-shared-state negative test server
- parseSSEStreamFull now reads all events until stream closes (not just until first response), catching leaked notifications after the response - Both notification-isolation and notification-isolation-fuzz now fail when a client receives zero own notifications (indicates they were routed to another client's stream) - Remove stagger between fuzz requests (fire all concurrently) - Add naive-shared-state-server.ts: a minimal non-SDK server with a subtle bug (inFlightRequests keyed by request ID, not session+request ID). Verified to fail both notification isolation tests.
1 parent f57a458 commit d334196

2 files changed

Lines changed: 322 additions & 24 deletions

File tree

Lines changed: 295 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,295 @@
1+
#!/usr/bin/env node
2+
3+
/**
4+
* A minimal MCP server built without the SDK.
5+
*
6+
* Implements just enough of the protocol to pass basic conformance tests:
7+
* initialize, tools/list, tools/call (with progress notifications).
8+
*
9+
* Uses a per-session architecture with session IDs, but keeps an in-flight
10+
* request table to route notifications back to the correct SSE stream.
11+
*/
12+
13+
import { randomUUID } from 'crypto';
14+
import express from 'express';
15+
16+
// ---------------------------------------------------------------------------
17+
// Types
18+
// ---------------------------------------------------------------------------
19+
20+
interface Session {
21+
id: string;
22+
initialized: boolean;
23+
}
24+
25+
interface InFlightRequest {
26+
streamWriter: (event: string, data: string) => void;
27+
sessionId: string;
28+
}
29+
30+
// ---------------------------------------------------------------------------
31+
// State
32+
// ---------------------------------------------------------------------------
33+
34+
const sessions = new Map<string, Session>();
35+
36+
// Track in-flight requests so we can send notifications (progress, logging)
37+
// back on the correct SSE stream while a tool is still executing.
38+
//
39+
// Key: JSON-RPC request id (number | string)
40+
// Value: the SSE writer + session context for that request
41+
const inFlightRequests = new Map<number | string, InFlightRequest>();
42+
43+
// ---------------------------------------------------------------------------
44+
// Tool implementations
45+
// ---------------------------------------------------------------------------
46+
47+
const TEST_IMAGE_BASE64 =
48+
'iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8DwHwAFBQIAX8jx0gAAAABJRU5ErkJggg==';
49+
50+
async function handleToolCall(
51+
toolName: string,
52+
_args: Record<string, unknown>,
53+
meta: Record<string, unknown> | undefined,
54+
requestId: number | string
55+
): Promise<object> {
56+
switch (toolName) {
57+
case 'test_simple_text':
58+
return {
59+
content: [
60+
{ type: 'text', text: 'This is a simple text response for testing.' }
61+
]
62+
};
63+
64+
case 'test_image_content':
65+
return {
66+
content: [
67+
{ type: 'image', data: TEST_IMAGE_BASE64, mimeType: 'image/png' }
68+
]
69+
};
70+
71+
case 'test_tool_with_progress': {
72+
const progressToken = meta?.progressToken ?? 0;
73+
74+
for (let i = 0; i <= 2; i++) {
75+
// Look up the stream for this request on each iteration
76+
const entry = inFlightRequests.get(requestId);
77+
if (entry) {
78+
const notification = {
79+
jsonrpc: '2.0',
80+
method: 'notifications/progress',
81+
params: {
82+
progressToken,
83+
progress: i * 50,
84+
total: 100
85+
}
86+
};
87+
entry.streamWriter('message', JSON.stringify(notification));
88+
}
89+
await new Promise((r) => setTimeout(r, 50));
90+
}
91+
92+
return {
93+
content: [{ type: 'text', text: String(progressToken) }]
94+
};
95+
}
96+
97+
default:
98+
throw Object.assign(new Error(`Unknown tool: ${toolName}`), {
99+
code: -32601
100+
});
101+
}
102+
}
103+
104+
const TOOLS = [
105+
{
106+
name: 'test_simple_text',
107+
description: 'Tests simple text content response',
108+
inputSchema: { type: 'object', properties: {} }
109+
},
110+
{
111+
name: 'test_image_content',
112+
description: 'Tests image content response',
113+
inputSchema: { type: 'object', properties: {} }
114+
},
115+
{
116+
name: 'test_tool_with_progress',
117+
description: 'Tests tool that reports progress notifications',
118+
inputSchema: { type: 'object', properties: {} }
119+
}
120+
];
121+
122+
// ---------------------------------------------------------------------------
123+
// JSON-RPC message handling
124+
// ---------------------------------------------------------------------------
125+
126+
function handleMessage(
127+
message: any,
128+
session: Session,
129+
sseWriter: (event: string, data: string) => void
130+
): Promise<void> | undefined {
131+
// Notification (no id) — fire and forget
132+
if (message.id === undefined) {
133+
if (message.method === 'notifications/initialized') {
134+
session.initialized = true;
135+
}
136+
return;
137+
}
138+
139+
const requestId = message.id;
140+
141+
switch (message.method) {
142+
case 'initialize': {
143+
const response = {
144+
jsonrpc: '2.0',
145+
id: requestId,
146+
result: {
147+
protocolVersion: '2025-11-25',
148+
capabilities: { tools: {}, logging: {} },
149+
serverInfo: { name: 'naive-shared-state-server', version: '1.0.0' }
150+
}
151+
};
152+
sseWriter('message', JSON.stringify(response));
153+
return;
154+
}
155+
156+
case 'tools/list': {
157+
sseWriter(
158+
'message',
159+
JSON.stringify({
160+
jsonrpc: '2.0',
161+
id: requestId,
162+
result: { tools: TOOLS }
163+
})
164+
);
165+
return;
166+
}
167+
168+
case 'tools/call': {
169+
const toolName = message.params?.name;
170+
const toolArgs = message.params?.arguments ?? {};
171+
const meta = message.params?._meta;
172+
173+
// Register this request so in-flight notifications can find its stream
174+
inFlightRequests.set(requestId, {
175+
streamWriter: sseWriter,
176+
sessionId: session.id
177+
});
178+
179+
// Return a promise that resolves when the tool handler is done.
180+
// The caller uses this to know when to close the SSE stream.
181+
return handleToolCall(toolName, toolArgs, meta, requestId)
182+
.then((result) => {
183+
// Send final response directly on this request's own sseWriter,
184+
// NOT through the inFlightRequests map (final response is always
185+
// routed correctly since we captured sseWriter at request time)
186+
sseWriter(
187+
'message',
188+
JSON.stringify({ jsonrpc: '2.0', id: requestId, result })
189+
);
190+
})
191+
.catch((err) => {
192+
sseWriter(
193+
'message',
194+
JSON.stringify({
195+
jsonrpc: '2.0',
196+
id: requestId,
197+
error: {
198+
code: err.code ?? -32603,
199+
message: err.message ?? 'Internal error'
200+
}
201+
})
202+
);
203+
})
204+
.finally(() => {
205+
inFlightRequests.delete(requestId);
206+
});
207+
}
208+
209+
case 'ping': {
210+
sseWriter(
211+
'message',
212+
JSON.stringify({ jsonrpc: '2.0', id: requestId, result: {} })
213+
);
214+
return;
215+
}
216+
217+
default: {
218+
sseWriter(
219+
'message',
220+
JSON.stringify({
221+
jsonrpc: '2.0',
222+
id: requestId,
223+
error: {
224+
code: -32601,
225+
message: `Method not found: ${message.method}`
226+
}
227+
})
228+
);
229+
}
230+
}
231+
}
232+
233+
// ---------------------------------------------------------------------------
234+
// Express app
235+
// ---------------------------------------------------------------------------
236+
237+
const app = express();
238+
app.use(express.json());
239+
240+
app.post('/mcp', async (req, res) => {
241+
const sessionIdHeader = req.headers['mcp-session-id'] as string | undefined;
242+
const body = req.body;
243+
244+
// Resolve or create session
245+
let session: Session;
246+
247+
if (sessionIdHeader && sessions.has(sessionIdHeader)) {
248+
session = sessions.get(sessionIdHeader)!;
249+
} else if (!sessionIdHeader && body?.method === 'initialize') {
250+
session = { id: randomUUID(), initialized: false };
251+
sessions.set(session.id, session);
252+
} else {
253+
res.status(400).json({
254+
jsonrpc: '2.0',
255+
error: { code: -32000, message: 'Invalid or missing session ID' },
256+
id: body?.id ?? null
257+
});
258+
return;
259+
}
260+
261+
// Set up SSE response
262+
res.writeHead(200, {
263+
'Content-Type': 'text/event-stream',
264+
'Cache-Control': 'no-cache',
265+
Connection: 'keep-alive',
266+
'Mcp-Session-Id': session.id
267+
});
268+
269+
const sseWriter = (event: string, data: string) => {
270+
if (!res.writableEnded) {
271+
res.write(`event: ${event}\ndata: ${data}\n\n`);
272+
}
273+
};
274+
275+
// Handle the message(s)
276+
const messages = Array.isArray(body) ? body : [body];
277+
const promises: Promise<void>[] = [];
278+
279+
for (const message of messages) {
280+
const p = handleMessage(message, session, sseWriter);
281+
if (p) promises.push(p);
282+
}
283+
284+
if (promises.length > 0) {
285+
// Wait for all async handlers (tool calls) to complete
286+
await Promise.all(promises);
287+
}
288+
289+
res.end();
290+
});
291+
292+
const PORT = parseInt(process.env.PORT || '3007', 10);
293+
app.listen(PORT, '127.0.0.1', () => {
294+
console.log(`MCP server running on http://localhost:${PORT}/mcp`);
295+
});

0 commit comments

Comments
 (0)