Skip to content

Commit 7680f82

Browse files
committed
feat: sse mcp backend support proxy to streamable http
1 parent 3c884d7 commit 7680f82

8 files changed

Lines changed: 307 additions & 216 deletions

File tree

core/common/mcpproxy/server.go

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package mcpproxy
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
7+
"github.com/bytedance/sonic"
8+
"github.com/mark3labs/mcp-go/client/transport"
9+
"github.com/mark3labs/mcp-go/mcp"
10+
)
11+
12+
type MCPServer interface {
13+
HandleMessage(ctx context.Context, message json.RawMessage) mcp.JSONRPCMessage
14+
}
15+
16+
type mcpClient2Server struct {
17+
client transport.Interface
18+
}
19+
20+
func (s *mcpClient2Server) HandleMessage(
21+
ctx context.Context,
22+
message json.RawMessage,
23+
) mcp.JSONRPCMessage {
24+
methodNode, err := sonic.Get(message, "method")
25+
if err != nil {
26+
return CreateMCPErrorResponse(nil, mcp.PARSE_ERROR, err.Error())
27+
}
28+
method, err := methodNode.String()
29+
if err != nil {
30+
return CreateMCPErrorResponse(nil, mcp.PARSE_ERROR, err.Error())
31+
}
32+
33+
switch method {
34+
case "notifications/initialized":
35+
req := mcp.JSONRPCNotification{}
36+
err := sonic.Unmarshal(message, &req)
37+
if err != nil {
38+
return CreateMCPErrorResponse(nil, mcp.PARSE_ERROR, err.Error())
39+
}
40+
err = s.client.SendNotification(ctx, req)
41+
if err != nil {
42+
return CreateMCPErrorResponse(nil, mcp.PARSE_ERROR, err.Error())
43+
}
44+
return nil
45+
default:
46+
req := transport.JSONRPCRequest{}
47+
err := sonic.Unmarshal(message, &req)
48+
if err != nil {
49+
return CreateMCPErrorResponse(nil, mcp.PARSE_ERROR, err.Error())
50+
}
51+
resp, err := s.client.SendRequest(ctx, req)
52+
if err != nil {
53+
return CreateMCPErrorResponse(nil, mcp.INTERNAL_ERROR, err.Error())
54+
}
55+
if resp.Error != nil {
56+
return CreateMCPErrorResponse(
57+
resp.ID,
58+
resp.Error.Code,
59+
resp.Error.Message,
60+
resp.Error.Data,
61+
)
62+
}
63+
return CreateMCPResultResponse(
64+
resp.ID,
65+
resp.Result,
66+
)
67+
}
68+
}
69+
70+
func WrapMCPClient2Server(client transport.Interface) MCPServer {
71+
return &mcpClient2Server{client: client}
72+
}
73+
74+
type JSONRPCNoErrorResponse struct {
75+
JSONRPC string `json:"jsonrpc"`
76+
ID mcp.RequestId `json:"id"`
77+
Result json.RawMessage `json:"result"`
78+
}
79+
80+
func CreateMCPResultResponse(
81+
id any,
82+
result json.RawMessage,
83+
) mcp.JSONRPCMessage {
84+
return &JSONRPCNoErrorResponse{
85+
JSONRPC: mcp.JSONRPC_VERSION,
86+
ID: mcp.NewRequestId(id),
87+
Result: result,
88+
}
89+
}
90+
91+
func CreateMCPErrorResponse(
92+
id any,
93+
code int,
94+
message string,
95+
data ...any,
96+
) mcp.JSONRPCMessage {
97+
var d any
98+
if len(data) > 0 {
99+
d = data[0]
100+
}
101+
return mcp.JSONRPCError{
102+
JSONRPC: mcp.JSONRPC_VERSION,
103+
ID: mcp.NewRequestId(id),
104+
Error: struct {
105+
Code int `json:"code"`
106+
Message string `json:"message"`
107+
Data any `json:"data,omitempty"`
108+
}{
109+
Code: code,
110+
Message: message,
111+
Data: d,
112+
},
113+
}
114+
}

core/common/mcpproxy/sse.go

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package mcpproxy
22

33
import (
44
"context"
5-
"encoding/json"
65
"errors"
76
"fmt"
87
"net/http"
@@ -12,10 +11,6 @@ import (
1211
"github.com/mark3labs/mcp-go/mcp"
1312
)
1413

15-
type MCPServer interface {
16-
HandleMessage(ctx context.Context, message json.RawMessage) mcp.JSONRPCMessage
17-
}
18-
1914
// SSEServer implements a Server-Sent Events (SSE) based MCP server.
2015
// It provides real-time communication capabilities over HTTP using the SSE protocol.
2116
type SSEServer struct {
@@ -138,14 +133,8 @@ func (s *SSEServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
138133
// handleMessage processes incoming JSON-RPC messages from clients and sends responses
139134
// back through both the SSE connection and HTTP response.
140135
func (s *SSEServer) HandleMessage(ctx context.Context, req []byte) error {
141-
// Parse message as raw JSON
142-
var rawMessage json.RawMessage
143-
if err := sonic.Unmarshal(req, &rawMessage); err != nil {
144-
return err
145-
}
146-
147136
// Process message through MCPServer
148-
response := s.server.HandleMessage(ctx, rawMessage)
137+
response := s.server.HandleMessage(ctx, req)
149138

150139
// Only send response if there is one (not for notifications)
151140
if response != nil {
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package mcpproxy
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"io"
7+
"net/http"
8+
9+
"github.com/bytedance/sonic"
10+
"github.com/mark3labs/mcp-go/mcp"
11+
)
12+
13+
type StreamableHTTPOption func(*StreamableHTTPServer)
14+
15+
type StreamableHTTPServer struct {
16+
server MCPServer
17+
}
18+
19+
// NewStatelessStreamableHTTPServer creates a new streamable-http server instance
20+
func NewStatelessStreamableHTTPServer(
21+
server MCPServer,
22+
opts ...StreamableHTTPOption,
23+
) *StreamableHTTPServer {
24+
s := &StreamableHTTPServer{
25+
server: server,
26+
}
27+
28+
for _, opt := range opts {
29+
opt(s)
30+
}
31+
return s
32+
}
33+
34+
// ServeHTTP implements the http.Handler interface.
35+
func (s *StreamableHTTPServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
36+
switch r.Method {
37+
case http.MethodPost:
38+
s.handlePost(w, r)
39+
case http.MethodGet:
40+
s.handleGet(w, r)
41+
case http.MethodDelete:
42+
s.handleDelete(w, r)
43+
default:
44+
http.NotFound(w, r)
45+
}
46+
}
47+
48+
func (s *StreamableHTTPServer) handlePost(w http.ResponseWriter, r *http.Request) {
49+
// post request carry request/notification message
50+
51+
// Check content type
52+
contentType := r.Header.Get("Content-Type")
53+
if contentType != "application/json" {
54+
http.Error(w, "Invalid content type: must be 'application/json'", http.StatusBadRequest)
55+
return
56+
}
57+
58+
// Check the request body is valid json, meanwhile, get the request Method
59+
rawData, err := io.ReadAll(r.Body)
60+
if err != nil {
61+
s.writeJSONRPCError(
62+
w,
63+
nil,
64+
mcp.PARSE_ERROR,
65+
fmt.Sprintf("read request body error: %v", err),
66+
)
67+
return
68+
}
69+
var baseMessage struct {
70+
Method mcp.MCPMethod `json:"method"`
71+
}
72+
if err := json.Unmarshal(rawData, &baseMessage); err != nil {
73+
s.writeJSONRPCError(w, nil, mcp.PARSE_ERROR, "request body is not valid json")
74+
return
75+
}
76+
77+
// Process message through MCPServer
78+
response := s.server.HandleMessage(r.Context(), rawData)
79+
if response == nil {
80+
// For notifications, just send 202 Accepted with no body
81+
w.WriteHeader(http.StatusAccepted)
82+
return
83+
}
84+
85+
w.Header().Set("Content-Type", "application/json")
86+
w.WriteHeader(http.StatusOK)
87+
_ = sonic.ConfigDefault.NewEncoder(w).Encode(response)
88+
}
89+
90+
func (s *StreamableHTTPServer) handleGet(w http.ResponseWriter, _ *http.Request) {
91+
http.Error(w, "get request is not supported", http.StatusMethodNotAllowed)
92+
}
93+
94+
func (s *StreamableHTTPServer) handleDelete(w http.ResponseWriter, _ *http.Request) {
95+
http.Error(w, "delete request is not supported", http.StatusMethodNotAllowed)
96+
}
97+
98+
func (s *StreamableHTTPServer) writeJSONRPCError(
99+
w http.ResponseWriter,
100+
id any,
101+
code int,
102+
message string,
103+
) {
104+
response := CreateMCPErrorResponse(id, code, message)
105+
w.Header().Set("Content-Type", "application/json")
106+
w.WriteHeader(http.StatusBadRequest)
107+
_ = sonic.ConfigDefault.NewEncoder(w).Encode(response)
108+
}

0 commit comments

Comments
 (0)