Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,7 @@ tmp/
temp/

# pnpm (lock file is tracked for reproducible Docker builds)


# mcp-publisher login info
.mcpregistry*
158 changes: 41 additions & 117 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import express from 'express';
import cors from 'cors';
import dotenv from 'dotenv';
import { randomUUID } from 'crypto';
import { StorageFactory, StorageType, StorageOptions } from './storage/index.js';
import { ContextService } from './context/service.js';
import { createRoutes } from './api/routes.js';
Expand Down Expand Up @@ -120,7 +119,6 @@ async function startPublicServer() {
const { StreamableHTTPServerTransport } = await import(
'@modelcontextprotocol/sdk/server/streamableHttp.js'
);
const { isInitializeRequest } = await import('@modelcontextprotocol/sdk/types.js');
const rateLimit = (await import('express-rate-limit')).default;

// Always use in-memory storage for public mode (read-only, no Redis needed)
Expand All @@ -146,7 +144,7 @@ async function startPublicServer() {
}

// Security headers
app.use((req, res, next) => {
app.use((_req, res, next) => {
res.setHeader('X-Content-Type-Options', 'nosniff');
res.setHeader('X-Frame-Options', 'DENY');
res.setHeader('X-XSS-Protection', '0');
Expand Down Expand Up @@ -200,125 +198,66 @@ async function startPublicServer() {
message: { error: 'Too many requests, please try again later' },
});

// Session tracking: sessionId -> transport
const sessions = new Map<string, InstanceType<typeof StreamableHTTPServerTransport>>();

// Session cleanup interval (30 min TTL)
const SESSION_TTL_MS = 30 * 60 * 1000;
const sessionTimestamps = new Map<string, number>();

const cleanupInterval = setInterval(() => {
const now = Date.now();
for (const [sessionId, lastSeen] of sessionTimestamps.entries()) {
if (now - lastSeen > SESSION_TTL_MS) {
const transport = sessions.get(sessionId);
if (transport) {
transport.close().catch(() => {});
sessions.delete(sessionId);
}
sessionTimestamps.delete(sessionId);
console.error(`[Public] Cleaned up stale session: ${sessionId}`);
}
}
}, 60 * 1000);

// MCP endpoint — Streamable HTTP
app.all('/mcp', mcpLimiter, async (req, res) => {
const sessionId = req.headers['mcp-session-id'] as string | undefined;

// MCP endpoint — Stateless Streamable HTTP
// Each request creates its own transport+server and is fully self-contained.
// No session tracking needed — eliminates "Session not found" errors after
// container restarts, Watchtower redeployments, or TTL expiry.
app.post('/mcp', mcpLimiter, async (req, res) => {
try {
// Handle new session (initialization)
if (req.method === 'POST') {
const body = req.body;

if (!sessionId) {
// Check if this is an initialize request
if (isInitializeRequest(body)) {
// Create a new transport for this session
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
onsessioninitialized: (sid: string) => {
sessions.set(sid, transport);
sessionTimestamps.set(sid, Date.now());
console.error(`[Public] New MCP session: ${sid}`);
},
});

// Create a public-profile MCP server and connect the transport
const chainClient = createChainClient();
const mcpServer = new KleverMCPServer(contextService, 'public', chainClient);
await mcpServer.connectTransport(transport);

// Handle the request
await transport.handleRequest(req, res, body);
return;
}

// Non-init request without session ID
res.status(400).json({ error: 'Missing mcp-session-id header' });
return;
}

// Existing session
const transport = sessions.get(sessionId);
if (!transport) {
res.status(404).json({ error: 'Session not found' });
return;
}
sessionTimestamps.set(sessionId, Date.now());
await transport.handleRequest(req, res, body);
return;
}
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: undefined,
});

// Handle GET (SSE stream) and DELETE (session termination)
if (req.method === 'GET' || req.method === 'DELETE') {
if (!sessionId) {
res.status(400).json({ error: 'Missing mcp-session-id header' });
return;
}

const transport = sessions.get(sessionId);
if (!transport) {
res.status(404).json({ error: 'Session not found' });
return;
}
sessionTimestamps.set(sessionId, Date.now());

if (req.method === 'DELETE') {
await transport.handleRequest(req, res);
sessions.delete(sessionId);
sessionTimestamps.delete(sessionId);
console.error(`[Public] Session terminated: ${sessionId}`);
return;
}

await transport.handleRequest(req, res);
return;
}
const chainClient = createChainClient();
const mcpServer = new KleverMCPServer(contextService, 'public', chainClient);
await mcpServer.connectTransport(transport);

// Unsupported method
res.status(405).json({ error: 'Method not allowed' });
await transport.handleRequest(req, res, req.body);

res.on('close', () => {
transport.close().catch(() => {});
});
} catch (error) {
console.error('[Public] MCP endpoint error:', error);
if (!res.headersSent) {
res.status(500).json({ error: 'Internal server error' });
res.status(500).json({
jsonrpc: '2.0',
error: { code: -32603, message: 'Internal server error' },
id: null,
});
}
}
});

// GET and DELETE are not applicable in stateless mode
app.get('/mcp', (_req, res) => {
res.status(405).json({
jsonrpc: '2.0',
error: { code: -32000, message: 'Method not allowed. Use POST for stateless requests.' },
id: null,
});
});

app.delete('/mcp', (_req, res) => {
res.status(405).json({
jsonrpc: '2.0',
error: { code: -32000, message: 'Method not allowed. No sessions to terminate in stateless mode.' },
id: null,
});
});

// Read-only API routes
app.use('/api', apiLimiter, createRoutes(contextService, { readOnly: true }));

// Health endpoint at root level
app.get('/health', async (req, res) => {
app.get('/health', async (_req, res) => {
try {
const allContexts = await contextService.query({ limit: 1, offset: 0 });
res.json({
status: 'ok',
timestamp: new Date().toISOString(),
mode: 'public',
uptime: process.uptime(),
activeSessions: sessions.size,
...getVersionInfo(),
knowledgeBase: {
totalContexts: allContexts.total,
Expand Down Expand Up @@ -355,24 +294,9 @@ async function startPublicServer() {
});

// Graceful shutdown
const shutdown = async () => {
const shutdown = () => {
console.error('\n[Public] Shutting down gracefully...');
clearInterval(cleanupInterval);

// Stop accepting new connections
server.close();

// Close all active sessions
for (const [sessionId, transport] of sessions.entries()) {
try {
await transport.close();
} catch {
// Ignore close errors during shutdown
}
sessions.delete(sessionId);
sessionTimestamps.delete(sessionId);
}

process.exit(0);
};

Expand Down