Skip to content

Conversation

@lalinsky
Copy link
Owner

@lalinsky lalinsky commented Aug 25, 2025

Summary

Fixes a critical use-after-free bug in subscription lifecycle management that was causing crashes during JetStream fetch operations and other concurrent scenarios.

Root Cause

The connection's subscription map was storing subscription references without properly taking ownership via reference counting. This meant:

  • Subscriptions started with ref_count = 1 (user ownership)
  • Connection added them to map but didn't increment ref_count
  • When users called deinit(), ref_count went 1→0 and subscription was freed
  • Map still contained dangling pointers to freed memory
  • Server responses caused crashes when trying to access freed subscriptions

Changes

  • Add retain() calls when putting subscriptions in connection map
  • Add release() calls when removing subscriptions from connection map
  • Fix connection cleanup to release all subscription references before shutdown

Impact

  • Eliminates use-after-free crashes in fetch operations and async scenarios
  • Enables proper timeout handling - fetch() now correctly times out and returns empty batches
  • Maintains backward compatibility - no API changes, pure bug fix
  • Ensures proper shared ownership between user code and connection internals

Testing

All existing tests pass. The fix resolves panics that were occurring during JetStream pull subscription operations, particularly when timeouts occurred or when subscriptions were cleaned up while server responses were still being processed.

🤖 Generated with Claude Code

Summary by CodeRabbit

  • Bug Fixes

    • Prevents rare crashes and resource leaks during subscribe/unsubscribe operations.
    • Ensures reliable cleanup of subscriptions when closing connections.
  • Refactor

    • Updated subscription lifecycle management to use explicit retain/release, improving stability under heavy usage.
    • Added a debug-time safety check to guard against invalid reference count decrements.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Aug 25, 2025

Walkthrough

Introduces retain/release ownership for subscriptions in Connection: retain on insert during subscribe paths, release on removal during unsubscribe and during Connection.deinit. Replaces direct deinit of subscriptions with release. Adds a debug assertion in RefCounter.decr to guard against underflow; no public signatures changed.

Changes

Cohort / File(s) Summary
Subscription retain/release in Connection
src/connection.zig
Adopt retain/release model for Subscription entries in the map: retain on insert (subscribeSync/subscribe), release on removal (unsubscribe) and during Connection.deinit iteration. Direct per-entry deinit removed.
RefCounter safety check
src/ref_counter.zig
Add std.debug.assert after atomic fetchSub in decr to ensure previous count > 0; logic and memory ordering unchanged; API unchanged.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Client
  participant Connection
  participant SubscriptionsMap as Map
  participant Subscription

  Client->>Connection: subscribe(...)
  Connection->>Subscription: init / obtain instance
  Connection->>Map: put(key, Subscription)
  Note right of Connection: Retain ownership reference
  Connection->>Subscription: retain()
  Connection-->>Client: return handle
Loading
sequenceDiagram
  autonumber
  actor Client
  participant Connection
  participant SubscriptionsMap as Map
  participant Subscription

  Client->>Connection: unsubscribe(key)
  Connection->>Map: remove(key)
  alt found
    Note right of Connection: Drop connection-owned reference
    Connection->>Subscription: release()
    Connection-->>Client: ok
  else not found
    Connection-->>Client: no-op / not found
  end
Loading
sequenceDiagram
  autonumber
  participant App
  participant Connection
  participant SubscriptionsMap as Map
  participant Subscription

  App->>Connection: deinit()
  loop over entries
    Connection->>Subscription: release()
  end
  Connection-->>App: deinitialized
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly related PRs

Poem

A hop and a bop through refs I keep,
Retain when I store, release when I sleep.
No underflow now—assertions stand guard,
Carrots of safety in memory’s yard.
Map burrows tidy, lifetimes at ease—
Thump-thump, ship it on the breeze! 🥕🐇

Tip

🔌 Remote MCP (Model Context Protocol) integration is now available!

Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats.

✨ Finishing Touches
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/subscription-reference-counting

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (3)
src/connection.zig (2)

258-263: Release connection-owned refs in deinit is the right fix; consider holding subs_mutex during iteration.

The release loop correctly drops the Connection’s ownership refs and prevents dangling pointers in the map. Minor hardening: acquire subs_mutex while iterating to avoid any future reentrancy surprises if Subscription.release() ever interacts with the connection (now or later).

-        // Clean up subscriptions - release connection's references first
-        var iter = self.subscriptions.iterator();
-        while (iter.next()) |entry| {
-            entry.value_ptr.*.release(); // Release connection's ownership reference
-        }
+        // Clean up subscriptions - release connection's references first
+        self.subs_mutex.lock();
+        var iter = self.subscriptions.iterator();
+        while (iter.next()) |entry| {
+            entry.value_ptr.*.release(); // Release connection's ownership reference
+        }
+        self.subs_mutex.unlock();
         self.subscriptions.deinit();

588-591: Prefer fetchRemove to get-and-release the actual stored pointer; also avoid Zig API ambiguities of remove().

On recent Zig, AutoHashMap.remove signature/semantics differ across versions; using fetchRemove is clearer and lets you release the exact value fetched from the map. Functional behavior stays the same, but is more robust to API changes and avoids a potential mismatch if a different pointer with the same sid were ever passed.

-        if (self.subscriptions.remove(sub.sid)) {
-            sub.release(); // Release connection's ownership reference
-        }
+        if (self.subscriptions.fetchRemove(sub.sid)) |kv| {
+            kv.value.release(); // Release connection's ownership reference
+        }

If your stdlib version lacks fetchRemove, we can switch to get/remove pairs under the lock. Do you want me to adjust for your exact Zig version?

src/jetstream.zig (1)

170-172: Docs nit: clarify heartbeat vs flow control and fix a typo.

  • “empty mess” → “empty message”.
  • The comment under idle_heartbeat conflates heartbeat with flow control. Suggest clarifying that:
    • idle_heartbeat (100 status, “Idle Heartbeat”, no reply expected).
    • flow_control (100 status, “FlowControl Request”, reply required — either msg.reply or flow-control header).
-    /// For push consumers this will regularly send an empty mess with Status header 100 and a reply subject  
-    /// (This field was moved to the end to avoid duplication)
+    /// For push consumers, idle_heartbeat sends empty messages with Status: 100 (“Idle Heartbeat”); no reply is expected.
+    /// Flow control sends Status: 100 with “FlowControl Request”; a reply is required (via msg.reply or flow-control header).

Also applies to: 182-186

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 270567e and fa05908.

📒 Files selected for processing (2)
  • src/connection.zig (4 hunks)
  • src/jetstream.zig (5 hunks)
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: lalinsky
PR: lalinsky/nats.zig#28
File: src/response_manager.zig:89-105
Timestamp: 2025-08-25T02:49:59.082Z
Learning: In this NATS Zig codebase, subscriptions are reference counted, so calling deinit() on a subscription from multiple locations (like both Connection and ResponseManager) is safe and doesn't cause double-free issues.
📚 Learning: 2025-08-25T02:49:59.082Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#28
File: src/response_manager.zig:89-105
Timestamp: 2025-08-25T02:49:59.082Z
Learning: In this NATS Zig codebase, subscriptions are reference counted, so calling deinit() on a subscription from multiple locations (like both Connection and ResponseManager) is safe and doesn't cause double-free issues.

Applied to files:

  • src/connection.zig
🔇 Additional comments (8)
src/connection.zig (2)

527-528: Retaining on insert fixes the UAF.

Taking an ownership ref when inserting into the map is exactly what was missing. This should eliminate the use-after-free during async delivery.


564-565: Symmetric retain on async subscribe looks good.

Same as sync path: Connection now clearly owns one ref. Good.

src/jetstream.zig (6)

24-27: Re-exports are helpful for consumers.

Re-exporting JetStreamMessage, MsgMetadata, and SequencePair improves ergonomics.


235-247: FetchRequest defaults and fields look reasonable.

Fields map well to the API; default expires at 30s aligns with typical behavior.


249-276: MessageBatch ownership and deinit are sensible.

messages holds JetStreamMessage pointers and deinit tears them down correctly, then frees the slice.


718-763: Push status handling covers heartbeats and flow control correctly.

Using msg.reply for flow control acks is compatible with current servers. Nice debug visibility into headers.


764-830: Push subscribe wrapper is solid; callback wrapper filters status=100.

The inline wrapper and context threading are clean. This should prevent users from seeing heartbeats/flow-control messages.


833-867: Sync push subscribe is consistent with async; good use of errdefer.

No issues spotted.

Comment on lines 301 to 398
pub fn fetch(self: *PullSubscription, request: FetchRequest) !MessageBatch {
self.mutex.lock();
defer self.mutex.unlock();

// Build the pull request subject
const pull_subject = try std.fmt.allocPrint(
self.js.allocator,
"CONSUMER.MSG.NEXT.{s}.{s}",
.{ self.stream_name, self.consumer_name }
);
defer self.js.allocator.free(pull_subject);

// Create unique inbox for responses
const inbox = try newInbox(self.js.allocator);
defer self.js.allocator.free(inbox);

// Create synchronous subscription for the inbox
const inbox_sub = try self.js.nc.subscribeSync(inbox);
defer inbox_sub.deinit();

// Serialize the fetch request to JSON
const request_json = try std.json.stringifyAlloc(self.js.allocator, request, .{});
defer self.js.allocator.free(request_json);

// Build the full API subject
const api_subject = try std.fmt.allocPrint(
self.js.allocator,
"{s}{s}",
.{ default_api_prefix, pull_subject }
);
defer self.js.allocator.free(api_subject);

// Send the pull request with reply subject
try self.js.nc.publishRequest(api_subject, inbox, request_json);

// Collect messages
var messages = std.ArrayList(*JetStreamMessage).init(self.js.allocator);
defer messages.deinit();

const timeout_ms = @as(u64, @intCast(request.expires / 1_000_000)); // Convert nanoseconds to milliseconds
var batch_complete = false;
var fetch_error: ?anyerror = null;

// Collect messages until batch is complete or timeout
while (!batch_complete and messages.items.len < request.batch) {
if (inbox_sub.nextMsg(timeout_ms)) |raw_msg| {
// Check for status messages
if (raw_msg.headers.get("Status")) |status_values| {
if (status_values.items.len > 0) {
const status_code = status_values.items[0];

if (std.mem.eql(u8, status_code, "404")) {
// No messages available
raw_msg.deinit();
batch_complete = true;
break;
} else if (std.mem.eql(u8, status_code, "408")) {
// Request timeout
raw_msg.deinit();
fetch_error = error.RequestTimeout;
batch_complete = true;
break;
} else if (std.mem.eql(u8, status_code, "409")) {
// Consumer sequence mismatch
raw_msg.deinit();
fetch_error = error.ConsumerSequenceMismatch;
batch_complete = true;
break;
} else if (std.mem.eql(u8, status_code, "100")) {
// Heartbeat - continue waiting
raw_msg.deinit();
continue;
}
}
// Unknown status code - clean up and continue
raw_msg.deinit();
} else {
// This is a regular message - convert to JetStream message
const js_msg_ptr = try jetstream_message.createJetStreamMessage(self.js, raw_msg);
try messages.append(js_msg_ptr);

// raw_msg is now owned by the JetStreamMessage
}
} else {
// Timeout occurred
batch_complete = true;
}
}

// Convert ArrayList to owned slice
const messages_slice = try messages.toOwnedSlice();

return MessageBatch{
.messages = messages_slice,
.error_info = fetch_error,
.allocator = self.js.allocator,
};
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix type mismatch and validate batch; also honor no_wait more explicitly.

  • messages.items.len (usize) is compared to request.batch (i32) — this won’t compile cleanly. Cast safely.
  • If batch <= 0, we should short-circuit with an empty batch to avoid a tight loop.
  • Optional: If request.no_wait is true, consider using a near-zero nextMsg timeout to return immediately when no messages are available (server still sends 404, but immediate local timeout reduces client-side wait when misconfigured).
-        const timeout_ms = @as(u64, @intCast(request.expires / 1_000_000)); // Convert nanoseconds to milliseconds
+        const timeout_ms = @as(u64, @intCast(request.expires / 1_000_000)); // ns -> ms

         var batch_complete = false;
         var fetch_error: ?anyerror = null;
 
-        // Collect messages until batch is complete or timeout
-        while (!batch_complete and messages.items.len < request.batch) {
+        // Guard against invalid batch sizes
+        if (request.batch <= 0) {
+            return MessageBatch{
+                .messages = &[_]*JetStreamMessage{},
+                .error_info = null,
+                .allocator = self.js.allocator,
+            };
+        }
+        const target_batch: usize = @as(usize, @intCast(request.batch));
+
+        // Collect messages until batch is complete or timeout
+        while (!batch_complete and messages.items.len < target_batch) {
-            if (inbox_sub.nextMsg(timeout_ms)) |raw_msg| {
+            const per_wait_ms: u64 = request.no_wait ? 1 : timeout_ms;
+            if (inbox_sub.nextMsg(per_wait_ms)) |raw_msg| {
                 // Check for status messages
                 if (raw_msg.headers.get("Status")) |status_values| {

If you prefer not to change the no_wait behavior client-side, we can drop the per_wait_ms tweak and keep only the type/batch guard fixes.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub fn fetch(self: *PullSubscription, request: FetchRequest) !MessageBatch {
self.mutex.lock();
defer self.mutex.unlock();
// Build the pull request subject
const pull_subject = try std.fmt.allocPrint(
self.js.allocator,
"CONSUMER.MSG.NEXT.{s}.{s}",
.{ self.stream_name, self.consumer_name }
);
defer self.js.allocator.free(pull_subject);
// Create unique inbox for responses
const inbox = try newInbox(self.js.allocator);
defer self.js.allocator.free(inbox);
// Create synchronous subscription for the inbox
const inbox_sub = try self.js.nc.subscribeSync(inbox);
defer inbox_sub.deinit();
// Serialize the fetch request to JSON
const request_json = try std.json.stringifyAlloc(self.js.allocator, request, .{});
defer self.js.allocator.free(request_json);
// Build the full API subject
const api_subject = try std.fmt.allocPrint(
self.js.allocator,
"{s}{s}",
.{ default_api_prefix, pull_subject }
);
defer self.js.allocator.free(api_subject);
// Send the pull request with reply subject
try self.js.nc.publishRequest(api_subject, inbox, request_json);
// Collect messages
var messages = std.ArrayList(*JetStreamMessage).init(self.js.allocator);
defer messages.deinit();
const timeout_ms = @as(u64, @intCast(request.expires / 1_000_000)); // Convert nanoseconds to milliseconds
var batch_complete = false;
var fetch_error: ?anyerror = null;
// Collect messages until batch is complete or timeout
while (!batch_complete and messages.items.len < request.batch) {
if (inbox_sub.nextMsg(timeout_ms)) |raw_msg| {
// Check for status messages
if (raw_msg.headers.get("Status")) |status_values| {
if (status_values.items.len > 0) {
const status_code = status_values.items[0];
if (std.mem.eql(u8, status_code, "404")) {
// No messages available
raw_msg.deinit();
batch_complete = true;
break;
} else if (std.mem.eql(u8, status_code, "408")) {
// Request timeout
raw_msg.deinit();
fetch_error = error.RequestTimeout;
batch_complete = true;
break;
} else if (std.mem.eql(u8, status_code, "409")) {
// Consumer sequence mismatch
raw_msg.deinit();
fetch_error = error.ConsumerSequenceMismatch;
batch_complete = true;
break;
} else if (std.mem.eql(u8, status_code, "100")) {
// Heartbeat - continue waiting
raw_msg.deinit();
continue;
}
}
// Unknown status code - clean up and continue
raw_msg.deinit();
} else {
// This is a regular message - convert to JetStream message
const js_msg_ptr = try jetstream_message.createJetStreamMessage(self.js, raw_msg);
try messages.append(js_msg_ptr);
// raw_msg is now owned by the JetStreamMessage
}
} else {
// Timeout occurred
batch_complete = true;
}
}
// Convert ArrayList to owned slice
const messages_slice = try messages.toOwnedSlice();
return MessageBatch{
.messages = messages_slice,
.error_info = fetch_error,
.allocator = self.js.allocator,
};
}
const timeout_ms = @as(u64, @intCast(request.expires / 1_000_000)); // ns -> ms
var batch_complete = false;
var fetch_error: ?anyerror = null;
// Guard against invalid batch sizes
if (request.batch <= 0) {
return MessageBatch{
.messages = &[_]*JetStreamMessage{},
.error_info = null,
.allocator = self.js.allocator,
};
}
const target_batch: usize = @as(usize, @intCast(request.batch));
// Collect messages until batch is complete or timeout
while (!batch_complete and messages.items.len < target_batch) {
const per_wait_ms: u64 = request.no_wait ? 1 : timeout_ms;
if (inbox_sub.nextMsg(per_wait_ms)) |raw_msg| {
// Check for status messages
🤖 Prompt for AI Agents
In src/jetstream.zig around lines 301 to 398, fix the type mismatch and batch
handling: cast messages.items.len (usize) to the same integer type as
request.batch (or cast request.batch to usize) when comparing to avoid
compilation errors; add an early-return short-circuit when request.batch <= 0
that returns an empty MessageBatch (no messages, no error); and if honoring
request.no_wait, use a near-zero nextMsg timeout (e.g., 1 ms) instead of the
full expires-derived timeout so the call returns immediately when no messages
are available — otherwise leave timeout logic unchanged. Ensure all casts are
safe and documented with comments.

Comment on lines 869 to 899
/// Create a pull subscription for the specified stream
pub fn pullSubscribe(
self: *JetStream,
stream_name: []const u8,
consumer_config: ConsumerConfig
) !*PullSubscription {
// Create pull consumer config with appropriate defaults
var pull_config = consumer_config;
pull_config.deliver_subject = null; // Force null for pull consumers
if (pull_config.max_waiting == 0) pull_config.max_waiting = 512; // Default max waiting pulls

// Create the consumer
var consumer_info = try self.addConsumer(stream_name, pull_config);
errdefer consumer_info.deinit();

// Get the consumer name (use name first, then durable_name)
const consumer_name = consumer_info.value.config.name orelse
consumer_info.value.config.durable_name orelse
return error.MissingConsumerName;

// Allocate PullSubscription
const pull_subscription = try self.allocator.create(PullSubscription);
pull_subscription.* = PullSubscription{
.js = self,
.stream_name = stream_name,
.consumer_name = consumer_name,
.consumer_info = consumer_info,
};

return pull_subscription;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Use ConsumerInfo.name when deriving the consumer name (fixes ephemeral consumer case).

After creating an ephemeral consumer, config.name and durable_name are typically null while ConsumerInfo.name contains the server-assigned durable. Using config.* first will incorrectly error on valid cases.

-        // Get the consumer name (use name first, then durable_name)
-        const consumer_name = consumer_info.value.config.name orelse 
-                             consumer_info.value.config.durable_name orelse
-                             return error.MissingConsumerName;
+        // Prefer the server-assigned name from ConsumerInfo, then fall back to provided config
+        const consumer_name = consumer_info.value.name orelse
+                              consumer_info.value.config.name orelse
+                              consumer_info.value.config.durable_name orelse
+                              return error.MissingConsumerName;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// Create a pull subscription for the specified stream
pub fn pullSubscribe(
self: *JetStream,
stream_name: []const u8,
consumer_config: ConsumerConfig
) !*PullSubscription {
// Create pull consumer config with appropriate defaults
var pull_config = consumer_config;
pull_config.deliver_subject = null; // Force null for pull consumers
if (pull_config.max_waiting == 0) pull_config.max_waiting = 512; // Default max waiting pulls
// Create the consumer
var consumer_info = try self.addConsumer(stream_name, pull_config);
errdefer consumer_info.deinit();
// Get the consumer name (use name first, then durable_name)
const consumer_name = consumer_info.value.config.name orelse
consumer_info.value.config.durable_name orelse
return error.MissingConsumerName;
// Allocate PullSubscription
const pull_subscription = try self.allocator.create(PullSubscription);
pull_subscription.* = PullSubscription{
.js = self,
.stream_name = stream_name,
.consumer_name = consumer_name,
.consumer_info = consumer_info,
};
return pull_subscription;
}
/// Create a pull subscription for the specified stream
pub fn pullSubscribe(
self: *JetStream,
stream_name: []const u8,
consumer_config: ConsumerConfig
) !*PullSubscription {
// Create pull consumer config with appropriate defaults
var pull_config = consumer_config;
pull_config.deliver_subject = null; // Force null for pull consumers
if (pull_config.max_waiting == 0) pull_config.max_waiting = 512; // Default max waiting pulls
// Create the consumer
var consumer_info = try self.addConsumer(stream_name, pull_config);
errdefer consumer_info.deinit();
// Prefer the server-assigned name from ConsumerInfo, then fall back to provided config
const consumer_name = consumer_info.value.name orelse
consumer_info.value.config.name orelse
consumer_info.value.config.durable_name orelse
return error.MissingConsumerName;
// Allocate PullSubscription
const pull_subscription = try self.allocator.create(PullSubscription);
pull_subscription.* = PullSubscription{
.js = self,
.stream_name = stream_name,
.consumer_name = consumer_name,
.consumer_info = consumer_info,
};
return pull_subscription;
}
🤖 Prompt for AI Agents
In src/jetstream.zig around lines 869 to 899, the consumer name derivation
currently prefers ConsumerInfo.config.name and durable_name which are often null
for ephemeral consumers; change the selection to use ConsumerInfo.name
(server-assigned name) first, then fall back to config.name and
config.durable_name, and return error.MissingConsumerName only if all three are
null. Update the assignment of consumer_name accordingly so the PullSubscription
uses the server-assigned name for ephemeral consumers.

@lalinsky lalinsky force-pushed the fix/subscription-reference-counting branch from fa05908 to ebe3132 Compare August 25, 2025 11:51
@lalinsky lalinsky merged commit 2b5b647 into main Aug 25, 2025
1 of 2 checks passed
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/ref_counter.zig (1)

49-58: Guard against refcount underflow in release builds (avoid wrap-around on double-release).

The added debug assertion on Line [50] is good for catching logic errors during development. However, in ReleaseFast/ReleaseSmall builds std.debug.assert is compiled out, so a spurious decr() from zero will silently underflow the counter to max(T), masking the bug and causing a hard-to-diagnose leak. Given this refcounter underpins subscription lifetime, it’s safer to unconditionally trap on underflow.

Apply this focused change:

             const prev_ref_count = self.refs.fetchSub(1, .release);
-            std.debug.assert(prev_ref_count > 0);
+            // Trap even in release builds to avoid wrap-around and latent leaks.
+            if (prev_ref_count == 0) @panic("RefCounter underflow in decr()");

This keeps the existing acquire-on-zero semantics intact while ensuring double-release bugs cannot proceed unnoticed in production.

🧹 Nitpick comments (1)
src/ref_counter.zig (1)

25-35: Constrain T at compile time to an unsigned integer type.

std.atomic.Value(T) and the arithmetic assume an unsigned integer with wrap-around semantics. Make misuse impossible by adding a compile-time check.

You can insert this just before return struct {:

comptime {
    const info = @typeInfo(T);
    if (info != .Int or info.Int.signedness != .unsigned) {
        @compileError("RefCounter expects an unsigned integer type (e.g., u32).");
    }
}
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between fa05908 and ebe3132.

📒 Files selected for processing (2)
  • src/connection.zig (4 hunks)
  • src/ref_counter.zig (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/connection.zig
🧰 Additional context used
🧠 Learnings (1)
📓 Common learnings
Learnt from: lalinsky
PR: lalinsky/nats.zig#28
File: src/response_manager.zig:89-105
Timestamp: 2025-08-25T02:49:59.082Z
Learning: In this NATS Zig codebase, subscriptions are reference counted, so calling deinit() on a subscription from multiple locations (like both Connection and ResponseManager) is safe and doesn't cause double-free issues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant