From ee05a53d73e53c0cd3c1d91517be3f9c04195404 Mon Sep 17 00:00:00 2001 From: Manolo van Ee Date: Thu, 13 Feb 2025 09:23:47 +0000 Subject: [PATCH 1/9] Make sure NIOAsyncWriter test doesn't hang indefinitely The testSuspendingBufferedYield_whenWriterFinished test causes the test application to hang in the Android emulator. This fix sets a timeout on waiting for the ConditionLock, so we don't wait indefinitely. Additionally, the timeout for waiting for both yields being suspended is increased to make it less likely that an incorrect state is reached. --- .../AsyncSequences/NIOAsyncWriterTests.swift | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift b/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift index 848c65f869..9a1e730f55 100644 --- a/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift +++ b/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift @@ -623,8 +623,11 @@ final class NIOAsyncWriterTests: XCTestCase { self.delegate.didYieldHandler = { _ in if self.delegate.didYieldCallCount == 1 { // Delay this yield until the other yield is suspended again. - suspendedAgain.lock(whenValue: true) - suspendedAgain.unlock() + if suspendedAgain.lock(whenValue: true, timeoutSeconds: 5) { + suspendedAgain.unlock() + } else { + XCTFail("Timeout while waiting for other yield to suspend again.") + } } } @@ -635,7 +638,7 @@ final class NIOAsyncWriterTests: XCTestCase { try await writer!.yield("message2") } - await fulfillment(of: [bothSuspended], timeout: 1) + await fulfillment(of: [bothSuspended], timeout: 5) self.writer.finish() self.assert(suspendCallCount: 2, yieldCallCount: 0, terminateCallCount: 0) From 04e2e445a0dddb6fa236397435156691866e28f7 Mon Sep 17 00:00:00 2001 From: Manolo van Ee Date: Fri, 7 Mar 2025 19:34:10 +0100 Subject: [PATCH 2/9] Add NIOThreadPoolTaskExecutor to NIOTestUtils Provides withNIOThreadPoolTaskExecutor, which runs a task executor based on a NIOThreadPool with a specified number of threads. --- Package.swift | 1 + .../NIOThreadPoolTaskExecutor.swift | 116 ++++++++++++++++++ .../NIOThreadPoolTaskExecutorTest.swift | 81 ++++++++++++ 3 files changed, 198 insertions(+) create mode 100644 Sources/NIOTestUtils/NIOThreadPoolTaskExecutor.swift create mode 100644 Tests/NIOTestUtilsTests/NIOThreadPoolTaskExecutorTest.swift diff --git a/Package.swift b/Package.swift index 472db3490f..127c37cd10 100644 --- a/Package.swift +++ b/Package.swift @@ -520,6 +520,7 @@ let package = Package( dependencies: [ "NIOTestUtils", "NIOCore", + "NIOConcurrencyHelpers", "NIOEmbedded", "NIOPosix", ] diff --git a/Sources/NIOTestUtils/NIOThreadPoolTaskExecutor.swift b/Sources/NIOTestUtils/NIOThreadPoolTaskExecutor.swift new file mode 100644 index 0000000000..9786105e47 --- /dev/null +++ b/Sources/NIOTestUtils/NIOThreadPoolTaskExecutor.swift @@ -0,0 +1,116 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2022 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +#if compiler(>=6) + +import NIOPosix + +/// Run a `NIOThreadPool` based `TaskExecutor` while executing the given `body`. +/// +/// This function provides a `TaskExecutor`, **not** a `SerialExecutor`. The executor can be +/// used for setting the executor preference of a task. +/// +/// Example usage: +/// ```swift +/// await withNIOThreadPoolTaskExecutor(numberOfThreads: 2) { taskExecutor in +/// await withDiscardingTaskGroup { group in +/// group.addTask(executorPreference: taskExecutor) { ... } +/// } +/// } +/// ``` +/// +/// - warning: Do not escape the task executor from the closure for later use and make sure that +/// all tasks running on the executor are completely finished before `body` returns. +/// For unstructured tasks, this means awaiting their results. If any task is still +/// running on the executor when `body` returns, this results in a fatalError. +/// It is highly recommended to use structured concurrency with this task executor. +/// +/// - Parameters: +/// - numberOfThreads: The number of threads in the pool. +/// - body: The closure that will accept the task executor. +/// +/// - Throws: When `body` throws. +/// +/// - Returns: The value returned by `body`. +@inlinable +public func withNIOThreadPoolTaskExecutor( + numberOfThreads: Int, + body: (NIOThreadPoolTaskExecutor) async throws(Failure) -> T +) async throws(Failure) -> T { + let taskExecutor = NIOThreadPoolTaskExecutor(numberOfThreads: numberOfThreads) + taskExecutor.start() + + let result: Result + do { + result = .success(try await body(taskExecutor)) + } catch { + result = .failure(error) + } + + await taskExecutor.shutdownGracefully() + + return try result.get() +} + +/// A task executor based on NIOThreadPool. +/// +/// Provides a `TaskExecutor`, **not** a `SerialExecutor`. The executor can be +/// used for setting the executor preference of a task. +/// +public final class NIOThreadPoolTaskExecutor: TaskExecutor { + let nioThreadPool: NIOThreadPool + + /// Initialize a `NIOThreadPoolTaskExecutor`, using a thread pool with `numberOfThreads` threads. + /// + /// - Parameters: + /// - numberOfThreads: The number of threads to use for the thread pool. + public init(numberOfThreads: Int) { + self.nioThreadPool = NIOThreadPool(numberOfThreads: numberOfThreads) + } + + /// Start the `NIOThreadPoolTaskExecutor`. + public func start() { + nioThreadPool.start() + } + + /// Gracefully shutdown this `NIOThreadPoolTaskExecutor`. + /// + /// Make sure that all tasks running on the executor are finished before shutting down. + /// + /// - warning: If any task is still running on the executor, this results in a fatalError. + public func shutdownGracefully() async { + do { + try await nioThreadPool.shutdownGracefully() + } catch { + fatalError("Failed to shutdown NIOThreadPool") + } + } + + /// Enqueue a job. + /// + /// Called by the concurrency runtime. + /// + /// - Parameter job: The job to enqueue. + public func enqueue(_ job: consuming ExecutorJob) { + let unownedJob = UnownedJob(job) + self.nioThreadPool.submit { shouldRun in + guard case shouldRun = NIOThreadPool.WorkItemState.active else { + fatalError("Shutdown before all tasks finished") + } + unownedJob.runSynchronously(on: self.asUnownedTaskExecutor()) + } + } +} + +#endif // compiler(>=6) diff --git a/Tests/NIOTestUtilsTests/NIOThreadPoolTaskExecutorTest.swift b/Tests/NIOTestUtilsTests/NIOThreadPoolTaskExecutorTest.swift new file mode 100644 index 0000000000..1911101a44 --- /dev/null +++ b/Tests/NIOTestUtilsTests/NIOThreadPoolTaskExecutorTest.swift @@ -0,0 +1,81 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2019-2025 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIOConcurrencyHelpers +import NIOTestUtils +import XCTest + +class NIOThreadPoolTaskExecutorTest: XCTestCase { + struct TestError: Error {} + + func runTasksSimultaneously(numberOfTasks: Int) async { + await withNIOThreadPoolTaskExecutor(numberOfThreads: numberOfTasks) { taskExecutor in + await withDiscardingTaskGroup { group in + var taskBlockers = [ConditionLock]() + defer { + // Unblock all tasks + for taskBlocker in taskBlockers { + taskBlocker.lock() + taskBlocker.unlock(withValue: true) + } + } + + for taskNumber in 1...numberOfTasks { + let taskStarted = ConditionLock(value: false) + let taskBlocker = ConditionLock(value: false) + taskBlockers.append(taskBlocker) + + // Start task and block it + group.addTask(executorPreference: taskExecutor) { + taskStarted.lock() + taskStarted.unlock(withValue: true) + taskBlocker.lock(whenValue: true) + taskBlocker.unlock() + } + + // Verify that task was able to start + if taskStarted.lock(whenValue: true, timeoutSeconds: 5) { + taskStarted.unlock() + } else { + XCTFail("Task \(taskNumber) failed to start.") + break + } + } + } + } + } + + func testRunsTaskOnSingleThread() async { + await runTasksSimultaneously(numberOfTasks: 1) + } + + func testRunsMultipleTasksOnMultipleThreads() async { + await runTasksSimultaneously(numberOfTasks: 3) + } + + func testReturnsBodyResult() async { + let expectedResult = "result" + let result = await withNIOThreadPoolTaskExecutor(numberOfThreads: 1) { _ in return expectedResult } + XCTAssertEqual(result, expectedResult) + } + + func testRethrows() async { + do { + try await withNIOThreadPoolTaskExecutor(numberOfThreads: 1) { _ in throw TestError() } + XCTFail("Function did not rethrow.") + } catch { + XCTAssertTrue(error is TestError) + } + } +} From b83d90dad7b808c02f5045280856028f55e60de2 Mon Sep 17 00:00:00 2001 From: Manolo van Ee Date: Fri, 7 Mar 2025 19:43:55 +0100 Subject: [PATCH 3/9] Use NIOThreadPoolTaskExecutor for NIOAsyncWriter test that hangs on Android emulator MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The testSuspendingBufferedYield_whenWriterFinished test requires at least two threads in the concurrency thread pool because it blocks one task, which waits for another task to set a condition. In environments where the global concurrency thread pool doesn’t have at least two threads available, the test will fail, as observed on the Android emulator running with a single virtual core (see discussion in #3044). Using a custom task executor guarantees that at least two threads are available for the test, regardless of the width of the global concurrency thread pool. --- Package.swift | 1 + .../AsyncSequences/NIOAsyncWriterTests.swift | 79 ++++++++++--------- 2 files changed, 44 insertions(+), 36 deletions(-) diff --git a/Package.swift b/Package.swift index 127c37cd10..0816cd766f 100644 --- a/Package.swift +++ b/Package.swift @@ -441,6 +441,7 @@ let package = Package( "NIOCore", "NIOEmbedded", "NIOFoundationCompat", + "NIOTestUtils", swiftAtomics, ], swiftSettings: strictConcurrencySettings diff --git a/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift b/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift index 9a1e730f55..9b18107d5b 100644 --- a/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift +++ b/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift @@ -14,6 +14,7 @@ import DequeModule import NIOConcurrencyHelpers +import NIOTestUtils import XCTest @testable import NIOCore @@ -607,50 +608,56 @@ final class NIOAsyncWriterTests: XCTestCase { } func testSuspendingBufferedYield_whenWriterFinished() async throws { - self.sink.setWritability(to: false) - - let bothSuspended = expectation(description: "suspended on both yields") - let suspendedAgain = ConditionLock(value: false) - self.delegate.didSuspendHandler = { - if self.delegate.didSuspendCallCount == 2 { - bothSuspended.fulfill() - } else if self.delegate.didSuspendCallCount > 2 { - suspendedAgain.lock() - suspendedAgain.unlock(withValue: true) - } - } + #if compiler(>=6) + try await withNIOThreadPoolTaskExecutor(numberOfThreads: 2) { taskExecutor in + try await withThrowingTaskGroup(of: Void.self) { group in + self.sink.setWritability(to: false) + + let bothSuspended = expectation(description: "suspended on both yields") + let suspendedAgain = ConditionLock(value: false) + self.delegate.didSuspendHandler = { + if self.delegate.didSuspendCallCount == 2 { + bothSuspended.fulfill() + } else if self.delegate.didSuspendCallCount > 2 { + suspendedAgain.lock() + suspendedAgain.unlock(withValue: true) + } + } - self.delegate.didYieldHandler = { _ in - if self.delegate.didYieldCallCount == 1 { - // Delay this yield until the other yield is suspended again. - if suspendedAgain.lock(whenValue: true, timeoutSeconds: 5) { - suspendedAgain.unlock() - } else { - XCTFail("Timeout while waiting for other yield to suspend again.") + self.delegate.didYieldHandler = { _ in + if self.delegate.didYieldCallCount == 1 { + // Delay this yield until the other yield is suspended again. + if suspendedAgain.lock(whenValue: true, timeoutSeconds: 5) { + suspendedAgain.unlock() + } else { + XCTFail("Timeout while waiting for other yield to suspend again.") + } + } } - } - } - let task1 = Task { [writer] in - try await writer!.yield("message1") - } - let task2 = Task { [writer] in - try await writer!.yield("message2") - } + group.addTask(executorPreference: taskExecutor) { [writer] in + try await writer!.yield("message1") + } + group.addTask(executorPreference: taskExecutor) { [writer] in + try await writer!.yield("message2") + } - await fulfillment(of: [bothSuspended], timeout: 5) - self.writer.finish() + await fulfillment(of: [bothSuspended], timeout: 5) + self.writer.finish() - self.assert(suspendCallCount: 2, yieldCallCount: 0, terminateCallCount: 0) + self.assert(suspendCallCount: 2, yieldCallCount: 0, terminateCallCount: 0) - // We have to become writable again to unbuffer the yields - // The first call to didYield will pause, so that the other yield will be suspended again. - self.sink.setWritability(to: true) + // We have to become writable again to unbuffer the yields + // The first call to didYield will pause, so that the other yield will be suspended again. + self.sink.setWritability(to: true) - await XCTAssertNoThrow(try await task1.value) - await XCTAssertNoThrow(try await task2.value) + await XCTAssertNoThrow(try await group.next()) + await XCTAssertNoThrow(try await group.next()) - self.assert(suspendCallCount: 3, yieldCallCount: 2, terminateCallCount: 1) + self.assert(suspendCallCount: 3, yieldCallCount: 2, terminateCallCount: 1) + } + } + #endif // compiler(>=6) } func testWriterFinish_whenFinished() { From 7af7145fb47a4cc744d73744cc257badc158e895 Mon Sep 17 00:00:00 2001 From: Manolo van Ee Date: Sun, 23 Mar 2025 15:12:22 +0100 Subject: [PATCH 4/9] Rename test to fit with the section it's in --- Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift b/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift index 9b18107d5b..db18645441 100644 --- a/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift +++ b/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift @@ -607,7 +607,7 @@ final class NIOAsyncWriterTests: XCTestCase { self.assert(suspendCallCount: 1, yieldCallCount: 1, terminateCallCount: 1) } - func testSuspendingBufferedYield_whenWriterFinished() async throws { + func testWriterFinish_AndSuspendBufferedYield() async throws { #if compiler(>=6) try await withNIOThreadPoolTaskExecutor(numberOfThreads: 2) { taskExecutor in try await withThrowingTaskGroup(of: Void.self) { group in From c778a2f9a89a6356e9c21acf88f63bf9665b6a27 Mon Sep 17 00:00:00 2001 From: Manolo van Ee Date: Mon, 24 Mar 2025 14:49:26 +0100 Subject: [PATCH 5/9] Fix year in copyright notice Co-authored-by: George Barnett --- Sources/NIOTestUtils/NIOThreadPoolTaskExecutor.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/NIOTestUtils/NIOThreadPoolTaskExecutor.swift b/Sources/NIOTestUtils/NIOThreadPoolTaskExecutor.swift index 9786105e47..b2a4f73d9d 100644 --- a/Sources/NIOTestUtils/NIOThreadPoolTaskExecutor.swift +++ b/Sources/NIOTestUtils/NIOThreadPoolTaskExecutor.swift @@ -2,7 +2,7 @@ // // This source file is part of the SwiftNIO open source project // -// Copyright (c) 2022 Apple Inc. and the SwiftNIO project authors +// Copyright (c) 2025 Apple Inc. and the SwiftNIO project authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information From e993300b3daa8288365ac10ffcf50a1cbf68a3e1 Mon Sep 17 00:00:00 2001 From: Manolo van Ee Date: Mon, 24 Mar 2025 14:50:26 +0100 Subject: [PATCH 6/9] Fix year in copyright notice Co-authored-by: George Barnett --- Tests/NIOTestUtilsTests/NIOThreadPoolTaskExecutorTest.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/NIOTestUtilsTests/NIOThreadPoolTaskExecutorTest.swift b/Tests/NIOTestUtilsTests/NIOThreadPoolTaskExecutorTest.swift index 1911101a44..0ec01bf9e3 100644 --- a/Tests/NIOTestUtilsTests/NIOThreadPoolTaskExecutorTest.swift +++ b/Tests/NIOTestUtilsTests/NIOThreadPoolTaskExecutorTest.swift @@ -2,7 +2,7 @@ // // This source file is part of the SwiftNIO open source project // -// Copyright (c) 2019-2025 Apple Inc. and the SwiftNIO project authors +// Copyright (c) 2025 Apple Inc. and the SwiftNIO project authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information From 4634b064373c3f0f1828f96276699157e0b0a225 Mon Sep 17 00:00:00 2001 From: Manolo van Ee Date: Wed, 26 Mar 2025 08:58:12 +0100 Subject: [PATCH 7/9] Make NIOThreadPoolTaskExecutor member functions internal and @usableFromInline --- Sources/NIOTestUtils/NIOThreadPoolTaskExecutor.swift | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/Sources/NIOTestUtils/NIOThreadPoolTaskExecutor.swift b/Sources/NIOTestUtils/NIOThreadPoolTaskExecutor.swift index b2a4f73d9d..91ee915918 100644 --- a/Sources/NIOTestUtils/NIOThreadPoolTaskExecutor.swift +++ b/Sources/NIOTestUtils/NIOThreadPoolTaskExecutor.swift @@ -75,12 +75,14 @@ public final class NIOThreadPoolTaskExecutor: TaskExecutor { /// /// - Parameters: /// - numberOfThreads: The number of threads to use for the thread pool. - public init(numberOfThreads: Int) { + @usableFromInline + init(numberOfThreads: Int) { self.nioThreadPool = NIOThreadPool(numberOfThreads: numberOfThreads) } /// Start the `NIOThreadPoolTaskExecutor`. - public func start() { + @usableFromInline + func start() { nioThreadPool.start() } @@ -89,7 +91,8 @@ public final class NIOThreadPoolTaskExecutor: TaskExecutor { /// Make sure that all tasks running on the executor are finished before shutting down. /// /// - warning: If any task is still running on the executor, this results in a fatalError. - public func shutdownGracefully() async { + @usableFromInline + func shutdownGracefully() async { do { try await nioThreadPool.shutdownGracefully() } catch { From 8afbc3f02b372bbf6c7cac0dd2c8135504dfc02e Mon Sep 17 00:00:00 2001 From: Manolo van Ee Date: Fri, 11 Apr 2025 13:15:40 +0200 Subject: [PATCH 8/9] Use ManualTaskExecutor instead of NIOThreadPoolTaskExecutor --- Package.swift | 1 - Sources/NIOTestUtils/ManualTaskExecutor.swift | 164 ++++++++++++++++++ .../NIOThreadPoolTaskExecutor.swift | 119 ------------- .../AsyncSequences/NIOAsyncWriterTests.swift | 50 +++--- .../ManualTaskExecutorTest.swift | 69 ++++++++ .../NIOThreadPoolTaskExecutorTest.swift | 81 --------- 6 files changed, 256 insertions(+), 228 deletions(-) create mode 100644 Sources/NIOTestUtils/ManualTaskExecutor.swift delete mode 100644 Sources/NIOTestUtils/NIOThreadPoolTaskExecutor.swift create mode 100644 Tests/NIOTestUtilsTests/ManualTaskExecutorTest.swift delete mode 100644 Tests/NIOTestUtilsTests/NIOThreadPoolTaskExecutorTest.swift diff --git a/Package.swift b/Package.swift index 0816cd766f..f4a3568f9b 100644 --- a/Package.swift +++ b/Package.swift @@ -521,7 +521,6 @@ let package = Package( dependencies: [ "NIOTestUtils", "NIOCore", - "NIOConcurrencyHelpers", "NIOEmbedded", "NIOPosix", ] diff --git a/Sources/NIOTestUtils/ManualTaskExecutor.swift b/Sources/NIOTestUtils/ManualTaskExecutor.swift new file mode 100644 index 0000000000..b4f543617e --- /dev/null +++ b/Sources/NIOTestUtils/ManualTaskExecutor.swift @@ -0,0 +1,164 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2025 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +#if compiler(>=6) + +import DequeModule +import Synchronization + +/// Provide a `ManualTaskExecutor` for the duration of the given `body`. +/// +/// The executor can be used for setting the executor preference of tasks and fully control +/// when execution of the tasks is performed. +/// +/// Example usage: +/// ```swift +/// await withDiscardingTaskGroup { group in +/// await withManualTaskExecutor { taskExecutor in +/// group.addTask(executorPreference: taskExecutor) { +/// print("Running") +/// } +/// taskExecutor.runUntilQueueIsEmpty() // Run the task synchronously +/// } +/// } +/// ``` +/// +/// - warning: Do not escape the task executor from the closure for later use and make sure that +/// all tasks running on the executor are completely finished before `body` returns. +/// It is highly recommended to use structured concurrency with this task executor. +/// +/// - Parameters: +/// - body: The closure that will accept the task executor. +/// +/// - Throws: When `body` throws. +/// +/// - Returns: The value returned by `body`. +@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) +@inlinable +public func withManualTaskExecutor( + body: (ManualTaskExecutor) async throws(Failure) -> T +) async throws(Failure) -> T { + let taskExecutor = ManualTaskExecutor() + defer { taskExecutor.shutdown() } + return try await body(taskExecutor) +} + +/// Provide two `ManualTaskExecutor`s for the duration of the given `body`. +/// +/// The executors can be used for setting the executor preference of tasks and fully control +/// when execution of the tasks is performed. +/// +/// Example usage: +/// ```swift +/// await withDiscardingTaskGroup { group in +/// await withManualTaskExecutor { taskExecutor1, taskExecutor2 in +/// group.addTask(executorPreference: taskExecutor1) { +/// print("Running 1") +/// } +/// group.addTask(executorPreference: taskExecutor2) { +/// print("Running 2") +/// } +/// taskExecutor2.runUntilQueueIsEmpty() // Run second task synchronously +/// taskExecutor1.runUntilQueueIsEmpty() // Run first task synchronously +/// } +/// } +/// ``` +/// +/// - warning: Do not escape the task executors from the closure for later use and make sure that +/// all tasks running on the executors are completely finished before `body` returns. +/// It is highly recommended to use structured concurrency with these task executors. +/// +/// - Parameters: +/// - body: The closure that will accept the task executors. +/// +/// - Throws: When `body` throws. +/// +/// - Returns: The value returned by `body`. +@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) +@inlinable +public func withManualTaskExecutor( + body: (ManualTaskExecutor, ManualTaskExecutor) async throws(Failure) -> T +) async throws(Failure) -> T { + let taskExecutor1 = ManualTaskExecutor() + defer { taskExecutor1.shutdown() } + + let taskExecutor2 = ManualTaskExecutor() + defer { taskExecutor2.shutdown() } + + return try await body(taskExecutor1, taskExecutor2) +} + +/// Manual task executor. +/// +/// A `TaskExecutor` that does not use any threadpool or similar mechanism to run the jobs. +/// Jobs are manually run by calling the `runUntilQueueIsEmpty` method. +/// +@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) +public final class ManualTaskExecutor: TaskExecutor { + struct Storage { + var isShutdown = false + var jobs = Deque() + } + + private let storage = Mutex(.init()) + + @usableFromInline + init() {} + + /// Run jobs until queue is empty. + /// + /// Synchronously runs all enqueued jobs, including any jobs that are enqueued while running. + /// When this function returns, it means that each task running on this executor is either: + /// - suspended + /// - moved (temporarily) to a different executor + /// - finished + /// + /// If not all tasks are finished, this function must be called again. + public func runUntilQueueIsEmpty() { + while let job = self.storage.withLock({ $0.jobs.popFirst() }) { + job.runSynchronously(on: self.asUnownedTaskExecutor()) + } + } + + /// Enqueue a job. + /// + /// Called by the concurrency runtime. + /// + /// - Parameter job: The job to enqueue. + public func enqueue(_ job: UnownedJob) { + self.storage.withLock { storage in + if storage.isShutdown { + fatalError("A job is enqueued after manual executor shutdown") + } + storage.jobs.append(job) + } + } + + /// Shutdown. + /// + /// Since the manual task executor is not running anything in the background, this is purely to catch + /// any issues due to incorrect usage of the executor. The shutdown verifies that the queue is empty + /// and makes sure that no new jobs can be enqueued. + @usableFromInline + func shutdown() { + self.storage.withLock { storage in + if !storage.jobs.isEmpty { + fatalError("Shutdown of manual executor with jobs in queue") + } + storage.isShutdown = true + } + } +} + +#endif // compiler(>=6) diff --git a/Sources/NIOTestUtils/NIOThreadPoolTaskExecutor.swift b/Sources/NIOTestUtils/NIOThreadPoolTaskExecutor.swift deleted file mode 100644 index 91ee915918..0000000000 --- a/Sources/NIOTestUtils/NIOThreadPoolTaskExecutor.swift +++ /dev/null @@ -1,119 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the SwiftNIO open source project -// -// Copyright (c) 2025 Apple Inc. and the SwiftNIO project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of SwiftNIO project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -#if compiler(>=6) - -import NIOPosix - -/// Run a `NIOThreadPool` based `TaskExecutor` while executing the given `body`. -/// -/// This function provides a `TaskExecutor`, **not** a `SerialExecutor`. The executor can be -/// used for setting the executor preference of a task. -/// -/// Example usage: -/// ```swift -/// await withNIOThreadPoolTaskExecutor(numberOfThreads: 2) { taskExecutor in -/// await withDiscardingTaskGroup { group in -/// group.addTask(executorPreference: taskExecutor) { ... } -/// } -/// } -/// ``` -/// -/// - warning: Do not escape the task executor from the closure for later use and make sure that -/// all tasks running on the executor are completely finished before `body` returns. -/// For unstructured tasks, this means awaiting their results. If any task is still -/// running on the executor when `body` returns, this results in a fatalError. -/// It is highly recommended to use structured concurrency with this task executor. -/// -/// - Parameters: -/// - numberOfThreads: The number of threads in the pool. -/// - body: The closure that will accept the task executor. -/// -/// - Throws: When `body` throws. -/// -/// - Returns: The value returned by `body`. -@inlinable -public func withNIOThreadPoolTaskExecutor( - numberOfThreads: Int, - body: (NIOThreadPoolTaskExecutor) async throws(Failure) -> T -) async throws(Failure) -> T { - let taskExecutor = NIOThreadPoolTaskExecutor(numberOfThreads: numberOfThreads) - taskExecutor.start() - - let result: Result - do { - result = .success(try await body(taskExecutor)) - } catch { - result = .failure(error) - } - - await taskExecutor.shutdownGracefully() - - return try result.get() -} - -/// A task executor based on NIOThreadPool. -/// -/// Provides a `TaskExecutor`, **not** a `SerialExecutor`. The executor can be -/// used for setting the executor preference of a task. -/// -public final class NIOThreadPoolTaskExecutor: TaskExecutor { - let nioThreadPool: NIOThreadPool - - /// Initialize a `NIOThreadPoolTaskExecutor`, using a thread pool with `numberOfThreads` threads. - /// - /// - Parameters: - /// - numberOfThreads: The number of threads to use for the thread pool. - @usableFromInline - init(numberOfThreads: Int) { - self.nioThreadPool = NIOThreadPool(numberOfThreads: numberOfThreads) - } - - /// Start the `NIOThreadPoolTaskExecutor`. - @usableFromInline - func start() { - nioThreadPool.start() - } - - /// Gracefully shutdown this `NIOThreadPoolTaskExecutor`. - /// - /// Make sure that all tasks running on the executor are finished before shutting down. - /// - /// - warning: If any task is still running on the executor, this results in a fatalError. - @usableFromInline - func shutdownGracefully() async { - do { - try await nioThreadPool.shutdownGracefully() - } catch { - fatalError("Failed to shutdown NIOThreadPool") - } - } - - /// Enqueue a job. - /// - /// Called by the concurrency runtime. - /// - /// - Parameter job: The job to enqueue. - public func enqueue(_ job: consuming ExecutorJob) { - let unownedJob = UnownedJob(job) - self.nioThreadPool.submit { shouldRun in - guard case shouldRun = NIOThreadPool.WorkItemState.active else { - fatalError("Shutdown before all tasks finished") - } - unownedJob.runSynchronously(on: self.asUnownedTaskExecutor()) - } - } -} - -#endif // compiler(>=6) diff --git a/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift b/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift index db18645441..748d138fe7 100644 --- a/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift +++ b/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift @@ -607,54 +607,50 @@ final class NIOAsyncWriterTests: XCTestCase { self.assert(suspendCallCount: 1, yieldCallCount: 1, terminateCallCount: 1) } + @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) func testWriterFinish_AndSuspendBufferedYield() async throws { #if compiler(>=6) - try await withNIOThreadPoolTaskExecutor(numberOfThreads: 2) { taskExecutor in - try await withThrowingTaskGroup(of: Void.self) { group in + try await withThrowingTaskGroup(of: Void.self) { group in + try await withManualTaskExecutor { taskExecutor1, taskExecutor2 in self.sink.setWritability(to: false) - let bothSuspended = expectation(description: "suspended on both yields") - let suspendedAgain = ConditionLock(value: false) - self.delegate.didSuspendHandler = { - if self.delegate.didSuspendCallCount == 2 { - bothSuspended.fulfill() - } else if self.delegate.didSuspendCallCount > 2 { - suspendedAgain.lock() - suspendedAgain.unlock(withValue: true) - } - } - self.delegate.didYieldHandler = { _ in if self.delegate.didYieldCallCount == 1 { - // Delay this yield until the other yield is suspended again. - if suspendedAgain.lock(whenValue: true, timeoutSeconds: 5) { - suspendedAgain.unlock() - } else { - XCTFail("Timeout while waiting for other yield to suspend again.") - } + // This is the yield of the first task. Run the second task until it suspends again + self.assert(suspendCallCount: 2, yieldCallCount: 1, terminateCallCount: 0) + taskExecutor2.runUntilQueueIsEmpty() + self.assert(suspendCallCount: 3, yieldCallCount: 1, terminateCallCount: 0) } } - group.addTask(executorPreference: taskExecutor) { [writer] in + group.addTask(executorPreference: taskExecutor1) { [writer] in try await writer!.yield("message1") } - group.addTask(executorPreference: taskExecutor) { [writer] in + group.addTask(executorPreference: taskExecutor2) { [writer] in try await writer!.yield("message2") } - await fulfillment(of: [bothSuspended], timeout: 5) - self.writer.finish() - + // Run tasks until they are both suspended + taskExecutor1.runUntilQueueIsEmpty() + taskExecutor2.runUntilQueueIsEmpty() self.assert(suspendCallCount: 2, yieldCallCount: 0, terminateCallCount: 0) + self.writer.finish() + // We have to become writable again to unbuffer the yields - // The first call to didYield will pause, so that the other yield will be suspended again. self.sink.setWritability(to: true) - await XCTAssertNoThrow(try await group.next()) - await XCTAssertNoThrow(try await group.next()) + // Run the first task, which will complete its yield + // During this yield, didYieldHandler will run the second task, which will suspend again + taskExecutor1.runUntilQueueIsEmpty() + self.assert(suspendCallCount: 3, yieldCallCount: 1, terminateCallCount: 0) + // Run the second task to complete its yield + taskExecutor2.runUntilQueueIsEmpty() self.assert(suspendCallCount: 3, yieldCallCount: 2, terminateCallCount: 1) + + await XCTAssertNoThrow(try await group.next()) + await XCTAssertNoThrow(try await group.next()) } } #endif // compiler(>=6) diff --git a/Tests/NIOTestUtilsTests/ManualTaskExecutorTest.swift b/Tests/NIOTestUtilsTests/ManualTaskExecutorTest.swift new file mode 100644 index 0000000000..66166ee2f6 --- /dev/null +++ b/Tests/NIOTestUtilsTests/ManualTaskExecutorTest.swift @@ -0,0 +1,69 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2025 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +#if compiler(>=6) + +import NIOTestUtils +import Synchronization +import XCTest + +class ManualTaskExecutorTest: XCTestCase { + @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) + func testManualTaskExecutor() async { + await withDiscardingTaskGroup { group in + await withManualTaskExecutor { taskExecutor in + let taskDidRun = Mutex(false) + + group.addTask(executorPreference: taskExecutor) { + taskDidRun.withLock { $0 = true } + } + + // Run task + XCTAssertFalse(taskDidRun.withLock { $0 }) + taskExecutor.runUntilQueueIsEmpty() + XCTAssertTrue(taskDidRun.withLock { $0 }) + } + } + } + + @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) + func testTwoManualTaskExecutors() async { + await withDiscardingTaskGroup { group in + await withManualTaskExecutor { taskExecutor1, taskExecutor2 in + let task1DidRun = Mutex(false) + let task2DidRun = Mutex(false) + + group.addTask(executorPreference: taskExecutor1) { + task1DidRun.withLock { $0 = true } + } + + group.addTask(executorPreference: taskExecutor2) { + task2DidRun.withLock { $0 = true } + } + + // Run task 1 + XCTAssertFalse(task1DidRun.withLock { $0 }) + taskExecutor1.runUntilQueueIsEmpty() + XCTAssertTrue(task1DidRun.withLock { $0 }) + + // Run task 2 + XCTAssertFalse(task2DidRun.withLock { $0 }) + taskExecutor2.runUntilQueueIsEmpty() + XCTAssertTrue(task2DidRun.withLock { $0 }) + } + } + } +} + +#endif // compiler(>=6) \ No newline at end of file diff --git a/Tests/NIOTestUtilsTests/NIOThreadPoolTaskExecutorTest.swift b/Tests/NIOTestUtilsTests/NIOThreadPoolTaskExecutorTest.swift deleted file mode 100644 index 0ec01bf9e3..0000000000 --- a/Tests/NIOTestUtilsTests/NIOThreadPoolTaskExecutorTest.swift +++ /dev/null @@ -1,81 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the SwiftNIO open source project -// -// Copyright (c) 2025 Apple Inc. and the SwiftNIO project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of SwiftNIO project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import NIOConcurrencyHelpers -import NIOTestUtils -import XCTest - -class NIOThreadPoolTaskExecutorTest: XCTestCase { - struct TestError: Error {} - - func runTasksSimultaneously(numberOfTasks: Int) async { - await withNIOThreadPoolTaskExecutor(numberOfThreads: numberOfTasks) { taskExecutor in - await withDiscardingTaskGroup { group in - var taskBlockers = [ConditionLock]() - defer { - // Unblock all tasks - for taskBlocker in taskBlockers { - taskBlocker.lock() - taskBlocker.unlock(withValue: true) - } - } - - for taskNumber in 1...numberOfTasks { - let taskStarted = ConditionLock(value: false) - let taskBlocker = ConditionLock(value: false) - taskBlockers.append(taskBlocker) - - // Start task and block it - group.addTask(executorPreference: taskExecutor) { - taskStarted.lock() - taskStarted.unlock(withValue: true) - taskBlocker.lock(whenValue: true) - taskBlocker.unlock() - } - - // Verify that task was able to start - if taskStarted.lock(whenValue: true, timeoutSeconds: 5) { - taskStarted.unlock() - } else { - XCTFail("Task \(taskNumber) failed to start.") - break - } - } - } - } - } - - func testRunsTaskOnSingleThread() async { - await runTasksSimultaneously(numberOfTasks: 1) - } - - func testRunsMultipleTasksOnMultipleThreads() async { - await runTasksSimultaneously(numberOfTasks: 3) - } - - func testReturnsBodyResult() async { - let expectedResult = "result" - let result = await withNIOThreadPoolTaskExecutor(numberOfThreads: 1) { _ in return expectedResult } - XCTAssertEqual(result, expectedResult) - } - - func testRethrows() async { - do { - try await withNIOThreadPoolTaskExecutor(numberOfThreads: 1) { _ in throw TestError() } - XCTFail("Function did not rethrow.") - } catch { - XCTAssertTrue(error is TestError) - } - } -} From 576641275ddc3d04d5cf6c2320545367f6bc2e92 Mon Sep 17 00:00:00 2001 From: Manolo van Ee Date: Sun, 27 Apr 2025 19:44:27 +0200 Subject: [PATCH 9/9] Change ManualTaskExecutor related functionality from public to package --- Sources/NIOTestUtils/ManualTaskExecutor.swift | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/Sources/NIOTestUtils/ManualTaskExecutor.swift b/Sources/NIOTestUtils/ManualTaskExecutor.swift index b4f543617e..c87ff324fe 100644 --- a/Sources/NIOTestUtils/ManualTaskExecutor.swift +++ b/Sources/NIOTestUtils/ManualTaskExecutor.swift @@ -46,7 +46,7 @@ import Synchronization /// - Returns: The value returned by `body`. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) @inlinable -public func withManualTaskExecutor( +package func withManualTaskExecutor( body: (ManualTaskExecutor) async throws(Failure) -> T ) async throws(Failure) -> T { let taskExecutor = ManualTaskExecutor() @@ -87,7 +87,7 @@ public func withManualTaskExecutor( /// - Returns: The value returned by `body`. @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) @inlinable -public func withManualTaskExecutor( +package func withManualTaskExecutor( body: (ManualTaskExecutor, ManualTaskExecutor) async throws(Failure) -> T ) async throws(Failure) -> T { let taskExecutor1 = ManualTaskExecutor() @@ -105,7 +105,8 @@ public func withManualTaskExecutor( /// Jobs are manually run by calling the `runUntilQueueIsEmpty` method. /// @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) -public final class ManualTaskExecutor: TaskExecutor { +@usableFromInline +package final class ManualTaskExecutor: TaskExecutor { struct Storage { var isShutdown = false var jobs = Deque() @@ -125,7 +126,7 @@ public final class ManualTaskExecutor: TaskExecutor { /// - finished /// /// If not all tasks are finished, this function must be called again. - public func runUntilQueueIsEmpty() { + package func runUntilQueueIsEmpty() { while let job = self.storage.withLock({ $0.jobs.popFirst() }) { job.runSynchronously(on: self.asUnownedTaskExecutor()) } @@ -136,7 +137,8 @@ public final class ManualTaskExecutor: TaskExecutor { /// Called by the concurrency runtime. /// /// - Parameter job: The job to enqueue. - public func enqueue(_ job: UnownedJob) { + @usableFromInline + package func enqueue(_ job: UnownedJob) { self.storage.withLock { storage in if storage.isShutdown { fatalError("A job is enqueued after manual executor shutdown")