Skip to content

Add TaskExecutor conformance to EventLoops #2732

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 40 additions & 12 deletions Benchmarks/Benchmarks/NIOPosixBenchmarks/Benchmarks.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,34 +20,44 @@ private let eventLoop = MultiThreadedEventLoopGroup.singleton.next()

let benchmarks = {
let defaultMetrics: [BenchmarkMetric] = [
.mallocCountTotal
.mallocCountTotal,
.cpuTotal,
.contextSwitches,
]

Benchmark(
"TCPEcho",
"TCPEcho pure NIO 1M times",
configuration: .init(
metrics: defaultMetrics,
scalingFactor: .mega,
maxDuration: .seconds(10_000_000),
maxIterations: 5
scalingFactor: .one
)
) { benchmark in
try runTCPEcho(
numberOfWrites: benchmark.scaledIterations.upperBound,
numberOfWrites: 1_000_000,
eventLoop: eventLoop
)
}

Benchmark(
"TCPEchoAsyncChannel pure async/await 1M times",
configuration: .init(
metrics: defaultMetrics,
scalingFactor: .one
)
) { benchmark in
try await runTCPEchoAsyncChannel(
numberOfWrites: 1_000_000,
eventLoop: eventLoop
)
}

// This benchmark is only available above 5.9 since our EL conformance
// to serial executor is also gated behind 5.9.
#if compiler(>=5.9)
Benchmark(
"TCPEchoAsyncChannel",
"TCPEchoAsyncChannel using globalHook 1M times",
configuration: .init(
metrics: defaultMetrics,
scalingFactor: .mega,
maxDuration: .seconds(10_000_000),
maxIterations: 5,
scalingFactor: .one,
// We are expecting a bit of allocation variance due to an allocation
// in the Concurrency runtime which happens when resuming a continuation.
thresholds: [.mallocCountTotal: .init(absolute: [.p90: 2000])],
Expand All @@ -62,10 +72,28 @@ let benchmarks = {
)
) { benchmark in
try await runTCPEchoAsyncChannel(
numberOfWrites: benchmark.scaledIterations.upperBound,
numberOfWrites: 1_000_000,
eventLoop: eventLoop
)
}

#if compiler(>=6.0)
if #available(macOS 15.0, *) {
Benchmark(
"TCPEchoAsyncChannel using task executor preference 1M times",
configuration: .init(
metrics: defaultMetrics,
scalingFactor: .one
)
) { benchmark in
try await withTaskExecutorPreference(eventLoop.taskExecutor) {
try await runTCPEchoAsyncChannel(
numberOfWrites: 1_000_000,
eventLoop: eventLoop
)
}
}
}
#endif

Benchmark(
Expand Down
2 changes: 1 addition & 1 deletion Benchmarks/Package.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// swift-tools-version: 5.7
// swift-tools-version: 5.9

import PackageDescription

Expand Down
44 changes: 37 additions & 7 deletions Sources/NIOCore/EventLoop+SerialExecutor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -62,31 +62,61 @@ extension NIOSerialEventLoopExecutor {
/// This type is not recommended for use because it risks problems with unowned
/// executors. Adopters are recommended to conform their own event loop
/// types to `SerialExecutor`.
final class NIODefaultSerialEventLoopExecutor {
package final class NIODefaultEventLoopExecutor {
@usableFromInline
let loop: EventLoop

@inlinable
init(_ loop: EventLoop) {
package init(_ loop: EventLoop) {
self.loop = loop
}
}

@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
extension NIODefaultSerialEventLoopExecutor: SerialExecutor {
extension NIODefaultEventLoopExecutor: SerialExecutor {
@inlinable
public func enqueue(_ job: consuming ExecutorJob) {
package func enqueue(_ job: consuming ExecutorJob) {
self.loop.enqueue(job)
}

@inlinable
public func asUnownedSerialExecutor() -> UnownedSerialExecutor {
package func asUnownedSerialExecutor() -> UnownedSerialExecutor {
UnownedSerialExecutor(complexEquality: self)

}

@inlinable
public func isSameExclusiveExecutionContext(other: NIODefaultSerialEventLoopExecutor) -> Bool {
package func isSameExclusiveExecutionContext(other: NIODefaultEventLoopExecutor) -> Bool {
self.loop === other.loop
}
}

#if compiler(>=6.0)
/// A helper protocol that can be mixed in to a NIO ``EventLoop`` to provide an
/// automatic conformance to `TaskExecutor`.
///
/// Implementers of `EventLoop` should consider conforming to this protocol as
/// well on Swift 6.0 and later.
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
public protocol NIOTaskEventLoopExecutor: NIOSerialEventLoopExecutor & TaskExecutor {}

@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
extension NIOTaskEventLoopExecutor {
@inlinable
func asUnownedTaskExecutor() -> UnownedTaskExecutor {
UnownedTaskExecutor(ordinary: self)
}

@inlinable
public var taskExecutor: any TaskExecutor {
self
}
}

@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
extension NIODefaultEventLoopExecutor: TaskExecutor {
@inlinable
public func asUnownedTaskExecutor() -> UnownedTaskExecutor {
UnownedTaskExecutor(ordinary: self)
}
}
#endif
9 changes: 8 additions & 1 deletion Sources/NIOCore/EventLoop.swift
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ extension EventLoop {
extension EventLoop {
@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
public var executor: any SerialExecutor {
NIODefaultSerialEventLoopExecutor(self)
NIODefaultEventLoopExecutor(self)
}

@inlinable
Expand All @@ -552,6 +552,13 @@ extension EventLoop {
unownedJob.runSynchronously(on: self.executor.asUnownedSerialExecutor())
}
}

#if compiler(>=6.0)
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
public var taskExecutor: any TaskExecutor {
NIODefaultEventLoopExecutor(self)
}
#endif
}

extension EventLoopGroup {
Expand Down
6 changes: 6 additions & 0 deletions Sources/NIOEmbedded/AsyncTestingEventLoop.swift
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,12 @@ public final class NIOAsyncTestingEventLoop: EventLoop, @unchecked Sendable {
@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
extension NIOAsyncTestingEventLoop: NIOSerialEventLoopExecutor {}

// MARK: TaskExecutor conformance
#if compiler(>=6.0)
@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *)
extension NIOAsyncTestingEventLoop: NIOTaskEventLoopExecutor {}
#endif

/// This is a thread-safe promise creation store.
///
/// We use this to keep track of where promises come from in the `NIOAsyncTestingEventLoop`.
Expand Down
9 changes: 9 additions & 0 deletions Sources/NIOEmbedded/Embedded.swift
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,15 @@ public final class EmbeddedEventLoop: EventLoop, CustomStringConvertible {
}
return false
}()

#if compiler(>=6.0)
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
public var taskExecutor: any TaskExecutor {
fatalError(
"EmbeddedEventLoop is not thread safe and cannot be used as a TaskExecutor. Use NIOAsyncTestingEventLoop instead."
)
}
#endif
}

// EmbeddedEventLoop is extremely _not_ Sendable. However, the EventLoop protocol
Expand Down
6 changes: 6 additions & 0 deletions Sources/NIOPosix/SelectableEventLoop.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1080,3 +1080,9 @@ extension SelectableEventLoop {
}
}
}

// MARK: TaskExecutor conformance
#if compiler(>=6.0)
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
extension SelectableEventLoop: NIOTaskEventLoopExecutor {}
#endif
82 changes: 82 additions & 0 deletions Tests/NIOPosixTests/TaskExecutorTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2024 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import NIOCore
import NIOEmbedded
import NIOPosix
import XCTest

final class TaskExecutorTests: XCTestCase {
#if compiler(>=6.0)
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
func _runTests(loop1: some EventLoop, loop2: some EventLoop) async {
let executor1 = loop1.taskExecutor
let executor2 = loop2.taskExecutor
await withTaskGroup(of: Void.self) { taskGroup in
taskGroup.addTask(executorPreference: executor1) {
loop1.assertInEventLoop()
loop2.assertNotInEventLoop()

withUnsafeCurrentTask { task in
// this currently fails on macOS
XCTAssertEqual(task?.unownedTaskExecutor, executor1.asUnownedTaskExecutor())
}
}

taskGroup.addTask(executorPreference: executor2) {
loop1.assertNotInEventLoop()
loop2.assertInEventLoop()

withUnsafeCurrentTask { task in
// this currently fails on macOS
XCTAssertEqual(task?.unownedTaskExecutor, executor2.asUnownedTaskExecutor())
}
}
}

let task = Task(executorPreference: executor1) {
loop1.assertInEventLoop()
loop2.assertNotInEventLoop()

withUnsafeCurrentTask { task in
// this currently fails on macOS
XCTAssertEqual(task?.unownedTaskExecutor, executor1.asUnownedTaskExecutor())
}
}

await task.value
}

@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
func testSelectableEventLoopAsTaskExecutor() async throws {
let group = MultiThreadedEventLoopGroup(numberOfThreads: 2)
var iterator = group.makeIterator()
let loop1 = iterator.next()!
let loop2 = iterator.next()!

await self._runTests(loop1: loop1, loop2: loop2)
Copy link
Member Author

@fabianfett fabianfett May 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test currently crashes:

Thread 4 Crashed:: NIO-ELT-0-#0
0   libswiftCore.dylib            	       0x19a319644 swift_getObjectType + 88
1   libswift_Concurrency.dylib    	       0x2617ea0e4 swift_task_enqueueImpl(swift::Job*, swift::SerialExecutorRef) + 160
2   libswift_Concurrency.dylib    	       0x2617ec890 swift::AsyncTask::flagAsAndEnqueueOnExecutor(swift::SerialExecutorRef) + 432
3   libswift_Concurrency.dylib    	       0x2617eacdc swift_task_switchImpl(swift::AsyncContext*, void (swift::AsyncContext* swift_async_context) swiftasynccall*, swift::SerialExecutorRef) + 640
4   swift-nioPackageTests         	       0x1080d5ca1 partial apply for closure #1 in closure #1 in TaskExecutorTests._runTests<A, B>(loop1:loop2:) + 1
5   swift-nioPackageTests         	       0x1080d4d3d thunk for @escaping @isolated(any) @callee_guaranteed @Sendable @async () -> (@out A) + 1
6   swift-nioPackageTests         	       0x1080d4e99 partial apply for thunk for @escaping @isolated(any) @callee_guaranteed @Sendable @async () -> (@out A) + 1
7   libswift_Concurrency.dylib    	       0x2617f1921 completeTaskWithClosure(swift::AsyncContext*, swift::SwiftError*) + 1

Why do we call swift_task_switchImpl with a SerialExecutorRef?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

switch is what checks if it can "switch" without releasing actor locks; it won't be able to switch when there's a task executor. I'll look into reproducing this though with a debug runtime to check.

try! await group.shutdownGracefully()
}

@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
func testAsyncTestingEventLoopAsTaskExecutor() async throws {
let loop1 = NIOAsyncTestingEventLoop()
let loop2 = NIOAsyncTestingEventLoop()

await self._runTests(loop1: loop1, loop2: loop2)

await loop1.shutdownGracefully()
await loop2.shutdownGracefully()
}
#endif
}
Loading