Skip to content

BaseStreamSocketChannel half-close allows outstanding writes to complete #3148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion Sources/NIOPosix/BaseSocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -595,8 +595,17 @@ class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, Chan
switch writeResult.writeResult {
case .couldNotWriteEverything:
newWriteRegistrationState = .register
case .writtenCompletely:
case .writtenCompletely(let closeState):
newWriteRegistrationState = .unregister
switch closeState {
case .open, .pending:
()
case .readyForClose(let eventLoopPromise):
// TODO: it doesn't seem right that I have to pass an error in here)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is.

self.close0(error: ChannelError.outputClosed, mode: .output, promise: eventLoopPromise)
case .closed:
() // we can be flushed before becoming active
}
}

if !self.isOpen || !self.hasFlushedPendingWrites() {
Expand Down
24 changes: 20 additions & 4 deletions Sources/NIOPosix/BaseStreamSocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,27 @@ class BaseStreamSocketChannel<Socket: SocketProtocol>: BaseSocketChannel<Socket>
self.close0(error: error, mode: .all, promise: promise)
return
}
try self.shutdownSocket(mode: mode)
// Fail all pending writes and so ensure all pending promises are notified
self.pendingWrites.failAll(error: error, close: false)
self.unregisterForWritable()
promise?.succeed(())

let outboundCloseState = self.pendingWrites.close(promise)
switch outboundCloseState {
case .open:
preconditionFailure("Close resulted in an open state, this should never happen")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's avoid this precondition failure. Instead, we should produce an error just before line 219, and go ahead and close outbound anyway. We can then drop this to assertionFailure so that it crashes in testing.

case .pending:
() // nothing to do
case .readyForClose(let closePromise):
assert(promise == closePromise)
// Shutdown the socket only when the pending writes are dealt with
do {
try self.shutdownSocket(mode: mode)
closePromise?.succeed(())
} catch let err {
closePromise?.fail(err)
}
self.pendingWrites.outboundCloseState = .closed
case .closed:
() // nothing to do
}

self.pipeline.fireUserInboundEventTriggered(ChannelEvent.outputClosed)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This user event now triggers unconditionally, and at the wrong time. It should trigger only after the shutdown. Also, don't forget the unregister for writable

case .input:
Expand Down
7 changes: 7 additions & 0 deletions Sources/NIOPosix/PendingDatagramWritesManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,13 @@ final class PendingDatagramWritesManager: PendingWritesManager {
internal var publishedWritability = true
internal var writeSpinCount: UInt = 16
private(set) var isOpen = true
var outboundCloseState: CloseState {
if self.isOpen {
.open
} else {
.closed
}
}

/// Initialize with a pre-allocated array of message headers and storage references. We pass in these pre-allocated
/// objects to save allocations. They can be safely be re-used for all `Channel`s on a given `EventLoop` as an
Expand Down
81 changes: 74 additions & 7 deletions Sources/NIOPosix/PendingWritesManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,23 @@ internal enum OneWriteOperationResult {
/// The result of trying to write all the outstanding flushed data. That naturally includes all `ByteBuffer`s and
/// `FileRegions` and the individual writes have potentially been retried (see `WriteSpinOption`).
internal struct OverallWriteResult {
enum WriteOutcome {
enum WriteOutcome: Equatable {
/// Wrote all the data that was flushed. When receiving this result, we can unsubscribe from 'writable' notification.
case writtenCompletely
case writtenCompletely(CloseState)

/// Could not write everything. Before attempting further writes the eventing system should send a 'writable' notification.
case couldNotWriteEverything

static func == (lhs: Self, rhs: Self) -> Bool {
switch (lhs, rhs) {
case (.writtenCompletely, .writtenCompletely):
return true
case (.couldNotWriteEverything, .couldNotWriteEverything):
return true
default:
return false
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get a bit nervous about ignoring state when comparing. Do we actually need the equality check? Also, it might make more sense to do this on the CloseState and compare the identity of its underlying futures where applicable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turns out we don't need it anymore so I've just deleted it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦‍♂️ it's needed in tests. Looking again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this has exposed that I was exposing the promise in a way which wasn't needed anyway because we just passed it right into pendingWrites.close which already held it. I've changed the method to return the CloseResult instead.

}

internal var writeResult: WriteOutcome
Expand Down Expand Up @@ -152,7 +163,7 @@ private struct PendingStreamWritesState {
self.subtractOutstanding(bytes: bytes)
}

/// Initialise a new, empty `PendingWritesState`.
/// Initialize a new, empty `PendingWritesState`.
public init() {}

/// Check if there are no outstanding writes.
Expand Down Expand Up @@ -310,6 +321,8 @@ final class PendingStreamWritesManager: PendingWritesManager {

private(set) var isOpen = true

internal var outboundCloseState: CloseState = .open
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need a very good reason to have this be anything but private(set).


/// Mark the flush checkpoint.
func markFlushCheckpoint() {
self.state.markFlushCheckpoint()
Expand Down Expand Up @@ -337,7 +350,7 @@ final class PendingStreamWritesManager: PendingWritesManager {
/// - result: If the `Channel` is still writable after adding the write of `data`.
func add(data: IOData, promise: EventLoopPromise<Void>?) -> Bool {
assert(self.isOpen)
self.state.append(.init(data: data, promise: promise))
self.state.append(PendingStreamWrite(data: data, promise: promise))

if self.state.bytes > waterMark.high
&& channelWritabilityFlag.compareExchange(expected: true, desired: false, ordering: .relaxed).exchanged
Expand Down Expand Up @@ -367,7 +380,7 @@ final class PendingStreamWritesManager: PendingWritesManager {
vectorBufferWriteOperation: (UnsafeBufferPointer<IOVector>) throws -> IOResult<Int>,
scalarFileWriteOperation: (CInt, Int, Int) throws -> IOResult<Int>
) throws -> OverallWriteResult {
try self.triggerWriteOperations { writeMechanism in
var result = try self.triggerWriteOperations { writeMechanism in
switch writeMechanism {
case .scalarBufferWrite:
return try triggerScalarBufferWrite({ try scalarBufferWriteOperation($0) })
Expand All @@ -380,6 +393,28 @@ final class PendingStreamWritesManager: PendingWritesManager {
return .writtenCompletely
}
}

// If we have no more writes check if we have a pending close
if self.isEmpty {
switch result.writeResult {
case .writtenCompletely:
switch self.outboundCloseState {
case .open:
()
case .pending(let eventLoopPromise):
self.outboundCloseState = .readyForClose(eventLoopPromise)
case .readyForClose:
assertionFailure("Transitioned from readyForClose to readyForClose, shouldn't we have closed?")
case .closed:
()
}

case .couldNotWriteEverything:
()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one also feels like it is worth an assertionFailure.

}
result.writeResult = .writtenCompletely(self.outboundCloseState)
}
return result
}

/// To be called after a write operation (usually selected and run by `triggerAppropriateWriteOperation`) has
Expand Down Expand Up @@ -463,11 +498,34 @@ final class PendingStreamWritesManager: PendingWritesManager {
return self.didWrite(itemCount: result.itemCount, result: result.writeResult)
}

/// Fail all the outstanding writes. This is useful if for example the `Channel` is closed.
/// Signal that the `Channel` is closed.
///
/// - Parameters:
/// - promise: Optionally an `EventLoopPromise` that will be succeeded once all outstanding writes have been dealt with
func close(_ promise: EventLoopPromise<Void>?) -> CloseState {
assert(self.isOpen)

if self.isEmpty {
switch self.outboundCloseState {
case .open:
self.outboundCloseState = .readyForClose(promise)
case .readyForClose:
()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code path leaks the promise.

Generally speaking, at this point we've taken ownership of the promise. It's our responsibility to do something with it, which we must do on all code paths.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed the API of this object so that the promise is never surfaced after it has been passed in. The promise must be completed by a call to `closeComplete.

case .pending, .closed:
preconditionFailure("close called on channel in unexpected state: \(self.outboundCloseState)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a touch nerve wracking. I can't see any reason this is actually impossible: I think if the user called close(mode: .output) twice in a row without the close having happened, they'd trap here. The same is true for the closed state.

More broadly, we have to tolerate the user doing this more than once. It's an error if they do, but that's ok.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've attempted account for tolerating being called more than once by cascading promises.

}
} else {
self.outboundCloseState = .pending(promise)
}
return self.outboundCloseState
}

/// Fail all the outstanding writes.
func failAll(error: Error, close: Bool) {
if close {
assert(self.isOpen)
self.isOpen = false
self.outboundCloseState = .closed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The state should be updated before completing the promise

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let's return the promise here rather than have this type complete it. It's a touch easier for the channel to manage things by completing things itself, rather than having this type do it, because the channel can ensure the state is squared away before making the outcall.

}

self.state.removeAll()?.fail(error)
Expand All @@ -487,6 +545,13 @@ final class PendingStreamWritesManager: PendingWritesManager {
}
}

internal enum CloseState {
case open
case pending(EventLoopPromise<Void>?)
case readyForClose(EventLoopPromise<Void>?)
case closed
}

internal enum WriteMechanism {
case scalarBufferWrite
case vectorBufferWrite
Expand All @@ -496,6 +561,8 @@ internal enum WriteMechanism {

internal protocol PendingWritesManager: AnyObject {
var isOpen: Bool { get }
var isEmpty: Bool { get }
var outboundCloseState: CloseState { get }
var isFlushPending: Bool { get }
var writeSpinCount: UInt { get }
var currentBestWriteMechanism: WriteMechanism { get }
Expand All @@ -522,7 +589,7 @@ extension PendingWritesManager {
var oneResult: OneWriteOperationResult
repeat {
guard self.isOpen && self.isFlushPending else {
result.writeResult = .writtenCompletely
result.writeResult = .writtenCompletely(self.outboundCloseState)
break writeSpinLoop
}

Expand Down
Loading
Loading