-
Notifications
You must be signed in to change notification settings - Fork 663
/
Copy pathBaseStreamSocketChannel.swift
296 lines (265 loc) · 11.3 KB
/
BaseStreamSocketChannel.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2019-2021 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import NIOCore
class BaseStreamSocketChannel<Socket: SocketProtocol>: BaseSocketChannel<Socket>, @unchecked Sendable {
internal var connectTimeoutScheduled: Optional<Scheduled<Void>>
private var allowRemoteHalfClosure: Bool = false
private var inputShutdown: Bool = false
private var outputShutdown: Bool = false
private let pendingWrites: PendingStreamWritesManager
init(
socket: Socket,
parent: Channel?,
eventLoop: SelectableEventLoop,
recvAllocator: RecvByteBufferAllocator
) throws {
self.pendingWrites = PendingStreamWritesManager(bufferPool: eventLoop.bufferPool)
self.connectTimeoutScheduled = nil
try super.init(
socket: socket,
parent: parent,
eventLoop: eventLoop,
recvAllocator: recvAllocator,
supportReconnect: false
)
}
deinit {
// We should never have any pending writes left as otherwise we may leak callbacks
assert(self.pendingWrites.isEmpty)
}
// MARK: BaseSocketChannel's must override API that might be further refined by subclasses
override func setOption0<Option: ChannelOption>(_ option: Option, value: Option.Value) throws {
self.eventLoop.assertInEventLoop()
guard self.isOpen else {
throw ChannelError._ioOnClosedChannel
}
switch option {
case _ as ChannelOptions.Types.AllowRemoteHalfClosureOption:
self.allowRemoteHalfClosure = value as! Bool
case _ as ChannelOptions.Types.WriteSpinOption:
self.pendingWrites.writeSpinCount = value as! UInt
case _ as ChannelOptions.Types.WriteBufferWaterMarkOption:
self.pendingWrites.waterMark = value as! ChannelOptions.Types.WriteBufferWaterMark
default:
try super.setOption0(option, value: value)
}
}
override func getOption0<Option: ChannelOption>(_ option: Option) throws -> Option.Value {
self.eventLoop.assertInEventLoop()
guard self.isOpen else {
throw ChannelError._ioOnClosedChannel
}
switch option {
case _ as ChannelOptions.Types.AllowRemoteHalfClosureOption:
return self.allowRemoteHalfClosure as! Option.Value
case _ as ChannelOptions.Types.WriteSpinOption:
return self.pendingWrites.writeSpinCount as! Option.Value
case _ as ChannelOptions.Types.WriteBufferWaterMarkOption:
return self.pendingWrites.waterMark as! Option.Value
case _ as ChannelOptions.Types.BufferedWritableBytesOption:
return Int(self.pendingWrites.bufferedBytes) as! Option.Value
default:
return try super.getOption0(option)
}
}
// Hook for customizable socket shutdown processing for subclasses, e.g. PipeChannel
func shutdownSocket(mode: CloseMode) throws {
switch mode {
case .output:
try self.socket.shutdown(how: .WR)
self.outputShutdown = true
case .input:
try socket.shutdown(how: .RD)
self.inputShutdown = true
case .all:
break
}
}
// MARK: BaseSocketChannel's must override API that cannot be further refined by subclasses
// This is `Channel` API so must be thread-safe.
final override public var isWritable: Bool {
self.pendingWrites.isWritable
}
final override var isOpen: Bool {
self.eventLoop.assertInEventLoop()
assert(super.isOpen == self.pendingWrites.isOpen)
return super.isOpen
}
final override func readFromSocket() throws -> ReadResult {
self.eventLoop.assertInEventLoop()
var result = ReadResult.none
for _ in 1...self.maxMessagesPerRead {
guard self.isOpen && !self.inputShutdown else {
throw ChannelError._eof
}
let (buffer, readResult) = try self.recvBufferPool.buffer(allocator: self.allocator) { buffer in
try buffer.withMutableWritePointer { pointer in
try self.socket.read(pointer: pointer)
}
}
// Reset reader and writerIndex and so allow to have the buffer filled again. This is better here than at
// the end of the loop to not do an allocation when the loop exits.
switch readResult {
case .processed(let bytesRead):
if bytesRead > 0 {
self.recvBufferPool.record(actualReadBytes: bytesRead)
self.readPending = false
assert(self.isActive)
self.pipeline.syncOperations.fireChannelRead(NIOAny(buffer))
result = .some
if buffer.writableBytes > 0 {
// If we did not fill the whole buffer with read(...) we should stop reading and wait until we get notified again.
// Otherwise chances are good that the next read(...) call will either read nothing or only a very small amount of data.
// Also this will allow us to call fireChannelReadComplete() which may give the user the chance to flush out all pending
// writes.
return result
}
} else {
if self.inputShutdown {
// We received a EOF because we called shutdown on the fd by ourself, unregister from the Selector and return
self.readPending = false
self.unregisterForReadable()
return result
}
// end-of-file
throw ChannelError._eof
}
case .wouldBlock(let bytesRead):
assert(bytesRead == 0)
return result
}
}
return result
}
final override func writeToSocket() throws -> OverallWriteResult {
let result = try self.pendingWrites.triggerAppropriateWriteOperations(
scalarBufferWriteOperation: { ptr in
guard ptr.count > 0 else {
// No need to call write if the buffer is empty.
return .processed(0)
}
// normal write
return try self.socket.write(pointer: ptr)
},
vectorBufferWriteOperation: { ptrs in
// Gathering write
try self.socket.writev(iovecs: ptrs)
},
scalarFileWriteOperation: { descriptor, index, endIndex in
try self.socket.sendFile(fd: descriptor, offset: index, count: endIndex - index)
}
)
return result
}
final override func close0(error: Error, mode: CloseMode, promise: EventLoopPromise<Void>?) {
do {
switch mode {
case .output:
if self.outputShutdown {
promise?.fail(ChannelError._outputClosed)
return
}
if self.inputShutdown {
// Escalate to full closure
self.close0(error: error, mode: .all, promise: promise)
return
}
self.unregisterForWritable()
let writesCloseResult = self.pendingWrites.close(promise)
switch writesCloseResult {
case .pending:
() // promise is stored in `pendingWrites` state for completing on later call to `closeComplete`
case .readyForClose:
// Shutdown the socket only when the pending writes are dealt with
do {
try self.shutdownSocket(mode: mode)
self.pendingWrites.closeComplete()
} catch let err {
self.pendingWrites.closeComplete(err)
}
self.pipeline.fireUserInboundEventTriggered(ChannelEvent.outputClosed)
case .closed:
promise?.succeed(())
case .open:
promise?.fail(ChannelError.inappropriateOperationForState)
assertionFailure("Close resulted in an open state, this should never happen")
}
case .input:
if self.inputShutdown {
promise?.fail(ChannelError._inputClosed)
return
}
if self.outputShutdown {
// Escalate to full closure
self.close0(error: error, mode: .all, promise: promise)
return
}
switch error {
case ChannelError.eof:
// No need to explicit call socket.shutdown(...) as we received an EOF and the call would only cause
// ENOTCON
self.inputShutdown = true
break
default:
try self.shutdownSocket(mode: mode)
}
self.unregisterForReadable()
promise?.succeed(())
self.pipeline.fireUserInboundEventTriggered(ChannelEvent.inputClosed)
case .all:
if let timeout = self.connectTimeoutScheduled {
self.connectTimeoutScheduled = nil
timeout.cancel()
}
super.close0(error: error, mode: mode, promise: promise)
}
} catch let err {
promise?.fail(err)
}
}
final override func hasFlushedPendingWrites() -> Bool {
self.pendingWrites.isFlushPending
}
final override func markFlushPoint() {
// Even if writable() will be called later by the EventLoop we still need to mark the flush checkpoint so we are sure all the flushed messages
// are actually written once writable() is called.
self.pendingWrites.markFlushCheckpoint()
}
final override func cancelWritesOnClose(error: Error) {
self.pendingWrites.failAll(error: error, close: true)
}
@discardableResult
final override func readIfNeeded0() -> Bool {
if self.inputShutdown {
return false
}
return super.readIfNeeded0()
}
final override public func read0() {
if self.inputShutdown {
return
}
super.read0()
}
final override func bufferPendingWrite(data: NIOAny, promise: EventLoopPromise<Void>?) {
if self.outputShutdown {
promise?.fail(ChannelError._outputClosed)
return
}
let data = self.unwrapData(data, as: IOData.self)
if !self.pendingWrites.add(data: data, promise: promise) {
self.pipeline.syncOperations.fireChannelWritabilityChanged()
}
}
}