Skip to content

BaseStreamSocketChannel half-close allows outstanding writes to complete #3148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion Sources/NIOPosix/BaseSocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -595,8 +595,16 @@ class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, Chan
switch writeResult.writeResult {
case .couldNotWriteEverything:
newWriteRegistrationState = .register
case .writtenCompletely:
case .writtenCompletely(let closeState):
newWriteRegistrationState = .unregister
switch closeState {
case .open, .pending:
()
case .readyForClose:
self.close0(error: ChannelError.outputClosed, mode: .output, promise: nil)
case .closed:
() // we can be flushed before becoming active
}
}

if !self.isOpen || !self.hasFlushedPendingWrites() {
Expand Down
26 changes: 21 additions & 5 deletions Sources/NIOPosix/BaseStreamSocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,28 @@ class BaseStreamSocketChannel<Socket: SocketProtocol>: BaseSocketChannel<Socket>
self.close0(error: error, mode: .all, promise: promise)
return
}
try self.shutdownSocket(mode: mode)
// Fail all pending writes and so ensure all pending promises are notified
self.pendingWrites.failAll(error: error, close: false)
self.unregisterForWritable()
promise?.succeed(())

self.pipeline.fireUserInboundEventTriggered(ChannelEvent.outputClosed)
let writesCloseResult = self.pendingWrites.close(promise)
switch writesCloseResult {
case .pending:
() // promise is stored in `pendingWrites` state for completing on later call to `closeComplete`
case .readyForClose:
// Shutdown the socket only when the pending writes are dealt with
do {
try self.shutdownSocket(mode: mode)
self.pendingWrites.closeComplete()
} catch let err {
self.pendingWrites.closeComplete(err)
}
self.pipeline.fireUserInboundEventTriggered(ChannelEvent.outputClosed)
case .closed:
promise?.succeed(())
case .open:
promise?.fail(ChannelError.inappropriateOperationForState)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This path appears to violate the "pending writes manager takes ownership of the promise" construction.

assertionFailure("Close resulted in an open state, this should never happen")
}

case .input:
if self.inputShutdown {
promise?.fail(ChannelError._inputClosed)
Expand All @@ -224,6 +239,7 @@ class BaseStreamSocketChannel<Socket: SocketProtocol>: BaseSocketChannel<Socket>
promise?.succeed(())

self.pipeline.fireUserInboundEventTriggered(ChannelEvent.inputClosed)

case .all:
if let timeout = self.connectTimeoutScheduled {
self.connectTimeoutScheduled = nil
Expand Down
7 changes: 7 additions & 0 deletions Sources/NIOPosix/PendingDatagramWritesManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,13 @@ final class PendingDatagramWritesManager: PendingWritesManager {
internal var publishedWritability = true
internal var writeSpinCount: UInt = 16
private(set) var isOpen = true
var outboundCloseState: CloseState {
if self.isOpen {
.open
} else {
.closed
}
}

/// Initialize with a pre-allocated array of message headers and storage references. We pass in these pre-allocated
/// objects to save allocations. They can be safely be re-used for all `Channel`s on a given `EventLoop` as an
Expand Down
134 changes: 127 additions & 7 deletions Sources/NIOPosix/PendingWritesManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,9 @@ internal enum OneWriteOperationResult {
/// The result of trying to write all the outstanding flushed data. That naturally includes all `ByteBuffer`s and
/// `FileRegions` and the individual writes have potentially been retried (see `WriteSpinOption`).
internal struct OverallWriteResult {
enum WriteOutcome {
enum WriteOutcome: Equatable {
/// Wrote all the data that was flushed. When receiving this result, we can unsubscribe from 'writable' notification.
case writtenCompletely
case writtenCompletely(CloseResult)

/// Could not write everything. Before attempting further writes the eventing system should send a 'writable' notification.
case couldNotWriteEverything
Expand Down Expand Up @@ -152,7 +152,7 @@ private struct PendingStreamWritesState {
self.subtractOutstanding(bytes: bytes)
}

/// Initialise a new, empty `PendingWritesState`.
/// Initialize a new, empty `PendingWritesState`.
public init() {}

/// Check if there are no outstanding writes.
Expand Down Expand Up @@ -310,6 +310,8 @@ final class PendingStreamWritesManager: PendingWritesManager {

private(set) var isOpen = true

private(set) var outboundCloseState: CloseState = .open

/// Mark the flush checkpoint.
func markFlushCheckpoint() {
self.state.markFlushCheckpoint()
Expand Down Expand Up @@ -337,7 +339,7 @@ final class PendingStreamWritesManager: PendingWritesManager {
/// - result: If the `Channel` is still writable after adding the write of `data`.
func add(data: IOData, promise: EventLoopPromise<Void>?) -> Bool {
assert(self.isOpen)
self.state.append(.init(data: data, promise: promise))
self.state.append(PendingStreamWrite(data: data, promise: promise))

if self.state.bytes > waterMark.high
&& channelWritabilityFlag.compareExchange(expected: true, desired: false, ordering: .relaxed).exchanged
Expand Down Expand Up @@ -367,7 +369,7 @@ final class PendingStreamWritesManager: PendingWritesManager {
vectorBufferWriteOperation: (UnsafeBufferPointer<IOVector>) throws -> IOResult<Int>,
scalarFileWriteOperation: (CInt, Int, Int) throws -> IOResult<Int>
) throws -> OverallWriteResult {
try self.triggerWriteOperations { writeMechanism in
var result = try self.triggerWriteOperations { writeMechanism in
switch writeMechanism {
case .scalarBufferWrite:
return try triggerScalarBufferWrite({ try scalarBufferWriteOperation($0) })
Expand All @@ -380,6 +382,28 @@ final class PendingStreamWritesManager: PendingWritesManager {
return .writtenCompletely
}
}

// If we have no more writes check if we have a pending close
if self.isEmpty {
switch result.writeResult {
case .writtenCompletely:
switch self.outboundCloseState {
case .open:
()
case .pending(let closePromise):
self.outboundCloseState = .readyForClose(closePromise)
case .readyForClose:
assertionFailure("Transitioned from readyForClose to readyForClose, shouldn't we have closed?")
case .closed:
()
}

case .couldNotWriteEverything:
assertionFailure("Write result is .couldNotWriteEverything but we have no more writes to perform.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is going to be an assertion failure we should have some fallback logic. Probably that should be throwing an error.

}
result.writeResult = .writtenCompletely(.init(self.outboundCloseState))
}
return result
}

/// To be called after a write operation (usually selected and run by `triggerAppropriateWriteOperation`) has
Expand Down Expand Up @@ -463,18 +487,85 @@ final class PendingStreamWritesManager: PendingWritesManager {
return self.didWrite(itemCount: result.itemCount, result: result.writeResult)
}

/// Fail all the outstanding writes. This is useful if for example the `Channel` is closed.
/// Fail all the outstanding writes.
func failAll(error: Error, close: Bool) {
if close {
assert(self.isOpen)
self.isOpen = false
switch self.outboundCloseState {
case .open, .closed:
self.outboundCloseState = .closed
case .pending(let closePromise), .readyForClose(let closePromise):
self.outboundCloseState = .closed
closePromise?.fail(error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrm, should we return this instead of closing it at this stage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain the thinking here? My understanding from our previous discussions was that it would be better that once the higher-level caller handed-off the promise it was the responsibility of the PendingWritesManager to complete the promise, so passing it in was a one-way street. This is why I separated-out CloseResult as its own type, so we have clarity on where the promise is exposed and not exposed.

Are you proposing that this method returns EventLoopPromise<Void>? where the promise is non-nil if the caller needs to do something with it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I've just scrolled up and seen #3148 (comment) - that makes sense.

}
}

self.state.removeAll()?.fail(error)

assert(self.state.isEmpty)
}

/// Signal the intention to close. Takes a promise which MUST be completed via a call to `closeComplete`
///
/// - Parameters:
/// - promise: Optionally an `EventLoopPromise` that will be succeeded once all outstanding writes have been dealt with
func close(_ promise: EventLoopPromise<Void>?) -> CloseResult {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be called closeOutbound.

assert(self.isOpen)

if self.isEmpty {
switch self.outboundCloseState {
case .open:
self.outboundCloseState = .readyForClose(promise)
case .readyForClose(var closePromise), .pending(var closePromise):
closePromise.setOrCascade(to: promise)
self.outboundCloseState = .readyForClose(closePromise)
case .closed:
promise?.succeed(())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As noted above, it's probably better to let the caller complete the promise.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case are you proposing that this method returns (CloseResult, EventLoopPromise<Void>?) where the promise is non-nil if the caller needs to do something with it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or put the promise in the close result as associated data.

}

} else {
switch self.outboundCloseState {
case .open:
self.outboundCloseState = .pending(promise)
case .pending(var closePromise):
closePromise.setOrCascade(to: promise)
self.outboundCloseState = .pending(closePromise)
case .readyForClose:
preconditionFailure(
"We are in .readyForClose state but we still have pending writes. This should never happen."
)
case .closed:
preconditionFailure(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a touch nervous about these precondition failures. Can we come up with a logical behaviour here? Probably it's "fail the promise" and "assertionFailure to get crashes in debug".

"We are in .closed state but we still have pending writes. This should never happen."
)
}

}

return .init(self.outboundCloseState)
}

/// Signal that the `Channel` is closed.
func closeComplete(_ error: Error? = nil) {
assert(self.isOpen)
assert(self.isEmpty)

switch self.outboundCloseState {
case .readyForClose(let closePromise):
self.outboundCloseState = .closed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let's return the promise here rather than have this type complete it. It's a touch easier for the channel to manage things by completing things itself, rather than having this type do it, because the channel can ensure the state is squared away before making the outcall.

if let error {
closePromise?.fail(error)
} else {
closePromise?.succeed(())
}
case .closed:
() // nothing to do
case .open, .pending:
preconditionFailure("close complete called on channel in unexpected state: \(self.outboundCloseState)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this a thrown error instead of a crash?

}
}

/// Initialize with a pre-allocated array of IO vectors and storage references. We pass in these pre-allocated
/// objects to save allocations. They can be safely be re-used for all `Channel`s on a given `EventLoop` as an
/// `EventLoop` always runs on one and the same thread. That means that there can't be any writes of more than
Expand All @@ -487,6 +578,33 @@ final class PendingStreamWritesManager: PendingWritesManager {
}
}

internal enum CloseState {
case open
case pending(EventLoopPromise<Void>?)
case readyForClose(EventLoopPromise<Void>?)
case closed
}

internal enum CloseResult {
case open
case pending
case readyForClose
case closed

init(_ state: CloseState) {
switch state {
case .open:
self = .open
case .pending:
self = .pending
case .readyForClose:
self = .readyForClose
case .closed:
self = .closed
}
}
}

internal enum WriteMechanism {
case scalarBufferWrite
case vectorBufferWrite
Expand All @@ -496,6 +614,8 @@ internal enum WriteMechanism {

internal protocol PendingWritesManager: AnyObject {
var isOpen: Bool { get }
var isEmpty: Bool { get }
var outboundCloseState: CloseState { get }
var isFlushPending: Bool { get }
var writeSpinCount: UInt { get }
var currentBestWriteMechanism: WriteMechanism { get }
Expand All @@ -522,7 +642,7 @@ extension PendingWritesManager {
var oneResult: OneWriteOperationResult
repeat {
guard self.isOpen && self.isFlushPending else {
result.writeResult = .writtenCompletely
result.writeResult = .writtenCompletely(.init(self.outboundCloseState))
break writeSpinLoop
}

Expand Down
Loading
Loading