-
Notifications
You must be signed in to change notification settings - Fork 1
Implement autounsubscribe functionality for subscriptions #113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughAdds per-subscription auto-unsubscribe: subscriptions gain atomic max/delivered counters and an API to set limits; dispatcher and subscription logic track deliveries and remove subscriptions when limits are reached; connection handles UNSUB with optional max and supports internal removal; tests added for behavior and reconnection. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
🧹 Nitpick comments (13)
src/subscription.zig (3)
208-215: Question: API semantics vs. server semantics — confirm intention.NATS treats UNSUB’s count as a total since subscription start; if current delivered ≥ max, the UNSUB is still valid and simply takes effect immediately. Your API returns
MaxAlreadyReachedinstead. If this is intentional, consider documenting it onautoUnsubscribe()to avoid surprises for users migrating from other clients. Otherwise, drop the error and allow an immediate unsubscribe. (docs.nats.io)
208-215: Optional: prevent unbounded pending growth after limit reached.If messages were already queued when the limit is reached, pre-check returns Timeout and those messages sit in the queue until destroy. Consider draining and dropping any remaining queued messages when the limit is reached (or in the connection’s
removeSubscriptionInternal) to release memory and decrement pending counters.If you want, I can wire a safe queue-drain path that decrements pending counters and deinits messages upon removal.
182-207: Add a short doc comment clarifying the “total vs remaining” behavior.Future you (and users) will thank you.
Example:
/// autoUnsubscribe sets a total delivery budget since subscription start. /// The server UNSUB count is total, not remaining. If current delivered >= max, /// this call returns MaxAlreadyReached (immediate unsubscribe intent). pub fn autoUnsubscribe(self: *Subscription, max: u64) AutoUnsubscribeError!void { ... }src/connection.zig (5)
748-760: Prefer MAX_CONTROL_LINE_SIZE and avoidcatch unreachablein UNSUB builderUse the existing control-line size constant and
bufPrintto eliminate the magic 256 and unreachable paths.- pub fn unsubscribeInternal(self: *Self, sid: u64, max: ?u64) !void { - var buffer: [256]u8 = undefined; - var stream = std.io.fixedBufferStream(&buffer); - var writer = stream.writer(); - - if (max) |m| { - writer.print("UNSUB {d} {d}\r\n", .{ sid, m }) catch unreachable; // Will always fit - } else { - writer.print("UNSUB {d}\r\n", .{sid}) catch unreachable; // Will always fit - } - - try self.write_buffer.append(stream.getWritten()); - } + pub fn unsubscribeInternal(self: *Self, sid: u64, max: ?u64) !void { + var buf: [MAX_CONTROL_LINE_SIZE]u8 = undefined; + const line = if (max) |m| + try std.fmt.bufPrint(&buf, "UNSUB {d} {d}\r\n", .{ sid, m }) + else + try std.fmt.bufPrint(&buf, "UNSUB {d}\r\n", .{ sid }); + try self.write_buffer.append(line); + }
777-785: Send UNSUB outside subs_mutex to shorten the critical sectionYou can call
unsubscribeInternalafter releasingsubs_mutex(map already updated), reducing contention and avoiding I/O inside the lock.Example:
// inside unsubscribe() _ = self.subscriptions.remove(sub.sid); // unlock subs_mutex here, then: self.unsubscribeInternal(sub.sid, null) catch |err| { log.err("Failed to send UNSUB for sid {d}: {}", .{ sub.sid, err }); };
787-799: Log subject on removal for better traceabilityIncluding the subject makes ops debugging easier.
- if (self.subscriptions.fetchRemove(sid)) |kv| { - log.debug("Removed subscription {d} from connection", .{sid}); + if (self.subscriptions.fetchRemove(sid)) |kv| { + log.debug("Removed subscription {d} ({s}) from connection", .{ sid, kv.value.subject }); // Release connection's reference to the subscription kv.value.release(); }
1168-1171: Unknown sid path: consider skipping redundant UNSUB attemptsIf we get a message for an unknown sid, sending another UNSUB is typically redundant. Dropping the message (as you already do) might be sufficient; alternatively, gate this with a one-shot per sid to avoid repeated UNSUBs on pathological inputs.
1612-1641: Reapply autounsubscribe budget on reconnect
resendSubscriptions()reissues SUBs but doesn’t reapply any remaining max delivery budget. If a sub hadmax_msgsand wasn’t yet satisfied before disconnect, the server will forget the priorUNSUB sid max. Suggest: after each SUB, checksub.max_msgsvssub.delivered_msgsand sendUNSUB sid remainingwhen remaining > 0. This keeps server-side enforcement aligned post-reconnect and reduces over-delivery.Happy to wire this in here or via a subscription-level hook (e.g.,
sub.onResubscribed(conn)).tests/autounsubscribe_test.zig (5)
10-15: Flush after setting autounsubscribe to remove ordering racesEnsures SUB and UNSUB(max) are processed before PUBs.
const sub = try conn.subscribeSync("auto.test"); defer sub.deinit(); // Set autounsubscribe limit to 3 messages try sub.autoUnsubscribe(3); + try conn.flush();
77-86: Replace fixed sleep with a bounded wait loop to avoid flakesPolling with a short sleep until count >= 2 (or timeout) is more stable on busy CI.
- // Wait a bit for message processing - std.time.sleep(100 * std.time.ns_per_ms); - - // Should have received exactly 2 messages - ctx.mutex.lock(); - const count = messages_received.items.len; - ctx.mutex.unlock(); - - try std.testing.expectEqual(@as(usize, 2), count); + const deadline_ms = std.time.milliTimestamp() + 1000; + var count: usize = 0; + while (std.time.milliTimestamp() < deadline_ms) { + ctx.mutex.lock(); + count = messages_received.items.len; + ctx.mutex.unlock(); + if (count >= 2) break; + std.time.sleep(10 * std.time.ns_per_ms); + } + try std.testing.expectEqual(@as(usize, 2), count);
118-135: Stronger memory ordering for cross-thread readsUse
.acquirewhen asserting on atomics potentially updated by another thread.- try std.testing.expectEqual(@as(u64, 0), sub.delivered_msgs.load(.monotonic)); + try std.testing.expectEqual(@as(u64, 0), sub.delivered_msgs.load(.acquire)); … - try std.testing.expectEqual(@as(u64, 1), sub.delivered_msgs.load(.monotonic)); + try std.testing.expectEqual(@as(u64, 1), sub.delivered_msgs.load(.acquire)); … - try std.testing.expectEqual(@as(u64, 2), sub.delivered_msgs.load(.monotonic)); + try std.testing.expectEqual(@as(u64, 2), sub.delivered_msgs.load(.acquire));
6-36: Optional: flush after subscribe to fully eliminate timing dependenciesNot strictly required given FIFO write buffer, but it makes the test intent explicit.
const sub = try conn.subscribeSync("auto.test"); defer sub.deinit(); + try conn.flush();
88-107: Add a reconnect scenario to cover server budget lossAdd a test that sets
autoUnsubscribe(2), delivers 1, forces a reconnect, publishes 2 more, and asserts only 1 more is delivered (server-side budget re-applied) or zero more if you enforce purely client-side removal. This will validate the reconnect behavior discussed in the code review.I can draft this test once you confirm the intended semantics across reconnect.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
src/connection.zig(3 hunks)src/dispatcher.zig(2 hunks)src/subscription.zig(3 hunks)tests/all_tests.zig(1 hunks)tests/autounsubscribe_test.zig(1 hunks)
🧰 Additional context used
🧠 Learnings (4)
📓 Common learnings
Learnt from: lalinsky
PR: lalinsky/nats.zig#28
File: src/response_manager.zig:89-105
Timestamp: 2025-08-25T02:49:59.119Z
Learning: In this NATS Zig codebase, subscriptions are reference counted, so calling deinit() on a subscription from multiple locations (like both Connection and ResponseManager) is safe and doesn't cause double-free issues.
📚 Learning: 2025-08-25T14:51:33.570Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#31
File: tests/jetstream_pull_test.zig:21-23
Timestamp: 2025-08-25T14:51:33.570Z
Learning: The nats.zig project uses a "tests:beforeEach" test in tests/all_tests.zig that runs before each test to clean up all JetStream streams and consumers, ensuring test isolation. This eliminates the need for unique test resource names as each test starts with a clean JetStream state.
Applied to files:
tests/all_tests.zigtests/autounsubscribe_test.zig
📚 Learning: 2025-08-25T02:49:59.119Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#28
File: src/response_manager.zig:89-105
Timestamp: 2025-08-25T02:49:59.119Z
Learning: In this NATS Zig codebase, subscriptions are reference counted, so calling deinit() on a subscription from multiple locations (like both Connection and ResponseManager) is safe and doesn't cause double-free issues.
Applied to files:
src/dispatcher.zigtests/autounsubscribe_test.zig
📚 Learning: 2025-09-09T19:51:02.156Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#108
File: src/dispatcher.zig:60-65
Timestamp: 2025-09-09T19:51:02.156Z
Learning: In the NATS Zig codebase dispatcher.zig, when decrementing pending_bytes (atomic u64) with message_data_len (usize) using fetchSub, explicit casting is not required as Zig handles the implicit conversion automatically and the code builds successfully.
Applied to files:
src/dispatcher.zigtests/autounsubscribe_test.zig
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: test
🔇 Additional comments (3)
tests/all_tests.zig (1)
10-10: Add import: looks good.Autounsubscribe tests now run with the suite. No concerns.
src/dispatcher.zig (1)
133-135: Delivered counter increment placement is correct.Incrementing before invoking the handler matches “messages sent” semantics and ensures the autounsubscribe check can trigger deterministically afterward.
src/subscription.zig (1)
144-147: Drain UNSUB error handling is fine.Catching and logging the failure while continuing drain bookkeeping is reasonable. No change requested.
| // Check autounsubscribe limit after successful message delivery | ||
| if (subscription.max_msgs) |max| { | ||
| if (delivered >= max) { | ||
| log.debug("Subscription {} reached autounsubscribe limit ({}), removing", .{ subscription.sid, max }); | ||
| subscription.nc.removeSubscriptionInternal(subscription.sid); | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Data race on max limit (max_msgs) — make it atomic and load with memory ordering.
subscription.max_msgs is a plain nullable field written by user code (autoUnsubscribe) and read here by the dispatcher thread. That’s a concurrent read/write with no synchronization. Convert the limit to an atomic u64 with 0 as “no limit”, and load it here with .acquire. Then compare against delivered. Apply along with the field/type change in subscription.zig (see my comment there).
Apply this diff after changing the field name/type to max_total: std.atomic.Value(u64):
- if (subscription.max_msgs) |max| {
- if (delivered >= max) {
- log.debug("Subscription {} reached autounsubscribe limit ({}), removing", .{ subscription.sid, max });
- subscription.nc.removeSubscriptionInternal(subscription.sid);
- }
- }
+ const max = subscription.max_total.load(.acquire);
+ if (max != 0 and delivered >= max) {
+ log.debug("Subscription {} reached autounsubscribe limit ({}), removing", .{ subscription.sid, max });
+ subscription.nc.removeSubscriptionInternal(subscription.sid);
+ }Note: Using 0 as sentinel avoids nullable reads and removes the race window.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Check autounsubscribe limit after successful message delivery | |
| if (subscription.max_msgs) |max| { | |
| if (delivered >= max) { | |
| log.debug("Subscription {} reached autounsubscribe limit ({}), removing", .{ subscription.sid, max }); | |
| subscription.nc.removeSubscriptionInternal(subscription.sid); | |
| } | |
| } | |
| // Check autounsubscribe limit after successful message delivery | |
| const max = subscription.max_total.load(.acquire); | |
| if (max != 0 and delivered >= max) { | |
| log.debug("Subscription {} reached autounsubscribe limit ({}), removing", .{ subscription.sid, max }); | |
| subscription.nc.removeSubscriptionInternal(subscription.sid); | |
| } |
🤖 Prompt for AI Agents
In src/dispatcher.zig around lines 149 to 156, the code reads the user-writable
nullable subscription.max_msgs without synchronization causing a data race;
after you change the field in subscription.zig to max_total:
std.atomic.Value(u64) (0 = no limit), replace the nullable read with an atomic
acquire load, store the loaded u64 into a local (e.g., max =
subscription.max_total.load(.Acquire)), skip the check when max == 0, and
otherwise compare delivered >= max and call
subscription.nc.removeSubscriptionInternal(subscription.sid) when the limit is
reached; ensure you use .Acquire ordering for the load to synchronize with
writes from autoUnsubscribe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
src/subscription.zig (2)
61-64: Atomic autounsubscribe state: LGTM.Using atomic u64 with 0 sentinel is the right fix. Consider renaming to max_total for clarity (total since sub start).
- max_msgs: std.atomic.Value(u64) = std.atomic.Value(u64).init(0), // 0 means no limit + // total messages since subscription start; 0 = no limit + max_total: std.atomic.Value(u64) = std.atomic.Value(u64).init(0),
182-206: Document autoUnsubscribe repeat semantics Update the doc comment to state that each invocation sends an UNSUB with the newmax, resetting the server-side cap to the most recent total, and that calls withmax == 0ormax <= delivered_msgswill returnInvalidMaxorMaxAlreadyReached, respectively.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
src/connection.zig(2 hunks)src/dispatcher.zig(2 hunks)src/subscription.zig(3 hunks)tests/autounsubscribe_test.zig(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- tests/autounsubscribe_test.zig
- src/connection.zig
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: lalinsky
PR: lalinsky/nats.zig#28
File: src/response_manager.zig:89-105
Timestamp: 2025-08-25T02:49:59.119Z
Learning: In this NATS Zig codebase, subscriptions are reference counted, so calling deinit() on a subscription from multiple locations (like both Connection and ResponseManager) is safe and doesn't cause double-free issues.
📚 Learning: 2025-08-25T02:49:59.119Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#28
File: src/response_manager.zig:89-105
Timestamp: 2025-08-25T02:49:59.119Z
Learning: In this NATS Zig codebase, subscriptions are reference counted, so calling deinit() on a subscription from multiple locations (like both Connection and ResponseManager) is safe and doesn't cause double-free issues.
Applied to files:
src/dispatcher.zigsrc/subscription.zig
📚 Learning: 2025-09-09T19:51:02.156Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#108
File: src/dispatcher.zig:60-65
Timestamp: 2025-09-09T19:51:02.156Z
Learning: In the NATS Zig codebase dispatcher.zig, when decrementing pending_bytes (atomic u64) with message_data_len (usize) using fetchSub, explicit casting is not required as Zig handles the implicit conversion automatically and the code builds successfully.
Applied to files:
src/dispatcher.zigsrc/subscription.zig
🔇 Additional comments (4)
src/dispatcher.zig (1)
149-154: removeSubscriptionInternal is already idempotent
The function lockssubs_mutexand usesfetchRemove, so a second (or concurrent) call with the samesidsimply does nothing and never callsrelease()again. No changes needed.src/subscription.zig (3)
144-146: Drain UNSUB error handling: LGTM.Sending UNSUB (no max) and logging on failure is appropriate for drain.
208-214: Pre-check before pop: LGTM.Atomic load + 0-sentinel keeps sync consumers from reading past the cap.
221-230: Post-pop accounting: LGTM; mirrors dispatcher.Increment with fetchAdd and remove when >= max is fine for the sync path.
| // Increment delivered counter for autounsubscribe | ||
| const delivered = subscription.delivered_msgs.fetchAdd(1, .acq_rel) + 1; | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Prevent over-delivery: guard before invoking the handler.
If multiple messages are already queued for this sid, the current check (after handler) can still deliver >max to the app. Guard immediately after increment and drop messages when delivered > max.
// Increment delivered counter for autounsubscribe
- const delivered = subscription.delivered_msgs.fetchAdd(1, .acq_rel) + 1;
+ const delivered = subscription.delivered_msgs.fetchAdd(1, .acq_rel) + 1;
+ const max_for_drop = subscription.max_msgs.load(.acquire);
+ if (max_for_drop > 0 and delivered > max_for_drop) {
+ log.debug("Dropping message beyond autounsubscribe limit for sid {}", .{subscription.sid});
+ // Message ownership hasn't been transferred; we must clean up.
+ message.deinit();
+ subscription_mod.decrementPending(subscription, message_data_len);
+ return;
+ }Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In src/dispatcher.zig around lines 133-135, the delivered counter is incremented
but the max_msgs guard runs after the handler, allowing over-delivery; move the
max check immediately after the fetchAdd so that if subscription.max_msgs != 0
and delivered > subscription.max_msgs you drop the message and do not call the
handler. Ensure you free or release the message resources and continue the
dispatch loop (or return) without invoking the handler; do not perform handler
invocation when the post-increment delivered value exceeds max_msgs.
Add automatic unsubscription capability that removes subscriptions after receiving a specified maximum number of messages. This feature is essential for request-response patterns and resource management. Key features: - Atomic message delivery counting with proper memory ordering - Protocol-compliant UNSUB commands with message limits - Comprehensive error handling with rollback on failure - Support for both sync and async subscription types - Thread-safe subscription removal from connection state The implementation follows NATS protocol standards and includes extensive test coverage for all functionality and edge cases.
Change max_msgs from optional u64 to atomic u64 (0 = no limit) to ensure thread-safe access from both dispatcher and subscription threads. Also set max_msgs only after successfully sending UNSUB to server to avoid inconsistent state on send failures. Key changes: - max_msgs: ?u64 -> std.atomic.Value(u64) with 0 meaning no limit - Use atomic load/store operations with proper memory ordering - Set limit after UNSUB command succeeds, not before - Update dispatcher.zig to use atomic operations
Address code review suggestions to improve robustness and code quality: - Move UNSUB sending outside subscription mutex locks to reduce critical section time - Add subject to removal log messages for better debugging traceability - Add documentation comment explaining autounsubscribe behavior - Add flush() calls in tests after setting autounsubscribe to prevent timing races - Replace sleep with bounded wait loop in async test to avoid flakes - Use .acquire memory ordering for cross-thread atomic reads in tests These changes improve thread safety, test reliability, and code maintainability while keeping all functionality working correctly.
3fc8089 to
c3c691e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
src/dispatcher.zig (1)
149-154: Ensure messages aren't over-delivered beyond the limit.The autounsubscribe check happens after the handler executes, meaning if multiple messages are already queued for this subscription, the handler could receive more than
maxmessages before the subscription is removed. Consider checking the limit before invoking the handler to prevent over-delivery.Apply this diff to check the limit earlier:
// Increment delivered counter for autounsubscribe const delivered = subscription.delivered_msgs.fetchAdd(1, .acq_rel) + 1; + + // Check if we've already exceeded the limit + const max = subscription.max_msgs.load(.acquire); + if (max > 0 and delivered > max) { + log.debug("Dropping message {} beyond autounsubscribe limit {} for sid {}", .{ delivered, max, subscription.sid }); + message.deinit(); + subscription_mod.decrementPending(subscription, message_data_len); + return; + } // Call the subscription's handler in this dispatcher thread contextThen you can still keep the existing check after the handler to handle the
delivered == maxcase for subscription removal.
🧹 Nitpick comments (3)
tests/autounsubscribe_test.zig (3)
35-37: Bump the post-limit nextMsg() timeout to reduce CI flakiness.100 ms can be tight under load; use 500 ms for stability.
Apply this diff:
- try std.testing.expectError(error.Timeout, sub.nextMsg(100)); + try std.testing.expectError(error.Timeout, sub.nextMsg(500));
79-91: Use a monotonic timer instead of milliTimestamp to avoid clock jumps.std.time.Timer is monotonic; avoids flakiness on clock adjustments.
Apply this diff:
- const deadline_ms = std.time.milliTimestamp() + 1000; - var count: usize = 0; - while (std.time.milliTimestamp() < deadline_ms) { + var timer = try std.time.Timer.start(); + var count: usize = 0; + while (timer.read() < 1 * std.time.ns_per_s) { ctx.mutex.lock(); count = messages_received.items.len; ctx.mutex.unlock(); if (count >= 2) break; std.time.sleep(10 * std.time.ns_per_ms); }
142-146: Same timeout nit here—make it 500 ms.Keeps the test stable on slower runners.
Apply this diff:
- try std.testing.expectError(error.Timeout, sub.nextMsg(100)); + try std.testing.expectError(error.Timeout, sub.nextMsg(500));
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
src/connection.zig(2 hunks)src/dispatcher.zig(2 hunks)src/subscription.zig(3 hunks)tests/all_tests.zig(1 hunks)tests/autounsubscribe_test.zig(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- tests/all_tests.zig
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-09-09T19:51:02.156Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#108
File: src/dispatcher.zig:60-65
Timestamp: 2025-09-09T19:51:02.156Z
Learning: In the NATS Zig codebase dispatcher.zig, when decrementing pending_bytes (atomic u64) with message_data_len (usize) using fetchSub, explicit casting is not required as Zig handles the implicit conversion automatically and the code builds successfully.
Applied to files:
src/subscription.zigsrc/dispatcher.zigtests/autounsubscribe_test.zig
📚 Learning: 2025-08-25T02:49:59.119Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#28
File: src/response_manager.zig:89-105
Timestamp: 2025-08-25T02:49:59.119Z
Learning: In this NATS Zig codebase, subscriptions are reference counted, so calling deinit() on a subscription from multiple locations (like both Connection and ResponseManager) is safe and doesn't cause double-free issues.
Applied to files:
src/subscription.zigsrc/dispatcher.zigtests/autounsubscribe_test.zig
📚 Learning: 2025-08-25T14:51:33.570Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#31
File: tests/jetstream_pull_test.zig:21-23
Timestamp: 2025-08-25T14:51:33.570Z
Learning: The nats.zig project uses a "tests:beforeEach" test in tests/all_tests.zig that runs before each test to clean up all JetStream streams and consumers, ensuring test isolation. This eliminates the need for unique test resource names as each test starts with a clean JetStream state.
Applied to files:
tests/autounsubscribe_test.zig
🔇 Additional comments (14)
src/dispatcher.zig (1)
133-135: Track delivery count correctly for autounsubscribe.The increment looks correct with
acq_relordering. The atomic increment ensures we get a unique delivered count for each message even with concurrent dispatches.src/connection.zig (4)
766-778: LGTM! Clean implementation of UNSUB with optional max.The function properly handles both cases - UNSUB with and without a message limit. The buffer is appropriately sized and errors propagate correctly.
780-806: Good separation of concerns for unsubscribe vs remove.The refactoring properly separates the protocol operation (sending UNSUB) from the internal bookkeeping (removing from subscriptions table). The error handling with logging provides good visibility.
808-819: Clean implementation of internal subscription removal.The mutex protection and reference counting look correct. Good debug logging for visibility.
1192-1194: Proper handling of unknown subscription IDs.Sending UNSUB for unknown sids is the right approach - it tells the server to stop sending messages for that subscription. The error logging helps with debugging.
src/subscription.zig (5)
61-63: Good use of atomics for concurrent access.Using atomic values with 0 as the sentinel for "no limit" avoids nullable types and race conditions. The initialization is clean.
181-205: Well-structured autounsubscribe implementation.Good validation, proper error handling, and correct ordering - sending UNSUB first before setting the limit atomically. The error set is comprehensive.
208-212: Pre-check prevents unnecessary blocking.Checking the limit before attempting to pop a message is efficient - avoids blocking on the queue when we know no more messages should be delivered.
220-228: Consistent delivered count tracking.The atomic increment with
acq_relmatches the dispatcher's approach. The post-delivery check for limit and subscription removal is correct.
149-153: Error handling for UNSUB during drain is appropriate.Even if UNSUB fails, continuing with the drain is the right choice since the draining flag will cause messages to be dropped anyway.
tests/autounsubscribe_test.zig (4)
10-16: Good sequencing: subscribe → autoUnsubscribe → flush.This ordering avoids races (server sees SUB and UNSUB with limit before any PUBs). Looks solid.
93-112: Error-path coverage looks correct.InvalidMax for 0 and MaxAlreadyReached after consuming 1 both read well against the API contract.
124-141: Delivered counter assertions are on point.Loading with .acquire after nextMsg() pairs well with an internal release/seq_cst increment. This gives meaningful coverage of the atomic counter.
51-60: Ensure message pointers remain valid beyond the async callback
The test’shandleMessageappends raw*Messagepointers, butDispatcher.processMessagecallsdispatch_msg.deinit()(which releases or recycles the underlying message) as soon as the callback returns. That makes stored pointers point to freed or reused memory. Instead, clone or deep-copy the message payload and headers before storing it inmessages.
When a connection reconnects, subscription autounsubscribe limits are now properly restored by calculating the remaining message count (max - delivered) and sending the appropriate UNSUB command with the adjusted limit to the server. This matches the behavior of nats.c and nats.go clients and ensures that subscribers receive exactly the number of messages specified in autoUnsubscribe(), regardless of connection interruptions. Changes: - Enhanced resendSubscriptions() to restore autounsubscribe limits - Added comprehensive test for reconnection behavior - Fixed potential resource leaks from unlimited message delivery after reconnect
Remove unnecessary UNSUB command during reconnection for subscriptions that have already reached their autounsubscribe limit. These subscriptions should simply be skipped rather than sending UNSUB for non-existent sids.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (5)
src/connection.zig (4)
766-778: Avoidcatch unreachableon fixed buffer writes; propagate errors instead.
writer.print(... ) catch unreachablecan hide real issues if the control line ever grows. Propagate the error; 256 bytes is ample today, but not a contract.- if (max) |m| { - writer.print("UNSUB {d} {d}\r\n", .{ sid, m }) catch unreachable; // Will always fit - } else { - writer.print("UNSUB {d}\r\n", .{sid}) catch unreachable; // Will always fit - } + if (max) |m| { + try writer.print("UNSUB {d} {d}\r\n", .{ sid, m }); + } else { + try writer.print("UNSUB {d}\r\n", .{sid}); + }
781-793: Unsubscribe table removal: drop the outerself.mutexto reduce contention.Subscription map ops are protected by
subs_mutex. Holdingself.mutexhere widens the critical section and increases deadlock surface without clear benefit.- { - self.mutex.lock(); - defer self.mutex.unlock(); - - self.subs_mutex.lock(); - defer self.subs_mutex.unlock(); + { + self.subs_mutex.lock(); + defer self.subs_mutex.unlock();
808-820: Guard against future reordering: snapshot subject before release.You log the subject then release. It’s fine now, but trivial refactors can move the release above the log. Snapshot the subject to avoid accidental UAF later.
- if (self.subscriptions.fetchRemove(sid)) |kv| { - log.debug("Removed subscription {d} ({s}) from connection", .{ sid, kv.value.subject }); - // Release connection's reference to the subscription - kv.value.release(); + if (self.subscriptions.fetchRemove(sid)) |kv| { + const subj = kv.value.subject; + log.debug("Removed subscription {d} ({s}) from connection", .{ sid, subj }); + // Release connection's reference to the subscription + kv.value.release();
1191-1195: Unknown SID UNSUB: downgrade log level to avoid noise.This path can be hit repeatedly during transient states. Errors here aren’t actionable and spam logs.
- self.unsubscribeInternal(message.sid, null) catch |err| { - log.err("Failed to send UNSUB for unknown sid {d}: {}", .{ message.sid, err }); - }; + self.unsubscribeInternal(message.sid, null) catch |err| { + log.debug("UNSUB for unknown sid {d} failed: {}", .{ message.sid, err }); + };tests/autounsubscribe_test.zig (1)
80-88: Async wait loop is tight; minor flake risk.1s is usually fine, but CI variance happens. Consider a slightly longer deadline (e.g., 2s) to reduce false negatives.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/connection.zig(3 hunks)tests/autounsubscribe_test.zig(1 hunks)
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: lalinsky
PR: lalinsky/nats.zig#28
File: src/response_manager.zig:89-105
Timestamp: 2025-08-25T02:49:59.119Z
Learning: In this NATS Zig codebase, subscriptions are reference counted, so calling deinit() on a subscription from multiple locations (like both Connection and ResponseManager) is safe and doesn't cause double-free issues.
📚 Learning: 2025-08-25T14:51:33.570Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#31
File: tests/jetstream_pull_test.zig:21-23
Timestamp: 2025-08-25T14:51:33.570Z
Learning: The nats.zig project uses a "tests:beforeEach" test in tests/all_tests.zig that runs before each test to clean up all JetStream streams and consumers, ensuring test isolation. This eliminates the need for unique test resource names as each test starts with a clean JetStream state.
Applied to files:
tests/autounsubscribe_test.zig
📚 Learning: 2025-08-25T02:49:59.119Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#28
File: src/response_manager.zig:89-105
Timestamp: 2025-08-25T02:49:59.119Z
Learning: In this NATS Zig codebase, subscriptions are reference counted, so calling deinit() on a subscription from multiple locations (like both Connection and ResponseManager) is safe and doesn't cause double-free issues.
Applied to files:
tests/autounsubscribe_test.zig
🔇 Additional comments (1)
tests/autounsubscribe_test.zig (1)
183-186: Confirm server-side limit is applied before publishing.You do
autoUnsubscribe(5)thenflush(), which should ensure the server received the UNSUB with max. Good. Keep this pattern; it prevents over-delivery into the local queue.
src/connection.zig
Outdated
| // Check autounsubscribe state | ||
| const max = sub.max_msgs.load(.acquire); | ||
| const delivered = sub.delivered_msgs.load(.acquire); | ||
|
|
||
| var adjusted_max: ?u64 = null; | ||
| if (max > 0) { | ||
| if (delivered < max) { | ||
| adjusted_max = max - delivered; // Remaining messages | ||
| } else { | ||
| // Already reached limit - don't re-subscribe at all | ||
| log.debug("Subscription {d} ({s}) already reached limit, skipping during reconnect", .{ sub.sid, sub.subject }); | ||
| continue; | ||
| } | ||
| } | ||
|
|
||
| // Send SUB command | ||
| if (sub.queue) |queue| { | ||
| try buffer.writer().print("SUB {s} {s} {d}\r\n", .{ sub.subject, queue, sub.sid }); | ||
| } else { | ||
| try buffer.writer().print("SUB {s} {d}\r\n", .{ sub.subject, sub.sid }); | ||
| } | ||
|
|
||
| log.debug("Re-subscribed to {s} with sid {d}", .{ sub.subject, sub.sid }); | ||
| // Send UNSUB with remaining limit if needed | ||
| if (adjusted_max) |remaining| { | ||
| try buffer.writer().print("UNSUB {d} {d}\r\n", .{ sub.sid, remaining }); | ||
| log.debug("Re-subscribed to {s} with sid {d} and autounsubscribe limit {d} (delivered: {d})", .{ sub.subject, sub.sid, remaining, delivered }); | ||
| } else { | ||
| log.debug("Re-subscribed to {s} with sid {d}", .{ sub.subject, sub.sid }); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Subscriptions that already reached max on reconnect aren’t removed (stale map entry).
When delivered >= max you skip re-subscribe but leave the entry in self.subscriptions. That leaks the connection’s reference and keeps a dead subscription around.
Apply this refactor to collect SIDs to remove and clean them up after unlocking:
fn resendSubscriptions(self: *Self) !void {
log.debug("Re-establishing subscriptions", .{});
- self.subs_mutex.lock();
- defer self.subs_mutex.unlock();
+ // Track SIDs that shouldn't be re-subscribed and must be removed
+ var to_remove = std.ArrayList(u64).init(self.allocator);
+ defer to_remove.deinit();
+
+ self.subs_mutex.lock();
+ defer self.subs_mutex.unlock();
const allocator = self.scratch.allocator();
defer self.resetScratch();
var buffer = ArrayList(u8).init(allocator);
var iter = self.subscriptions.iterator();
while (iter.next()) |entry| {
const sub = entry.value_ptr.*;
// Check autounsubscribe state
const max = sub.max_msgs.load(.acquire);
const delivered = sub.delivered_msgs.load(.acquire);
var adjusted_max: ?u64 = null;
if (max > 0) {
if (delivered < max) {
adjusted_max = max - delivered; // Remaining messages
} else {
- // Already reached limit - don't re-subscribe at all
- log.debug("Subscription {d} ({s}) already reached limit, skipping during reconnect", .{ sub.sid, sub.subject });
- continue;
+ // Already reached limit - don't re-subscribe; remove after unlock
+ log.debug("Subscription {d} ({s}) already reached limit; will remove during reconnect", .{ sub.sid, sub.subject });
+ try to_remove.append(sub.sid);
+ continue;
}
}
// Send SUB command
if (sub.queue) |queue| {
try buffer.writer().print("SUB {s} {s} {d}\r\n", .{ sub.subject, queue, sub.sid });
} else {
try buffer.writer().print("SUB {s} {d}\r\n", .{ sub.subject, sub.sid });
}
// Send UNSUB with remaining limit if needed
if (adjusted_max) |remaining| {
try buffer.writer().print("UNSUB {d} {d}\r\n", .{ sub.sid, remaining });
log.debug("Re-subscribed to {s} with sid {d} and autounsubscribe limit {d} (delivered: {d})", .{ sub.subject, sub.sid, remaining, delivered });
} else {
log.debug("Re-subscribed to {s} with sid {d}", .{ sub.subject, sub.sid });
}
}
// Send all subscription commands via write buffer
if (buffer.items.len > 0) {
try self.write_buffer.append(buffer.items);
}
+
+ // Now remove stale subs outside the subs_mutex
+ for (to_remove.items) |sid| {
+ self.removeSubscriptionInternal(sid);
+ }
}Also applies to: 1700-1704
| var timer = try std.time.Timer.start(); | ||
| while (ReconnectTracker.reconnected_called == 0) { | ||
| if (timer.read() >= 5000 * std.time.ns_per_ms) { | ||
| return error.ReconnectionTimeout; | ||
| } | ||
| std.time.sleep(10 * std.time.ns_per_ms); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Data race on ReconnectTracker.reconnected_called read.
You mutate under a mutex but read without locking. Use the mutex or an atomic to avoid racy reads.
- var timer = try std.time.Timer.start();
- while (ReconnectTracker.reconnected_called == 0) {
+ var timer = try std.time.Timer.start();
+ while (true) {
+ ReconnectTracker.mutex.lock();
+ const rc = ReconnectTracker.reconnected_called;
+ ReconnectTracker.mutex.unlock();
+ if (rc > 0) break;
if (timer.read() >= 5000 * std.time.ns_per_ms) {
return error.ReconnectionTimeout;
}
std.time.sleep(10 * std.time.ns_per_ms);
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| var timer = try std.time.Timer.start(); | |
| while (ReconnectTracker.reconnected_called == 0) { | |
| if (timer.read() >= 5000 * std.time.ns_per_ms) { | |
| return error.ReconnectionTimeout; | |
| } | |
| std.time.sleep(10 * std.time.ns_per_ms); | |
| } | |
| var timer = try std.time.Timer.start(); | |
| while (true) { | |
| ReconnectTracker.mutex.lock(); | |
| const rc = ReconnectTracker.reconnected_called; | |
| ReconnectTracker.mutex.unlock(); | |
| if (rc > 0) break; | |
| if (timer.read() >= 5000 * std.time.ns_per_ms) { | |
| return error.ReconnectionTimeout; | |
| } | |
| std.time.sleep(10 * std.time.ns_per_ms); | |
| } |
🤖 Prompt for AI Agents
In tests/autounsubscribe_test.zig around lines 208-214, the test reads
ReconnectTracker.reconnected_called without synchronization causing a data race
(writes happen under a mutex). Fix by either reading the value while holding the
same mutex used for mutations or change reconnected_called to an atomic integer
and use atomic load with appropriate ordering; ensure the sleep/loop logic uses
the synchronized/atomic read so the race is eliminated.
|
@claude when ignoring subscription on resend, also remove it from the internal map |
|
Claude finished @lalinsky's task —— View job ✅ Completed: Remove subscriptions from internal map during resendTodo List:
Changes MadeI've successfully implemented the fix to remove stale subscriptions from the internal map during resend operations in Key changes:
Technical details:
The implementation follows the exact pattern suggested in the code review comments and resolves the stale map entry issue without breaking existing functionality. Commit: |
When subscriptions have already reached their autounsubscribe limit during reconnection, collect their SIDs and remove them from the internal map after unlocking the mutex. This prevents memory leaks from keeping references to subscriptions that will never be used again. Co-authored-by: Lukáš Lalinský <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
src/connection.zig (1)
1657-1715: Re-subscribe logic handles autounsubscribe correctly and cleans up stale entries.
- Computing
remaining = max - deliveredand emittingUNSUB {sid} {remaining}is protocol-compliant.- Collecting SIDs to remove and purging them after releasing
subs_mutexfixes the stale map entry on reconnect. Nice.Optional:
- Add an inline note to prevent future misuse (this relies on caller holding the main mutex, which
doReconnectdoes):log.debug("Re-establishing subscriptions", .{}); + // Note: called from doReconnect() with self.mutex held; safe to use self.scratch here.
- If
write_buffer.append(buffer.items)can block, consider moving the append outsidesubs_mutex(you’d need to allocate the command batch with a non-scratch allocator or defer arena reset accordingly).To double-check atomic ordering across the codebase (that increments use release/acq_rel to match the
.load(.acquire)here), run:#!/bin/bash # Verify delivered/max counters use appropriate atomic orderings rg -n -C2 'delivered_msgs|max_msgs' --type zig
🧹 Nitpick comments (2)
src/connection.zig (2)
766-778: UNSUB formatting + buffering look correct.Optional: if
ConcurrentWriteBuffer.appendcan error when frozen during reconnects, consider documenting that callers should ignore failures (as you already do) or route UNSUBs through the reconnect pending buffer for consistency with publish.
1192-1194: Reduce log noise on unknown-sid UNSUB failures.During transient states this can get chatty. Suggest dropping to debug to avoid log spam.
- log.err("Failed to send UNSUB for unknown sid {d}: {}", .{ message.sid, err }); + log.debug("Failed to send UNSUB for unknown sid {d}: {}", .{ message.sid, err });
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/connection.zig(3 hunks)
🧰 Additional context used
🧠 Learnings (1)
📓 Common learnings
Learnt from: lalinsky
PR: lalinsky/nats.zig#28
File: src/response_manager.zig:89-105
Timestamp: 2025-08-25T02:49:59.119Z
Learning: In this NATS Zig codebase, subscriptions are reference counted, so calling deinit() on a subscription from multiple locations (like both Connection and ResponseManager) is safe and doesn't cause double-free issues.
🔇 Additional comments (2)
src/connection.zig (2)
781-793: Good order: remove from map before sending UNSUB.Lock order (main mutex → subs_mutex) matches the rest of the file and avoids races with
processMsg. Post-remove error handling is sensible.Also applies to: 798-801
808-819: Internal removal helper is solid.Using
fetchRemove()undersubs_mutexand releasing the connection’s ref aligns with the codebase’s refcounting model (per learnings: multiple deinit/release sites are safe).
Add automatic unsubscription capability that removes subscriptions after receiving a specified maximum number of messages. This feature is essential for request-response patterns and resource management.
Key features:
The implementation follows NATS protocol standards and includes extensive test coverage for all functionality and edge cases.