-
Notifications
You must be signed in to change notification settings - Fork 661
/
Copy pathEmbedded.swift
1167 lines (1021 loc) · 44.5 KB
/
Embedded.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2022 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import Atomics
import DequeModule
import NIOConcurrencyHelpers
import NIOCore
import _NIODataStructures
#if canImport(Dispatch)
import Dispatch
#endif
#if canImport(Darwin)
import Darwin
#elseif canImport(Glibc)
@preconcurrency import Glibc
#elseif canImport(Musl)
@preconcurrency import Musl
#elseif canImport(Android)
@preconcurrency import Android
#elseif canImport(WASILibc)
@preconcurrency import WASILibc
#else
#error("Unknown C library.")
#endif
private func printError(_ string: StaticString) {
string.withUTF8Buffer { buf in
var buf = buf
while buf.count > 0 {
// 2 is stderr
let rc = write(2, buf.baseAddress, buf.count)
if rc < 0 {
let err = errno
if err == EINTR { continue }
fatalError("Unexpected error writing: \(err)")
}
buf = .init(rebasing: buf.dropFirst(Int(rc)))
}
}
}
internal struct EmbeddedScheduledTask {
let id: UInt64
let task: () -> Void
let failFn: (Error) -> Void
let readyTime: NIODeadline
let insertOrder: UInt64
init(
id: UInt64,
readyTime: NIODeadline,
insertOrder: UInt64,
task: @escaping () -> Void,
_ failFn: @escaping (Error) -> Void
) {
self.id = id
self.readyTime = readyTime
self.insertOrder = insertOrder
self.task = task
self.failFn = failFn
}
func fail(_ error: Error) {
self.failFn(error)
}
}
extension EmbeddedScheduledTask: Comparable {
static func < (lhs: EmbeddedScheduledTask, rhs: EmbeddedScheduledTask) -> Bool {
if lhs.readyTime == rhs.readyTime {
return lhs.insertOrder < rhs.insertOrder
} else {
return lhs.readyTime < rhs.readyTime
}
}
static func == (lhs: EmbeddedScheduledTask, rhs: EmbeddedScheduledTask) -> Bool {
lhs.id == rhs.id
}
}
/// An `EventLoop` that is embedded in the current running context with no external
/// control.
///
/// Unlike more complex `EventLoop`s, such as `SelectableEventLoop`, the `EmbeddedEventLoop`
/// has no proper eventing mechanism. Instead, reads and writes are fully controlled by the
/// entity that instantiates the `EmbeddedEventLoop`. This property makes `EmbeddedEventLoop`
/// of limited use for many application purposes, but highly valuable for testing and other
/// kinds of mocking.
///
/// Time is controllable on an `EmbeddedEventLoop`. It begins at `NIODeadline.uptimeNanoseconds(0)`
/// and may be advanced by a fixed amount by using `advanceTime(by:)`, or advanced to a point in
/// time with `advanceTime(to:)`.
///
/// - warning: Unlike `SelectableEventLoop`, `EmbeddedEventLoop` **is not thread-safe**. This
/// is because it is intended to be run in the thread that instantiated it. Users are
/// responsible for ensuring they never call into the `EmbeddedEventLoop` in an
/// unsynchronized fashion.
public final class EmbeddedEventLoop: EventLoop, CustomStringConvertible {
private var _now: NIODeadline = .uptimeNanoseconds(0)
/// The current "time" for this event loop. This is an amount in nanoseconds.
public var now: NIODeadline { _now }
private enum State { case open, closing, closed }
private var state: State = .open
private var scheduledTaskCounter: UInt64 = 0
private var scheduledTasks = PriorityQueue<EmbeddedScheduledTask>()
/// Keep track of where promises are allocated to ensure we can identify their source if they leak.
private var _promiseCreationStore: [_NIOEventLoopFutureIdentifier: (file: StaticString, line: UInt)] = [:]
// The number of the next task to be created. We track the order so that when we execute tasks
// scheduled at the same time, we may do so in the order in which they were submitted for
// execution.
private var taskNumber: UInt64 = 0
public let description = "EmbeddedEventLoop"
#if canImport(Darwin) || canImport(Glibc) || canImport(Musl) || canImport(Android)
private let myThread: pthread_t = pthread_self()
func isCorrectThread() -> Bool {
pthread_equal(self.myThread, pthread_self()) != 0
}
#else
func isCorrectThread() -> Bool {
true // let's be conservative
}
#endif
private func nextTaskNumber() -> UInt64 {
defer {
self.taskNumber += 1
}
return self.taskNumber
}
/// - see: `EventLoop.inEventLoop`
public var inEventLoop: Bool {
self.checkCorrectThread()
return true
}
public func checkCorrectThread() {
guard self.isCorrectThread() else {
if Self.strictModeEnabled {
preconditionFailure(
"EmbeddedEventLoop is not thread-safe. You can only use it from the thread you created it on."
)
} else {
printError(
"""
ERROR: NIO API misuse: EmbeddedEventLoop is not thread-safe. \
You can only use it from the thread you created it on. This problem will be upgraded to a forced \
crash in future versions of SwiftNIO.
"""
)
}
return
}
}
/// Initialize a new `EmbeddedEventLoop`.
public init() {}
/// - see: `EventLoop.scheduleTask(deadline:_:)`
@discardableResult
public func scheduleTask<T>(deadline: NIODeadline, _ task: @escaping () throws -> T) -> Scheduled<T> {
self.checkCorrectThread()
let promise: EventLoopPromise<T> = makePromise()
switch self.state {
case .open:
break
case .closing, .closed:
// If the event loop is shut down, or shutting down, immediately cancel the task.
promise.fail(EventLoopError.cancelled)
return Scheduled(promise: promise, cancellationTask: {})
}
self.scheduledTaskCounter += 1
let task = EmbeddedScheduledTask(
id: self.scheduledTaskCounter,
readyTime: deadline,
insertOrder: self.nextTaskNumber(),
task: {
do {
promise.assumeIsolated().succeed(try task())
} catch let err {
promise.fail(err)
}
},
promise.fail
)
let taskId = task.id
let scheduled = Scheduled(
promise: promise,
cancellationTask: {
self.scheduledTasks.removeFirst { $0.id == taskId }
}
)
scheduledTasks.push(task)
return scheduled
}
/// - see: `EventLoop.scheduleTask(in:_:)`
@discardableResult
public func scheduleTask<T>(in: TimeAmount, _ task: @escaping () throws -> T) -> Scheduled<T> {
self.checkCorrectThread()
return self.scheduleTask(deadline: self._now + `in`, task)
}
@preconcurrency
@discardableResult
public func scheduleCallback(
in amount: TimeAmount,
handler: some (NIOScheduledCallbackHandler & Sendable)
) -> NIOScheduledCallback {
self.checkCorrectThread()
/// Even though this type does not implement a custom `scheduleCallback(at:handler)`, it uses a manual clock so
/// it cannot rely on the default implementation of `scheduleCallback(in:handler:)`, which computes the deadline
/// as an offset from `NIODeadline.now`. This event loop needs the deadline to be offset from `self._now`.
return self.scheduleCallback(at: self._now + amount, handler: handler)
}
/// On an `EmbeddedEventLoop`, `execute` will simply use `scheduleTask` with a deadline of _now_. This means that
/// `task` will be run the next time you call `EmbeddedEventLoop.run`.
public func execute(_ task: @escaping () -> Void) {
self.checkCorrectThread()
self.scheduleTask(deadline: self._now, task)
}
/// Run all tasks that have previously been submitted to this `EmbeddedEventLoop`, either by calling `execute` or
/// events that have been enqueued using `scheduleTask`/`scheduleRepeatedTask`/`scheduleRepeatedAsyncTask` and whose
/// deadlines have expired.
///
/// - seealso: `EmbeddedEventLoop.advanceTime`.
public func run() {
self.checkCorrectThread()
// Execute all tasks that are currently enqueued to be executed *now*.
self.advanceTime(to: self._now)
}
/// Runs the event loop and moves "time" forward by the given amount, running any scheduled
/// tasks that need to be run.
public func advanceTime(by increment: TimeAmount) {
self.checkCorrectThread()
self.advanceTime(to: self._now + increment)
}
/// Runs the event loop and moves "time" forward to the given point in time, running any scheduled
/// tasks that need to be run.
///
/// - Note: If `deadline` is before the current time, the current time will not be advanced.
public func advanceTime(to deadline: NIODeadline) {
self.checkCorrectThread()
let newTime = max(deadline, self._now)
while let nextTask = self.scheduledTasks.peek() {
guard nextTask.readyTime <= newTime else {
break
}
// Now we want to grab all tasks that are ready to execute at the same
// time as the first.
var tasks = [EmbeddedScheduledTask]()
while let candidateTask = self.scheduledTasks.peek(), candidateTask.readyTime == nextTask.readyTime {
tasks.append(candidateTask)
self.scheduledTasks.pop()
}
// Set the time correctly before we call into user code, then
// call in for all tasks.
self._now = nextTask.readyTime
for task in tasks {
task.task()
}
}
// Finally ensure we got the time right.
self._now = newTime
}
internal func cancelRemainingScheduledTasks() {
self.checkCorrectThread()
while let task = self.scheduledTasks.pop() {
task.fail(EventLoopError.cancelled)
}
}
#if canImport(Dispatch)
/// - see: `EventLoop.shutdownGracefully`
public func shutdownGracefully(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) {
self.checkCorrectThread()
self.state = .closing
run()
cancelRemainingScheduledTasks()
self.state = .closed
queue.sync {
callback(nil)
}
}
#endif
public func preconditionInEventLoop(file: StaticString, line: UInt) {
self.checkCorrectThread()
// Currently, inEventLoop is always true so this always passes.
}
public func preconditionNotInEventLoop(file: StaticString, line: UInt) {
// As inEventLoop always returns true, this must always preconditon.
preconditionFailure("Always in EmbeddedEventLoop", file: file, line: line)
}
public func _preconditionSafeToWait(file: StaticString, line: UInt) {
self.checkCorrectThread()
// EmbeddedEventLoop always allows a wait, as waiting will essentially always block
// wait()
return
}
public func _promiseCreated(futureIdentifier: _NIOEventLoopFutureIdentifier, file: StaticString, line: UInt) {
self.checkCorrectThread()
precondition(_isDebugAssertConfiguration())
self._promiseCreationStore[futureIdentifier] = (file: file, line: line)
}
public func _promiseCompleted(futureIdentifier: _NIOEventLoopFutureIdentifier) -> (file: StaticString, line: UInt)?
{
self.checkCorrectThread()
precondition(_isDebugAssertConfiguration())
return self._promiseCreationStore.removeValue(forKey: futureIdentifier)
}
public func _preconditionSafeToSyncShutdown(file: StaticString, line: UInt) {
self.checkCorrectThread()
// EmbeddedEventLoop always allows a sync shutdown.
return
}
public func _executeIsolatedUnsafeUnchecked(_ task: @escaping () -> Void) {
// Because of the way EmbeddedEL works, we can just delegate this directly
// to execute.
self.execute(task)
}
public func _submitIsolatedUnsafeUnchecked<T>(_ task: @escaping () throws -> T) -> EventLoopFuture<T> {
// Because of the way EmbeddedEL works, we can delegate this directly to schedule. Note that I didn't
// say submit: we don't have an override of submit here.
self.scheduleTask(deadline: self._now, task).futureResult
}
@discardableResult
public func _scheduleTaskIsolatedUnsafeUnchecked<T>(
deadline: NIODeadline,
_ task: @escaping () throws -> T
) -> Scheduled<T> {
// Because of the way EmbeddedEL works, we can delegate this directly to schedule.
self.scheduleTask(deadline: deadline, task)
}
@discardableResult
public func _scheduleTaskIsolatedUnsafeUnchecked<T>(
in delay: TimeAmount,
_ task: @escaping () throws -> T
) -> Scheduled<T> {
// Because of the way EmbeddedEL works, we can delegate this directly to schedule.
self.scheduleTask(in: delay, task)
}
deinit {
self.checkCorrectThread()
precondition(scheduledTasks.isEmpty, "Embedded event loop freed with unexecuted scheduled tasks!")
}
@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
public var executor: any SerialExecutor {
fatalError(
"EmbeddedEventLoop is not thread safe and cannot be used as a SerialExecutor. Use NIOAsyncTestingEventLoop instead."
)
}
static let strictModeEnabled: Bool = {
for ciVar in ["SWIFTNIO_STRICT", "SWIFTNIO_CI", "SWIFTNIO_STRICT_EMBEDDED"] {
switch getenv(ciVar).map({ String.init(cString: $0).lowercased() }) {
case "true", "y", "yes", "on", "1":
return true
default:
()
}
}
return false
}()
#if compiler(>=6.0)
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
public var taskExecutor: any TaskExecutor {
fatalError(
"EmbeddedEventLoop is not thread safe and cannot be used as a TaskExecutor. Use NIOAsyncTestingEventLoop instead."
)
}
#endif
}
// EmbeddedEventLoop is extremely _not_ Sendable. However, the EventLoop protocol
// requires it to be. We are doing some runtime enforcement of correct use, but
// ultimately we can't have the compiler validating this usage.
extension EmbeddedEventLoop: @unchecked Sendable {}
@usableFromInline
class EmbeddedChannelCore: ChannelCore {
var isOpen: Bool {
get {
self._isOpen.load(ordering: .sequentiallyConsistent)
}
set {
self._isOpen.store(newValue, ordering: .sequentiallyConsistent)
}
}
var isActive: Bool {
get {
self._isActive.load(ordering: .sequentiallyConsistent)
}
set {
self._isActive.store(newValue, ordering: .sequentiallyConsistent)
}
}
var allowRemoteHalfClosure: Bool {
get {
self._allowRemoteHalfClosure.load(ordering: .sequentiallyConsistent)
}
set {
self._allowRemoteHalfClosure.store(newValue, ordering: .sequentiallyConsistent)
}
}
private let _isOpen = ManagedAtomic(true)
private let _isActive = ManagedAtomic(false)
private let _allowRemoteHalfClosure = ManagedAtomic(false)
let eventLoop: EventLoop
let closePromise: EventLoopPromise<Void>
var error: Optional<Error>
private let pipeline: ChannelPipeline
init(pipeline: ChannelPipeline, eventLoop: EventLoop) {
closePromise = eventLoop.makePromise()
self.pipeline = pipeline
self.eventLoop = eventLoop
self.error = nil
}
deinit {
assert(
!self.isOpen && !self.isActive,
"leaked an open EmbeddedChannel, maybe forgot to call channel.finish()?"
)
isOpen = false
closePromise.succeed(())
}
/// Contains the flushed items that went into the `Channel` (and on a regular channel would have hit the network).
@usableFromInline
var outboundBuffer: CircularBuffer<NIOAny> = CircularBuffer()
/// Contains observers that want to consume the first element that would be appended to the `outboundBuffer`
@usableFromInline
var outboundBufferConsumer: Deque<(NIOAny) -> Void> = []
/// Contains the unflushed items that went into the `Channel`
@usableFromInline
var pendingOutboundBuffer: MarkedCircularBuffer<(NIOAny, EventLoopPromise<Void>?)> = MarkedCircularBuffer(
initialCapacity: 16
)
/// Contains the items that travelled the `ChannelPipeline` all the way and hit the tail channel handler. On a
/// regular `Channel` these items would be lost.
@usableFromInline
var inboundBuffer: CircularBuffer<NIOAny> = CircularBuffer()
/// Contains observers that want to consume the first element that would be appended to the `inboundBuffer`
@usableFromInline
var inboundBufferConsumer: Deque<(NIOAny) -> Void> = []
@usableFromInline
var localAddress: SocketAddress?
@usableFromInline
var remoteAddress: SocketAddress?
@usableFromInline
func localAddress0() throws -> SocketAddress {
self.eventLoop.preconditionInEventLoop()
if let localAddress = self.localAddress {
return localAddress
} else {
throw ChannelError.operationUnsupported
}
}
@usableFromInline
func remoteAddress0() throws -> SocketAddress {
self.eventLoop.preconditionInEventLoop()
if let remoteAddress = self.remoteAddress {
return remoteAddress
} else {
throw ChannelError.operationUnsupported
}
}
@usableFromInline
func close0(error: Error, mode: CloseMode, promise: EventLoopPromise<Void>?) {
self.eventLoop.preconditionInEventLoop()
guard self.isOpen else {
promise?.fail(ChannelError.alreadyClosed)
return
}
isOpen = false
isActive = false
promise?.succeed(())
// As we called register() in the constructor of EmbeddedChannel we also need to ensure we call unregistered here.
self.pipeline.syncOperations.fireChannelInactive()
self.pipeline.syncOperations.fireChannelUnregistered()
let loopBoundSelf = NIOLoopBound(self, eventLoop: self.eventLoop)
eventLoop.execute {
// ensure this is executed in a delayed fashion as the users code may still traverse the pipeline
let `self` = loopBoundSelf.value
self.removeHandlers(pipeline: self.pipeline)
self.closePromise.succeed(())
}
}
@usableFromInline
func bind0(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
self.eventLoop.preconditionInEventLoop()
promise?.succeed(())
}
@usableFromInline
func connect0(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
self.eventLoop.preconditionInEventLoop()
isActive = true
promise?.succeed(())
self.pipeline.syncOperations.fireChannelActive()
}
@usableFromInline
func register0(promise: EventLoopPromise<Void>?) {
self.eventLoop.preconditionInEventLoop()
promise?.succeed(())
self.pipeline.syncOperations.fireChannelRegistered()
}
@usableFromInline
func registerAlreadyConfigured0(promise: EventLoopPromise<Void>?) {
self.eventLoop.preconditionInEventLoop()
isActive = true
register0(promise: promise)
self.pipeline.syncOperations.fireChannelActive()
}
@usableFromInline
func write0(_ data: NIOAny, promise: EventLoopPromise<Void>?) {
self.eventLoop.preconditionInEventLoop()
self.pendingOutboundBuffer.append((data, promise))
}
@usableFromInline
func flush0() {
self.eventLoop.preconditionInEventLoop()
self.pendingOutboundBuffer.mark()
while self.pendingOutboundBuffer.hasMark, let dataAndPromise = self.pendingOutboundBuffer.popFirst() {
self.addToBuffer(
buffer: &self.outboundBuffer,
consumer: &self.outboundBufferConsumer,
data: dataAndPromise.0
)
dataAndPromise.1?.succeed(())
}
}
@usableFromInline
func read0() {
self.eventLoop.preconditionInEventLoop()
// NOOP
}
public final func triggerUserOutboundEvent0(_ event: Any, promise: EventLoopPromise<Void>?) {
self.eventLoop.preconditionInEventLoop()
promise?.fail(ChannelError.operationUnsupported)
}
@usableFromInline
func channelRead0(_ data: NIOAny) {
self.eventLoop.preconditionInEventLoop()
self.addToBuffer(
buffer: &self.inboundBuffer,
consumer: &self.inboundBufferConsumer,
data: data
)
}
public func errorCaught0(error: Error) {
self.eventLoop.preconditionInEventLoop()
if self.error == nil {
self.error = error
}
}
private func addToBuffer(
buffer: inout CircularBuffer<NIOAny>,
consumer: inout Deque<(NIOAny) -> Void>,
data: NIOAny
) {
self.eventLoop.preconditionInEventLoop()
if let consume = consumer.popFirst() {
consume(data)
} else {
buffer.append(data)
}
}
}
// ChannelCores are basically never Sendable.
@available(*, unavailable)
extension EmbeddedChannelCore: Sendable {}
/// `EmbeddedChannel` is a `Channel` implementation that does neither any
/// actual IO nor has a proper eventing mechanism. The prime use-case for
/// `EmbeddedChannel` is in unit tests when you want to feed the inbound events
/// and check the outbound events manually.
///
/// Please remember to call `finish()` when you are no longer using this
/// `EmbeddedChannel`.
///
/// To feed events through an `EmbeddedChannel`'s `ChannelPipeline` use
/// `EmbeddedChannel.writeInbound` which accepts data of any type. It will then
/// forward that data through the `ChannelPipeline` and the subsequent
/// `ChannelInboundHandler` will receive it through the usual `channelRead`
/// event. The user is responsible for making sure the first
/// `ChannelInboundHandler` expects data of that type.
///
/// `EmbeddedChannel` automatically collects arriving outbound data and makes it
/// available one-by-one through `readOutbound`.
///
/// - Note: `EmbeddedChannel` is currently only compatible with
/// `EmbeddedEventLoop`s and cannot be used with `SelectableEventLoop`s from
/// for example `MultiThreadedEventLoopGroup`.
/// - warning: Unlike other `Channel`s, `EmbeddedChannel` **is not thread-safe**. This
/// is because it is intended to be run in the thread that instantiated it. Users are
/// responsible for ensuring they never call into an `EmbeddedChannel` in an
/// unsynchronized fashion. `EmbeddedEventLoop`s notes also apply as
/// `EmbeddedChannel` uses an `EmbeddedEventLoop` as its `EventLoop`.
public final class EmbeddedChannel: Channel {
/// `LeftOverState` represents any left-over inbound, outbound, and pending outbound events that hit the
/// `EmbeddedChannel` and were not consumed when `finish` was called on the `EmbeddedChannel`.
///
/// `EmbeddedChannel` is most useful in testing and usually in unit tests, you want to consume all inbound and
/// outbound data to verify they are what you expect. Therefore, when you `finish` an `EmbeddedChannel` it will
/// return if it's either `.clean` (no left overs) or that it has `.leftOvers`.
public enum LeftOverState {
/// The `EmbeddedChannel` is clean, ie. no inbound, outbound, or pending outbound data left on `finish`.
case clean
/// The `EmbeddedChannel` has inbound, outbound, or pending outbound data left on `finish`.
case leftOvers(inbound: [NIOAny], outbound: [NIOAny], pendingOutbound: [NIOAny])
/// `true` if the `EmbeddedChannel` was `clean` on `finish`, ie. there is no unconsumed inbound, outbound, or
/// pending outbound data left on the `Channel`.
public var isClean: Bool {
if case .clean = self {
return true
} else {
return false
}
}
/// `true` if the `EmbeddedChannel` if there was unconsumed inbound, outbound, or pending outbound data left
/// on the `Channel` when it was `finish`ed.
public var hasLeftOvers: Bool {
!self.isClean
}
}
/// `BufferState` represents the state of either the inbound, or the outbound `EmbeddedChannel` buffer. These
/// buffers contain data that travelled the `ChannelPipeline` all the way.
///
/// If the last `ChannelHandler` explicitly (by calling `fireChannelRead`) or implicitly (by not implementing
/// `channelRead`) sends inbound data into the end of the `EmbeddedChannel`, it will be held in the
/// `EmbeddedChannel`'s inbound buffer. Similarly for `write` on the outbound side. The state of the respective
/// buffer will be returned from `writeInbound`/`writeOutbound` as a `BufferState`.
public enum BufferState {
/// The buffer is empty.
case empty
/// The buffer is non-empty.
case full([NIOAny])
/// Returns `true` is the buffer was empty.
public var isEmpty: Bool {
if case .empty = self {
return true
} else {
return false
}
}
/// Returns `true` if the buffer was non-empty.
public var isFull: Bool {
!self.isEmpty
}
}
/// `WrongTypeError` is throws if you use `readInbound` or `readOutbound` and request a certain type but the first
/// item in the respective buffer is of a different type.
public struct WrongTypeError: Error, Equatable {
/// The type you expected.
public let expected: Any.Type
/// The type of the actual first element.
public let actual: Any.Type
public init(expected: Any.Type, actual: Any.Type) {
self.expected = expected
self.actual = actual
}
public static func == (lhs: WrongTypeError, rhs: WrongTypeError) -> Bool {
lhs.expected == rhs.expected && lhs.actual == rhs.actual
}
}
/// Returns `true` if the `EmbeddedChannel` is 'active'.
///
/// An active `EmbeddedChannel` can be closed by calling `close` or `finish` on the `EmbeddedChannel`.
///
/// - Note: An `EmbeddedChannel` starts _inactive_ and can be activated, for example by calling `connect`.
public var isActive: Bool { channelcore.isActive }
/// - see: `ChannelOptions.Types.AllowRemoteHalfClosureOption`
public var allowRemoteHalfClosure: Bool {
get {
self.embeddedEventLoop.checkCorrectThread()
return channelcore.allowRemoteHalfClosure
}
set {
self.embeddedEventLoop.checkCorrectThread()
channelcore.allowRemoteHalfClosure = newValue
}
}
/// - see: `Channel.closeFuture`
public var closeFuture: EventLoopFuture<Void> { channelcore.closePromise.futureResult }
@usableFromInline
lazy var channelcore: EmbeddedChannelCore = EmbeddedChannelCore(
pipeline: self._pipeline,
eventLoop: self.eventLoop
)
/// - see: `Channel._channelCore`
public var _channelCore: ChannelCore {
self.embeddedEventLoop.checkCorrectThread()
return self.channelcore
}
/// - see: `Channel.pipeline`
public var pipeline: ChannelPipeline {
self.embeddedEventLoop.checkCorrectThread()
return self._pipeline
}
/// - see: `Channel.isWritable`
public var isWritable: Bool = true
/// Synchronously closes the `EmbeddedChannel`.
///
/// Errors in the `EmbeddedChannel` can be consumed using `throwIfErrorCaught`.
///
/// - Parameters:
/// - acceptAlreadyClosed: Whether `finish` should throw if the `EmbeddedChannel` has been previously `close`d.
/// - Returns: The `LeftOverState` of the `EmbeddedChannel`. If all the inbound and outbound events have been
/// consumed (using `readInbound` / `readOutbound`) and there are no pending outbound events (unflushed
/// writes) this will be `.clean`. If there are any unconsumed inbound, outbound, or pending outbound
/// events, the `EmbeddedChannel` will returns those as `.leftOvers(inbound:outbound:pendingOutbound:)`.
public func finish(acceptAlreadyClosed: Bool) throws -> LeftOverState {
self.embeddedEventLoop.checkCorrectThread()
do {
try close().wait()
} catch let error as ChannelError {
guard error == .alreadyClosed && acceptAlreadyClosed else {
throw error
}
}
self.embeddedEventLoop.run()
self.embeddedEventLoop.cancelRemainingScheduledTasks()
try throwIfErrorCaught()
let c = self.channelcore
if c.outboundBuffer.isEmpty && c.inboundBuffer.isEmpty && c.pendingOutboundBuffer.isEmpty {
return .clean
} else {
return .leftOvers(
inbound: Array(c.inboundBuffer),
outbound: Array(c.outboundBuffer),
pendingOutbound: c.pendingOutboundBuffer.map { $0.0 }
)
}
}
/// Synchronously closes the `EmbeddedChannel`.
///
/// This method will throw if the `Channel` hit any unconsumed errors or if the `close` fails. Errors in the
/// `EmbeddedChannel` can be consumed using `throwIfErrorCaught`.
///
/// - Returns: The `LeftOverState` of the `EmbeddedChannel`. If all the inbound and outbound events have been
/// consumed (using `readInbound` / `readOutbound`) and there are no pending outbound events (unflushed
/// writes) this will be `.clean`. If there are any unconsumed inbound, outbound, or pending outbound
/// events, the `EmbeddedChannel` will returns those as `.leftOvers(inbound:outbound:pendingOutbound:)`.
public func finish() throws -> LeftOverState {
self.embeddedEventLoop.checkCorrectThread()
return try self.finish(acceptAlreadyClosed: false)
}
private var _pipeline: ChannelPipeline!
/// - see: `Channel.allocator`
public var allocator: ByteBufferAllocator = ByteBufferAllocator()
/// - see: `Channel.eventLoop`
public var eventLoop: EventLoop {
self.embeddedEventLoop.checkCorrectThread()
return self.embeddedEventLoop
}
/// Returns the `EmbeddedEventLoop` that this `EmbeddedChannel` uses. This will return the same instance as
/// `EmbeddedChannel.eventLoop` but as the concrete `EmbeddedEventLoop` rather than as `EventLoop` existential.
public var embeddedEventLoop: EmbeddedEventLoop = EmbeddedEventLoop()
/// - see: `Channel.localAddress`
public var localAddress: SocketAddress? {
get {
self.embeddedEventLoop.checkCorrectThread()
return self.channelcore.localAddress
}
set {
self.embeddedEventLoop.checkCorrectThread()
self.channelcore.localAddress = newValue
}
}
/// - see: `Channel.remoteAddress`
public var remoteAddress: SocketAddress? {
get {
self.embeddedEventLoop.checkCorrectThread()
return self.channelcore.remoteAddress
}
set {
self.embeddedEventLoop.checkCorrectThread()
self.channelcore.remoteAddress = newValue
}
}
/// `nil` because `EmbeddedChannel`s don't have parents.
public let parent: Channel? = nil
/// If available, this method reads one element of type `T` out of the `EmbeddedChannel`'s outbound buffer. If the
/// first element was of a different type than requested, `EmbeddedChannel.WrongTypeError` will be thrown, if there
/// are no elements in the outbound buffer, `nil` will be returned.
///
/// Data hits the `EmbeddedChannel`'s outbound buffer when data was written using `write`, then `flush`ed, and
/// then travelled the `ChannelPipeline` all the way too the front. For data to hit the outbound buffer, the very
/// first `ChannelHandler` must have written and flushed it either explicitly (by calling
/// `ChannelHandlerContext.write` and `flush`) or implicitly by not implementing `write`/`flush`.
///
/// - Note: Outbound events travel the `ChannelPipeline` _back to front_.
/// - Note: `EmbeddedChannel.writeOutbound` will `write` data through the `ChannelPipeline`, starting with last
/// `ChannelHandler`.
@inlinable
public func readOutbound<T>(as type: T.Type = T.self) throws -> T? {
self.embeddedEventLoop.checkCorrectThread()
return try _readFromBuffer(buffer: &channelcore.outboundBuffer)
}
/// If available, this method reads one element of type `T` out of the `EmbeddedChannel`'s inbound buffer. If the
/// first element was of a different type than requested, `EmbeddedChannel.WrongTypeError` will be thrown, if there
/// are no elements in the outbound buffer, `nil` will be returned.
///
/// Data hits the `EmbeddedChannel`'s inbound buffer when data was send through the pipeline using `fireChannelRead`
/// and then travelled the `ChannelPipeline` all the way too the back. For data to hit the inbound buffer, the
/// last `ChannelHandler` must have send the event either explicitly (by calling
/// `ChannelHandlerContext.fireChannelRead`) or implicitly by not implementing `channelRead`.
///
/// - Note: `EmbeddedChannel.writeInbound` will fire data through the `ChannelPipeline` using `fireChannelRead`.
@inlinable
public func readInbound<T>(as type: T.Type = T.self) throws -> T? {
self.embeddedEventLoop.checkCorrectThread()
return try _readFromBuffer(buffer: &channelcore.inboundBuffer)
}
/// Sends an inbound `channelRead` event followed by a `channelReadComplete` event through the `ChannelPipeline`.
///
/// The immediate effect being that the first `ChannelInboundHandler` will get its `channelRead` method called
/// with the data you provide.
///
/// - Parameters:
/// - data: The data to fire through the pipeline.
/// - Returns: The state of the inbound buffer which contains all the events that travelled the `ChannelPipeline`
// all the way.
@inlinable
@discardableResult public func writeInbound<T>(_ data: T) throws -> BufferState {
self.embeddedEventLoop.checkCorrectThread()
self.pipeline.syncOperations.fireChannelRead(NIOAny(data))
self.pipeline.syncOperations.fireChannelReadComplete()
try self.throwIfErrorCaught()
return self.channelcore.inboundBuffer.isEmpty ? .empty : .full(Array(self.channelcore.inboundBuffer))
}
/// Sends an outbound `writeAndFlush` event through the `ChannelPipeline`.
///
/// The immediate effect being that the first `ChannelOutboundHandler` will get its `write` method called
/// with the data you provide. Note that the first `ChannelOutboundHandler` in the pipeline is the _last_ handler
/// because outbound events travel the pipeline from back to front.
///
/// - Parameters:
/// - data: The data to fire through the pipeline.
/// - Returns: The state of the outbound buffer which contains all the events that travelled the `ChannelPipeline`
// all the way.
@inlinable
@discardableResult public func writeOutbound<T>(_ data: T) throws -> BufferState {
self.embeddedEventLoop.checkCorrectThread()
try self.writeAndFlush(data).wait()
return self.channelcore.outboundBuffer.isEmpty ? .empty : .full(Array(self.channelcore.outboundBuffer))
}
/// This method will throw the error that is stored in the `EmbeddedChannel` if any.
///
/// The `EmbeddedChannel` will store an error some error travels the `ChannelPipeline` all the way past its end.
public func throwIfErrorCaught() throws {
self.embeddedEventLoop.checkCorrectThread()
if let error = channelcore.error {
self.channelcore.error = nil
throw error
}
}
@inlinable
func _readFromBuffer<T>(buffer: inout CircularBuffer<NIOAny>) throws -> T? {
self.embeddedEventLoop.checkCorrectThread()
if buffer.isEmpty {
return nil
}
let elem = buffer.removeFirst()
guard let t = self._channelCore.tryUnwrapData(elem, as: T.self) else {
throw WrongTypeError(
expected: T.self,
actual: type(of: self._channelCore.tryUnwrapData(elem, as: Any.self)!)
)
}
return t
}
/// Create a new instance.
///
/// During creation it will automatically also register itself on the `EmbeddedEventLoop`.
///
/// - Parameters:
/// - handler: The `ChannelHandler` to add to the `ChannelPipeline` before register or `nil` if none should be added.
/// - loop: The `EmbeddedEventLoop` to use.
public convenience init(handler: ChannelHandler? = nil, loop: EmbeddedEventLoop = EmbeddedEventLoop()) {
let handlers = handler.map { [$0] } ?? []
self.init(handlers: handlers, loop: loop)
self.embeddedEventLoop.checkCorrectThread()
}
/// Create a new instance.
///