|
14 | 14 |
|
15 | 15 | import DequeModule
|
16 | 16 | import NIOConcurrencyHelpers
|
| 17 | +import NIOTestUtils |
17 | 18 | import XCTest
|
18 | 19 |
|
19 | 20 | @testable import NIOCore
|
@@ -607,50 +608,56 @@ final class NIOAsyncWriterTests: XCTestCase {
|
607 | 608 | }
|
608 | 609 |
|
609 | 610 | func testSuspendingBufferedYield_whenWriterFinished() async throws {
|
610 |
| - self.sink.setWritability(to: false) |
611 |
| - |
612 |
| - let bothSuspended = expectation(description: "suspended on both yields") |
613 |
| - let suspendedAgain = ConditionLock(value: false) |
614 |
| - self.delegate.didSuspendHandler = { |
615 |
| - if self.delegate.didSuspendCallCount == 2 { |
616 |
| - bothSuspended.fulfill() |
617 |
| - } else if self.delegate.didSuspendCallCount > 2 { |
618 |
| - suspendedAgain.lock() |
619 |
| - suspendedAgain.unlock(withValue: true) |
620 |
| - } |
621 |
| - } |
| 611 | + #if compiler(>=6) |
| 612 | + try await withNIOThreadPoolTaskExecutor(numberOfThreads: 2) { taskExecutor in |
| 613 | + try await withThrowingTaskGroup(of: Void.self) { group in |
| 614 | + self.sink.setWritability(to: false) |
| 615 | + |
| 616 | + let bothSuspended = expectation(description: "suspended on both yields") |
| 617 | + let suspendedAgain = ConditionLock(value: false) |
| 618 | + self.delegate.didSuspendHandler = { |
| 619 | + if self.delegate.didSuspendCallCount == 2 { |
| 620 | + bothSuspended.fulfill() |
| 621 | + } else if self.delegate.didSuspendCallCount > 2 { |
| 622 | + suspendedAgain.lock() |
| 623 | + suspendedAgain.unlock(withValue: true) |
| 624 | + } |
| 625 | + } |
622 | 626 |
|
623 |
| - self.delegate.didYieldHandler = { _ in |
624 |
| - if self.delegate.didYieldCallCount == 1 { |
625 |
| - // Delay this yield until the other yield is suspended again. |
626 |
| - if suspendedAgain.lock(whenValue: true, timeoutSeconds: 5) { |
627 |
| - suspendedAgain.unlock() |
628 |
| - } else { |
629 |
| - XCTFail("Timeout while waiting for other yield to suspend again.") |
| 627 | + self.delegate.didYieldHandler = { _ in |
| 628 | + if self.delegate.didYieldCallCount == 1 { |
| 629 | + // Delay this yield until the other yield is suspended again. |
| 630 | + if suspendedAgain.lock(whenValue: true, timeoutSeconds: 5) { |
| 631 | + suspendedAgain.unlock() |
| 632 | + } else { |
| 633 | + XCTFail("Timeout while waiting for other yield to suspend again.") |
| 634 | + } |
| 635 | + } |
630 | 636 | }
|
631 |
| - } |
632 |
| - } |
633 | 637 |
|
634 |
| - let task1 = Task { [writer] in |
635 |
| - try await writer!.yield("message1") |
636 |
| - } |
637 |
| - let task2 = Task { [writer] in |
638 |
| - try await writer!.yield("message2") |
639 |
| - } |
| 638 | + group.addTask(executorPreference: taskExecutor) { [writer] in |
| 639 | + try await writer!.yield("message1") |
| 640 | + } |
| 641 | + group.addTask(executorPreference: taskExecutor) { [writer] in |
| 642 | + try await writer!.yield("message2") |
| 643 | + } |
640 | 644 |
|
641 |
| - await fulfillment(of: [bothSuspended], timeout: 5) |
642 |
| - self.writer.finish() |
| 645 | + await fulfillment(of: [bothSuspended], timeout: 5) |
| 646 | + self.writer.finish() |
643 | 647 |
|
644 |
| - self.assert(suspendCallCount: 2, yieldCallCount: 0, terminateCallCount: 0) |
| 648 | + self.assert(suspendCallCount: 2, yieldCallCount: 0, terminateCallCount: 0) |
645 | 649 |
|
646 |
| - // We have to become writable again to unbuffer the yields |
647 |
| - // The first call to didYield will pause, so that the other yield will be suspended again. |
648 |
| - self.sink.setWritability(to: true) |
| 650 | + // We have to become writable again to unbuffer the yields |
| 651 | + // The first call to didYield will pause, so that the other yield will be suspended again. |
| 652 | + self.sink.setWritability(to: true) |
649 | 653 |
|
650 |
| - await XCTAssertNoThrow(try await task1.value) |
651 |
| - await XCTAssertNoThrow(try await task2.value) |
| 654 | + await XCTAssertNoThrow(try await group.next()) |
| 655 | + await XCTAssertNoThrow(try await group.next()) |
652 | 656 |
|
653 |
| - self.assert(suspendCallCount: 3, yieldCallCount: 2, terminateCallCount: 1) |
| 657 | + self.assert(suspendCallCount: 3, yieldCallCount: 2, terminateCallCount: 1) |
| 658 | + } |
| 659 | + } |
| 660 | + #endif // compiler(>=6) |
654 | 661 | }
|
655 | 662 |
|
656 | 663 | func testWriterFinish_whenFinished() {
|
|
0 commit comments