-
Notifications
You must be signed in to change notification settings - Fork 667
Ensure early dispose is delivered to late subscriber #2756
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Oleh Dokuka <[email protected]>
if (log.isDebugEnabled()) { | ||
log.debug(format(channel, "{}: subscribing inbound receiver"), this); | ||
} | ||
if (inboundDone && getPending() == 0) { | ||
if ((inboundDone && getPending() == 0) || isCancelled()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since cancelation is possible when there is a subscriber, in that case isCancelled would mean dispose happened earlier
volatile int once; | ||
static final AtomicIntegerFieldUpdater<FluxReceive> ONCE = | ||
AtomicIntegerFieldUpdater.newUpdater(FluxReceive.class, "once"); | ||
boolean subscribedOnce; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
simplified that part to be simple bool value since there is no concurrency on that field
Signed-off-by: Oleh Dokuka <[email protected]>
Signed-off-by: Oleh Dokuka <[email protected]>
with that fix I dont see the rsocket test hanging because of an undelivered completion signal on the inbound side) (need that to wait for all bufs to be delivered and consumed) |
Signed-off-by: Oleh Dokuka <[email protected]>
The test is dropped as it relies that the EmbeddedEventLoop can be accessed by multiple threads. This is not allowed any more in Netty 5 netty/netty#12871
This PR fixes the issue that appears when late inbound subscriber races with connection dispose