-
Notifications
You must be signed in to change notification settings - Fork 663
BaseStreamSocketChannel half-close allows outstanding writes to complete #3148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
a9d9404
94f9573
2603614
f762b29
716fc47
0e6ee40
68397e0
1f0cf18
081a205
32b232a
e439537
92fa820
d41ef1b
e60b390
828a113
ddbb053
4c4bdd9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -194,13 +194,29 @@ class BaseStreamSocketChannel<Socket: SocketProtocol>: BaseSocketChannel<Socket> | |
self.close0(error: error, mode: .all, promise: promise) | ||
return | ||
} | ||
try self.shutdownSocket(mode: mode) | ||
// Fail all pending writes and so ensure all pending promises are notified | ||
self.pendingWrites.failAll(error: error, close: false) | ||
self.unregisterForWritable() | ||
promise?.succeed(()) | ||
|
||
self.pipeline.fireUserInboundEventTriggered(ChannelEvent.outputClosed) | ||
let writesCloseResult = self.pendingWrites.closeOutbound(promise) | ||
switch writesCloseResult { | ||
Lukasa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
case .pending: | ||
() // promise is stored in `pendingWrites` state for completing on later call to `closeComplete` | ||
case .readyForClose(let closePromise): | ||
// Shutdown the socket only when the pending writes are dealt with | ||
do { | ||
try self.shutdownSocket(mode: mode) | ||
self.pendingWrites.closeComplete() | ||
} catch let err { | ||
Lukasa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.pendingWrites.closeComplete(err) | ||
} | ||
Lukasa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
closePromise?.succeed(()) | ||
self.pipeline.fireUserInboundEventTriggered(ChannelEvent.outputClosed) | ||
case .closed(let closePromise): | ||
closePromise?.succeed(()) | ||
case .open: | ||
promise?.fail(ChannelError.inappropriateOperationForState) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This path appears to violate the "pending writes manager takes ownership of the promise" construction. |
||
assertionFailure("Close resulted in an open state, this should never happen") | ||
} | ||
Lukasa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
case .input: | ||
if self.inputShutdown { | ||
promise?.fail(ChannelError._inputClosed) | ||
|
@@ -224,6 +240,7 @@ class BaseStreamSocketChannel<Socket: SocketProtocol>: BaseSocketChannel<Socket> | |
promise?.succeed(()) | ||
|
||
self.pipeline.fireUserInboundEventTriggered(ChannelEvent.inputClosed) | ||
|
||
case .all: | ||
if let timeout = self.connectTimeoutScheduled { | ||
self.connectTimeoutScheduled = nil | ||
|
@@ -247,7 +264,17 @@ class BaseStreamSocketChannel<Socket: SocketProtocol>: BaseSocketChannel<Socket> | |
} | ||
|
||
final override func cancelWritesOnClose(error: Error) { | ||
self.pendingWrites.failAll(error: error, close: true) | ||
let closeResult = self.pendingWrites.failAll(error: error, close: true) | ||
switch closeResult { | ||
case .closed(let eventLoopPromise): | ||
if let eventLoopPromise { | ||
eventLoopPromise.fail(error) | ||
} | ||
case .open, .pending, .readyForClose: | ||
preconditionFailure("failAll with close should never return \(closeResult!)") | ||
case .none: | ||
preconditionFailure("failAll with close should return a close result") | ||
} | ||
} | ||
|
||
@discardableResult | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,9 @@ private struct PendingStreamWrite { | |
var promise: Optional<EventLoopPromise<Void>> | ||
} | ||
|
||
/// Write result is `.couldNotWriteEverything` but we have no more writes to perform. | ||
public struct NIOReportedIncompleteWritesWhenNoMoreToPerform: Error {} | ||
|
||
/// Does the setup required to issue a writev. | ||
/// | ||
/// - Parameters: | ||
|
@@ -97,9 +100,9 @@ internal enum OneWriteOperationResult { | |
/// The result of trying to write all the outstanding flushed data. That naturally includes all `ByteBuffer`s and | ||
/// `FileRegions` and the individual writes have potentially been retried (see `WriteSpinOption`). | ||
internal struct OverallWriteResult { | ||
enum WriteOutcome { | ||
enum WriteOutcome: Equatable { | ||
/// Wrote all the data that was flushed. When receiving this result, we can unsubscribe from 'writable' notification. | ||
case writtenCompletely | ||
case writtenCompletely(CloseResult) | ||
|
||
/// Could not write everything. Before attempting further writes the eventing system should send a 'writable' notification. | ||
case couldNotWriteEverything | ||
|
@@ -152,7 +155,7 @@ private struct PendingStreamWritesState { | |
self.subtractOutstanding(bytes: bytes) | ||
} | ||
|
||
/// Initialise a new, empty `PendingWritesState`. | ||
/// Initialize a new, empty `PendingWritesState`. | ||
public init() {} | ||
|
||
/// Check if there are no outstanding writes. | ||
|
@@ -310,6 +313,8 @@ final class PendingStreamWritesManager: PendingWritesManager { | |
|
||
private(set) var isOpen = true | ||
|
||
private(set) var outboundCloseState: CloseState = .open | ||
|
||
/// Mark the flush checkpoint. | ||
func markFlushCheckpoint() { | ||
self.state.markFlushCheckpoint() | ||
|
@@ -337,7 +342,7 @@ final class PendingStreamWritesManager: PendingWritesManager { | |
/// - result: If the `Channel` is still writable after adding the write of `data`. | ||
func add(data: IOData, promise: EventLoopPromise<Void>?) -> Bool { | ||
assert(self.isOpen) | ||
self.state.append(.init(data: data, promise: promise)) | ||
self.state.append(PendingStreamWrite(data: data, promise: promise)) | ||
|
||
if self.state.bytes > waterMark.high | ||
&& channelWritabilityFlag.compareExchange(expected: true, desired: false, ordering: .relaxed).exchanged | ||
|
@@ -367,7 +372,7 @@ final class PendingStreamWritesManager: PendingWritesManager { | |
vectorBufferWriteOperation: (UnsafeBufferPointer<IOVector>) throws -> IOResult<Int>, | ||
scalarFileWriteOperation: (CInt, Int, Int) throws -> IOResult<Int> | ||
) throws -> OverallWriteResult { | ||
try self.triggerWriteOperations { writeMechanism in | ||
var result = try self.triggerWriteOperations { writeMechanism in | ||
switch writeMechanism { | ||
case .scalarBufferWrite: | ||
return try triggerScalarBufferWrite({ try scalarBufferWriteOperation($0) }) | ||
|
@@ -380,6 +385,32 @@ final class PendingStreamWritesManager: PendingWritesManager { | |
return .writtenCompletely | ||
} | ||
} | ||
|
||
let closeResult: CloseResult | ||
// If we have no more writes check if we have a pending close | ||
if self.isEmpty { | ||
switch result.writeResult { | ||
case .writtenCompletely: | ||
switch self.outboundCloseState { | ||
case .open: | ||
closeResult = .open | ||
case .pending(let closePromise): | ||
self.outboundCloseState = .readyForClose(closePromise) | ||
closeResult = .readyForClose(closePromise) | ||
case .readyForClose(let closePromise): | ||
assertionFailure("Transitioned from readyForClose to readyForClose, shouldn't we have closed?") | ||
closeResult = .readyForClose(closePromise) | ||
case .closed: | ||
closeResult = .closed(nil) | ||
} | ||
|
||
case .couldNotWriteEverything: | ||
assertionFailure("Write result is .couldNotWriteEverything but we have no more writes to perform.") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this is going to be an assertion failure we should have some fallback logic. Probably that should be throwing an error. |
||
throw NIOReportedIncompleteWritesWhenNoMoreToPerform() | ||
} | ||
result.writeResult = .writtenCompletely(closeResult) | ||
} | ||
return result | ||
} | ||
|
||
/// To be called after a write operation (usually selected and run by `triggerAppropriateWriteOperation`) has | ||
|
@@ -463,16 +494,93 @@ final class PendingStreamWritesManager: PendingWritesManager { | |
return self.didWrite(itemCount: result.itemCount, result: result.writeResult) | ||
} | ||
|
||
/// Fail all the outstanding writes. This is useful if for example the `Channel` is closed. | ||
func failAll(error: Error, close: Bool) { | ||
/// Fail all the outstanding writes. | ||
func failAll(error: Error, close: Bool) -> CloseResult? { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This "boolean parameter with optional return contingent on the value of the bool" smells like there are two methods here. It may be worth factoring them apart. A safe way to do this, if we want to keep this patch understandable, is to make a separate refactoring PR that only refactors this method into two. (Sidebar: the two methods are actually implemented as one calling the other: the one with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
let closeResult: CloseResult? | ||
if close { | ||
assert(self.isOpen) | ||
self.isOpen = false | ||
switch self.outboundCloseState { | ||
case .open, .closed: | ||
self.outboundCloseState = .closed | ||
closeResult = .closed(nil) | ||
case .pending(let closePromise), .readyForClose(let closePromise): | ||
self.outboundCloseState = .closed | ||
closeResult = .closed(closePromise) | ||
} | ||
} else { | ||
closeResult = nil | ||
} | ||
|
||
self.state.removeAll()?.fail(error) | ||
|
||
assert(self.state.isEmpty) | ||
return closeResult | ||
} | ||
|
||
/// Signal the intention to close. Takes a promise which MUST be completed via a call to `closeComplete` | ||
/// | ||
/// - Parameters: | ||
/// - promise: Optionally an `EventLoopPromise` that will be succeeded once all outstanding writes have been dealt with | ||
func closeOutbound(_ promise: EventLoopPromise<Void>?) -> CloseResult { | ||
assert(self.isOpen) | ||
|
||
let closeResult: CloseResult | ||
if self.isEmpty { | ||
switch self.outboundCloseState { | ||
case .open: | ||
self.outboundCloseState = .readyForClose(promise) | ||
closeResult = .readyForClose(promise) | ||
case .readyForClose(var closePromise), .pending(var closePromise): | ||
closePromise.setOrCascade(to: promise) | ||
self.outboundCloseState = .readyForClose(closePromise) | ||
closeResult = .readyForClose(closePromise) | ||
case .closed: | ||
closeResult = .closed(promise) | ||
} | ||
|
||
} else { | ||
switch self.outboundCloseState { | ||
case .open: | ||
self.outboundCloseState = .pending(promise) | ||
closeResult = .pending | ||
case .pending(var closePromise): | ||
closePromise.setOrCascade(to: promise) | ||
self.outboundCloseState = .pending(closePromise) | ||
closeResult = .pending | ||
case .readyForClose: | ||
preconditionFailure( | ||
"We are in .readyForClose state but we still have pending writes. This should never happen." | ||
) | ||
case .closed: | ||
preconditionFailure( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm a touch nervous about these precondition failures. Can we come up with a logical behaviour here? Probably it's "fail the promise" and "assertionFailure to get crashes in debug". |
||
"We are in .closed state but we still have pending writes. This should never happen." | ||
) | ||
} | ||
|
||
} | ||
|
||
return closeResult | ||
} | ||
|
||
/// Signal that the `Channel` is closed. | ||
func closeComplete(_ error: Error? = nil) { | ||
assert(self.isOpen) | ||
assert(self.isEmpty) | ||
|
||
switch self.outboundCloseState { | ||
case .readyForClose(let closePromise): | ||
self.outboundCloseState = .closed | ||
Lukasa marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, let's return the promise here rather than have this type complete it. It's a touch easier for the channel to manage things by completing things itself, rather than having this type do it, because the channel can ensure the state is squared away before making the outcall. |
||
if let error { | ||
closePromise?.fail(error) | ||
} else { | ||
closePromise?.succeed(()) | ||
} | ||
case .closed: | ||
() // nothing to do | ||
case .open, .pending: | ||
preconditionFailure("close complete called on channel in unexpected state: \(self.outboundCloseState)") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we make this a thrown error instead of a crash? |
||
} | ||
} | ||
|
||
/// Initialize with a pre-allocated array of IO vectors and storage references. We pass in these pre-allocated | ||
|
@@ -487,6 +595,20 @@ final class PendingStreamWritesManager: PendingWritesManager { | |
} | ||
} | ||
|
||
internal enum CloseState { | ||
case open | ||
case pending(EventLoopPromise<Void>?) | ||
case readyForClose(EventLoopPromise<Void>?) | ||
case closed | ||
} | ||
|
||
internal enum CloseResult: Equatable { | ||
case open | ||
case pending | ||
case readyForClose(EventLoopPromise<Void>?) | ||
case closed(EventLoopPromise<Void>?) | ||
} | ||
|
||
internal enum WriteMechanism { | ||
case scalarBufferWrite | ||
case vectorBufferWrite | ||
|
@@ -496,6 +618,8 @@ internal enum WriteMechanism { | |
|
||
internal protocol PendingWritesManager: AnyObject { | ||
var isOpen: Bool { get } | ||
var isEmpty: Bool { get } | ||
var outboundCloseState: CloseState { get } | ||
var isFlushPending: Bool { get } | ||
var writeSpinCount: UInt { get } | ||
var currentBestWriteMechanism: WriteMechanism { get } | ||
|
@@ -522,7 +646,14 @@ extension PendingWritesManager { | |
var oneResult: OneWriteOperationResult | ||
repeat { | ||
guard self.isOpen && self.isFlushPending else { | ||
result.writeResult = .writtenCompletely | ||
let closeResult: CloseResult = | ||
switch self.outboundCloseState { | ||
case .open: .open | ||
case .pending: .pending | ||
case .readyForClose(let closePromise): .readyForClose(closePromise) | ||
case .closed: .closed(nil) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's outline this. Looks like an initializer to me. |
||
result.writeResult = .writtenCompletely(closeResult) | ||
break writeSpinLoop | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, ok: there's a test missing here. The
unregisterForWritable
above isn't appropriate. I believe the following situation is possible:In this case, we'll have flushed, unwritten data, but we will no longer be registered for writable. As long as no other flush or close comes along, we'll stay unregistered for writable, and so we will make no attempt to empty the buffer. That will cause us to be wedged open.
I recommend, before fixing the bug, you write a test that correctly reproduces this wedge, and use that to validate the bug is actually fixed. This is going to be a challenging test to write. It can be written using both regular sockets, but it can also be written using the SAL, which may be easier. Let me know if you'd like to pair on a test for this.