Skip to content

Commit 36c980a

Browse files
authored
fix: mcp memory leak error (#453)
<!-- Thank you for your pull request. Please review below requirements. Bug fixes and new features should include tests and possibly benchmarks. Contributors guide: https://github.com/eggjs/egg/blob/master/CONTRIBUTING.md 感谢您贡献代码。请确认下列 checklist 的完成情况。 Bug 修复和新功能必须包含测试,必要时请附上性能测试。 Contributors guide: https://github.com/eggjs/egg/blob/master/CONTRIBUTING.md --> ##### Checklist <!-- Remove items that do not apply. For completed items, change [ ] to [x]. --> - [ ] `npm test` passes - [ ] tests and/or benchmarks are included - [ ] documentation is changed or added - [ ] commit message follows commit guidelines ##### Affected core subsystem(s) <!-- Provide affected core subsystem(s). --> ##### Description of change <!-- Provide a description of the change below this comment. --> <!-- - any feature? - close https://github.com/eggjs/egg/ISSUE_URL --> <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **Bug Fixes** * Prevented duplicate hook registrations in controller management * Enhanced stream and SSE session cleanup with improved resource deallocation during application shutdown * **Tests** * Added test coverage for SSE middleware execution flow <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent f474495 commit 36c980a

5 files changed

Lines changed: 95 additions & 32 deletions

File tree

plugin/controller/lib/impl/mcp/MCPControllerRegister.ts

Lines changed: 60 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,16 @@ export class MCPControllerRegister implements ControllerRegister {
134134
}
135135

136136
static addHook(hook: MCPControllerHook) {
137-
MCPControllerRegister.hooks.push(hook);
137+
if (!MCPControllerRegister.hooks.includes(hook)) {
138+
MCPControllerRegister.hooks.push(hook);
139+
}
140+
}
141+
142+
static deleteHook(hook: MCPControllerHook) {
143+
const index = MCPControllerRegister.hooks.indexOf(hook);
144+
if (index >= 0) {
145+
MCPControllerRegister.hooks.splice(index, 1);
146+
}
138147
}
139148

140149
// eslint-disable-next-line @typescript-eslint/no-unused-vars
@@ -148,6 +157,7 @@ export class MCPControllerRegister implements ControllerRegister {
148157
this.instance.controllerProtos = [];
149158
}
150159
this.instance = undefined;
160+
this.hooks = [];
151161
}
152162

153163
mcpStatelessStreamServerInit(name?: string) {
@@ -319,6 +329,7 @@ export class MCPControllerRegister implements ControllerRegister {
319329
prompt.meta,
320330
);
321331
}
332+
let streamSessionId: string | undefined;
322333
const transport = new StreamableHTTPServerTransport({
323334
sessionIdGenerator: () =>
324335
this.mcpConfig.getSessionIdGenerator(name)(ctx),
@@ -327,6 +338,7 @@ export class MCPControllerRegister implements ControllerRegister {
327338
self.clearStreamMcpServer(sessionId);
328339
},
329340
onsessioninitialized: async sessionId => {
341+
streamSessionId = sessionId;
330342
self.streamTransports[sessionId] = transport;
331343
if (MCPControllerRegister.hooks.length > 0) {
332344
for (const hook of MCPControllerRegister.hooks) {
@@ -350,9 +362,15 @@ export class MCPControllerRegister implements ControllerRegister {
350362

351363
await mcpServerHelper.server.connect(transport);
352364

365+
const closeFunc = transport.onclose;
353366
transport.onclose = async () => {
354-
if (transport.sessionId) {
355-
self.clearStreamMcpServer(transport.sessionId);
367+
try {
368+
await closeFunc?.();
369+
} finally {
370+
const sessionId = transport.sessionId ?? streamSessionId;
371+
if (sessionId) {
372+
self.clearStreamMcpServer(sessionId);
373+
}
356374
}
357375
};
358376

@@ -564,25 +582,48 @@ export class MCPControllerRegister implements ControllerRegister {
564582
'content-type': 'application/json',
565583
};
566584
const newCtx = self.app.createContext(req, res) as unknown as Context;
567-
await ctx.app.ctxStorage.run(newCtx, async () => {
568-
await mw(newCtx, async () => {
569-
if (MCPControllerRegister.hooks.length > 0) {
570-
for (const hook of MCPControllerRegister.hooks) {
571-
await hook.preHandle?.(newCtx);
585+
try {
586+
await ctx.app.ctxStorage.run(newCtx, async () => {
587+
await mw(newCtx, async () => {
588+
if (MCPControllerRegister.hooks.length > 0) {
589+
for (const hook of MCPControllerRegister.hooks) {
590+
await hook.preHandle?.(newCtx);
591+
}
572592
}
573-
}
574-
messageFunc!(message, extra);
575-
if (isJSONRPCRequest(args[0])) {
576-
const map = self.sseTransportsRequestMap.get(transport)!;
577-
const wait = new Promise<null>((resolve, reject) => {
578-
if (extra && 'id' in extra) {
579-
map[extra.id as string] = { resolve, reject };
593+
if (!messageFunc) {
594+
return;
595+
}
596+
597+
const map = self.sseTransportsRequestMap.get(transport);
598+
const requestId = isJSONRPCRequest(args[0]) ? String(args[0].id) : undefined;
599+
let wait: Promise<null> | undefined;
600+
if (map && requestId !== undefined) {
601+
wait = new Promise<null>((resolve, reject) => {
602+
map[requestId] = { resolve, reject };
603+
});
604+
}
605+
606+
try {
607+
await messageFunc(message, extra);
608+
if (wait) {
609+
await wait;
580610
}
581-
});
582-
await wait;
583-
}
611+
} finally {
612+
if (map && requestId !== undefined) {
613+
delete map[requestId];
614+
}
615+
}
616+
});
584617
});
585-
});
618+
} finally {
619+
if (!res.destroyed) {
620+
res.destroy();
621+
}
622+
if (!req.destroyed) {
623+
req.destroy();
624+
}
625+
socket.destroy();
626+
}
586627
};
587628
}
588629

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
11
'use strict';
22

3-
module.exports = () => {
3+
module.exports = (_options, app) => {
44
return async function tracelog(ctx, next) {
55
ctx.req.headers.trace = 'middleware';
66
ctx.req.rawHeaders.push('trace');
77
ctx.req.rawHeaders.push('middleware');
8-
await next();
8+
try {
9+
await next();
10+
} finally {
11+
if (ctx.path === '/mcp/sse' || ctx.path === '/mcp/test/sse') {
12+
app.mcpSseSyntheticMiddlewareEndCount = (app.mcpSseSyntheticMiddlewareEndCount || 0) + 1;
13+
}
14+
}
915
};
1016
};

plugin/controller/test/mcp/mcp.test.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,17 @@ async function startNotificationTool(client: Client, name?: string) {
5454
return notifications;
5555
}
5656

57+
async function waitForSseSyntheticMiddlewareEnd(app: { mcpSseSyntheticMiddlewareEndCount?: number }, previousCount: number) {
58+
const start = Date.now();
59+
while (Date.now() - start < 2000) {
60+
if ((app.mcpSseSyntheticMiddlewareEndCount ?? 0) > previousCount) {
61+
return;
62+
}
63+
await new Promise(resolve => setTimeout(resolve, 50));
64+
}
65+
assert.fail('SSE synthetic context middleware should finish after request response is sent');
66+
}
67+
5768
describe('plugin/controller/test/mcp/mcp.test.ts', () => {
5869

5970

@@ -154,12 +165,14 @@ describe('plugin/controller/test/mcp/mcp.test.ts', () => {
154165
},
155166
]);
156167

168+
const syntheticMiddlewareEndCount = app.mcpSseSyntheticMiddlewareEndCount ?? 0;
157169
const toolRes = await sseClient.callTool({
158170
name: 'bar',
159171
arguments: {
160172
name: 'aaa',
161173
},
162174
});
175+
await waitForSseSyntheticMiddlewareEnd(app, syntheticMiddlewareEndCount);
163176
assert.deepEqual(toolRes, {
164177
content: [{ type: 'text', text: 'npm package: aaa not found' }],
165178
});

plugin/mcp-proxy/app.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ export default class AppHook {
2020
}
2121

2222
async beforeClose() {
23+
MCPControllerRegister.deleteHook(MCPProxyHook);
2324
if (this.agent.mcpProxy) {
2425
await (this.agent.mcpProxy as any).close();
2526
}

plugin/mcp-proxy/index.ts

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -123,13 +123,10 @@ export const MCPProxyHook: MCPControllerHook = {
123123
}
124124
});
125125
ctx.res.once('close', () => {
126-
delete self.transports[id];
127-
const connection = self.sseConnections.get(id);
128-
if (connection) {
129-
clearInterval(connection.intervalId);
130-
self.sseConnections.delete(id);
131-
}
132-
self.app.mcpProxy.unregisterClient(id);
126+
self.clearSseMcpServer(transport)
127+
.catch(error => self.app.logger.error('[mcp-proxy] clear SSE MCP server failed: %s', error.message));
128+
self.app.mcpProxy.unregisterClient(id)
129+
.catch(error => self.app.logger.error('[mcp-proxy] unregister SSE client failed: %s', error.message));
133130
});
134131
},
135132
async onStreamSessionInitialized(_ctx, transport, _server, self) {
@@ -195,12 +192,17 @@ export const MCPProxyHook: MCPControllerHook = {
195192
}
196193
});
197194
await self.app.mcpProxy.registerClient(sessionId, process.pid);
195+
const closeFunc = transport.onclose;
198196
transport.onclose = async () => {
199-
const sid = transport.sessionId;
200-
if (sid && self.streamTransports[sid]) {
201-
delete self.streamTransports[sid];
197+
const sid = transport.sessionId ?? sessionId;
198+
try {
199+
await closeFunc?.();
200+
} finally {
201+
if (sid) {
202+
self.clearStreamMcpServer(sid);
203+
await self.app.mcpProxy.unregisterClient(sid);
204+
}
202205
}
203-
await self.app.mcpProxy.unregisterClient(sid!);
204206
};
205207
},
206208
async checkAndRunProxy(ctx, type, sessionId) {

0 commit comments

Comments
 (0)