-
Notifications
You must be signed in to change notification settings - Fork 1
Auto-generate deliver_subject for push subscriptions #105
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Push subscriptions now automatically generate an inbox for deliver_subject when none is provided, matching the behavior of the official Go client. This provides a more convenient API while maintaining compatibility with explicit deliver_subject configuration. Changes: - Updated subscribe() to auto-generate deliver_subject if not provided - Updated subscribeSync() to auto-generate deliver_subject if not provided - Fixed memory management with proper errdefer cleanup - Updated test to expect StreamNotFound instead of MissingDeliverSubject
WalkthroughAuto-generate deliver_subject for push subscriptions when absent, create push consumers before subscribing, wrap push handlers to convert messages and intercept STATUS_CONTROL (idle heartbeat / flow-control) messages, mirror behavior in subscribeSync, free generated inboxes, and add a Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Pre-merge checks (3 passed)✅ Passed checks (3 passed)
✨ Finishing Touches🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/jetstream.zig (1)
1126-1138: Prevent resource leak if allocation fails after subscribeIf
create(JetStreamSubscription)fails, the underlying NATS subscription stays active. Adderrdeferto unsubscribe, and store the owned subject.- const subscription = try self.nc.subscribe(deliver_subject, JSHandler.wrappedHandler, .{ self.nc, args }); + const subscription = try self.nc.subscribe(deliver_subject, JSHandler.wrappedHandler, .{ self.nc, args }); + errdefer self.nc.unsubscribe(subscription); @@ const js_sub = try self.nc.allocator.create(JetStreamSubscription); js_sub.* = JetStreamSubscription{ .subscription = subscription, .js = self, .consumer_info = consumer_info, + .deliver_subject_owned = generated_deliver_subject, };
🧹 Nitpick comments (1)
src/jetstream.zig (1)
1096-1123: Don’t swallow conversion errors silently in wrapped handlerIf
createJetStreamMessagefails, we drop the message with no log. Emit a debug/error to help diagnose malformed messages.- const js_msg = jetstream_message.createJetStreamMessage(nc, msg) catch { - msg.deinit(); // Clean up on error - return; - }; + const js_msg = jetstream_message.createJetStreamMessage(nc, msg) catch |err| { + log.err("Failed to wrap JetStream message: {}", .{err}); + msg.deinit(); // Clean up on error + return; + };
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/jetstream.zig(2 hunks)tests/jetstream_push_test.zig(2 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.zig
📄 CodeRabbit inference engine (CLAUDE.md)
Format code using
zig fmtbefore commit
Files:
tests/jetstream_push_test.zigsrc/jetstream.zig
🧠 Learnings (1)
📚 Learning: 2025-08-25T14:51:33.570Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#31
File: tests/jetstream_pull_test.zig:21-23
Timestamp: 2025-08-25T14:51:33.570Z
Learning: The nats.zig project uses a "tests:beforeEach" test in tests/all_tests.zig that runs before each test to clean up all JetStream streams and consumers, ensuring test isolation. This eliminates the need for unique test resource names as each test starts with a clean JetStream state.
Applied to files:
tests/jetstream_push_test.zig
🪛 GitHub Actions: CI
src/jetstream.zig
[error] 1-1: Zig fmt --check failed (exit code 1). Command 'zig fmt --check .' reported formatting issues. Run 'zig fmt' to auto-format.
🔇 Additional comments (4)
tests/jetstream_push_test.zig (2)
133-139: LGTM: exercising auto-generated deliver_subject on missing streamConfig without deliver_subject and expecting a stream lookup error is correct with the new behavior.
147-149: LGTM: explicit assertion for StreamNotFoundThe error expectation matches the server-side response when the stream is absent.
src/jetstream.zig (2)
1-1: CI failed: run zig fmtFormatter check failed in CI. Please run:
- zig fmt .
- git add -A && git commit
This repo mandates zig fmt for all Zig sources.
1162-1165: Consider adding queue subscription support for deliver_group
Searches inconnection.zigandjetstream.zigdid not reveal any queue‐based subscribe or subscribeSync variants. Ifpush_config.deliver_groupis set today we fall back to plain subscriptions. Confirm whether the underlying NATS connection exposes a queue‐subscribe API or plan to implement it so push consumers with adeliver_groupcan share workload.
|
@claude address coderabbit's comments |
|
Claude finished @lalinsky's task —— View job I've successfully addressed all of CodeRabbit's review comments and fixed the memory management issues. Completed Tasks
Changes MadeMemory Management Fixes:
Error Handling Improvements:
Validation:
Code Quality:
The implementation now properly manages memory for auto-generated deliver subjects, provides better error handling, validates inputs, and maintains the same API behavior while preventing resource leaks. |
Addresses CodeRabbit's review comments: - Add deliver_subject_owned field to JetStreamSubscription to prevent memory leak of generated deliver_subject - Update deinit() to free owned deliver_subject - Add errdefer cleanup for subscription on allocation failure - Add error logging instead of silently swallowing conversion errors in wrapped handler - Add deliver_subject validation for both subscribe() and subscribeSync() - Apply fixes to both subscribe() and subscribeSync() methods 🤖 Generated with [Claude Code](https://claude.ai/code) Co-authored-by: Lukáš Lalinský <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (3)
src/jetstream.zig (3)
1081-1092: Auto-generate + validate deliver_subject; strip pull-only fields — solidThis matches Go client behavior, validates inputs, and avoids mixing pull-only fields into push configs. This also addresses the earlier leak/validation review.
Also applies to: 1093-1097
1143-1144: Good: retain generated subject on the subscriptionEnsures lifetime matches the subscription and frees on teardown.
1177-1183: Ownership mirrored in sync path — resolvedThe generated deliver subject is now owned by the subscription and freed in deinit().
🧹 Nitpick comments (2)
src/jetstream.zig (2)
1116-1118: Include more context in the wrap-error logAdding subject/status makes field issues diagnosable without packet capture.
- log.err("Failed to wrap JetStream message: {}", .{err}); + log.err("Failed to wrap JetStream message (subject={s}, status={d}): {}", .{ msg.subject, msg.status_code, err });
1081-1092: DRY the push-config setupThe deliver_subject auto-gen/validation and pull-field stripping are duplicated in subscribe and subscribeSync. Consider a small helper to reduce drift.
fn preparePushConfig(nc: *Connection, consumer_config: ConsumerConfig) !struct { push_config: ConsumerConfig, deliver_subject: []const u8, owned: ?[]u8 } { var cfg = consumer_config; const owned = if (consumer_config.deliver_subject == null) try inbox.newInbox(nc.allocator) else null; errdefer if (owned) |ds| nc.allocator.free(ds); const deliver = consumer_config.deliver_subject orelse owned.?; try validation.validateSubject(deliver); cfg.deliver_subject = deliver; cfg.max_waiting = 0; cfg.max_batch = null; cfg.max_expires = null; return .{ .push_config = cfg, .deliver_subject = deliver, .owned = owned }; }Also applies to: 1151-1162
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/jetstream.zig(5 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.zig
📄 CodeRabbit inference engine (CLAUDE.md)
Format code using
zig fmtbefore commit
Files:
src/jetstream.zig
🧠 Learnings (1)
📚 Learning: 2025-09-06T20:28:53.149Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#98
File: src/jetstream_kv.zig:235-279
Timestamp: 2025-09-06T20:28:53.149Z
Learning: JetStreamMessage wrappers in nats.zig are allocated on the Message's owned arena, so they are automatically freed when Message.deinit() is called and don't require separate cleanup.
Applied to files:
src/jetstream.zig
🔇 Additional comments (3)
src/jetstream.zig (3)
528-536: Leak fixed and ownership clarified — good callTracking the auto-generated deliver subject on the subscription and freeing it in deinit() is the right fix.
1134-1136: Good: unsubscribe on failure patherrdefer unsubscribe avoids dangling subs when consumer creation later fails.
1174-1174: Good: unsubscribe on failure path (sync)Matches the async path; no leaks.
| // Generate deliver_subject if not provided and create push consumer config | ||
| var push_config = consumer_config; | ||
| const generated_deliver_subject = if (consumer_config.deliver_subject == null) | ||
| try inbox.newInbox(self.nc.allocator) | ||
| else | ||
| null; | ||
| errdefer if (generated_deliver_subject) |ds| self.nc.allocator.free(ds); | ||
|
|
||
| const deliver_subject = consumer_config.deliver_subject orelse generated_deliver_subject.?; | ||
| try validation.validateSubject(deliver_subject); | ||
| push_config.deliver_subject = deliver_subject; | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sync push path misses STATUS_CONTROL handling (heartbeats/flow control)
Without intercepting STATUS_CONTROL in subscribeSync, users will receive heartbeats as messages and flow-control requests won’t be answered, which can stall delivery. Handle these in nextMsg() similarly to the async wrapper.
Apply this diff to JetStreamSubscription.nextMsg:
@@
- pub fn nextMsg(self: *JetStreamSubscription, timeout_ms: u64) !*JetStreamMessage {
- // Get the next message from the underlying subscription
- const msg = try self.subscription.nextMsg(timeout_ms);
- errdefer msg.deinit();
-
- // Convert to JetStream message
- return try jetstream_message.createJetStreamMessage(self.js.nc, msg);
- }
+ pub fn nextMsg(self: *JetStreamSubscription, timeout_ms: u64) !*JetStreamMessage {
+ var remaining_ms: u64 = timeout_ms;
+ const deadline_ms: i64 = std.time.milliTimestamp() + @as(i64, @intCast(timeout_ms));
+ while (true) {
+ const msg = self.subscription.nextMsg(remaining_ms) catch |err| switch (err) {
+ error.Timeout => return error.Timeout,
+ else => return err,
+ };
+ errdefer msg.deinit();
+
+ if (msg.status_code == STATUS_CONTROL) {
+ handleStatusMessage(msg, self.js.nc) catch |e| {
+ log.err("Failed to handle status message: {}", .{e});
+ };
+ msg.deinit();
+ const now = std.time.milliTimestamp();
+ if (now >= deadline_ms) return error.Timeout;
+ remaining_ms = @intCast(u64, deadline_ms - now);
+ continue;
+ }
+
+ return try jetstream_message.createJetStreamMessage(self.js.nc, msg);
+ }
+ }Also applies to: 1163-1167
🤖 Prompt for AI Agents
In src/jetstream.zig around lines 1151 to 1167, the sync push path
(subscribeSync/nextMsg) is not intercepting STATUS_CONTROL messages — heartbeats
are being returned to callers and flow-control requests are not being answered.
Update nextMsg() to mirror the async wrapper: after receiving a message check if
it is a STATUS_CONTROL/control message, and if so handle it internally (do not
return it to the caller): for heartbeat-type control messages simply ignore and
continue the receive loop; for flow-control/control messages with a reply
subject send the appropriate empty/ack response to msg.reply (using the same
reply behavior as the async path) and continue the loop; ensure any error
handling, timers, and counters used by the async path are applied the same way
so control messages don’t surface to callers and flow-control is replied to
immediately.
Summary
Changes
subscribe()method to auto-generate deliver_subject if not providedsubscribeSync()method to auto-generate deliver_subject if not providedTest plan
This matches the Go client behavior where push subscriptions automatically generate an inbox when no deliver_subject is specified, providing a more convenient API.