Skip to content

Commit 76f013a

Browse files
authored
feat: add streamable HTTP transport to MCP server (#1266)
* feat: add streamable HTTP transport to MCP server The MCP server only supported the legacy SSE transport (GET /sse + POST /message), deprecated in MCP spec 2025-03-26. Claude Code uses the newer streamable HTTP transport, causing connection failures. Add a /mcp endpoint implementing the streamable HTTP spec: - POST initialize creates a session with Mcp-Session-Id header - POST with session dispatches JSON-RPC and returns synchronous JSON - DELETE terminates a session - Legacy SSE endpoints preserved for backwards compatibility Both transports share one MCPServer instance via a new public Dispatch() method that wraps the existing handleRequest/handleNotification logic. * fix: address review feedback on streamable HTTP handler - Use mime.ParseMediaType for Content-Type check to accept "application/json; charset=utf-8" (CodeRabbit, claude[bot]) - Dispatch initialize before creating session to prevent orphans on failed initialize (CodeRabbit) - Add background session eviction goroutine (1h idle timeout) to clean up sessions from disconnected clients (claude[bot]) - Extract "initialize" string literal to constant (goconst lint) - Add Close() method and defer in main.go - Add tests: failed initialize creates no session, charset tolerance * fix: harden streamable HTTP handler (CodeRabbit review round 2) - Limit request body to 1 MB via io.LimitReader to prevent memory exhaustion from oversized payloads - Redact session IDs in log messages (show last 8 chars only) to reduce hijack risk if logs are exposed - Guard against nil dispatch response in initialize path --------- Co-authored-by: Ben Coombs <bjcoombs@users.noreply.github.com>
1 parent 7b965ea commit 76f013a

5 files changed

Lines changed: 582 additions & 11 deletions

File tree

deploy/demo/Caddyfile

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,11 @@ demo.meridianhub.cloud, *.demo.meridianhub.cloud {
3030
reverse_proxy dex:5556
3131
}
3232

33-
# MCP Server: SSE transport + message endpoint
33+
# MCP Server: streamable HTTP + legacy SSE transport
3434
# NOTE: Requires mcp-server container to be running; routes will 502 until deployed.
35+
handle /mcp {
36+
reverse_proxy mcp-server:8090
37+
}
3538
handle /sse {
3639
reverse_proxy mcp-server:8090
3740
}

services/mcp-server/cmd/main.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,11 @@ func runSSE(logger *slog.Logger, cfg server.Config) error {
154154

155155
srv := server.New(sseTr, cfg, logger)
156156

157+
// Streamable HTTP transport (MCP spec 2025-03-26).
158+
// Shares the same MCPServer instance so tools/resources/prompts are identical.
159+
streamableHandler := transport.NewStreamableHTTPHandler(srv, logger)
160+
defer streamableHandler.Close()
161+
157162
mux := http.NewServeMux()
158163

159164
// OAuth 2.1 endpoints (optional — enabled via MCP_OAUTH_ENABLED=true).
@@ -180,9 +185,11 @@ func runSSE(logger *slog.Logger, cfg server.Config) error {
180185
validator := &passthroughValidator{logger: logger}
181186
bearerMW := mcpauth.NewBearerMiddleware(validator, meta)
182187

188+
mux.Handle("/mcp", bearerMW.Handler(streamableHandler))
183189
mux.Handle("/sse", bearerMW.Handler(http.HandlerFunc(sseTr.HandleSSE)))
184190
mux.Handle("/message", bearerMW.Handler(http.HandlerFunc(sseTr.HandleMessage)))
185191
} else {
192+
mux.Handle("/mcp", streamableHandler)
186193
mux.HandleFunc("/sse", sseTr.HandleSSE)
187194
mux.HandleFunc("/message", sseTr.HandleMessage)
188195
}

services/mcp-server/internal/server/server.go

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,21 @@ func (s *MCPServer) RegisterTool(tool Tool, handler ToolHandler) {
166166
s.handlers[tool.Name] = handler
167167
}
168168

169+
// Dispatch handles a single JSON-RPC message and returns the response.
170+
// For notifications it returns nil (no response required by JSON-RPC spec).
171+
// For non-request/non-notification messages it returns an invalid-request error.
172+
// This method is safe to call from multiple goroutines concurrently.
173+
func (s *MCPServer) Dispatch(ctx context.Context, msg *transport.JSONRPCMessage) *transport.JSONRPCMessage {
174+
if msg.IsNotification() {
175+
s.handleNotification(msg)
176+
return nil
177+
}
178+
if !msg.IsRequest() {
179+
return transport.NewErrorResponse(msg.ID, transport.CodeInvalidRequest, "invalid message")
180+
}
181+
return s.handleRequest(ctx, msg)
182+
}
183+
169184
// Run starts the server's message processing loop. It blocks until the context
170185
// is cancelled or an unrecoverable error occurs.
171186
func (s *MCPServer) Run(ctx context.Context) error {
@@ -182,18 +197,11 @@ func (s *MCPServer) Run(ctx context.Context) error {
182197
return fmt.Errorf("read message: %w", err)
183198
}
184199

185-
if msg.IsNotification() {
186-
s.handleNotification(msg)
200+
resp := s.Dispatch(ctx, msg)
201+
if resp == nil {
187202
continue
188203
}
189-
190-
if !msg.IsRequest() {
191-
s.logger.Warn("ignoring non-request message")
192-
continue
193-
}
194-
195-
response := s.handleRequest(ctx, msg)
196-
if err := s.transport.WriteMessage(ctx, response); err != nil {
204+
if err := s.transport.WriteMessage(ctx, resp); err != nil {
197205
s.logger.Error("failed to write response", "error", err)
198206
}
199207
}
Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
package transport
2+
3+
import (
4+
"context"
5+
"crypto/rand"
6+
"encoding/hex"
7+
"encoding/json"
8+
"io"
9+
"log/slog"
10+
"mime"
11+
"net/http"
12+
"sync"
13+
"time"
14+
)
15+
16+
const (
17+
methodInitialize = "initialize"
18+
// sessionIdleTimeout is how long a session can be idle before eviction.
19+
sessionIdleTimeout = 1 * time.Hour
20+
// sessionEvictInterval is how often the background goroutine sweeps idle sessions.
21+
sessionEvictInterval = 5 * time.Minute
22+
// maxRequestBodySize is the maximum size of a JSON-RPC request body (1 MB).
23+
maxRequestBodySize = 1 << 20
24+
)
25+
26+
// Dispatcher handles a single JSON-RPC message and returns the response.
27+
// Notifications return nil (no response required).
28+
type Dispatcher interface {
29+
Dispatch(ctx context.Context, msg *JSONRPCMessage) *JSONRPCMessage
30+
}
31+
32+
// StreamableHTTPHandler implements the MCP streamable HTTP transport
33+
// (spec 2025-03-26). Clients POST JSON-RPC messages to a single endpoint
34+
// and receive synchronous JSON responses.
35+
type StreamableHTTPHandler struct {
36+
dispatcher Dispatcher
37+
sessions map[string]*streamSession
38+
mu sync.RWMutex
39+
logger *slog.Logger
40+
stop chan struct{}
41+
}
42+
43+
type streamSession struct {
44+
id string
45+
created time.Time
46+
lastUsed time.Time
47+
}
48+
49+
// NewStreamableHTTPHandler creates a handler for the MCP streamable HTTP transport.
50+
// Call Close to stop the background session eviction goroutine.
51+
func NewStreamableHTTPHandler(dispatcher Dispatcher, logger *slog.Logger) *StreamableHTTPHandler {
52+
h := &StreamableHTTPHandler{
53+
dispatcher: dispatcher,
54+
sessions: make(map[string]*streamSession),
55+
logger: logger,
56+
stop: make(chan struct{}),
57+
}
58+
go h.evictLoop()
59+
return h
60+
}
61+
62+
// Close stops the background session eviction goroutine.
63+
func (h *StreamableHTTPHandler) Close() {
64+
select {
65+
case <-h.stop:
66+
default:
67+
close(h.stop)
68+
}
69+
}
70+
71+
func (h *StreamableHTTPHandler) evictLoop() {
72+
ticker := time.NewTicker(sessionEvictInterval)
73+
defer ticker.Stop()
74+
for {
75+
select {
76+
case <-ticker.C:
77+
h.evictIdleSessions()
78+
case <-h.stop:
79+
return
80+
}
81+
}
82+
}
83+
84+
func (h *StreamableHTTPHandler) evictIdleSessions() {
85+
h.mu.Lock()
86+
defer h.mu.Unlock()
87+
for id, sess := range h.sessions {
88+
if time.Since(sess.lastUsed) > sessionIdleTimeout {
89+
delete(h.sessions, id)
90+
h.logger.Info("evicted idle streamable HTTP session", "session_id", redactSessionID(id))
91+
}
92+
}
93+
}
94+
95+
// ServeHTTP routes requests by HTTP method.
96+
func (h *StreamableHTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
97+
switch r.Method {
98+
case http.MethodPost:
99+
h.handlePost(w, r)
100+
case http.MethodDelete:
101+
h.handleDelete(w, r)
102+
default:
103+
w.Header().Set("Allow", "POST, DELETE")
104+
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
105+
}
106+
}
107+
108+
func (h *StreamableHTTPHandler) handlePost(w http.ResponseWriter, r *http.Request) {
109+
if !isJSONContentType(r) {
110+
http.Error(w, "Content-Type must be application/json", http.StatusUnsupportedMediaType)
111+
return
112+
}
113+
114+
var msg JSONRPCMessage
115+
if err := json.NewDecoder(io.LimitReader(r.Body, maxRequestBodySize)).Decode(&msg); err != nil {
116+
writeJSONError(w, nil, CodeParseError, "invalid JSON")
117+
return
118+
}
119+
120+
// For initialize requests, create a new session.
121+
if msg.Method == methodInitialize {
122+
h.handleInitialize(w, r, &msg)
123+
return
124+
}
125+
126+
// All other messages require a valid session.
127+
sessionID := r.Header.Get("Mcp-Session-Id")
128+
if sessionID == "" {
129+
http.Error(w, "Mcp-Session-Id header required", http.StatusBadRequest)
130+
return
131+
}
132+
133+
h.mu.RLock()
134+
sess, exists := h.sessions[sessionID]
135+
h.mu.RUnlock()
136+
137+
if !exists {
138+
http.Error(w, "unknown session", http.StatusNotFound)
139+
return
140+
}
141+
142+
h.mu.Lock()
143+
sess.lastUsed = time.Now()
144+
h.mu.Unlock()
145+
146+
resp := h.dispatcher.Dispatch(r.Context(), &msg)
147+
if resp == nil {
148+
// Notification — no response body.
149+
w.WriteHeader(http.StatusAccepted)
150+
return
151+
}
152+
153+
writeJSON(w, resp)
154+
}
155+
156+
func (h *StreamableHTTPHandler) handleInitialize(w http.ResponseWriter, r *http.Request, msg *JSONRPCMessage) {
157+
// Dispatch first — only create the session if initialize succeeds.
158+
resp := h.dispatcher.Dispatch(r.Context(), msg)
159+
if resp == nil || resp.Error != nil {
160+
if resp != nil {
161+
writeJSON(w, resp)
162+
} else {
163+
http.Error(w, "internal server error", http.StatusInternalServerError)
164+
}
165+
return
166+
}
167+
168+
sessionID := generateSessionID()
169+
170+
h.mu.Lock()
171+
now := time.Now()
172+
h.sessions[sessionID] = &streamSession{
173+
id: sessionID,
174+
created: now,
175+
lastUsed: now,
176+
}
177+
h.mu.Unlock()
178+
179+
h.logger.Info("streamable HTTP session created", "session_id", redactSessionID(sessionID))
180+
181+
w.Header().Set("Mcp-Session-Id", sessionID)
182+
writeJSON(w, resp)
183+
}
184+
185+
func (h *StreamableHTTPHandler) handleDelete(w http.ResponseWriter, r *http.Request) {
186+
sessionID := r.Header.Get("Mcp-Session-Id")
187+
if sessionID == "" {
188+
http.Error(w, "Mcp-Session-Id header required", http.StatusBadRequest)
189+
return
190+
}
191+
192+
h.mu.Lock()
193+
_, exists := h.sessions[sessionID]
194+
if exists {
195+
delete(h.sessions, sessionID)
196+
}
197+
h.mu.Unlock()
198+
199+
if !exists {
200+
http.Error(w, "unknown session", http.StatusNotFound)
201+
return
202+
}
203+
204+
h.logger.Info("streamable HTTP session terminated", "session_id", redactSessionID(sessionID))
205+
w.WriteHeader(http.StatusAccepted)
206+
}
207+
208+
// SessionCount returns the number of active sessions.
209+
func (h *StreamableHTTPHandler) SessionCount() int {
210+
h.mu.RLock()
211+
defer h.mu.RUnlock()
212+
return len(h.sessions)
213+
}
214+
215+
func writeJSON(w http.ResponseWriter, msg *JSONRPCMessage) {
216+
w.Header().Set("Content-Type", "application/json")
217+
if err := json.NewEncoder(w).Encode(msg); err != nil {
218+
// Headers already sent; log but can't send error response.
219+
_ = err
220+
}
221+
}
222+
223+
func writeJSONError(w http.ResponseWriter, id json.RawMessage, code int, message string) {
224+
resp := NewErrorResponse(id, code, message)
225+
w.Header().Set("Content-Type", "application/json")
226+
w.WriteHeader(http.StatusBadRequest)
227+
_ = json.NewEncoder(w).Encode(resp)
228+
}
229+
230+
// isJSONContentType checks whether the request Content-Type is application/json,
231+
// tolerating optional parameters like charset (e.g. "application/json; charset=utf-8").
232+
func isJSONContentType(r *http.Request) bool {
233+
ct := r.Header.Get("Content-Type")
234+
mediaType, _, err := mime.ParseMediaType(ct)
235+
if err != nil {
236+
return false
237+
}
238+
return mediaType == "application/json"
239+
}
240+
241+
func generateSessionID() string {
242+
b := make([]byte, 16)
243+
if _, err := rand.Read(b); err != nil {
244+
panic("crypto/rand failed: " + err.Error())
245+
}
246+
return hex.EncodeToString(b)
247+
}
248+
249+
// redactSessionID returns a truncated session ID safe for logging.
250+
// Shows only the last 8 characters to aid debugging without exposing
251+
// the full value (which acts as authorization material).
252+
func redactSessionID(id string) string {
253+
if len(id) <= 8 {
254+
return "***"
255+
}
256+
return "..." + id[len(id)-8:]
257+
}

0 commit comments

Comments
 (0)