Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,7 @@ tmp/
temp/

# pnpm (lock file is tracked for reproducible Docker builds)


# mcp-publisher login info
.mcpregistry*
163 changes: 44 additions & 119 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import express from 'express';
import cors from 'cors';
import dotenv from 'dotenv';
import { randomUUID } from 'crypto';
import { StorageFactory, StorageType, StorageOptions } from './storage/index.js';
import { ContextService } from './context/service.js';
import { createRoutes } from './api/routes.js';
Expand Down Expand Up @@ -120,7 +119,6 @@ async function startPublicServer() {
const { StreamableHTTPServerTransport } = await import(
'@modelcontextprotocol/sdk/server/streamableHttp.js'
);
const { isInitializeRequest } = await import('@modelcontextprotocol/sdk/types.js');
const rateLimit = (await import('express-rate-limit')).default;

// Always use in-memory storage for public mode (read-only, no Redis needed)
Expand All @@ -146,7 +144,7 @@ async function startPublicServer() {
}

// Security headers
app.use((req, res, next) => {
app.use((_req, res, next) => {
res.setHeader('X-Content-Type-Options', 'nosniff');
res.setHeader('X-Frame-Options', 'DENY');
res.setHeader('X-XSS-Protection', '0');
Expand Down Expand Up @@ -200,125 +198,66 @@ async function startPublicServer() {
message: { error: 'Too many requests, please try again later' },
});

// Session tracking: sessionId -> transport
const sessions = new Map<string, InstanceType<typeof StreamableHTTPServerTransport>>();

// Session cleanup interval (30 min TTL)
const SESSION_TTL_MS = 30 * 60 * 1000;
const sessionTimestamps = new Map<string, number>();

const cleanupInterval = setInterval(() => {
const now = Date.now();
for (const [sessionId, lastSeen] of sessionTimestamps.entries()) {
if (now - lastSeen > SESSION_TTL_MS) {
const transport = sessions.get(sessionId);
if (transport) {
transport.close().catch(() => {});
sessions.delete(sessionId);
}
sessionTimestamps.delete(sessionId);
console.error(`[Public] Cleaned up stale session: ${sessionId}`);
}
}
}, 60 * 1000);

// MCP endpoint — Streamable HTTP
app.all('/mcp', mcpLimiter, async (req, res) => {
const sessionId = req.headers['mcp-session-id'] as string | undefined;

// MCP endpoint — Stateless Streamable HTTP
// Each request creates its own transport+server and is fully self-contained.
// No session tracking needed — eliminates "Session not found" errors after
// container restarts, Watchtower redeployments, or TTL expiry.
app.post('/mcp', mcpLimiter, async (req, res) => {
try {
// Handle new session (initialization)
if (req.method === 'POST') {
const body = req.body;

if (!sessionId) {
// Check if this is an initialize request
if (isInitializeRequest(body)) {
// Create a new transport for this session
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
onsessioninitialized: (sid: string) => {
sessions.set(sid, transport);
sessionTimestamps.set(sid, Date.now());
console.error(`[Public] New MCP session: ${sid}`);
},
});

// Create a public-profile MCP server and connect the transport
const chainClient = createChainClient();
const mcpServer = new KleverMCPServer(contextService, 'public', chainClient);
await mcpServer.connectTransport(transport);

// Handle the request
await transport.handleRequest(req, res, body);
return;
}

// Non-init request without session ID
res.status(400).json({ error: 'Missing mcp-session-id header' });
return;
}

// Existing session
const transport = sessions.get(sessionId);
if (!transport) {
res.status(404).json({ error: 'Session not found' });
return;
}
sessionTimestamps.set(sessionId, Date.now());
await transport.handleRequest(req, res, body);
return;
}
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: undefined,
});

// Handle GET (SSE stream) and DELETE (session termination)
if (req.method === 'GET' || req.method === 'DELETE') {
if (!sessionId) {
res.status(400).json({ error: 'Missing mcp-session-id header' });
return;
}

const transport = sessions.get(sessionId);
if (!transport) {
res.status(404).json({ error: 'Session not found' });
return;
}
sessionTimestamps.set(sessionId, Date.now());

if (req.method === 'DELETE') {
await transport.handleRequest(req, res);
sessions.delete(sessionId);
sessionTimestamps.delete(sessionId);
console.error(`[Public] Session terminated: ${sessionId}`);
return;
}

await transport.handleRequest(req, res);
return;
}
res.on('close', () => {
transport.close().catch(() => {});
});

const chainClient = createChainClient();
const mcpServer = new KleverMCPServer(contextService, 'public', chainClient);
await mcpServer.connectTransport(transport);

// Unsupported method
res.status(405).json({ error: 'Method not allowed' });
await transport.handleRequest(req, res, req.body);
} catch (error) {
console.error('[Public] MCP endpoint error:', error);
if (!res.headersSent) {
res.status(500).json({ error: 'Internal server error' });
res.status(500).json({
jsonrpc: '2.0',
error: { code: -32603, message: 'Internal server error' },
id: null,
});
}
}
});

// GET and DELETE are not applicable in stateless mode
app.get('/mcp', mcpLimiter, (_req, res) => {
res.status(405).json({
jsonrpc: '2.0',
error: { code: -32000, message: 'Method not allowed. Use POST for stateless requests.' },
id: null,
});
});

app.delete('/mcp', mcpLimiter, (_req, res) => {
res.status(405).json({
jsonrpc: '2.0',
error: { code: -32000, message: 'Method not allowed. No sessions to terminate in stateless mode.' },
id: null,
});
});

// Read-only API routes
app.use('/api', apiLimiter, createRoutes(contextService, { readOnly: true }));

// Health endpoint at root level
app.get('/health', async (req, res) => {
app.get('/health', async (_req, res) => {
try {
const allContexts = await contextService.query({ limit: 1, offset: 0 });
res.json({
status: 'ok',
timestamp: new Date().toISOString(),
mode: 'public',
uptime: process.uptime(),
activeSessions: sessions.size,
...getVersionInfo(),
knowledgeBase: {
totalContexts: allContexts.total,
Expand Down Expand Up @@ -355,25 +294,11 @@ async function startPublicServer() {
});

// Graceful shutdown
const shutdown = async () => {
const shutdown = () => {
console.error('\n[Public] Shutting down gracefully...');
clearInterval(cleanupInterval);

// Stop accepting new connections
server.close();

// Close all active sessions
for (const [sessionId, transport] of sessions.entries()) {
try {
await transport.close();
} catch {
// Ignore close errors during shutdown
}
sessions.delete(sessionId);
sessionTimestamps.delete(sessionId);
}

process.exit(0);
server.close(() => {
process.exit(0);
});
};

process.on('SIGINT', shutdown);
Expand Down