Skip to content
This repository was archived by the owner on May 29, 2025. It is now read-only.

Commit 4edbe37

Browse files
authored
Add example for using SSE on the server
1 parent 24e83c1 commit 4edbe37

File tree

18 files changed

+571
-36
lines changed

18 files changed

+571
-36
lines changed

.github/workflows/swift.yml

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,11 @@ on:
1111

1212
jobs:
1313
build_and_test:
14-
15-
# TODO: once https://github.com/swift-actions/setup-swift/pull/684 is merged, revert to:
16-
17-
# runs-on: macos-latest
18-
# steps:
19-
# - uses: swift-actions/setup-swift@v2
20-
# with:
21-
# swift-version: "6.0.1"
22-
23-
runs-on: macos-15
14+
runs-on: macos-latest
15+
steps:
16+
- uses: swift-actions/setup-swift@v2
17+
with:
18+
swift-version: "6.0.1"
2419

2520
steps:
2621
- name: Get swift version
@@ -42,6 +37,8 @@ jobs:
4237
# shell: bash
4338
# env:
4439
# CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
40+
- name: Build examples
41+
run: cd Examples && swift build -q
4542

4643
lint:
4744
runs-on: macos-15

ExampleMCPServer/launch.sh

Lines changed: 0 additions & 4 deletions
This file was deleted.

Examples/.swiftpm/xcode/package.xcworkspace/contents.xcworkspacedata

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
import os
2+
3+
let logger = Logger(subsystem: "com.mcp-sse-server", category: "mcp")
4+
5+
try await runWebServer(runMCPServerForConnection: { responseStream, requestStrean in
6+
try await startMCPServer(sendDataTo: responseStream, readDataFrom: requestStrean)
7+
})
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import Foundation
2+
import JSONSchemaBuilder
3+
import MCPServer
4+
import Vapor
5+
6+
/// Proxy transport, used for logging received and send data. (can be removed).
7+
func proxy(_ transport: Transport) -> Transport {
8+
let (stream, continuation) = AsyncStream<Data>.makeStream()
9+
10+
Task {
11+
for await data in transport.dataSequence {
12+
logger.log("Reading data from transport: \(String(data: data, encoding: .utf8)!, privacy: .public)")
13+
continuation.yield(data)
14+
}
15+
continuation.finish()
16+
}
17+
18+
return Transport(
19+
writeHandler: { data in
20+
logger.log("Writing data to transport: \(String(data: data, encoding: .utf8)!, privacy: .public)")
21+
try await transport.writeHandler(data)
22+
},
23+
dataSequence: stream)
24+
}
25+
26+
// MARK: - RepeatToolInput
27+
28+
@Schemable
29+
struct RepeatToolInput {
30+
let text: String
31+
}
32+
33+
@MainActor
34+
func startMCPServer(sendDataTo responseStream: BodyStreamWriter, readDataFrom requestStream: AsyncStream<Data>) async throws {
35+
let transport = Transport(
36+
writeHandler: { data in
37+
let message = try MessageEvent(data: data).buffer()
38+
try await responseStream.write(.buffer(message)).get()
39+
},
40+
dataSequence: requestStream)
41+
42+
let server = try await MCPServer(
43+
info: Implementation(name: "test-server", version: "1.0.0"),
44+
capabilities: ServerCapabilityHandlers(tools: [
45+
Tool(name: "repeat") { (input: RepeatToolInput) in
46+
[.text(.init(text: input.text))]
47+
},
48+
testTool,
49+
]),
50+
transport: proxy(transport))
51+
52+
try await server.waitForDisconnection()
53+
}
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
import Foundation
2+
import JSONSchemaBuilder
3+
import MCPServer
4+
import Vapor
5+
6+
/// Start the web server.
7+
/// - Parameter runMCPServerForConnection: When a new connection is made to connect to MCP, create a new MCP server (one instance per connection) to handle the request.
8+
func runWebServer(runMCPServerForConnection: @escaping (BodyStreamWriter, AsyncStream<Data>) async throws -> Void) async throws {
9+
var sessions: [String: AsyncStream<Data>.Continuation] = [:]
10+
11+
let app = try await Application.make()
12+
13+
app.get("sse") { _ async -> Response in
14+
// Expects an initial request to /sse, which will start a long lived connection.
15+
// It will immediately write to that connection an endpoint where to send messages to that contains an identifier for this connection.
16+
let body = Response.Body(stream: { writer in
17+
Task {
18+
do {
19+
let sessionId = UUID().uuidString
20+
let (stream, continuation) = AsyncStream<Data>.makeStream()
21+
sessions[sessionId] = continuation // TODO: look at concurrency.
22+
23+
let message = try EndpointEvent(sessionId: sessionId).buffer()
24+
try await writer.write(.buffer(message)).get()
25+
26+
try await runMCPServerForConnection(writer, stream)
27+
_ = writer.write(.end)
28+
} catch {
29+
logger.error("Error: \(error, privacy: .public)")
30+
}
31+
}
32+
})
33+
34+
let response = Response(status: .ok, body: body)
35+
36+
response.headers.replaceOrAdd(name: .contentType, value: "text/event-stream")
37+
response.headers.replaceOrAdd(name: .cacheControl, value: "no-cache")
38+
response.headers.replaceOrAdd(name: .connection, value: "keep-alive")
39+
40+
return response
41+
}
42+
43+
// To send messages to the server, the client will POST to /messages with a sessionId query parameter.
44+
app.post("messages") { request async -> Response in
45+
guard let sessionId = request.query[String.self, at: "sessionId"] else {
46+
return Response(status: .badRequest)
47+
}
48+
guard let session = sessions[sessionId] else {
49+
return Response(status: .notFound)
50+
}
51+
guard
52+
let contentType = request.headers.first(name: "Content-Type"),
53+
contentType.contains("application/json"),
54+
let contentLengthStr = request.headers.first(name: "content-length"),
55+
let contentLength = Int(contentLengthStr),
56+
var bodyData = request.body.data,
57+
bodyData.readableBytes >= contentLength,
58+
let bytes = bodyData.readBytes(length: contentLength)
59+
else {
60+
return Response(status: .badRequest)
61+
}
62+
let data = Data(bytes)
63+
session.yield(data)
64+
return Response(status: .ok)
65+
}
66+
67+
try await app.execute()
68+
}
69+
70+
// MARK: - ServerEvent
71+
72+
protocol ServerEvent {
73+
var event: String? { get }
74+
var data: [String] { get }
75+
var id: String? { get }
76+
var retry: Int? { get }
77+
}
78+
79+
extension ServerEvent {
80+
var isValid: Bool {
81+
!data.isEmpty
82+
}
83+
}
84+
85+
// MARK: - ServerEventError
86+
87+
enum ServerEventError: Error {
88+
case noDataAvailable
89+
case encoding
90+
}
91+
92+
extension ServerEvent {
93+
func buffer() throws -> ByteBuffer {
94+
guard isValid else {
95+
throw ServerEventError.noDataAvailable
96+
}
97+
98+
var message = ""
99+
100+
if let event {
101+
message += "event: \(event)\n"
102+
}
103+
104+
message += data
105+
.map { "data: \($0)" }
106+
.joined(separator: "\n")
107+
.appending("\n")
108+
109+
if let id {
110+
message += "id: \(id)\n"
111+
}
112+
113+
if let retry {
114+
message += "retry: \(retry)\n"
115+
}
116+
117+
message += "\n\n"
118+
119+
return ByteBuffer(string: message)
120+
}
121+
}
122+
123+
// MARK: - EndpointEvent
124+
125+
struct EndpointEvent: ServerEvent {
126+
var id: String?
127+
128+
var retry: Int?
129+
130+
init(sessionId: String) {
131+
self.sessionId = sessionId
132+
}
133+
134+
let event: String? = "endpoint"
135+
var data: [String] { ["/messages?sessionId=\(sessionId)"] }
136+
private let sessionId: String
137+
}
138+
139+
// MARK: - MessageEvent
140+
141+
struct MessageEvent: ServerEvent {
142+
var id: String?
143+
144+
var retry: Int?
145+
146+
let event: String? = "message"
147+
init(data: Data) {
148+
self.data = [String(data: data, encoding: .utf8)!]
149+
}
150+
151+
var data: [String]
152+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
#!/bin/zsh
2+
3+
dir=$(dirname "$0")
4+
(cd "$dir/.." && swift run ExampleSSEServer)
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# Inspect the server in the debugger:
2+
3+
```
4+
nvm use 20.18.1
5+
6+
npx @modelcontextprotocol/inspector "$(pwd)/ExampleSSEServer/launch.sh"
7+
```
8+
9+
10+
# Observe console logs:
11+
- in Console.app, filter by `com.mcp-sse-server` as the subsystem.

0 commit comments

Comments
 (0)