Skip to content

Conversation

@lalinsky
Copy link
Owner

Add automatic unsubscription capability that removes subscriptions after receiving a specified maximum number of messages. This feature is essential for request-response patterns and resource management.

Key features:

  • Atomic message delivery counting with proper memory ordering
  • Protocol-compliant UNSUB commands with message limits
  • Comprehensive error handling with rollback on failure
  • Support for both sync and async subscription types
  • Thread-safe subscription removal from connection state

The implementation follows NATS protocol standards and includes extensive test coverage for all functionality and edge cases.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 10, 2025

Walkthrough

Adds per-subscription auto-unsubscribe: subscriptions gain atomic max/delivered counters and an API to set limits; dispatcher and subscription logic track deliveries and remove subscriptions when limits are reached; connection handles UNSUB with optional max and supports internal removal; tests added for behavior and reconnection.

Changes

Cohort / File(s) Summary of changes
Connection unsubscribe & removal
src/connection.zig
unsubscribeInternal(self, sid, max: ?u64) !void — emits UNSUB {sid} or UNSUB {sid} {max}, returns errors; call sites updated to handle/log errors. Adds pub fn removeSubscriptionInternal(self, sid: u64) void to remove/release subscriptions with proper locking. Resend/reconnect logic updated to honor per-subscription remaining counts.
Dispatcher delivery counting & auto-remove
src/dispatcher.zig
Increments subscription.delivered_msgs (atomic fetchAdd .acq_rel) on delivery; after handler dispatch, if max_msgs > 0 and delivered >= max, logs and calls subscription.nc.removeSubscriptionInternal(subscription.sid) before normal pending decrements.
Subscription auto-unsubscribe API & state
src/subscription.zig
Adds atomic fields max_msgs and delivered_msgs; adds pub const AutoUnsubscribeError = error{ MaxAlreadyReached, InvalidMax, SubscriptionClosed, SendFailed } and pub fn autoUnsubscribe(self, max: u64) AutoUnsubscribeError!void. nextMsg and drain() updated to enforce/increment limits and remove subscription when reached.
Tests registration
tests/all_tests.zig
Imports tests/autounsubscribe_test.zig so new tests run.
Auto-unsubscribe tests
tests/autounsubscribe_test.zig
New test file with five tests: sync basic, async basic, error conditions (InvalidMax, MaxAlreadyReached), delivered counter verification, and reconnection behavior to validate resubscribe + remaining UNSUB semantics.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Title Check ✅ Passed The title clearly identifies the primary change—adding autounsubscribe functionality for subscriptions—and matches the code delta that adds max_msgs/delivered_msgs, updated unsubscribe behavior, and tests. It is concise, specific to the feature, and contains no distracting noise. This makes it easy for a reviewer scanning history to understand the main intent of the PR.
Description Check ✅ Passed The description succinctly summarizes the feature and lists key behaviors (atomic counting, protocol-compliant UNSUB with limits, error handling, sync/async support, and thread-safe removal), which aligns with the changes and added tests. It gives reviewers the necessary context about intent and scope without being off-topic or irrelevant. Given the provided changeset, the description is relevant and informative enough to pass this lenient check.
Docstring Coverage ✅ Passed No functions found in the changes. Docstring coverage check skipped.
✨ Finishing touches
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch autounsubscribe-implementation

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (13)
src/subscription.zig (3)

208-215: Question: API semantics vs. server semantics — confirm intention.

NATS treats UNSUB’s count as a total since subscription start; if current delivered ≥ max, the UNSUB is still valid and simply takes effect immediately. Your API returns MaxAlreadyReached instead. If this is intentional, consider documenting it on autoUnsubscribe() to avoid surprises for users migrating from other clients. Otherwise, drop the error and allow an immediate unsubscribe. (docs.nats.io)


208-215: Optional: prevent unbounded pending growth after limit reached.

If messages were already queued when the limit is reached, pre-check returns Timeout and those messages sit in the queue until destroy. Consider draining and dropping any remaining queued messages when the limit is reached (or in the connection’s removeSubscriptionInternal) to release memory and decrement pending counters.

If you want, I can wire a safe queue-drain path that decrements pending counters and deinits messages upon removal.


182-207: Add a short doc comment clarifying the “total vs remaining” behavior.

Future you (and users) will thank you.

Example:

/// autoUnsubscribe sets a total delivery budget since subscription start.
/// The server UNSUB count is total, not remaining. If current delivered >= max,
/// this call returns MaxAlreadyReached (immediate unsubscribe intent).
pub fn autoUnsubscribe(self: *Subscription, max: u64) AutoUnsubscribeError!void { ... }
src/connection.zig (5)

748-760: Prefer MAX_CONTROL_LINE_SIZE and avoid catch unreachable in UNSUB builder

Use the existing control-line size constant and bufPrint to eliminate the magic 256 and unreachable paths.

-    pub fn unsubscribeInternal(self: *Self, sid: u64, max: ?u64) !void {
-        var buffer: [256]u8 = undefined;
-        var stream = std.io.fixedBufferStream(&buffer);
-        var writer = stream.writer();
-
-        if (max) |m| {
-            writer.print("UNSUB {d} {d}\r\n", .{ sid, m }) catch unreachable; // Will always fit
-        } else {
-            writer.print("UNSUB {d}\r\n", .{sid}) catch unreachable; // Will always fit
-        }
-
-        try self.write_buffer.append(stream.getWritten());
-    }
+    pub fn unsubscribeInternal(self: *Self, sid: u64, max: ?u64) !void {
+        var buf: [MAX_CONTROL_LINE_SIZE]u8 = undefined;
+        const line = if (max) |m|
+            try std.fmt.bufPrint(&buf, "UNSUB {d} {d}\r\n", .{ sid, m })
+        else
+            try std.fmt.bufPrint(&buf, "UNSUB {d}\r\n", .{ sid });
+        try self.write_buffer.append(line);
+    }

777-785: Send UNSUB outside subs_mutex to shorten the critical section

You can call unsubscribeInternal after releasing subs_mutex (map already updated), reducing contention and avoiding I/O inside the lock.

Example:

// inside unsubscribe()
_ = self.subscriptions.remove(sub.sid);
// unlock subs_mutex here, then:
self.unsubscribeInternal(sub.sid, null) catch |err| {
    log.err("Failed to send UNSUB for sid {d}: {}", .{ sub.sid, err });
};

787-799: Log subject on removal for better traceability

Including the subject makes ops debugging easier.

-        if (self.subscriptions.fetchRemove(sid)) |kv| {
-            log.debug("Removed subscription {d} from connection", .{sid});
+        if (self.subscriptions.fetchRemove(sid)) |kv| {
+            log.debug("Removed subscription {d} ({s}) from connection", .{ sid, kv.value.subject });
             // Release connection's reference to the subscription
             kv.value.release();
         }

1168-1171: Unknown sid path: consider skipping redundant UNSUB attempts

If we get a message for an unknown sid, sending another UNSUB is typically redundant. Dropping the message (as you already do) might be sufficient; alternatively, gate this with a one-shot per sid to avoid repeated UNSUBs on pathological inputs.


1612-1641: Reapply autounsubscribe budget on reconnect

resendSubscriptions() reissues SUBs but doesn’t reapply any remaining max delivery budget. If a sub had max_msgs and wasn’t yet satisfied before disconnect, the server will forget the prior UNSUB sid max. Suggest: after each SUB, check sub.max_msgs vs sub.delivered_msgs and send UNSUB sid remaining when remaining > 0. This keeps server-side enforcement aligned post-reconnect and reduces over-delivery.

Happy to wire this in here or via a subscription-level hook (e.g., sub.onResubscribed(conn)).

tests/autounsubscribe_test.zig (5)

10-15: Flush after setting autounsubscribe to remove ordering races

Ensures SUB and UNSUB(max) are processed before PUBs.

     const sub = try conn.subscribeSync("auto.test");
     defer sub.deinit();

     // Set autounsubscribe limit to 3 messages
     try sub.autoUnsubscribe(3);
+    try conn.flush();

77-86: Replace fixed sleep with a bounded wait loop to avoid flakes

Polling with a short sleep until count >= 2 (or timeout) is more stable on busy CI.

-    // Wait a bit for message processing
-    std.time.sleep(100 * std.time.ns_per_ms);
-
-    // Should have received exactly 2 messages
-    ctx.mutex.lock();
-    const count = messages_received.items.len;
-    ctx.mutex.unlock();
-
-    try std.testing.expectEqual(@as(usize, 2), count);
+    const deadline_ms = std.time.milliTimestamp() + 1000;
+    var count: usize = 0;
+    while (std.time.milliTimestamp() < deadline_ms) {
+        ctx.mutex.lock();
+        count = messages_received.items.len;
+        ctx.mutex.unlock();
+        if (count >= 2) break;
+        std.time.sleep(10 * std.time.ns_per_ms);
+    }
+    try std.testing.expectEqual(@as(usize, 2), count);

118-135: Stronger memory ordering for cross-thread reads

Use .acquire when asserting on atomics potentially updated by another thread.

-    try std.testing.expectEqual(@as(u64, 0), sub.delivered_msgs.load(.monotonic));
+    try std.testing.expectEqual(@as(u64, 0), sub.delivered_msgs.load(.acquire));-    try std.testing.expectEqual(@as(u64, 1), sub.delivered_msgs.load(.monotonic));
+    try std.testing.expectEqual(@as(u64, 1), sub.delivered_msgs.load(.acquire));-    try std.testing.expectEqual(@as(u64, 2), sub.delivered_msgs.load(.monotonic));
+    try std.testing.expectEqual(@as(u64, 2), sub.delivered_msgs.load(.acquire));

6-36: Optional: flush after subscribe to fully eliminate timing dependencies

Not strictly required given FIFO write buffer, but it makes the test intent explicit.

     const sub = try conn.subscribeSync("auto.test");
     defer sub.deinit();
+    try conn.flush();

88-107: Add a reconnect scenario to cover server budget loss

Add a test that sets autoUnsubscribe(2), delivers 1, forces a reconnect, publishes 2 more, and asserts only 1 more is delivered (server-side budget re-applied) or zero more if you enforce purely client-side removal. This will validate the reconnect behavior discussed in the code review.

I can draft this test once you confirm the intended semantics across reconnect.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0b36954 and afc2168.

📒 Files selected for processing (5)
  • src/connection.zig (3 hunks)
  • src/dispatcher.zig (2 hunks)
  • src/subscription.zig (3 hunks)
  • tests/all_tests.zig (1 hunks)
  • tests/autounsubscribe_test.zig (1 hunks)
🧰 Additional context used
🧠 Learnings (4)
📓 Common learnings
Learnt from: lalinsky
PR: lalinsky/nats.zig#28
File: src/response_manager.zig:89-105
Timestamp: 2025-08-25T02:49:59.119Z
Learning: In this NATS Zig codebase, subscriptions are reference counted, so calling deinit() on a subscription from multiple locations (like both Connection and ResponseManager) is safe and doesn't cause double-free issues.
📚 Learning: 2025-08-25T14:51:33.570Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#31
File: tests/jetstream_pull_test.zig:21-23
Timestamp: 2025-08-25T14:51:33.570Z
Learning: The nats.zig project uses a "tests:beforeEach" test in tests/all_tests.zig that runs before each test to clean up all JetStream streams and consumers, ensuring test isolation. This eliminates the need for unique test resource names as each test starts with a clean JetStream state.

Applied to files:

  • tests/all_tests.zig
  • tests/autounsubscribe_test.zig
📚 Learning: 2025-08-25T02:49:59.119Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#28
File: src/response_manager.zig:89-105
Timestamp: 2025-08-25T02:49:59.119Z
Learning: In this NATS Zig codebase, subscriptions are reference counted, so calling deinit() on a subscription from multiple locations (like both Connection and ResponseManager) is safe and doesn't cause double-free issues.

Applied to files:

  • src/dispatcher.zig
  • tests/autounsubscribe_test.zig
📚 Learning: 2025-09-09T19:51:02.156Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#108
File: src/dispatcher.zig:60-65
Timestamp: 2025-09-09T19:51:02.156Z
Learning: In the NATS Zig codebase dispatcher.zig, when decrementing pending_bytes (atomic u64) with message_data_len (usize) using fetchSub, explicit casting is not required as Zig handles the implicit conversion automatically and the code builds successfully.

Applied to files:

  • src/dispatcher.zig
  • tests/autounsubscribe_test.zig
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: test
🔇 Additional comments (3)
tests/all_tests.zig (1)

10-10: Add import: looks good.

Autounsubscribe tests now run with the suite. No concerns.

src/dispatcher.zig (1)

133-135: Delivered counter increment placement is correct.

Incrementing before invoking the handler matches “messages sent” semantics and ensures the autounsubscribe check can trigger deterministically afterward.

src/subscription.zig (1)

144-147: Drain UNSUB error handling is fine.

Catching and logging the failure while continuing drain bookkeeping is reasonable. No change requested.

Comment on lines 149 to 155
// Check autounsubscribe limit after successful message delivery
if (subscription.max_msgs) |max| {
if (delivered >= max) {
log.debug("Subscription {} reached autounsubscribe limit ({}), removing", .{ subscription.sid, max });
subscription.nc.removeSubscriptionInternal(subscription.sid);
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Data race on max limit (max_msgs) — make it atomic and load with memory ordering.

subscription.max_msgs is a plain nullable field written by user code (autoUnsubscribe) and read here by the dispatcher thread. That’s a concurrent read/write with no synchronization. Convert the limit to an atomic u64 with 0 as “no limit”, and load it here with .acquire. Then compare against delivered. Apply along with the field/type change in subscription.zig (see my comment there).

Apply this diff after changing the field name/type to max_total: std.atomic.Value(u64):

-        if (subscription.max_msgs) |max| {
-            if (delivered >= max) {
-                log.debug("Subscription {} reached autounsubscribe limit ({}), removing", .{ subscription.sid, max });
-                subscription.nc.removeSubscriptionInternal(subscription.sid);
-            }
-        }
+        const max = subscription.max_total.load(.acquire);
+        if (max != 0 and delivered >= max) {
+            log.debug("Subscription {} reached autounsubscribe limit ({}), removing", .{ subscription.sid, max });
+            subscription.nc.removeSubscriptionInternal(subscription.sid);
+        }

Note: Using 0 as sentinel avoids nullable reads and removes the race window.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Check autounsubscribe limit after successful message delivery
if (subscription.max_msgs) |max| {
if (delivered >= max) {
log.debug("Subscription {} reached autounsubscribe limit ({}), removing", .{ subscription.sid, max });
subscription.nc.removeSubscriptionInternal(subscription.sid);
}
}
// Check autounsubscribe limit after successful message delivery
const max = subscription.max_total.load(.acquire);
if (max != 0 and delivered >= max) {
log.debug("Subscription {} reached autounsubscribe limit ({}), removing", .{ subscription.sid, max });
subscription.nc.removeSubscriptionInternal(subscription.sid);
}
🤖 Prompt for AI Agents
In src/dispatcher.zig around lines 149 to 156, the code reads the user-writable
nullable subscription.max_msgs without synchronization causing a data race;
after you change the field in subscription.zig to max_total:
std.atomic.Value(u64) (0 = no limit), replace the nullable read with an atomic
acquire load, store the loaded u64 into a local (e.g., max =
subscription.max_total.load(.Acquire)), skip the check when max == 0, and
otherwise compare delivered >= max and call
subscription.nc.removeSubscriptionInternal(subscription.sid) when the limit is
reached; ensure you use .Acquire ordering for the load to synchronize with
writes from autoUnsubscribe.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
src/subscription.zig (2)

61-64: Atomic autounsubscribe state: LGTM.

Using atomic u64 with 0 sentinel is the right fix. Consider renaming to max_total for clarity (total since sub start).

-    max_msgs: std.atomic.Value(u64) = std.atomic.Value(u64).init(0), // 0 means no limit
+    // total messages since subscription start; 0 = no limit
+    max_total: std.atomic.Value(u64) = std.atomic.Value(u64).init(0),

182-206: Document autoUnsubscribe repeat semantics Update the doc comment to state that each invocation sends an UNSUB with the new max, resetting the server-side cap to the most recent total, and that calls with max == 0 or max <= delivered_msgs will return InvalidMax or MaxAlreadyReached, respectively.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between afc2168 and 3fc8089.

📒 Files selected for processing (4)
  • src/connection.zig (2 hunks)
  • src/dispatcher.zig (2 hunks)
  • src/subscription.zig (3 hunks)
  • tests/autounsubscribe_test.zig (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • tests/autounsubscribe_test.zig
  • src/connection.zig
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: lalinsky
PR: lalinsky/nats.zig#28
File: src/response_manager.zig:89-105
Timestamp: 2025-08-25T02:49:59.119Z
Learning: In this NATS Zig codebase, subscriptions are reference counted, so calling deinit() on a subscription from multiple locations (like both Connection and ResponseManager) is safe and doesn't cause double-free issues.
📚 Learning: 2025-08-25T02:49:59.119Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#28
File: src/response_manager.zig:89-105
Timestamp: 2025-08-25T02:49:59.119Z
Learning: In this NATS Zig codebase, subscriptions are reference counted, so calling deinit() on a subscription from multiple locations (like both Connection and ResponseManager) is safe and doesn't cause double-free issues.

Applied to files:

  • src/dispatcher.zig
  • src/subscription.zig
📚 Learning: 2025-09-09T19:51:02.156Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#108
File: src/dispatcher.zig:60-65
Timestamp: 2025-09-09T19:51:02.156Z
Learning: In the NATS Zig codebase dispatcher.zig, when decrementing pending_bytes (atomic u64) with message_data_len (usize) using fetchSub, explicit casting is not required as Zig handles the implicit conversion automatically and the code builds successfully.

Applied to files:

  • src/dispatcher.zig
  • src/subscription.zig
🔇 Additional comments (4)
src/dispatcher.zig (1)

149-154: removeSubscriptionInternal is already idempotent
The function locks subs_mutex and uses fetchRemove, so a second (or concurrent) call with the same sid simply does nothing and never calls release() again. No changes needed.

src/subscription.zig (3)

144-146: Drain UNSUB error handling: LGTM.

Sending UNSUB (no max) and logging on failure is appropriate for drain.


208-214: Pre-check before pop: LGTM.

Atomic load + 0-sentinel keeps sync consumers from reading past the cap.


221-230: Post-pop accounting: LGTM; mirrors dispatcher.

Increment with fetchAdd and remove when >= max is fine for the sync path.

Comment on lines +133 to +135
// Increment delivered counter for autounsubscribe
const delivered = subscription.delivered_msgs.fetchAdd(1, .acq_rel) + 1;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Prevent over-delivery: guard before invoking the handler.

If multiple messages are already queued for this sid, the current check (after handler) can still deliver >max to the app. Guard immediately after increment and drop messages when delivered > max.

         // Increment delivered counter for autounsubscribe
-        const delivered = subscription.delivered_msgs.fetchAdd(1, .acq_rel) + 1;
+        const delivered = subscription.delivered_msgs.fetchAdd(1, .acq_rel) + 1;
+        const max_for_drop = subscription.max_msgs.load(.acquire);
+        if (max_for_drop > 0 and delivered > max_for_drop) {
+            log.debug("Dropping message beyond autounsubscribe limit for sid {}", .{subscription.sid});
+            // Message ownership hasn't been transferred; we must clean up.
+            message.deinit();
+            subscription_mod.decrementPending(subscription, message_data_len);
+            return;
+        }

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In src/dispatcher.zig around lines 133-135, the delivered counter is incremented
but the max_msgs guard runs after the handler, allowing over-delivery; move the
max check immediately after the fetchAdd so that if subscription.max_msgs != 0
and delivered > subscription.max_msgs you drop the message and do not call the
handler. Ensure you free or release the message resources and continue the
dispatch loop (or return) without invoking the handler; do not perform handler
invocation when the post-increment delivered value exceeds max_msgs.

Add automatic unsubscription capability that removes subscriptions
after receiving a specified maximum number of messages. This feature
is essential for request-response patterns and resource management.

Key features:
- Atomic message delivery counting with proper memory ordering
- Protocol-compliant UNSUB commands with message limits
- Comprehensive error handling with rollback on failure
- Support for both sync and async subscription types
- Thread-safe subscription removal from connection state

The implementation follows NATS protocol standards and includes
extensive test coverage for all functionality and edge cases.
Change max_msgs from optional u64 to atomic u64 (0 = no limit) to ensure
thread-safe access from both dispatcher and subscription threads. Also
set max_msgs only after successfully sending UNSUB to server to avoid
inconsistent state on send failures.

Key changes:
- max_msgs: ?u64 -> std.atomic.Value(u64) with 0 meaning no limit
- Use atomic load/store operations with proper memory ordering
- Set limit after UNSUB command succeeds, not before
- Update dispatcher.zig to use atomic operations
Address code review suggestions to improve robustness and code quality:

- Move UNSUB sending outside subscription mutex locks to reduce critical section time
- Add subject to removal log messages for better debugging traceability
- Add documentation comment explaining autounsubscribe behavior
- Add flush() calls in tests after setting autounsubscribe to prevent timing races
- Replace sleep with bounded wait loop in async test to avoid flakes
- Use .acquire memory ordering for cross-thread atomic reads in tests

These changes improve thread safety, test reliability, and code maintainability
while keeping all functionality working correctly.
@lalinsky lalinsky force-pushed the autounsubscribe-implementation branch from 3fc8089 to c3c691e Compare September 10, 2025 14:14
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
src/dispatcher.zig (1)

149-154: Ensure messages aren't over-delivered beyond the limit.

The autounsubscribe check happens after the handler executes, meaning if multiple messages are already queued for this subscription, the handler could receive more than max messages before the subscription is removed. Consider checking the limit before invoking the handler to prevent over-delivery.

Apply this diff to check the limit earlier:

         // Increment delivered counter for autounsubscribe
         const delivered = subscription.delivered_msgs.fetchAdd(1, .acq_rel) + 1;
+        
+        // Check if we've already exceeded the limit
+        const max = subscription.max_msgs.load(.acquire);
+        if (max > 0 and delivered > max) {
+            log.debug("Dropping message {} beyond autounsubscribe limit {} for sid {}", .{ delivered, max, subscription.sid });
+            message.deinit();
+            subscription_mod.decrementPending(subscription, message_data_len);
+            return;
+        }

         // Call the subscription's handler in this dispatcher thread context

Then you can still keep the existing check after the handler to handle the delivered == max case for subscription removal.

🧹 Nitpick comments (3)
tests/autounsubscribe_test.zig (3)

35-37: Bump the post-limit nextMsg() timeout to reduce CI flakiness.

100 ms can be tight under load; use 500 ms for stability.

Apply this diff:

-    try std.testing.expectError(error.Timeout, sub.nextMsg(100));
+    try std.testing.expectError(error.Timeout, sub.nextMsg(500));

79-91: Use a monotonic timer instead of milliTimestamp to avoid clock jumps.

std.time.Timer is monotonic; avoids flakiness on clock adjustments.

Apply this diff:

-    const deadline_ms = std.time.milliTimestamp() + 1000;
-    var count: usize = 0;
-    while (std.time.milliTimestamp() < deadline_ms) {
+    var timer = try std.time.Timer.start();
+    var count: usize = 0;
+    while (timer.read() < 1 * std.time.ns_per_s) {
         ctx.mutex.lock();
         count = messages_received.items.len;
         ctx.mutex.unlock();
         if (count >= 2) break;
         std.time.sleep(10 * std.time.ns_per_ms);
     }

142-146: Same timeout nit here—make it 500 ms.

Keeps the test stable on slower runners.

Apply this diff:

-    try std.testing.expectError(error.Timeout, sub.nextMsg(100));
+    try std.testing.expectError(error.Timeout, sub.nextMsg(500));
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3fc8089 and c3c691e.

📒 Files selected for processing (5)
  • src/connection.zig (2 hunks)
  • src/dispatcher.zig (2 hunks)
  • src/subscription.zig (3 hunks)
  • tests/all_tests.zig (1 hunks)
  • tests/autounsubscribe_test.zig (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • tests/all_tests.zig
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-09-09T19:51:02.156Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#108
File: src/dispatcher.zig:60-65
Timestamp: 2025-09-09T19:51:02.156Z
Learning: In the NATS Zig codebase dispatcher.zig, when decrementing pending_bytes (atomic u64) with message_data_len (usize) using fetchSub, explicit casting is not required as Zig handles the implicit conversion automatically and the code builds successfully.

Applied to files:

  • src/subscription.zig
  • src/dispatcher.zig
  • tests/autounsubscribe_test.zig
📚 Learning: 2025-08-25T02:49:59.119Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#28
File: src/response_manager.zig:89-105
Timestamp: 2025-08-25T02:49:59.119Z
Learning: In this NATS Zig codebase, subscriptions are reference counted, so calling deinit() on a subscription from multiple locations (like both Connection and ResponseManager) is safe and doesn't cause double-free issues.

Applied to files:

  • src/subscription.zig
  • src/dispatcher.zig
  • tests/autounsubscribe_test.zig
📚 Learning: 2025-08-25T14:51:33.570Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#31
File: tests/jetstream_pull_test.zig:21-23
Timestamp: 2025-08-25T14:51:33.570Z
Learning: The nats.zig project uses a "tests:beforeEach" test in tests/all_tests.zig that runs before each test to clean up all JetStream streams and consumers, ensuring test isolation. This eliminates the need for unique test resource names as each test starts with a clean JetStream state.

Applied to files:

  • tests/autounsubscribe_test.zig
🔇 Additional comments (14)
src/dispatcher.zig (1)

133-135: Track delivery count correctly for autounsubscribe.

The increment looks correct with acq_rel ordering. The atomic increment ensures we get a unique delivered count for each message even with concurrent dispatches.

src/connection.zig (4)

766-778: LGTM! Clean implementation of UNSUB with optional max.

The function properly handles both cases - UNSUB with and without a message limit. The buffer is appropriately sized and errors propagate correctly.


780-806: Good separation of concerns for unsubscribe vs remove.

The refactoring properly separates the protocol operation (sending UNSUB) from the internal bookkeeping (removing from subscriptions table). The error handling with logging provides good visibility.


808-819: Clean implementation of internal subscription removal.

The mutex protection and reference counting look correct. Good debug logging for visibility.


1192-1194: Proper handling of unknown subscription IDs.

Sending UNSUB for unknown sids is the right approach - it tells the server to stop sending messages for that subscription. The error logging helps with debugging.

src/subscription.zig (5)

61-63: Good use of atomics for concurrent access.

Using atomic values with 0 as the sentinel for "no limit" avoids nullable types and race conditions. The initialization is clean.


181-205: Well-structured autounsubscribe implementation.

Good validation, proper error handling, and correct ordering - sending UNSUB first before setting the limit atomically. The error set is comprehensive.


208-212: Pre-check prevents unnecessary blocking.

Checking the limit before attempting to pop a message is efficient - avoids blocking on the queue when we know no more messages should be delivered.


220-228: Consistent delivered count tracking.

The atomic increment with acq_rel matches the dispatcher's approach. The post-delivery check for limit and subscription removal is correct.


149-153: Error handling for UNSUB during drain is appropriate.

Even if UNSUB fails, continuing with the drain is the right choice since the draining flag will cause messages to be dropped anyway.

tests/autounsubscribe_test.zig (4)

10-16: Good sequencing: subscribe → autoUnsubscribe → flush.

This ordering avoids races (server sees SUB and UNSUB with limit before any PUBs). Looks solid.


93-112: Error-path coverage looks correct.

InvalidMax for 0 and MaxAlreadyReached after consuming 1 both read well against the API contract.


124-141: Delivered counter assertions are on point.

Loading with .acquire after nextMsg() pairs well with an internal release/seq_cst increment. This gives meaningful coverage of the atomic counter.


51-60: Ensure message pointers remain valid beyond the async callback
The test’s handleMessage appends raw *Message pointers, but Dispatcher.processMessage calls dispatch_msg.deinit() (which releases or recycles the underlying message) as soon as the callback returns. That makes stored pointers point to freed or reused memory. Instead, clone or deep-copy the message payload and headers before storing it in messages.

When a connection reconnects, subscription autounsubscribe limits are now properly
restored by calculating the remaining message count (max - delivered) and sending
the appropriate UNSUB command with the adjusted limit to the server.

This matches the behavior of nats.c and nats.go clients and ensures that
subscribers receive exactly the number of messages specified in autoUnsubscribe(),
regardless of connection interruptions.

Changes:
- Enhanced resendSubscriptions() to restore autounsubscribe limits
- Added comprehensive test for reconnection behavior
- Fixed potential resource leaks from unlimited message delivery after reconnect
Remove unnecessary UNSUB command during reconnection for subscriptions
that have already reached their autounsubscribe limit. These subscriptions
should simply be skipped rather than sending UNSUB for non-existent sids.
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (5)
src/connection.zig (4)

766-778: Avoid catch unreachable on fixed buffer writes; propagate errors instead.

writer.print(... ) catch unreachable can hide real issues if the control line ever grows. Propagate the error; 256 bytes is ample today, but not a contract.

-        if (max) |m| {
-            writer.print("UNSUB {d} {d}\r\n", .{ sid, m }) catch unreachable; // Will always fit
-        } else {
-            writer.print("UNSUB {d}\r\n", .{sid}) catch unreachable; // Will always fit
-        }
+        if (max) |m| {
+            try writer.print("UNSUB {d} {d}\r\n", .{ sid, m });
+        } else {
+            try writer.print("UNSUB {d}\r\n", .{sid});
+        }

781-793: Unsubscribe table removal: drop the outer self.mutex to reduce contention.

Subscription map ops are protected by subs_mutex. Holding self.mutex here widens the critical section and increases deadlock surface without clear benefit.

-        {
-            self.mutex.lock();
-            defer self.mutex.unlock();
-
-            self.subs_mutex.lock();
-            defer self.subs_mutex.unlock();
+        {
+            self.subs_mutex.lock();
+            defer self.subs_mutex.unlock();

808-820: Guard against future reordering: snapshot subject before release.

You log the subject then release. It’s fine now, but trivial refactors can move the release above the log. Snapshot the subject to avoid accidental UAF later.

-        if (self.subscriptions.fetchRemove(sid)) |kv| {
-            log.debug("Removed subscription {d} ({s}) from connection", .{ sid, kv.value.subject });
-            // Release connection's reference to the subscription
-            kv.value.release();
+        if (self.subscriptions.fetchRemove(sid)) |kv| {
+            const subj = kv.value.subject;
+            log.debug("Removed subscription {d} ({s}) from connection", .{ sid, subj });
+            // Release connection's reference to the subscription
+            kv.value.release();

1191-1195: Unknown SID UNSUB: downgrade log level to avoid noise.

This path can be hit repeatedly during transient states. Errors here aren’t actionable and spam logs.

-            self.unsubscribeInternal(message.sid, null) catch |err| {
-                log.err("Failed to send UNSUB for unknown sid {d}: {}", .{ message.sid, err });
-            };
+            self.unsubscribeInternal(message.sid, null) catch |err| {
+                log.debug("UNSUB for unknown sid {d} failed: {}", .{ message.sid, err });
+            };
tests/autounsubscribe_test.zig (1)

80-88: Async wait loop is tight; minor flake risk.

1s is usually fine, but CI variance happens. Consider a slightly longer deadline (e.g., 2s) to reduce false negatives.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c3c691e and 2ffb3c6.

📒 Files selected for processing (2)
  • src/connection.zig (3 hunks)
  • tests/autounsubscribe_test.zig (1 hunks)
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: lalinsky
PR: lalinsky/nats.zig#28
File: src/response_manager.zig:89-105
Timestamp: 2025-08-25T02:49:59.119Z
Learning: In this NATS Zig codebase, subscriptions are reference counted, so calling deinit() on a subscription from multiple locations (like both Connection and ResponseManager) is safe and doesn't cause double-free issues.
📚 Learning: 2025-08-25T14:51:33.570Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#31
File: tests/jetstream_pull_test.zig:21-23
Timestamp: 2025-08-25T14:51:33.570Z
Learning: The nats.zig project uses a "tests:beforeEach" test in tests/all_tests.zig that runs before each test to clean up all JetStream streams and consumers, ensuring test isolation. This eliminates the need for unique test resource names as each test starts with a clean JetStream state.

Applied to files:

  • tests/autounsubscribe_test.zig
📚 Learning: 2025-08-25T02:49:59.119Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#28
File: src/response_manager.zig:89-105
Timestamp: 2025-08-25T02:49:59.119Z
Learning: In this NATS Zig codebase, subscriptions are reference counted, so calling deinit() on a subscription from multiple locations (like both Connection and ResponseManager) is safe and doesn't cause double-free issues.

Applied to files:

  • tests/autounsubscribe_test.zig
🔇 Additional comments (1)
tests/autounsubscribe_test.zig (1)

183-186: Confirm server-side limit is applied before publishing.

You do autoUnsubscribe(5) then flush(), which should ensure the server received the UNSUB with max. Good. Keep this pattern; it prevents over-delivery into the local queue.

Comment on lines 1669 to 1697
// Check autounsubscribe state
const max = sub.max_msgs.load(.acquire);
const delivered = sub.delivered_msgs.load(.acquire);

var adjusted_max: ?u64 = null;
if (max > 0) {
if (delivered < max) {
adjusted_max = max - delivered; // Remaining messages
} else {
// Already reached limit - don't re-subscribe at all
log.debug("Subscription {d} ({s}) already reached limit, skipping during reconnect", .{ sub.sid, sub.subject });
continue;
}
}

// Send SUB command
if (sub.queue) |queue| {
try buffer.writer().print("SUB {s} {s} {d}\r\n", .{ sub.subject, queue, sub.sid });
} else {
try buffer.writer().print("SUB {s} {d}\r\n", .{ sub.subject, sub.sid });
}

log.debug("Re-subscribed to {s} with sid {d}", .{ sub.subject, sub.sid });
// Send UNSUB with remaining limit if needed
if (adjusted_max) |remaining| {
try buffer.writer().print("UNSUB {d} {d}\r\n", .{ sub.sid, remaining });
log.debug("Re-subscribed to {s} with sid {d} and autounsubscribe limit {d} (delivered: {d})", .{ sub.subject, sub.sid, remaining, delivered });
} else {
log.debug("Re-subscribed to {s} with sid {d}", .{ sub.subject, sub.sid });
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Subscriptions that already reached max on reconnect aren’t removed (stale map entry).

When delivered >= max you skip re-subscribe but leave the entry in self.subscriptions. That leaks the connection’s reference and keeps a dead subscription around.

Apply this refactor to collect SIDs to remove and clean them up after unlocking:

 fn resendSubscriptions(self: *Self) !void {
     log.debug("Re-establishing subscriptions", .{});

-    self.subs_mutex.lock();
-    defer self.subs_mutex.unlock();
+    // Track SIDs that shouldn't be re-subscribed and must be removed
+    var to_remove = std.ArrayList(u64).init(self.allocator);
+    defer to_remove.deinit();
+
+    self.subs_mutex.lock();
+    defer self.subs_mutex.unlock();

     const allocator = self.scratch.allocator();
     defer self.resetScratch();

     var buffer = ArrayList(u8).init(allocator);

     var iter = self.subscriptions.iterator();
     while (iter.next()) |entry| {
         const sub = entry.value_ptr.*;

         // Check autounsubscribe state
         const max = sub.max_msgs.load(.acquire);
         const delivered = sub.delivered_msgs.load(.acquire);

         var adjusted_max: ?u64 = null;
         if (max > 0) {
             if (delivered < max) {
                 adjusted_max = max - delivered; // Remaining messages
             } else {
-                // Already reached limit - don't re-subscribe at all
-                log.debug("Subscription {d} ({s}) already reached limit, skipping during reconnect", .{ sub.sid, sub.subject });
-                continue;
+                // Already reached limit - don't re-subscribe; remove after unlock
+                log.debug("Subscription {d} ({s}) already reached limit; will remove during reconnect", .{ sub.sid, sub.subject });
+                try to_remove.append(sub.sid);
+                continue;
             }
         }

         // Send SUB command
         if (sub.queue) |queue| {
             try buffer.writer().print("SUB {s} {s} {d}\r\n", .{ sub.subject, queue, sub.sid });
         } else {
             try buffer.writer().print("SUB {s} {d}\r\n", .{ sub.subject, sub.sid });
         }

         // Send UNSUB with remaining limit if needed
         if (adjusted_max) |remaining| {
             try buffer.writer().print("UNSUB {d} {d}\r\n", .{ sub.sid, remaining });
             log.debug("Re-subscribed to {s} with sid {d} and autounsubscribe limit {d} (delivered: {d})", .{ sub.subject, sub.sid, remaining, delivered });
         } else {
             log.debug("Re-subscribed to {s} with sid {d}", .{ sub.subject, sub.sid });
         }
     }

     // Send all subscription commands via write buffer
     if (buffer.items.len > 0) {
         try self.write_buffer.append(buffer.items);
     }
+
+    // Now remove stale subs outside the subs_mutex
+    for (to_remove.items) |sid| {
+        self.removeSubscriptionInternal(sid);
+    }
 }

Also applies to: 1700-1704

Comment on lines +208 to +214
var timer = try std.time.Timer.start();
while (ReconnectTracker.reconnected_called == 0) {
if (timer.read() >= 5000 * std.time.ns_per_ms) {
return error.ReconnectionTimeout;
}
std.time.sleep(10 * std.time.ns_per_ms);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Data race on ReconnectTracker.reconnected_called read.

You mutate under a mutex but read without locking. Use the mutex or an atomic to avoid racy reads.

-    var timer = try std.time.Timer.start();
-    while (ReconnectTracker.reconnected_called == 0) {
+    var timer = try std.time.Timer.start();
+    while (true) {
+        ReconnectTracker.mutex.lock();
+        const rc = ReconnectTracker.reconnected_called;
+        ReconnectTracker.mutex.unlock();
+        if (rc > 0) break;
         if (timer.read() >= 5000 * std.time.ns_per_ms) {
             return error.ReconnectionTimeout;
         }
         std.time.sleep(10 * std.time.ns_per_ms);
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
var timer = try std.time.Timer.start();
while (ReconnectTracker.reconnected_called == 0) {
if (timer.read() >= 5000 * std.time.ns_per_ms) {
return error.ReconnectionTimeout;
}
std.time.sleep(10 * std.time.ns_per_ms);
}
var timer = try std.time.Timer.start();
while (true) {
ReconnectTracker.mutex.lock();
const rc = ReconnectTracker.reconnected_called;
ReconnectTracker.mutex.unlock();
if (rc > 0) break;
if (timer.read() >= 5000 * std.time.ns_per_ms) {
return error.ReconnectionTimeout;
}
std.time.sleep(10 * std.time.ns_per_ms);
}
🤖 Prompt for AI Agents
In tests/autounsubscribe_test.zig around lines 208-214, the test reads
ReconnectTracker.reconnected_called without synchronization causing a data race
(writes happen under a mutex). Fix by either reading the value while holding the
same mutex used for mutations or change reconnected_called to an atomic integer
and use atomic load with appropriate ordering; ensure the sleep/loop logic uses
the synchronized/atomic read so the race is eliminated.

@lalinsky
Copy link
Owner Author

@claude when ignoring subscription on resend, also remove it from the internal map

@claude
Copy link
Contributor

claude bot commented Sep 13, 2025

Claude finished @lalinsky's task —— View job


✅ Completed: Remove subscriptions from internal map during resend

Todo List:

  • Read CLAUDE.md for repo guidelines
  • Examine current resendSubscriptions implementation in src/connection.zig
  • Implement fix to remove stale subscriptions from internal map
  • Run tests and checks
  • Commit and push changes

Changes Made

I've successfully implemented the fix to remove stale subscriptions from the internal map during resend operations in src/connection.zig:1654-1716:

Key changes:

  1. Added subscription tracking: Created an ArrayList(u64) to collect SIDs that have reached their autounsubscribe limit
  2. Safe removal outside mutex: After releasing the subs_mutex, the code now calls removeSubscriptionInternal() for each collected SID
  3. Improved logging: Updated log message to indicate subscriptions will be removed during reconnect
  4. Memory leak prevention: This ensures that subscriptions that have already reached their limit don't remain in the internal map indefinitely

Technical details:

  • The fix maintains thread safety by collecting SIDs while holding the lock, then removing them after unlocking
  • This prevents the iterator invalidation issue that would occur if we tried to remove entries during iteration
  • Uses the existing removeSubscriptionInternal() function which properly handles reference counting and cleanup

The implementation follows the exact pattern suggested in the code review comments and resolves the stale map entry issue without breaking existing functionality.

Commit: e443929 - Remove stale subscriptions from internal map during resend
|

When subscriptions have already reached their autounsubscribe limit during
reconnection, collect their SIDs and remove them from the internal map
after unlocking the mutex. This prevents memory leaks from keeping
references to subscriptions that will never be used again.

Co-authored-by: Lukáš Lalinský <[email protected]>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
src/connection.zig (1)

1657-1715: Re-subscribe logic handles autounsubscribe correctly and cleans up stale entries.

  • Computing remaining = max - delivered and emitting UNSUB {sid} {remaining} is protocol-compliant.
  • Collecting SIDs to remove and purging them after releasing subs_mutex fixes the stale map entry on reconnect. Nice.

Optional:

  • Add an inline note to prevent future misuse (this relies on caller holding the main mutex, which doReconnect does):
         log.debug("Re-establishing subscriptions", .{});
+        // Note: called from doReconnect() with self.mutex held; safe to use self.scratch here.
  • If write_buffer.append(buffer.items) can block, consider moving the append outside subs_mutex (you’d need to allocate the command batch with a non-scratch allocator or defer arena reset accordingly).

To double-check atomic ordering across the codebase (that increments use release/acq_rel to match the .load(.acquire) here), run:

#!/bin/bash
# Verify delivered/max counters use appropriate atomic orderings
rg -n -C2 'delivered_msgs|max_msgs' --type zig
🧹 Nitpick comments (2)
src/connection.zig (2)

766-778: UNSUB formatting + buffering look correct.

Optional: if ConcurrentWriteBuffer.append can error when frozen during reconnects, consider documenting that callers should ignore failures (as you already do) or route UNSUBs through the reconnect pending buffer for consistency with publish.


1192-1194: Reduce log noise on unknown-sid UNSUB failures.

During transient states this can get chatty. Suggest dropping to debug to avoid log spam.

-            log.err("Failed to send UNSUB for unknown sid {d}: {}", .{ message.sid, err });
+            log.debug("Failed to send UNSUB for unknown sid {d}: {}", .{ message.sid, err });
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2ffb3c6 and e443929.

📒 Files selected for processing (1)
  • src/connection.zig (3 hunks)
🧰 Additional context used
🧠 Learnings (1)
📓 Common learnings
Learnt from: lalinsky
PR: lalinsky/nats.zig#28
File: src/response_manager.zig:89-105
Timestamp: 2025-08-25T02:49:59.119Z
Learning: In this NATS Zig codebase, subscriptions are reference counted, so calling deinit() on a subscription from multiple locations (like both Connection and ResponseManager) is safe and doesn't cause double-free issues.
🔇 Additional comments (2)
src/connection.zig (2)

781-793: Good order: remove from map before sending UNSUB.

Lock order (main mutex → subs_mutex) matches the rest of the file and avoids races with processMsg. Post-remove error handling is sensible.

Also applies to: 798-801


808-819: Internal removal helper is solid.

Using fetchRemove() under subs_mutex and releasing the connection’s ref aligns with the codebase’s refcounting model (per learnings: multiple deinit/release sites are safe).

@lalinsky lalinsky merged commit d3d5797 into main Sep 13, 2025
3 checks passed
@lalinsky lalinsky deleted the autounsubscribe-implementation branch September 13, 2025 06:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant