Draft
Conversation
Merged
…e messages are not dropped during dispose.
Streamlines the dispose logic by adding `DrainAsync` to handle UNSUB, PING/PONG, and channel completion to avoid message loss during shutdown. Applied changes to `NatsJSFetch` and `NatsJSConsume` for proper cleanup and consistent behavior.
Apply DrainAsync to NatsJSOrderedConsume so fetch, consume, and ordered-consume share the same UNSUB -> PING/PONG -> TryComplete dispose path. Replace the flaky in-test stress repro with a sandbox console app (SubCleanDispose) that exercises all four subscription types and checks for sequence gaps.
Disposing the connection mid-consume tore down CommandWriter before the loop could ack buffered messages, throwing ObjectDisposedException and leaving messages stuck NumAckPending until AckWait expired. Reorder NatsConnection.DisposeAsync so subscription drain and writer flush run before the socket closes, and add a reader-active flag so DrainAsync waits for the user iterator's finally to run. Drop the SubCleanDispose sandbox; the new test in ConsumerConsumeTest guards the regression.
The previous commit changed dispose semantics in two ways: subs are now drained before the socket closes, and consumer dispose waits for in-flight consume loops to finish acking buffered messages. Both are strictly more correct, but the wait can block dispose for up to the configured timeout, which is observable. Add NatsOpts.DrainSubscriptionsOnDispose (bool, default false) for the dispose order, and NatsOpts.ConsumerDrainOnDisposeTimeout (TimeSpan?, default null) for the wait. Defaults preserve current behavior; set both to opt into the graceful path.
Polling server-side AckFloor to time the dispose was racing the pull consumer's threshold top-up: the reader could process a second batch before the server stat caught up, so the strict consumed == pullBatch check fired Expected 20 / Actual 30 on Linux runners. Use a client-side TaskCompletionSource to signal once the reader has acked bailAt messages, and assert what actually matters: NumAckPending is zero and AckFloor matches the client-side count, regardless of how many batches the consumer pulled.
Fetch removes itself from the SubscriptionManager via EndSubscription on natural completion (NoMsgs, RequestsPending, MaxBytes, Timeout), so SubscriptionManager.DisposeAsync no longer sees it and the per-sub DrainAsync path that waits for the user reader never fired. Ordered consumer's outer loop tried to RecreateConsumer after the inner cc finished draining, hitting the disposed CommandWriter. Track active reader subs in a per-connection drain participants set, populated from MarkReaderActive / cleared from MarkReaderInactive, and run their DrainAsync from NatsConnection.DisposeAsync between SubscriptionManager.DisposeAsync and CommandWriter.DisposeAsync. Move WaitForReaderDrainAsync outside DrainAsync's _unsubscribed early return so it always runs even when EndSubscription already sent UNSUB. Have the ordered consumer's outer loop bail when the connection is already disposed instead of attempting to recreate. Adds Fetch_connection_dispose_drains_buffered_messages and OrderedConsume_connection_dispose_drains_buffered_messages.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
JetStream consumer dispose was dropping in-flight messages: the per-sub path completed the channel before all wire messages had landed, and connection dispose killed the socket before subs got a chance to drain at all, leaving NumAckPending pinned until AckWait. NatsSubBase gets a DrainAsync (UNSUB -> PING/PONG -> TryComplete) so per-sub dispose flushes the wire cleanly. The connection-dispose reorder and the wait for active consume loops to finish acking buffered messages are gated behind two new opt-in NatsOpts (DrainSubscriptionsOnDispose, ConsumerDrainOnDisposeTimeout) since they change dispose timing. Defaults preserve current behavior.