Skip to content

BaseStreamSocketChannel half-close allows outstanding writes to complete #3148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: main
Choose a base branch
from

Conversation

rnro
Copy link
Contributor

@rnro rnro commented Mar 18, 2025

Motivation:

At the moment half-closes are actioned immediately and fails all outstanding writes. We should refuse new writes but allow these writes to complete before completing the close.

Modifications:

Modify the PendingWritesManager internal buffer to hold an enum of either writes or close events. We use this to store the close and only action it when the preceding writes have been handled.

Result:

Outbound close should no longer fail outstanding writes

Should resolve #3139

@rnro rnro added the 🔨 semver/patch No public API change. label Mar 18, 2025
Motivation:

At the moment half-closes are actioned immediately and fails all outstanding writes.
We should refuse new writes but allow these writes to complete before
completing the close.

Modifications:

Modify the PendingWritesManager internal buffer to hold an enum of
either writes or close events. We use this to store the close and only
action it when the preceding writes have been handled.

Result:

Outbound close should no longer fail outstanding writes
@rnro rnro force-pushed the queue_outbound_close branch from ef380ee to a9d9404 Compare March 18, 2025 16:32
@rnro rnro requested a review from Lukasa March 21, 2025 15:29
let outboundCloseState = self.pendingWrites.close(promise)
switch outboundCloseState {
case .open:
preconditionFailure("Close resulted in an open state, this should never happen")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's avoid this precondition failure. Instead, we should produce an error just before line 219, and go ahead and close outbound anyway. We can then drop this to assertionFailure so that it crashes in testing.

self.pendingWrites.outboundCloseState = .closed
case .closed:
() // nothing to do
}

self.pipeline.fireUserInboundEventTriggered(ChannelEvent.outputClosed)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This user event now triggers unconditionally, and at the wrong time. It should trigger only after the shutdown. Also, don't forget the unregister for writable

case .readyForClose:
()
case .pending, .closed:
preconditionFailure("close called on channel in unexpected state: \(self.outboundCloseState)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a touch nerve wracking. I can't see any reason this is actually impossible: I think if the user called close(mode: .output) twice in a row without the close having happened, they'd trap here. The same is true for the closed state.

More broadly, we have to tolerate the user doing this more than once. It's an error if they do, but that's ok.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've attempted account for tolerating being called more than once by cascading promises.

case .open:
self.outboundCloseState = .readyForClose(promise)
case .readyForClose:
()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code path leaks the promise.

Generally speaking, at this point we've taken ownership of the promise. It's our responsibility to do something with it, which we must do on all code paths.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed the API of this object so that the promise is never surfaced after it has been passed in. The promise must be completed by a call to `closeComplete.

@@ -310,6 +321,8 @@ final class PendingStreamWritesManager: PendingWritesManager {

private(set) var isOpen = true

internal var outboundCloseState: CloseState = .open
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need a very good reason to have this be anything but private(set).

case .open, .pending:
()
case .readyForClose(let eventLoopPromise):
// TODO: it doesn't seem right that I have to pass an error in here)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is.

}

case .couldNotWriteEverything:
()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one also feels like it is worth an assertionFailure.

@rnro rnro requested a review from glbrntt March 31, 2025 14:30
Comment on lines 107 to 116
static func == (lhs: Self, rhs: Self) -> Bool {
switch (lhs, rhs) {
case (.writtenCompletely, .writtenCompletely):
return true
case (.couldNotWriteEverything, .couldNotWriteEverything):
return true
default:
return false
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get a bit nervous about ignoring state when comparing. Do we actually need the equality check? Also, it might make more sense to do this on the CloseState and compare the identity of its underlying futures where applicable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turns out we don't need it anymore so I've just deleted it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦‍♂️ it's needed in tests. Looking again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this has exposed that I was exposing the promise in a way which wasn't needed anyway because we just passed it right into pendingWrites.close which already held it. I've changed the method to return the CloseResult instead.

func failAll(error: Error, close: Bool) {
if close {
assert(self.isOpen)
self.isOpen = false
self.state.removeAll()?.fail(error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General rule of thumb: update your state before making outcalls. Failing the promise here might lead to calling back into this type again and we haven't fully reconciled our state yet. We should grab the promise and fail it afterwards.

Copy link
Contributor Author

@rnro rnro Apr 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the general guidance 🙂

Comment on lines 512 to 513
closePromise?.fail(error)
self.outboundCloseState = .closed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, update the state then complete the promise

func failAll(error: Error, close: Bool) {
if close {
assert(self.isOpen)
self.isOpen = false
self.state.removeAll()?.fail(error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it's just the diff but it looks like this function also failed all the promises if !close, that doesn't seem to happen anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good spot

closePromise?.succeed(())
}

self.outboundCloseState = .closed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The state should be updated before completing the promise

@rnro rnro requested a review from glbrntt April 2, 2025 09:02
@rnro rnro force-pushed the queue_outbound_close branch from c7f73ff to e439537 Compare April 2, 2025 09:23
Copy link
Contributor

@glbrntt glbrntt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This LGTM now but please wait for Cory's review as well.

@rnro rnro requested a review from Lukasa April 10, 2025 05:18
Copy link
Contributor

@Lukasa Lukasa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, this is looking closer, I think I just have some suggestions around promise hygiene and naming.

///
/// - Parameters:
/// - promise: Optionally an `EventLoopPromise` that will be succeeded once all outstanding writes have been dealt with
func close(_ promise: EventLoopPromise<Void>?) -> CloseResult {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be called closeOutbound.

}

case .couldNotWriteEverything:
assertionFailure("Write result is .couldNotWriteEverything but we have no more writes to perform.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is going to be an assertion failure we should have some fallback logic. Probably that should be throwing an error.

self.outboundCloseState = .closed
case .pending(let closePromise), .readyForClose(let closePromise):
self.outboundCloseState = .closed
closePromise?.fail(error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrm, should we return this instead of closing it at this stage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain the thinking here? My understanding from our previous discussions was that it would be better that once the higher-level caller handed-off the promise it was the responsibility of the PendingWritesManager to complete the promise, so passing it in was a one-way street. This is why I separated-out CloseResult as its own type, so we have clarity on where the promise is exposed and not exposed.

Are you proposing that this method returns EventLoopPromise<Void>? where the promise is non-nil if the caller needs to do something with it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I've just scrolled up and seen #3148 (comment) - that makes sense.

closePromise.setOrCascade(to: promise)
self.outboundCloseState = .readyForClose(closePromise)
case .closed:
promise?.succeed(())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As noted above, it's probably better to let the caller complete the promise.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case are you proposing that this method returns (CloseResult, EventLoopPromise<Void>?) where the promise is non-nil if the caller needs to do something with it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or put the promise in the close result as associated data.

@rnro rnro requested a review from Lukasa April 11, 2025 13:40

self.pipeline.fireUserInboundEventTriggered(ChannelEvent.outputClosed)
let writesCloseResult = self.pendingWrites.closeOutbound(promise)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, ok: there's a test missing here. The unregisterForWritable above isn't appropriate. I believe the following situation is possible:

  1. Many writes are done, such that we are no longer writable at the socket layer.
  2. The user issues a final flush.
  3. The user calls close(mode: .output)

In this case, we'll have flushed, unwritten data, but we will no longer be registered for writable. As long as no other flush or close comes along, we'll stay unregistered for writable, and so we will make no attempt to empty the buffer. That will cause us to be wedged open.

I recommend, before fixing the bug, you write a test that correctly reproduces this wedge, and use that to validate the bug is actually fixed. This is going to be a challenging test to write. It can be written using both regular sockets, but it can also be written using the SAL, which may be easier. Let me know if you'd like to pair on a test for this.

case .closed(let closePromise):
closePromise?.succeed(())
case .open:
promise?.fail(ChannelError.inappropriateOperationForState)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This path appears to violate the "pending writes manager takes ownership of the promise" construction.

/// Fail all the outstanding writes. This is useful if for example the `Channel` is closed.
func failAll(error: Error, close: Bool) {
/// Fail all the outstanding writes.
func failAll(error: Error, close: Bool) -> CloseResult? {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This "boolean parameter with optional return contingent on the value of the bool" smells like there are two methods here. It may be worth factoring them apart. A safe way to do this, if we want to keep this patch understandable, is to make a separate refactoring PR that only refactors this method into two.

(Sidebar: the two methods are actually implemented as one calling the other: the one with close: true does some stuff, then it calls the one with close: false.)

"We are in .readyForClose state but we still have pending writes. This should never happen."
)
case .closed:
preconditionFailure(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a touch nervous about these precondition failures. Can we come up with a logical behaviour here? Probably it's "fail the promise" and "assertionFailure to get crashes in debug".

case .closed:
() // nothing to do
case .open, .pending:
preconditionFailure("close complete called on channel in unexpected state: \(self.outboundCloseState)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this a thrown error instead of a crash?


switch self.outboundCloseState {
case .readyForClose(let closePromise):
self.outboundCloseState = .closed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let's return the promise here rather than have this type complete it. It's a touch easier for the channel to manage things by completing things itself, rather than having this type do it, because the channel can ensure the state is squared away before making the outcall.

case .pending: .pending
case .readyForClose(let closePromise): .readyForClose(closePromise)
case .closed: .closed(nil)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's outline this. Looks like an initializer to me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
🔨 semver/patch No public API change.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

isOutboundHalfClosureEnabled closes channel before all writes are flushed
3 participants