-
Notifications
You must be signed in to change notification settings - Fork 1
Implement subscription drain functionality #109
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughAdds subscription draining state and APIs, centralizes pending-messages accounting in the subscription module, updates connection and dispatcher to use the centralized helpers and to drop messages for draining subscriptions, and introduces tests covering drain behaviors. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Pre-merge checks (3 passed)✅ Passed checks (3 passed)
📜 Recent review detailsConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
✨ Finishing Touches🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
Add comprehensive subscription drain support that allows graceful subscription shutdown while processing remaining messages. Key features: - Event-driven completion using ResetEvent for efficient waiting - Automatic detection when last message is processed - Blocks new messages once draining starts - Works with both sync and async subscriptions - Thread-safe implementation using atomic counters API: - sub.drain() - Start draining process - sub.isDraining() / sub.isDrainComplete() - Status checks - sub.waitForDrainCompletion(timeout_ms) - Block until complete Implementation details: - Leverages existing pending_msgs/pending_bytes counters - Helper functions incrementPending/decrementPending are module-private - ResetEvent is automatically set when pending count reaches zero - Connection rejects new messages for draining subscriptions Comprehensive test coverage includes immediate completion, pending message processing, async callback handling, message blocking, timeout scenarios, and error cases.
3e2f7e4 to
a33b5ec
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (4)
src/connection.zig (1)
1121-1124: Nit: fix log format specifier.Use an explicit integer formatter for sid.
Apply:
- log.err("Async subscription {} has no assigned dispatcher", .{message.sid}); + log.err("Async subscription {d} has no assigned dispatcher", .{message.sid});tests/drain_test.zig (3)
43-45: Avoid sleep-based flakiness; wait until pending arrives.Replace fixed sleep with a bounded wait on pending count.
- // Give messages time to arrive - std.time.sleep(10 * std.time.ns_per_ms); + // Wait (up to 1s) for both messages to be counted as pending + var waited: u64 = 0; + while (sub.pending_msgs.load(.acquire) < 2 and waited < 1000) : (waited += 5) { + std.time.sleep(5 * std.time.ns_per_ms); + }
46-48: Tests rely on internal pending counters; consider public-API-only assertions.The PR summary says internals are hidden, yet tests use sub.pending_msgs directly. If that field is not part of the public surface, these tests will be brittle.
Options:
- Keep using pending counters but mark them public/stable in Subscription for tests.
- Or switch to API-based checks: consume with nextMsg(), assert isDraining()/isDrainComplete(), and optionally count messages processed to infer pending=0.
Also applies to: 61-65, 71-75, 169-171, 137-138
160-179: Optional: assert that post-drain publishes are truly undispatchable.After consuming the original message, try fetching another with a short timeout to prove drops.
// Consume the original message var msg = try sub.nextMsg(1000); defer msg.deinit(); try std.testing.expect(std.mem.eql(u8, msg.data, "before drain")); // Should be complete try std.testing.expect(sub.isDrainComplete()); + // And no extra messages should be retrievable + const maybe = sub.nextMsg(50); + try std.testing.expectError(error.Timeout, maybe);
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
src/connection.zig(3 hunks)src/dispatcher.zig(3 hunks)src/subscription.zig(4 hunks)tests/all_tests.zig(1 hunks)tests/drain_test.zig(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.zig
📄 CodeRabbit inference engine (CLAUDE.md)
Format code using
zig fmtbefore commit
Files:
tests/all_tests.zigtests/drain_test.zigsrc/dispatcher.zigsrc/connection.zigsrc/subscription.zig
🧠 Learnings (4)
📚 Learning: 2025-08-29T10:22:22.247Z
Learnt from: CR
PR: lalinsky/nats.zig#0
File: CLAUDE.md:0-0
Timestamp: 2025-08-29T10:22:22.247Z
Learning: Run `zig build test` to confirm your changes are working
Applied to files:
tests/all_tests.zig
📚 Learning: 2025-08-25T14:51:33.570Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#31
File: tests/jetstream_pull_test.zig:21-23
Timestamp: 2025-08-25T14:51:33.570Z
Learning: The nats.zig project uses a "tests:beforeEach" test in tests/all_tests.zig that runs before each test to clean up all JetStream streams and consumers, ensuring test isolation. This eliminates the need for unique test resource names as each test starts with a clean JetStream state.
Applied to files:
tests/all_tests.zigtests/drain_test.zig
📚 Learning: 2025-09-09T19:51:02.156Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#108
File: src/dispatcher.zig:60-65
Timestamp: 2025-09-09T19:51:02.156Z
Learning: In the NATS Zig codebase dispatcher.zig, when decrementing pending_bytes (atomic u64) with message_data_len (usize) using fetchSub, explicit casting is not required as Zig handles the implicit conversion automatically and the code builds successfully.
Applied to files:
src/dispatcher.zigsrc/connection.zig
📚 Learning: 2025-08-25T02:49:59.119Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#28
File: src/response_manager.zig:89-105
Timestamp: 2025-08-25T02:49:59.119Z
Learning: In this NATS Zig codebase, subscriptions are reference counted, so calling deinit() on a subscription from multiple locations (like both Connection and ResponseManager) is safe and doesn't cause double-free issues.
Applied to files:
src/dispatcher.zigsrc/connection.zig
🔇 Additional comments (15)
src/subscription.zig (5)
61-63: LGTM! Clean drain state management.The drain state fields are well-designed - using atomic bool for the draining flag and ResetEvent for efficient completion waiting without polling.
125-139: Good implementation of the drain method.The drain logic correctly handles both immediate completion (when no pending messages) and deferred completion. The UNSUB is sent immediately to stop new messages from arriving.
149-164: Solid timeout implementation with proper error handling.The implementation properly handles indefinite wait (timeout_ms == 0) and converts milliseconds to nanoseconds for the timed wait. Good error mapping for NotDraining and Timeout cases.
174-174: Correct integration with pending counter management.Using the centralized
decrementPendinghelper maintains consistency with the drain completion logic.
218-232: Thread-safe pending counter management with proper drain completion signaling.The implementation correctly uses atomic operations with appropriate memory ordering. The drain completion check (remaining_msgs == 1) is correct - it triggers when decrementing from 1 to 0. The logging provides good visibility into drain operations.
src/dispatcher.zig (3)
17-18: Clean module aliasing for the subscription module.Using a module alias keeps the code readable while providing access to the new helper functions.
64-64: Consistent use of centralized pending counter management.Good to see the dispatcher using the same
decrementPendinghelper for dropped messages during cleanup.
147-147: Proper decrement after handler completion.The pending counter is correctly decremented after the handler completes, whether it succeeds or fails. This ensures accurate tracking for drain completion.
tests/all_tests.zig (1)
11-11: Test suite properly updated.Adding the drain tests to the test suite ensures the new functionality gets tested along with everything else.
src/connection.zig (4)
1098-1103: Correct place to drop messages for draining subs.Early return before touching pending counters avoids churn and races. Looks good.
1105-1105: Centralized pending increment is fine.Calling incrementPending once at receipt keeps semantics consistent across sync/async. No issues.
1114-1117: Good: undo pending on enqueue failure.This keeps the drain-completion signal accurate.
1134-1142: Good: decrement on sync enqueue errors.All error paths restore counters; drain completion won’t get stuck.
tests/drain_test.zig (2)
21-26: LGTM: immediate drain completes fast.Asserts match the API.
205-207: LGTM: NotDraining error is validated.Matches API contract.
… state manipulation - Replace direct sub.draining.store() with proper test scenario - Publish message, start drain without consuming, then expect timeout - Addresses CodeRabbit review feedback about bypassing invariants Co-authored-by: Lukáš Lalinský <[email protected]>
Replace sleep-based timing with bounded wait on pending count to eliminate test flakiness. Add assertion to verify dropped messages are truly undispatchable after drain completion.
Summary
Implements comprehensive subscription drain support that allows graceful subscription shutdown while processing remaining messages.
Key Features
API
sub.drain()- Start the draining processsub.isDraining()/sub.isDrainComplete()- Status checkssub.waitForDrainCompletion(timeout_ms)- Block until completion with timeoutImplementation Details
pending_msgs/pending_bytescounters from the base branchincrementPending/decrementPendingare module-private standalone functionsTest Coverage
Comprehensive test suite with 7 test cases covering:
Architecture Benefits
Based on the pending-messages-tracking branch which provides the foundational atomic counters for tracking message state through the entire pipeline.