Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 63 additions & 8 deletions Sources/NIOPosix/PosixSingletons+ConcurrencyTakeOver.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,40 @@ private protocol SilenceWarning {
@available(macOS 14, *)
extension SelectableEventLoop: SilenceWarning {}

/// A sentinel `TaskExecutor` shared by all `SelectableEventLoop`s in the singleton `MultiThreadedEventLoopGroup`
/// when the concurrency global executor has been taken over.
///
/// By sharing a single `TaskExecutor` identity across all event loops, the Swift concurrency runtime treats them
/// as equivalent executors. This prevents unnecessary thread hops when a task suspends on one event loop and
/// resumes on another within the same group.
@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
private final class NIOGlobalExecutorTakeOverTaskExecutor: TaskExecutor, Sendable {
let group: MultiThreadedEventLoopGroup

init(group: MultiThreadedEventLoopGroup) {
self.group = group
}

func enqueue(_ job: consuming ExecutorJob) {
let targetEL = self.group.anySEL()
targetEL.enqueue(job)
}
}

private let _haveWeTakenOverTheConcurrencyPool = ManagedAtomic(false)
extension NIOSingletons {
/// Install ``MultiThreadedEventLoopGroup/singleton`` as Swift Concurrency's global executor and set it as the task executor.
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, *)
public static func withUnsafeInstallSingletonPosixEventLoopGroupAsConcurrencyGlobalExecutorAndTaskExecutor<
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think this should be with* because it leaves residue (overridden global executors)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree but I didn't come up with a better name. It needs to install the global and the task executor. Do you have a better name in mind? Or maybe we unset the global override again.

Failure: Error
>(
body: () async throws(Failure) -> Void
) async throws(Failure) {
let taskExecutor = _unsafeTryInstallSingletonPosixEventLoopGroupAsConcurrencyGlobalExecutor()

try await withTaskExecutorPreference(taskExecutor, operation: body)
}

/// Install ``MultiThreadedEventLoopGroup/singleton`` as Swift Concurrency's global executor.
///
/// This allows to use Swift Concurrency and retain high-performance I/O alleviating the otherwise necessary thread switches between
Expand All @@ -42,14 +74,23 @@ extension NIOSingletons {
/// - warning: This method is currently not supported on Windows and will return false.
@discardableResult
public static func unsafeTryInstallSingletonPosixEventLoopGroupAsConcurrencyGlobalExecutor() -> Bool {
guard #available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *) else {
return false
}

let taskExecutor = _unsafeTryInstallSingletonPosixEventLoopGroupAsConcurrencyGlobalExecutor()
return taskExecutor != nil
}

@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
private static func _unsafeTryInstallSingletonPosixEventLoopGroupAsConcurrencyGlobalExecutor()
-> NIOGlobalExecutorTakeOverTaskExecutor?
{
#if os(Windows)
return false
return nil
#else
// Guard between the minimum and maximum supported version for the hook
#if compiler(<6.4)
guard #available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *) else {
return false
}

typealias ConcurrencyEnqueueGlobalHook =
@convention(thin) (
Expand Down Expand Up @@ -80,7 +121,7 @@ extension NIOSingletons {
"swift_task_enqueueGlobal_hook"
)?.assumingMemoryBound(to: UnsafeRawPointer?.AtomicRep.self)
guard let concurrencyEnqueueGlobalHookPtr = concurrencyEnqueueGlobalHookPtr else {
return false
return nil
}

// We will use an atomic operation to swap the pointers aiming to protect against other code that attempts
Expand All @@ -92,7 +133,7 @@ extension NIOSingletons {
return withUnsafeTemporaryAllocation(
of: ConcurrencyEnqueueGlobalHook.self,
capacity: 1
) { enqueueOnNIOPtr -> Bool in
) { enqueueOnNIOPtr -> NIOGlobalExecutorTakeOverTaskExecutor? in
// Unsafe 2: We mandate that we're actually getting _the_ function pointer to the closure below which
// isn't formally guaranteed by Swift.
enqueueOnNIOPtr.baseAddress!.initialize(to: { job, _ in
Expand All @@ -119,11 +160,25 @@ extension NIOSingletons {
ordering: .relaxed
).exchanged
else {
return false
return nil
}

// Install the sentinel task executor on all SELs so that
// runSynchronously passes the same TaskExecutor for every
// event loop in the group.
let sentinel = NIOGlobalExecutorTakeOverTaskExecutor(
group: MultiThreadedEventLoopGroup.singleton
)
for el in MultiThreadedEventLoopGroup.singleton.makeIterator() {
if #available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, *) {
(el as! SelectableEventLoop)._customRunJob = {
$0.unownedJob.runSynchronously(on: sentinel.asUnownedTaskExecutor())
}
}
}

// nice, everything worked.
return true
return sentinel
}
}
#else
Expand Down
16 changes: 13 additions & 3 deletions Sources/NIOPosix/SelectableEventLoop.swift
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,12 @@ internal final class SelectableEventLoop: EventLoop, @unchecked Sendable {

private var metricsDelegateState: MetricsDelegateState?

/// The method to use when running a job. Allows us to switch between the various runSynchronously methods
///
/// - Important: This is not protected by any lock and must be set before any job is enqueued.
@usableFromInline
internal var _customRunJob: ((ErasedUnownedJob) -> Void)?

@usableFromInline
internal func _promiseCreated(futureIdentifier: _NIOEventLoopFutureIdentifier, file: StaticString, line: UInt) {
precondition(_isDebugAssertConfiguration())
Expand Down Expand Up @@ -617,10 +623,14 @@ internal final class SelectableEventLoop: EventLoop, @unchecked Sendable {
case .function(let function):
function()
case .unownedJob(let erasedUnownedJob):
if #available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *) {
erasedUnownedJob.unownedJob.runSynchronously(on: self.asUnownedSerialExecutor())
if let runJob = self._customRunJob {
runJob(erasedUnownedJob)
} else {
fatalError("Tried to run an UnownedJob without runtime support")
if #available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *) {
erasedUnownedJob.unownedJob.runSynchronously(on: self.asUnownedSerialExecutor())
} else {
fatalError("Tried to run an UnownedJob without runtime support")
}
}
case .callback(let handler):
handler.handleScheduledCallback(eventLoop: self)
Expand Down
Loading