Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 64 additions & 8 deletions serve-mcp/src/main/java/build/serve/mcp/McpServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,20 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Pattern;

/**
* An MCP (Model Context Protocol) server that exposes tools via JSON-RPC 2.0 over HTTP.
*
* <p><strong>Authentication:</strong> {@code McpServer} has no built-in authentication.
* Any deployment reachable by untrusted clients must be wrapped with an authentication
* layer — for example {@code serve-auth}'s {@code AuthMiddleware} — before the handler
* returned by {@link #handler()} is registered. Standalone use without such a wrapper
* is an open server.
*
* @author reed.vonredwitz
* @since Mar-2026
*/
Expand Down Expand Up @@ -93,6 +100,8 @@ private McpServer(final Builder builder) {
this.maxSessions = builder.maxSessions;
this.maxSubscriptionsPerSession = builder.maxSubscriptionsPerSession;

startSessionReaper(builder.sessionIdleTimeout);

final var toolMap = new LinkedHashMap<String, McpTool>();
for (final var tool : builder.tools) {
toolMap.put(tool.name(), tool);
Expand All @@ -112,9 +121,25 @@ private McpServer(final Builder builder) {
this.templates = List.copyOf(templateList);
}

private void startSessionReaper(final Duration idleTimeout) {
final long intervalMs = Math.min(idleTimeout.toMillis() / 2, Duration.ofMinutes(5).toMillis());
Thread.ofVirtual().name("mcp-session-reaper").start(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(intervalMs);
} catch (final InterruptedException e) {
break;
}
final var cutoff = Instant.now().minus(idleTimeout);
sessions.entrySet().removeIf(entry -> entry.getValue().lastAccessed.isBefore(cutoff));
}
});
}

private static final class SessionState {
final java.util.Set<String> subscriptions = ConcurrentHashMap.newKeySet();
volatile SseEmitter emitter;
final Set<String> subscriptions = ConcurrentHashMap.newKeySet();
final AtomicReference<SseEmitter> emitter = new AtomicReference<>();
volatile Instant lastAccessed = Instant.now();
}

private record TemplateEntry(McpResourceTemplate template, Pattern pattern) {
Expand Down Expand Up @@ -261,6 +286,11 @@ public void stdioLoop(final InputStream in,

private Optional<JsonObject> dispatch(final JsonObject request,
final String sessionId) {
final var state = sessions.get(sessionId);
if (state != null) {
state.lastAccessed = Instant.now();
}

final var rpcMethod = getString(request, "method");
final var id = request.members().get("id");

Expand Down Expand Up @@ -390,7 +420,7 @@ private JsonObject handleToolsCall(final JsonValue params, final String sessionI

final var tool = tools.get(toolName);
if (tool == null) {
return errorEnvelope(id, -32602, "Unknown tool: " + toolName);
return errorEnvelope(id, -32602, "Unknown tool: " + sanitizeMessage(toolName));
}

final var invocationId = invocationCounter.incrementAndGet();
Expand Down Expand Up @@ -420,13 +450,17 @@ private void handleSseConnection(final Exchange exchange) throws Exception {
return;
}
SseUpgrade.sse(emitter -> {
state.emitter = emitter;
state.lastAccessed = Instant.now();
final var prev = state.emitter.getAndSet(emitter);
if (prev != null) {
prev.close();
}
try {
emitter.awaitClose();
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
state.emitter = null;
state.emitter.compareAndSet(emitter, null);
}
}).handle(exchange);
}
Expand All @@ -448,7 +482,7 @@ public void notifyResourceChanged(final String uri) {

for (final var state : sessions.values()) {
if (state.subscriptions.contains(uri)) {
final var emitter = state.emitter;
final var emitter = state.emitter.get();
if (emitter != null && emitter.isOpen()) {
try {
emitter.send(event);
Expand Down Expand Up @@ -543,8 +577,12 @@ private JsonObject handleResourcesSubscribe(final JsonValue params, final String
final var paramsObj = params instanceof JsonObject p ? p : JsonObject.builder().build();
final var uri = getString(paramsObj, "uri");
final var state = sessions.get(sessionId);
if (state != null && state.subscriptions.size() < maxSubscriptionsPerSession) {
state.subscriptions.add(uri);
if (state != null) {
synchronized (state.subscriptions) {
if (state.subscriptions.size() < maxSubscriptionsPerSession) {
state.subscriptions.add(uri);
}
}
}
return envelope(id, JsonObject.builder().build());
}
Expand Down Expand Up @@ -671,6 +709,7 @@ public static final class Builder {
private final HashSet<String> allowedOrigins = new HashSet<>();
private int maxSessions = 10_000;
private int maxSubscriptionsPerSession = 1_000;
private Duration sessionIdleTimeout = Duration.ofHours(1);

private Builder(final String name, final String version) {
this.name = name;
Expand Down Expand Up @@ -751,6 +790,23 @@ public Builder maxSessions(final int maxSessions) {
return this;
}

/**
* Sets the idle timeout after which inactive sessions are reaped (default 1 hour).
* A background daemon thread sweeps expired sessions at half the idle timeout interval,
* capped at 5 minutes. Activity on a session (any JSON-RPC request or SSE connection)
* resets the idle clock.
*
* @param idleTimeout the idle duration; must be at least 1 minute
* @return this builder
*/
public Builder sessionIdleTimeout(final Duration idleTimeout) {
if (idleTimeout == null || idleTimeout.isNegative() || idleTimeout.isZero()) {
throw new IllegalArgumentException("sessionIdleTimeout must be positive");
}
this.sessionIdleTimeout = idleTimeout;
return this;
}

/**
* Sets the maximum number of resource subscriptions per session (default 1,000).
* Subscribe requests beyond the cap are silently ignored.
Expand Down
46 changes: 46 additions & 0 deletions serve-mcp/src/test/java/build/serve/mcp/McpServerTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.io.InputStreamReader;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -898,6 +899,51 @@ public McpToolResult call(final JsonValue arguments) throws Exception {
}
}

@Test
void shouldEvictIdleSessionAfterTimeout() throws InterruptedException {
final var mcp = McpServer.builder("s", "1")
.maxSessions(1)
.sessionIdleTimeout(Duration.ofMillis(200))
.build();
final var srv = TestServer.of(RouterBuilder.create().route("/mcp", mcp.handler()).build());
try (srv) {
srv.post("/mcp")
.header("Content-Type", "application/json")
.body("{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"initialize\",\"params\":{}}")
.send().assertStatus(200);

srv.post("/mcp")
.header("Content-Type", "application/json")
.body("{\"jsonrpc\":\"2.0\",\"id\":2,\"method\":\"initialize\",\"params\":{}}")
.send().assertStatus(503);

Thread.sleep(500); // wait for reaper (sweeps at 100ms, idle timeout 200ms)

assertThat(srv.post("/mcp")
.header("Content-Type", "application/json")
.body("{\"jsonrpc\":\"2.0\",\"id\":3,\"method\":\"initialize\",\"params\":{}}")
.send().status()).isEqualTo(200);
}
}

@Test
void shouldCloseOldSseEmitterOnReconnect() throws InterruptedException {
final var mcpServer = McpServer.builder("sse-server", "1.0.0").build();
try (final var sseClient = McpTestClient.start(mcpServer)) {
sseClient.initialize();

final var stream1 = sseClient.sseStream();
Thread.sleep(100); // let stream1 establish on the server

try (final var ignored = sseClient.sseStream()) {
// stream2 connecting causes the server to close stream1's emitter;
// collectAll waits for the done future, which completes when the reader exits
final var events = stream1.collectAll(Duration.ofSeconds(3));
assertThat(events).isEmpty();
}
}
}

/**
* Sends a raw HTTP POST to /mcp using a plain socket, bypassing Java's restricted-header
* filtering so that headers like Origin can be set freely.
Expand Down