-
Notifications
You must be signed in to change notification settings - Fork 704
Add structured concurrency aware ServerBootstrap bind #3378
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 9 commits
76c7590
70b0aa1
afa3d9d
8dbb961
227beb8
21cd496
c05579c
378dca5
1b3dde7
cf26162
6b26145
a4fde79
2d54244
b6563f5
91c1ae3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -521,6 +521,214 @@ public final class ServerBootstrap { | |
// MARK: Async bind methods | ||
|
||
extension ServerBootstrap { | ||
|
||
/// Represents a target address or socket for binding a server socket. | ||
/// | ||
/// `BindTarget` provides a type-safe way to specify different types of binding targets | ||
/// for server bootstraps. It supports various address types including network addresses, | ||
/// Unix domain sockets, VSOCK addresses, and existing socket handles. | ||
@_spi(StructuredConcurrencyNIOAsyncChannel) | ||
public struct BindTarget: Sendable { | ||
FranzBusch marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
enum Base { | ||
case hostAndPort(host: String, port: Int) | ||
case socketAddress(SocketAddress) | ||
case unixDomainSocketPath(String) | ||
case vsockAddress(VsockAddress) | ||
case socket(NIOBSDSocket.Handle) | ||
} | ||
|
||
var base: Base | ||
|
||
/// Creates a binding target for a hostname and port. | ||
/// | ||
/// This method creates a target that will resolve the hostname and bind to the | ||
/// specified port. The hostname resolution follows standard system behavior | ||
/// and may resolve to both IPv4 and IPv6 addresses depending on system configuration. | ||
/// | ||
/// - Parameters: | ||
/// - host: The hostname or IP address to bind to. Can be a domain name like | ||
/// "localhost" or "example.com", or an IP address like "127.0.0.1" or "::1" | ||
/// - port: The port number to bind to (0-65535). Use 0 to let the system | ||
/// choose an available port | ||
public static func hostAndPort(_ host: String, _ port: Int) -> BindTarget { | ||
BindTarget(base: .hostAndPort(host: host, port: port)) | ||
} | ||
|
||
/// Creates a binding target for a specific socket address. | ||
/// | ||
/// Use this method when you have a pre-constructed ``SocketAddress`` that | ||
/// specifies the exact binding location, including IPv4, IPv6, or Unix domain addresses. | ||
/// | ||
/// - Parameter address: The socket address to bind to | ||
public static func socketAddress(_ address: SocketAddress) -> BindTarget { | ||
BindTarget(base: .socketAddress(address)) | ||
} | ||
|
||
/// Creates a binding target for a Unix domain socket. | ||
/// | ||
/// Unix domain sockets provide high-performance inter-process communication | ||
/// on the same machine using filesystem paths. The socket file will be created | ||
/// at the specified path when binding occurs. | ||
/// | ||
/// - Parameter path: The filesystem path for the Unix domain socket. | ||
/// Must be a valid filesystem path and should not exist | ||
/// unless cleanup is enabled in the binding operation | ||
/// - Warning: The path must not exist. | ||
public static func unixDomainSocketPath(_ path: String) -> BindTarget { | ||
BindTarget(base: .unixDomainSocketPath(path)) | ||
} | ||
|
||
/// Creates a binding target for a VSOCK address. | ||
/// | ||
/// VSOCK (Virtual Socket) provides communication between virtual machines and their hosts, | ||
/// or between different virtual machines on the same host. This is commonly used | ||
/// in virtualized environments for guest-host communication. | ||
/// | ||
/// - Parameter vsockAddress: The VSOCK address to bind to, containing both | ||
/// context ID (CID) and port number | ||
/// - Note: VSOCK support depends on the underlying platform and virtualization technology | ||
public static func vsockAddress(_ vsockAddress: VsockAddress) -> BindTarget { | ||
BindTarget(base: .vsockAddress(vsockAddress)) | ||
} | ||
|
||
/// Creates a binding target for an existing socket handle. | ||
/// | ||
/// This method allows you to use a pre-existing socket that has already been | ||
/// created and optionally configured. This is useful for advanced scenarios where you | ||
/// need custom socket setup before binding, or when integrating with external libraries. | ||
/// | ||
/// - Parameters: | ||
/// - handle: The existing socket handle to use. Must be a valid, open socket | ||
/// that is compatible with the intended server bootstrap type | ||
/// - Note: The bootstrap will take ownership of the socket handle and will close | ||
/// it when the server shuts down | ||
public static func socket(_ handle: NIOBSDSocket.Handle) -> BindTarget { | ||
BindTarget(base: .socket(handle)) | ||
} | ||
} | ||
|
||
/// Bind the `ServerSocketChannel` to the ``BindTarget``. This method will returns once all connections that | ||
/// were spawned have been closed. | ||
/// | ||
/// # Supporting graceful shutdown | ||
/// | ||
/// To support a graceful server shutdown we recommend using the `ServerQuiescingHelper` from the | ||
/// SwiftNIO extras package. The `ServerQuiescingHelper` can be installed using the | ||
/// ``ServerBootstrap/serverChannelInitializer`` callback. | ||
/// | ||
/// Below you can find the code to setup a simple TCP echo server that supports graceful server closure. | ||
/// | ||
/// ```swift | ||
/// let quiesce = ServerQuiescingHelper(group: group) | ||
/// let signalSource = DispatchSource.makeSignalSource(signal: SIGINT, queue: signalQueue) | ||
/// signalSource.setEventHandler { | ||
/// signalSource.cancel() | ||
/// print("received signal, initiating shutdown which should complete after the last request finished.") | ||
/// | ||
/// quiesce.initiateShutdown(promise: fullyShutdownPromise) | ||
/// } | ||
/// try await ServerBootstrap(group: self.eventLoopGroup) | ||
/// .serverChannelInitializer { channel in | ||
/// channel.eventLoop.makeCompletedFuture { | ||
/// try channel.pipeline.syncOperations.addHandler(quiesce.makeServerChannelHandler(channel: channel)) | ||
/// } | ||
/// } | ||
/// .serverChannelOption(.socketOption(.so_reuseaddr), value: 1) | ||
/// .bind( | ||
/// target: .hostAndPort(self.host, self.port), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should the label here be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've discussed this with @Lukasa, and he wants this to be target, as this type might change in the future. With a special label we can deprecate much better, than with a |
||
/// childChannelInitializer: { channel in | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you use multiple trailing closure syntax here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this is done in the NIOTCPEchoExample which is part of this pr. |
||
/// channel.eventLoop.makeCompletedFuture { | ||
/// try channel.pipeline.syncOperations.addHandler(ByteToMessageHandler(NewlineDelimiterCoder())) | ||
/// try channel.pipeline.syncOperations.addHandler(MessageToByteHandler(NewlineDelimiterCoder())) | ||
/// | ||
/// return try NIOAsyncChannel( | ||
/// wrappingChannelSynchronously: channel, | ||
/// configuration: NIOAsyncChannel.Configuration( | ||
/// inboundType: String.self, | ||
/// outboundType: String.self | ||
/// ) | ||
/// ) | ||
/// } | ||
/// } | ||
/// ) { channel in | ||
/// print("Handling new connection") | ||
/// await self.handleConnection(channel: channel) | ||
/// print("Done handling connection") | ||
/// } | ||
/// | ||
/// ``` | ||
/// | ||
/// - Parameters: | ||
/// - target: The ``BindTarget`` to use. | ||
/// - serverBackPressureStrategy: The back pressure strategy used by the server socket channel. | ||
/// - childChannelInitializer: A closure to initialize the channel. The return value of this closure is used in the `onConnection` | ||
/// closure. | ||
/// - onceStartup: A closure that will be called once the server has been started. Use this to get access to | ||
/// the port number, if you used port `0` in the ``BindTarget``. | ||
/// - handleConnection: A closure to handle the connection. Use the channel's `inbound` property to read from | ||
/// the connection and channel's `outbound` to write to the connection. | ||
/// - onListeningChannel: A closure that will be called once the server has been started. Use this to get access to | ||
/// the serverChannel, if you used port `0` in the ``BindTarget``. You can also use it to | ||
|
||
/// send events on the server channel pipeline. You must not call the channels `inbound` or | ||
/// `outbound` properties. | ||
/// - Note: The bind method respects task cancellation which will force close the server. If you want to gracefully | ||
/// shut-down use the quiescing helper approach as outlined above. | ||
@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *) | ||
@_spi(StructuredConcurrencyNIOAsyncChannel) | ||
public func bind<Inbound: Sendable, Outbound: Sendable>( | ||
FranzBusch marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I implement a server using this method, how can I access the address that the listening socket was bound to? (e.g. I set port to zero and want to know the actual port I got.) Currently it seems the only way to do this is via the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that's a good point and we probably need another closure that is called once the server channel is bound. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it should be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree. I think actually it should be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. went for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would bias towards some level of consistency. We already have the established So perhaps we should have:
This has clear parallels with the existing bootstrap APIs. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @glbrntt love it! |
||
target: BindTarget, | ||
serverBackPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, | ||
FranzBusch marked this conversation as resolved.
Show resolved
Hide resolved
|
||
childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<NIOAsyncChannel<Inbound, Outbound>>, | ||
handleConnection: @escaping @Sendable ( | ||
_ channel: NIOAsyncChannel<Inbound, Outbound> | ||
) async -> (), | ||
onListeningChannel: @Sendable @escaping ( | ||
NIOAsyncChannel<NIOAsyncChannel<Inbound, Outbound>, Never> | ||
|
||
) async -> () = { _ in }, | ||
) async throws { | ||
let channel = try await self.makeConnectedChannel( | ||
target: target, | ||
serverBackPressureStrategy: serverBackPressureStrategy, | ||
childChannelInitializer: childChannelInitializer | ||
) | ||
|
||
try await withTaskCancellationHandler { | ||
try await channel.executeThenClose { inbound, outbound in | ||
// we need to dance the result dance here, since we can't throw from the | ||
// withDiscardingTaskGroup closure. | ||
let result = await withDiscardingTaskGroup { group -> Result<Void, any Error> in | ||
|
||
group.addTask { | ||
await onListeningChannel(channel) | ||
} | ||
|
||
do { | ||
try await channel.executeThenClose { inbound in | ||
for try await connectionChannel in inbound { | ||
group.addTask { | ||
do { | ||
try await connectionChannel.executeThenClose { _, _ in | ||
await handleConnection(connectionChannel) | ||
} | ||
} catch { | ||
// ignore single connection failures | ||
} | ||
} | ||
} | ||
} | ||
return .success(()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this implementation need to know about quiescing and/or graceful shutdown? What's the expected behavior when a server stops accepting clients I.E. ServerBootstrap closes? Do all child tasks cancel, or do we wait for them to shut down? This kinda plays into graceful shutdown from Service Lifecycle. Can a connection continue to live while the parent channel is closed? |
||
} catch { | ||
return .failure(error) | ||
} | ||
} | ||
try result.get() | ||
} | ||
} onCancel: { | ||
channel.channel.close(promise: nil) | ||
} | ||
} | ||
|
||
/// Bind the `ServerSocketChannel` to the `host` and `port` parameters. | ||
/// | ||
/// - Parameters: | ||
|
@@ -622,6 +830,87 @@ extension ServerBootstrap { | |
to vsockAddress: VsockAddress, | ||
serverBackPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, | ||
childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Output> | ||
) async throws -> NIOAsyncChannel<Output, Never> { | ||
try await self._bind( | ||
to: vsockAddress, | ||
serverBackPressureStrategy: serverBackPressureStrategy, | ||
childChannelInitializer: childChannelInitializer | ||
) | ||
} | ||
|
||
/// Use the existing bound socket file descriptor. | ||
/// | ||
/// - Parameters: | ||
/// - socket: The _Unix file descriptor_ representing the bound stream socket. | ||
/// - cleanupExistingSocketFile: Unused. | ||
/// - serverBackPressureStrategy: The back pressure strategy used by the server socket channel. | ||
/// - childChannelInitializer: A closure to initialize the channel. The return value of this closure is returned from the `connect` | ||
/// method. | ||
/// - Returns: The result of the channel initializer. | ||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) | ||
public func bind<Output: Sendable>( | ||
_ socket: NIOBSDSocket.Handle, | ||
cleanupExistingSocketFile: Bool = false, | ||
serverBackPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, | ||
childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Output> | ||
) async throws -> NIOAsyncChannel<Output, Never> { | ||
try await self._bind( | ||
socket, | ||
serverBackPressureStrategy: serverBackPressureStrategy, | ||
childChannelInitializer: childChannelInitializer | ||
) | ||
} | ||
|
||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) | ||
private func makeConnectedChannel<Inbound: Sendable, Outbound: Sendable>( | ||
target: BindTarget, | ||
serverBackPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, | ||
childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<NIOAsyncChannel<Inbound, Outbound>> | ||
) async throws -> NIOAsyncChannel<NIOAsyncChannel<Inbound, Outbound>, Never> { | ||
switch target.base { | ||
case .hostAndPort(let host, let port): | ||
try await self.bind( | ||
to: try SocketAddress.makeAddressResolvingHost(host, port: port), | ||
serverBackPressureStrategy: serverBackPressureStrategy, | ||
childChannelInitializer: childChannelInitializer | ||
) | ||
|
||
case .unixDomainSocketPath(let unixDomainSocketPath): | ||
try await self.bind( | ||
to: try SocketAddress(unixDomainSocketPath: unixDomainSocketPath), | ||
serverBackPressureStrategy: serverBackPressureStrategy, | ||
childChannelInitializer: childChannelInitializer | ||
) | ||
|
||
case .socketAddress(let address): | ||
try await self.bind( | ||
to: address, | ||
serverBackPressureStrategy: serverBackPressureStrategy, | ||
childChannelInitializer: childChannelInitializer | ||
) | ||
|
||
case .vsockAddress(let vsockAddress): | ||
try await self._bind( | ||
to: vsockAddress, | ||
serverBackPressureStrategy: serverBackPressureStrategy, | ||
childChannelInitializer: childChannelInitializer | ||
) | ||
|
||
case .socket(let handle): | ||
try await self._bind( | ||
handle, | ||
serverBackPressureStrategy: serverBackPressureStrategy, | ||
childChannelInitializer: childChannelInitializer | ||
) | ||
} | ||
} | ||
|
||
|
||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) | ||
private func _bind<Output: Sendable>( | ||
to vsockAddress: VsockAddress, | ||
serverBackPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, | ||
childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Output> | ||
) async throws -> NIOAsyncChannel<Output, Never> { | ||
func makeChannel( | ||
_ eventLoop: SelectableEventLoop, | ||
|
@@ -652,19 +941,9 @@ extension ServerBootstrap { | |
}.get() | ||
} | ||
|
||
/// Use the existing bound socket file descriptor. | ||
/// | ||
/// - Parameters: | ||
/// - socket: The _Unix file descriptor_ representing the bound stream socket. | ||
/// - cleanupExistingSocketFile: Unused. | ||
/// - serverBackPressureStrategy: The back pressure strategy used by the server socket channel. | ||
/// - childChannelInitializer: A closure to initialize the channel. The return value of this closure is returned from the `connect` | ||
/// method. | ||
/// - Returns: The result of the channel initializer. | ||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) | ||
public func bind<Output: Sendable>( | ||
private func _bind<Output: Sendable>( | ||
_ socket: NIOBSDSocket.Handle, | ||
cleanupExistingSocketFile: Bool = false, | ||
serverBackPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, | ||
childChannelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Output> | ||
) async throws -> NIOAsyncChannel<Output, Never> { | ||
|
@@ -1270,6 +1549,11 @@ public final class ClientBootstrap: NIOClientTCPBootstrapProtocol { | |
// MARK: Async connect methods | ||
|
||
extension ClientBootstrap { | ||
|
||
struct Endpoint { | ||
|
||
} | ||
|
||
/// Specify the `host` and `port` to connect to for the TCP `Channel` that will be established. | ||
/// | ||
/// - Parameters: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we undeprecating these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because we want to pass in the NIOAsyncChannel and users shall then use
inbound
andoutbound
on it. So that potentially we can add further properties toNIOAsyncChannel
later.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we undeprecate with an spi?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No you can't. What you can do is put it behind a trait like
_ExperimentalStructuredBootstrap
. You just have to duplicate the properties.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should I intro new
in
andout
properties and mark them with the same@_spi
?