Skip to content

Commit c329423

Browse files
committed
Make subscribe structured concurrency friendly
1 parent f953784 commit c329423

File tree

3 files changed

+49
-29
lines changed

3 files changed

+49
-29
lines changed

server-sent-events/Package.swift

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,16 @@ let package = Package(
1010
.executable(name: "App", targets: ["App"])
1111
],
1212
dependencies: [
13-
.package(url: "https://github.com/hummingbird-project/hummingbird.git", from: "2.6.0"),
13+
.package(url: "https://github.com/hummingbird-project/hummingbird.git", from: "2.19.0"),
1414
.package(url: "https://github.com/apple/swift-argument-parser.git", from: "1.3.0"),
15+
.package(url: "https://github.com/apple/swift-async-algorithms.git", from: "1.1.0"),
1516
.package(url: "https://github.com/orlandos-nl/SSEKit.git", from: "1.1.0"),
1617
],
1718
targets: [
1819
.executableTarget(
1920
name: "App",
2021
dependencies: [
22+
.product(name: "AsyncAlgorithms", package: "swift-async-algorithms"),
2123
.product(name: "ArgumentParser", package: "swift-argument-parser"),
2224
.product(name: "Hummingbird", package: "hummingbird"),
2325
.product(name: "SSEKit", package: "SSEKit"),

server-sent-events/Sources/App/Application+build.swift

Lines changed: 41 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import AsyncAlgorithms
12
import Hummingbird
23
import Logging
34
import NIOCore
@@ -18,7 +19,7 @@ public protocol AppArguments {
1819
/// The request context can store any data that is associated with the request
1920
/// This could include things like the authenticated user, a JWT token, or other information derived
2021
/// from the request such as their IP address, or user agent.
21-
///
22+
///
2223
/// The Request Context can be accessed and modified by any middleware or route handler.
2324
/// This allows middleware to pass information forward to the next middleware or route handler in the chain.
2425
struct AppRequestContext: RequestContext {
@@ -86,41 +87,56 @@ func buildRouter(requestPublisher: Publisher<String>) -> Router<AppRequestContex
8687
// It will stay open until the client closes the connection, or the stream ends
8788
let allocator = ByteBufferAllocator()
8889

89-
// A subscription to some data source is opened
90-
// This might be a database like Redis, or some other data source
91-
let (stream, id) = requestPublisher.subscribe()
90+
// We create a stream to pass our cancel event to
91+
enum StreamResult: Sendable {
92+
case event(String)
93+
case cancel
94+
}
95+
let (cancelStream, cancelCont) = AsyncStream.makeStream(of: StreamResult.self)
9296

9397
// This is a helper that will call the `onGracefulShutdown` closure
9498
// when the application is shutting down.
9599
// This helps ensure that the application will gracefully shut down, meaning
96100
// any existing work will be correctly cleaned up before the application exits.
97101
try await withGracefulShutdownHandler {
98-
// If connection if closed then this function will call the `onInboundCLosed` closure
102+
// If connection if closed then this function will call the `onInboundClosed` closure
99103
try await request.body.consumeWithInboundCloseHandler { requestBody in
100-
// This loop will suspend until a new message is available, or the stream ends
101-
// If the stream ends, the loop will finish exiting the loop.
102-
for try await value in stream {
103-
// A new value was received from the data source
104-
// We create a new ServerSentEvent with the value and write it to the response body
105-
// The `await` before the `write` is used to ensure that the write is completed
106-
// before the loop continues to await the next value
107-
// This applies backpressure to the data source
108-
// Depending on the implementation, the data source could buffer the messages
109-
// in memory or suspend the production of events until the client is ready to receive them
110-
// Additionally, data could be dropped if the client is unable to keep up with the rate of data production
111-
try await writer.write(
112-
ServerSentEvent(data: .init(string: value)).makeBuffer(
113-
allocator: allocator
114-
)
115-
)
104+
105+
// A subscription to some data source is opened
106+
// This might be a database like Redis, or some other data source
107+
try await requestPublisher.subscribe { stream in
108+
// We merge the publisher stream with the cancellation stream
109+
let publishAndCancelStream = merge(cancelStream, stream.map { .event($0) })
110+
111+
// This will wait for either a publish event or a cancellation event
112+
outsideLoop: for try await value in publishAndCancelStream {
113+
// A new value was received from the data source
114+
switch value {
115+
case .event(let value):
116+
// if it is a publish event we create a new ServerSentEvent with the value
117+
// and write it to the response body
118+
// The `await` before the `write` is used to ensure that the write is completed
119+
// before the loop continues to await the next value
120+
// This applies backpressure to the data source
121+
// Depending on the implementation, the data source could buffer the messages
122+
// in memory or suspend the production of events until the client is ready to receive them
123+
// Additionally, data could be dropped if the client is unable to keep up with the rate of data production
124+
try await writer.write(
125+
ServerSentEvent(data: .init(string: value)).makeBuffer(
126+
allocator: allocator
127+
)
128+
)
129+
case .cancel:
130+
// if it is a cancellation event then we exit the loop
131+
break outsideLoop
132+
}
133+
}
116134
}
117135
} onInboundClosed: {
118-
// If the client closes the connection, we unsubscribe from the data source
119-
requestPublisher.unsubscribe(id)
136+
cancelCont.yield(.cancel)
120137
}
121138
} onGracefulShutdown: {
122-
// If the application is shutting down, we unsubscribe from the data source
123-
requestPublisher.unsubscribe(id)
139+
cancelCont.yield(.cancel)
124140
}
125141
try await writer.finish(nil)
126142
}

server-sent-events/Sources/App/Publisher.swift

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ actor Publisher<Value: Sendable>: Service {
2525

2626
/// Subscribe to service
2727
/// - Returns: AsyncStream of values, and subscription identifier
28-
nonisolated func subscribe() -> (AsyncStream<Value>, SubscriptionID) {
28+
nonisolated(nonsending) func subscribe<ReturnValue>(_ operation: (AsyncStream<Value>) async throws -> ReturnValue) async throws -> ReturnValue {
2929
let id = SubscriptionID()
3030

3131
// Each subscription gets an AsyncStream and a SubscriptionID
@@ -35,12 +35,14 @@ actor Publisher<Value: Sendable>: Service {
3535
// data will stack up in the server's memory.
3636
let (stream, source) = AsyncStream<Value>.makeStream()
3737
subSource.yield(.add(id, source))
38-
return (stream, id)
38+
39+
defer { self.unsubscribe(id) }
40+
return try await operation(stream)
3941
}
4042

4143
/// Unsubscribe from service
4244
/// - Parameter id: Subscription identifier
43-
nonisolated func unsubscribe(_ id: SubscriptionID) {
45+
private nonisolated func unsubscribe(_ id: SubscriptionID) {
4446
subSource.yield(.remove(id))
4547
}
4648

0 commit comments

Comments
 (0)