-
Notifications
You must be signed in to change notification settings - Fork 746
Expand file tree
/
Copy pathPosixSingletons+ConcurrencyTakeOver.swift
More file actions
199 lines (177 loc) · 8.6 KB
/
PosixSingletons+ConcurrencyTakeOver.swift
File metadata and controls
199 lines (177 loc) · 8.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2023 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
#if !os(WASI)
import Atomics
import NIOCore
private protocol SilenceWarning {
@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
func enqueue(_ job: UnownedJob)
}
@available(macOS 14, *)
extension SelectableEventLoop: SilenceWarning {}
/// A sentinel `TaskExecutor` shared by all `SelectableEventLoop`s in the singleton `MultiThreadedEventLoopGroup`
/// when the concurrency global executor has been taken over.
///
/// By sharing a single `TaskExecutor` identity across all event loops, the Swift concurrency runtime treats them
/// as equivalent executors. This prevents unnecessary thread hops when a task suspends on one event loop and
/// resumes on another within the same group.
@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
private final class NIOGlobalExecutorTakeOverTaskExecutor: TaskExecutor, Sendable {
let group: MultiThreadedEventLoopGroup
init(group: MultiThreadedEventLoopGroup) {
self.group = group
}
func enqueue(_ job: consuming ExecutorJob) {
let targetEL = self.group.anySEL()
targetEL.enqueue(job)
}
}
private let _haveWeTakenOverTheConcurrencyPool = ManagedAtomic(false)
extension NIOSingletons {
/// Install ``MultiThreadedEventLoopGroup/singleton`` as Swift Concurrency's global executor and set it as the task executor.
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, *)
public static func withUnsafeInstallSingletonPosixEventLoopGroupAsConcurrencyGlobalExecutorAndTaskExecutor<
Failure: Error
>(
body: () async throws(Failure) -> Void
) async throws(Failure) {
let taskExecutor = _unsafeTryInstallSingletonPosixEventLoopGroupAsConcurrencyGlobalExecutor()
try await withTaskExecutorPreference(taskExecutor, operation: body)
}
/// Install ``MultiThreadedEventLoopGroup/singleton`` as Swift Concurrency's global executor.
///
/// This allows to use Swift Concurrency and retain high-performance I/O alleviating the otherwise necessary thread switches between
/// Swift Concurrency's own global pool and a place (like an `EventLoop`) that allows to perform I/O
///
/// This method uses an atomic compare and exchange to install the hook (and makes sure it's not already set). This unilateral atomic memory
/// operation doesn't guarantee anything because another piece of code could have done the same without using atomic operations. But we
/// do our best.
///
/// - Returns: If ``MultiThreadedEventLoopGroup/singleton`` was successfully installed as Swift Concurrency's global executor or not.
///
/// - warning: You may only call this method from the main thread.
/// - warning: You may only call this method once.
/// - warning: This method is currently not supported on Windows and will return false.
@discardableResult
public static func unsafeTryInstallSingletonPosixEventLoopGroupAsConcurrencyGlobalExecutor() -> Bool {
guard #available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *) else {
return false
}
let taskExecutor = _unsafeTryInstallSingletonPosixEventLoopGroupAsConcurrencyGlobalExecutor()
return taskExecutor != nil
}
@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
private static func _unsafeTryInstallSingletonPosixEventLoopGroupAsConcurrencyGlobalExecutor()
-> NIOGlobalExecutorTakeOverTaskExecutor?
{
#if os(Windows)
return nil
#else
// Guard between the minimum and maximum supported version for the hook
#if compiler(<6.4)
typealias ConcurrencyEnqueueGlobalHook =
@convention(thin) (
UnownedJob, @convention(thin) (UnownedJob) -> Void
) -> Void
guard
_haveWeTakenOverTheConcurrencyPool.compareExchange(
expected: false,
desired: true,
ordering: .relaxed
).exchanged
else {
fatalError("Must be called only once")
}
#if canImport(Darwin)
guard pthread_main_np() == 1 else {
fatalError("Must be called from the main thread")
}
#endif
// Unsafe 1: We pretend that the hook's type is actually fully equivalent to `ConcurrencyEnqueueGlobalHook`
// @convention(thin) (UnownedJob, @convention(thin) (UnownedJob) -> Void) -> Void
// which isn't formally guaranteed.
let concurrencyEnqueueGlobalHookPtr = dlsym(
dlopen(nil, RTLD_NOW),
"swift_task_enqueueGlobal_hook"
)?.assumingMemoryBound(to: UnsafeRawPointer?.AtomicRep.self)
guard let concurrencyEnqueueGlobalHookPtr = concurrencyEnqueueGlobalHookPtr else {
return nil
}
// We will use an atomic operation to swap the pointers aiming to protect against other code that attempts
// to swap the pointer. This isn't guaranteed to work as we can't be sure that the other code will actually
// use atomic compares and exchanges to. Nevertheless, we're doing our best.
let concurrencyEnqueueGlobalHookAtomic = UnsafeAtomic<UnsafeRawPointer?>(at: concurrencyEnqueueGlobalHookPtr)
// note: We don't need to destroy this atomic as we're borrowing the storage from `dlsym`.
return withUnsafeTemporaryAllocation(
of: ConcurrencyEnqueueGlobalHook.self,
capacity: 1
) { enqueueOnNIOPtr -> NIOGlobalExecutorTakeOverTaskExecutor? in
// Unsafe 2: We mandate that we're actually getting _the_ function pointer to the closure below which
// isn't formally guaranteed by Swift.
enqueueOnNIOPtr.baseAddress!.initialize(to: { job, _ in
// This formally picks a random EventLoop from the singleton group. However, `EventLoopGroup.any()`
// attempts to be sticky. So if we're already in an `EventLoop` that's part of the singleton
// `EventLoopGroup`, we'll get that one and be very fast (avoid a thread switch).
let targetEL = MultiThreadedEventLoopGroup.singleton.anySEL()
(targetEL as any SilenceWarning).enqueue(job)
})
// Unsafe 3: We mandate that the function pointer can be reinterpreted as `UnsafeRawPointer` which isn't
// formally guaranteed by Swift
return enqueueOnNIOPtr.baseAddress!.withMemoryRebound(
to: UnsafeRawPointer.self,
capacity: 1
) { enqueueOnNIOPtr in
// Unsafe 4: We just pretend that we're the only ones in the world pulling this trick (or at least
// that the others also use a `compareExchange`)...
guard
concurrencyEnqueueGlobalHookAtomic.compareExchange(
expected: nil,
desired: enqueueOnNIOPtr.pointee,
ordering: .relaxed
).exchanged
else {
return nil
}
// Install the sentinel task executor on all SELs so that
// runSynchronously passes the same TaskExecutor for every
// event loop in the group.
let sentinel = NIOGlobalExecutorTakeOverTaskExecutor(
group: MultiThreadedEventLoopGroup.singleton
)
for el in MultiThreadedEventLoopGroup.singleton.makeIterator() {
if #available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, *) {
(el as! SelectableEventLoop)._customRunJob = {
$0.unownedJob.runSynchronously(on: sentinel.asUnownedTaskExecutor())
}
}
}
// nice, everything worked.
return sentinel
}
}
#else
return false
#endif
#endif // windows unimplemented
}
}
// Workaround for https://github.com/apple/swift-nio/issues/2893
extension Optional
where
Wrapped: AtomicOptionalWrappable,
Wrapped.AtomicRepresentation.Value == Wrapped
{
typealias AtomicRep = Wrapped.AtomicOptionalRepresentation
}
#endif // !os(WASI)