-
Notifications
You must be signed in to change notification settings - Fork 1
Implement queueSubscribe() and queueSubscribeSync() #47
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Caution Review failedThe pull request is closed. WalkthroughAdds optional subscription queue-group support and queue-aware APIs (sync + async); updates SUB protocol formatting; extends Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
📜 Recent review detailsConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro 💡 Knowledge Base configuration:
You can enable these sources in your CodeRabbit configuration. 📒 Files selected for processing (2)
✨ Finishing Touches🧪 Generate unit tests
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
Status, Documentation and Community
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/connection.zig (2)
564-578: Put-then-fail leaves sub in map; add rollback on error.If bufferWrite fails, the sub stays stored and retained. Roll it back on error.
self.subs_mutex.lock(); defer self.subs_mutex.unlock(); - try self.subscriptions.put(sub.sid, sub); - sub.retain(); // Connection takes ownership reference + try self.subscriptions.put(sub.sid, sub); + sub.retain(); // Connection takes ownership reference + errdefer { + // Remove from map and drop the connection's ref on failure + _ = self.subscriptions.remove(sub.sid); + sub.release(); + } // Send SUB command via buffer var buffer = ArrayList(u8).init(self.allocator); defer buffer.deinit(); if (sub.queue_group) |group| { try buffer.writer().print("SUB {s} {s} {d}\r\n", .{ sub.subject, group, sub.sid }); } else { try buffer.writer().print("SUB {s} {d}\r\n", .{ sub.subject, sub.sid }); } try self.bufferWrite(buffer.items, false);
1348-1359: Re-subscribe on reconnect drops queue_group and writes unsafely to the stream.Two issues: (1) queue subscriptions are re-sent as plain SUB (missing the queue group), breaking semantics after reconnect; (2) writing directly to stream can race with the flusher thread. Build commands with queue awareness and route via bufferWrite under the main mutex.
- fn resendSubscriptions(self: *Self) !void { + fn resendSubscriptions(self: *Self) !void { log.debug("Re-establishing subscriptions", .{}); self.subs_mutex.lock(); defer self.subs_mutex.unlock(); - var iter = self.subscriptions.iterator(); - while (iter.next()) |entry| { - const sub = entry.value_ptr.*; - - // Send SUB command - var buffer = ArrayList(u8).init(self.allocator); - defer buffer.deinit(); - - try buffer.writer().print("SUB {s} {d}\r\n", .{ sub.subject, sub.sid }); - - // Send directly (bypass buffering since we're reconnecting) - const stream = self.stream orelse return ConnectionError.ConnectionClosed; - try stream.writeAll(buffer.items); - - log.debug("Re-subscribed to {s} with sid {d}", .{ sub.subject, sub.sid }); - } + // Build all SUB commands first + var all = ArrayList(u8).init(self.allocator); + defer all.deinit(); + var iter = self.subscriptions.iterator(); + while (iter.next()) |entry| { + const sub = entry.value_ptr.*; + if (sub.queue_group) |group| { + try all.writer().print("SUB {s} {s} {d}\r\n", .{ sub.subject, group, sub.sid }); + } else { + try all.writer().print("SUB {s} {d}\r\n", .{ sub.subject, sub.sid }); + } + } + // Serialize through write buffer to avoid concurrent writes with flusher + self.mutex.lock(); + defer self.mutex.unlock(); + if (self.status != .connected) return ConnectionError.ConnectionClosed; + try self.bufferWrite(all.items, true); + log.debug("Re-subscribed {} subscriptions", .{self.subscriptions.count()}); }
🧹 Nitpick comments (3)
src/subscription.zig (1)
131-143: Handler trampoline works; minor optional tightening.If you ever allow non-void/non-error return types, consider matching on .ErrorUnion/.Void in @typeinfo rather than equality to void. Not blocking.
- const ReturnType = @typeInfo(@TypeOf(handlerFn)).@"fn".return_type.?; - if (ReturnType == void) { + const RT = @typeInfo(@TypeOf(handlerFn)).@"fn".return_type.?; + switch (@typeInfo(RT)) { + .Void => { @call(.auto, handlerFn, .{msg} ++ self_ctx.args); - } else { + }, + .ErrorUnion => { try @call(.auto, handlerFn, .{msg} ++ self_ctx.args); - } + }, + else => { + _ = @call(.auto, handlerFn, .{msg} ++ self_ctx.args); + }, + }tests/subscribe_test.zig (2)
61-73: Async test can flake; wait for callback with a bounded poll.flush() doesn’t guarantee the handler has run. Add a short wait loop.
- try conn.flush(); - - if (result) |msg| { + try conn.flush(); + // Wait up to 1s for async handler to set result + const deadline_ms = std.time.milliTimestamp() + 1000; + while (result == null and std.time.milliTimestamp() < deadline_ms) { + std.time.sleep(10 * std.time.ns_per_ms); + } + if (result) |msg| { try std.testing.expectEqualStrings("test", msg.subject); try std.testing.expectEqualStrings("Hello world!", msg.data); } else { try std.testing.expect(false); }
82-94: Same flake risk for queue async; add the same wait.Mirror the change from the non-queue async test.
- try conn.flush(); - - if (result) |msg| { + try conn.flush(); + const deadline_ms = std.time.milliTimestamp() + 1000; + while (result == null and std.time.milliTimestamp() < deadline_ms) { + std.time.sleep(10 * std.time.ns_per_ms); + } + if (result) |msg| { try std.testing.expectEqualStrings("test", msg.subject); try std.testing.expectEqualStrings("Hello world!", msg.data); } else { try std.testing.expect(false); }
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (4)
src/connection.zig(3 hunks)src/subscription.zig(5 hunks)tests/all_tests.zig(1 hunks)tests/subscribe_test.zig(1 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-08-25T14:51:33.542Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#31
File: tests/jetstream_pull_test.zig:21-23
Timestamp: 2025-08-25T14:51:33.542Z
Learning: The nats.zig project uses a "tests:beforeEach" test in tests/all_tests.zig that runs before each test to clean up all JetStream streams and consumers, ensuring test isolation. This eliminates the need for unique test resource names as each test starts with a clean JetStream state.
Applied to files:
tests/subscribe_test.zigtests/all_tests.zig
📚 Learning: 2025-08-25T02:49:59.082Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#28
File: src/response_manager.zig:89-105
Timestamp: 2025-08-25T02:49:59.082Z
Learning: In this NATS Zig codebase, subscriptions are reference counted, so calling deinit() on a subscription from multiple locations (like both Connection and ResponseManager) is safe and doesn't cause double-free issues.
Applied to files:
src/subscription.zig
🔇 Additional comments (12)
src/subscription.zig (4)
43-43: Field addition looks correct (owned optional queue group).Defaulting to null is right; ownership is handled in init/deinit below.
58-76: Init: proper ownership and errdefer for subject/queue_group.Good duplication and cleanup on failure; struct init is consistent.
93-96: deinitInternal frees queue_group correctly.Matches ownership from init; no leaks.
109-111: Dispatcher pointer cleared; no extra teardown needed here.Given ref-counted subs (per prior learning), this is fine.
tests/all_tests.zig (1)
8-8: Including subscribe tests: good coverage boost.Brings the new API into the suite.
tests/subscribe_test.zig (2)
8-27: Sync smoke test looks fine.Happy path verified; cleanup is correct.
29-48: Queue sync smoke test looks fine.Covers the queue_group path.
src/connection.zig (5)
572-576: SUB formatting handles queue_group correctly.Good conditional for "SUB " vs "SUB ".
580-595: Async subscribe: handler/dispatcher wiring LGTM.ensureDispatcherPool + assignDispatcher before registering is right.
599-606: subscribeSync: correct use of new init signature.No handler, no queue group; good.
609-624: queueSubscribe: API and wiring look good.Passes queue_group through; logs are fine.
626-636: queueSubscribeSync: API shape matches the others.Good to have the sync variant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/connection.zig (1)
616-631: Reject empty queue group and log it; empty yields an invalid SUB.An empty string would render “SUB ” (double space) which the server will reject.
Apply this diff:
pub fn queueSubscribe(self: *Self, subject: []const u8, queue_group: []const u8, comptime handlerFn: anytype, args: anytype) !*Subscription { + if (queue_group.len == 0) return PublishError.InvalidSubject; const handler = try subscription_mod.createMsgHandler(self.allocator, handlerFn, args); errdefer handler.cleanup(self.allocator); @@ try self.subscribeInternal(sub); - log.debug("Subscribed to {s} with sid {d} (async)", .{ sub.subject, sub.sid }); + log.debug("Subscribed to {s} in queue {s} with sid {d} (async)", .{ sub.subject, queue_group, sub.sid }); return sub; }
🧹 Nitpick comments (1)
src/connection.zig (1)
579-583: Also validate subjects (not just queue groups) at the API boundary.Optional, but catching bad subjects early yields clearer errors than server-side -ERRs. You can reuse isValidNatsToken() in subscribe/subscribeSync/queueSubscribe/queueSubscribeSync.
Happy to push a quick follow-up with these guards and unit tests for rejection paths.
Also applies to: 616-631, 633-643
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
src/connection.zig(4 hunks)
🔇 Additional comments (4)
src/connection.zig (4)
570-575: Good: defensive rollback on failure.Putting the sub in the map, retaining, and errdefer cleanup is correct and avoids leaks on later failures.
587-602: Async subscribe path looks solid.Handler lifetime, SID allocation, dispatcher assignment, and internal subscribe are in the right order.
607-607: Sync subscribe OK.Passing null for queue and handler matches the new init signature.
592-593: Subscription memory ownership verifiedI checked
Subscription.initinsrc/subscription.zigand it explicitly duplicates bothsubject(line 62) andqueue_group(line 65) viaallocator.dupe, then assigns those copies into the struct. The internal cleanup (deinitInternal, lines 90–95) frees both slices before destroying the struct. There’s no use-after-free risk here.
| if (sub.queue_group) |group| { | ||
| try buffer.writer().print("SUB {s} {s} {d}\r\n", .{ sub.subject, group, sub.sid }); | ||
| } else { | ||
| try buffer.writer().print("SUB {s} {d}\r\n", .{ sub.subject, sub.sid }); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
SUB formatting supports queue groups; validate tokens to prevent malformed/unsafe lines.
Subjects/queue groups with whitespace or CR/LF will break the protocol. Add a cheap guard.
Apply this diff inline:
// Send SUB command via buffer
var buffer = ArrayList(u8).init(self.allocator);
defer buffer.deinit();
+ // Validate tokens to avoid protocol injection / malformed SUB
+ if (!isValidNatsToken(sub.subject)) return PublishError.InvalidSubject;
+ if (sub.queue_group) |g| {
+ if (!isValidNatsToken(g)) return PublishError.InvalidSubject;
+ }
if (sub.queue_group) |group| {
try buffer.writer().print("SUB {s} {s} {d}\r\n", .{ sub.subject, group, sub.sid });
} else {
try buffer.writer().print("SUB {s} {d}\r\n", .{ sub.subject, sub.sid });
}Add this helper once in this file (outside the shown range):
fn isValidNatsToken(tok: []const u8) bool {
if (tok.len == 0) return false;
// reject spaces, tabs and CR/LF
return std.mem.indexOfAny(u8, tok, " \t\r\n") == null;
}🤖 Prompt for AI Agents
In src/connection.zig around lines 579-583, the SUB formatting writes subject
and optional queue group directly and can produce malformed/unsafe protocol
lines if tokens contain whitespace or CR/LF; add the suggested
isValidNatsToken(tok: []const u8) bool helper elsewhere in this file (outside
the shown range) and call it before writing: validate sub.subject and, if
present, group; if validation fails, return or propagate an appropriate error
instead of writing the line; only call buffer.writer().print when both tokens
pass isValidNatsToken.
| var buffer = ArrayList(u8).init(self.allocator); | ||
| defer buffer.deinit(); | ||
|
|
||
| var iter = self.subscriptions.iterator(); | ||
| while (iter.next()) |entry| { | ||
| const sub = entry.value_ptr.*; | ||
|
|
||
| // Send SUB command | ||
| var buffer = ArrayList(u8).init(self.allocator); | ||
| defer buffer.deinit(); | ||
|
|
||
| try buffer.writer().print("SUB {s} {d}\r\n", .{ sub.subject, sub.sid }); | ||
| if (sub.queue_group) |queue_group| { | ||
| try buffer.writer().print("SUB {s} {s} {d}\r\n", .{ sub.subject, queue_group, sub.sid }); | ||
| } else { | ||
| try buffer.writer().print("SUB {s} {d}\r\n", .{ sub.subject, sub.sid }); | ||
| } | ||
|
|
||
| // Send directly (bypass buffering since we're reconnecting) | ||
| const stream = self.stream orelse return ConnectionError.ConnectionClosed; | ||
| try stream.writeAll(buffer.items); | ||
|
|
||
| log.debug("Re-subscribed to {s} with sid {d}", .{ sub.subject, sub.sid }); | ||
| buffer.clearRetainingCapacity(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Concurrent direct writes to stream can interleave with the flusher and corrupt the wire.
resendSubscriptions writes SUB lines directly to stream while the flusher thread may also be writing. Because flusher performs I/O without holding the main mutex, there is no mutual exclusion on the socket, so writes can interleave.
Recommended fix: compose all SUBs into a single buffer under subs_mutex, release subs_mutex, then append to the write buffer and signal the flusher. Example full replacement:
fn resendSubscriptions(self: *Self) !void {
log.debug("Re-establishing subscriptions", .{});
var buffer = ArrayList(u8).init(self.allocator);
defer buffer.deinit();
self.subs_mutex.lock();
var iter = self.subscriptions.iterator();
while (iter.next()) |entry| {
const sub = entry.value_ptr.*;
if (sub.queue_group) |queue_group| {
try buffer.writer().print("SUB {s} {s} {d}\r\n", .{ sub.subject, queue_group, sub.sid });
} else {
try buffer.writer().print("SUB {s} {d}\r\n", .{ sub.subject, sub.sid });
}
log.debug("Re-subscribed to {s} with sid {d}", .{ sub.subject, sub.sid });
}
self.subs_mutex.unlock();
// serialize onto the single writer path
self.mutex.lock();
defer self.mutex.unlock();
if (self.status != .connected) return ConnectionError.ConnectionClosed;
try self.bufferWrite(buffer.items, true); // ASAP to get SUBs out before buffered app data
}If you’d rather keep direct writes, introduce and use a dedicated write mutex around all stream I/O (flusher and any direct writers), but centralizing through bufferWrite is simpler and faster.
🤖 Prompt for AI Agents
In src/connection.zig around lines 1351-1371, resendSubscriptions is performing
direct writes to the stream while the flusher may also write, which can
interleave and corrupt the protocol; instead, under subs_mutex build a single
buffer containing all SUB lines, release subs_mutex, then take the main writer
path (lock self.mutex) and call the existing bufferWrite method to append that
buffer and signal the flusher (or write ASAP) while still checking connection
status; alternatively, wrap all stream I/O (flusher and resend path) with a
dedicated write mutex, but prefer centralizing through bufferWrite for
correctness and performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (6)
tests/subscribe_test.zig (6)
6-6: Remove unused scoped logger.
logisn’t used and will trip Zig’s unused-const checks. Drop it or prefix with_if you plan to use it later.Apply this diff:
-const log = std.log.scoped(.testing);
8-23: Sync smoke test is fine; consider a slightly larger timeout to deflake.CI can be noisy; 1s is tight for networked tests. 3s is a safer default.
Apply this diff:
- const msg = sub.nextMsg(1000) orelse return error.Timeout; + const msg = sub.nextMsg(3_000) orelse return error.Timeout;
25-40: Queue sync smoke test: same deflake note as above.Apply this diff:
- const msg = sub.nextMsg(1000) orelse return error.Timeout; + const msg = sub.nextMsg(3_000) orelse return error.Timeout;
42-50: Make message ownership explicit to avoid surprises and ease reuse.Right now the collector keeps ownership and deinit happens in
collector.deinit(). Clearresultwhen handing out the message and deinit at the callsite; this avoids accidental double-frees iftimedWaitis called and also makes ownership obvious in tests.Apply these diffs:
- Clear the stored pointer when returning it:
- return self.result.?; + const msg = self.result.?; + self.result = null; + return msg;
- Deinit at the callsites (async tests):
- const msg = try collector.timedWait(1000); + const msg = try collector.timedWait(3_000); + defer msg.deinit();- const msg = try collector.timedWait(1000); + const msg = try collector.timedWait(3_000); + defer msg.deinit();Also applies to: 59-73, 89-92, 107-110
51-57: Prefer Condition.signal() over broadcast() here.Only one waiter exists, so
signal()is cheaper and avoids waking non-existent waiters.Apply this diff:
- self.cond.broadcast(); + self.cond.signal();
94-110: Missing coverage: validate queue-group load-balancing semantics.Add a test with two subscribers in the same queue group and assert that exactly one receives a single published message.
You can append this test:
test "queueSubscribe delivers to exactly one member" { var conn = try utils.createDefaultConnection(); defer utils.closeConnection(conn); var c1: MessageCollector = .{}; defer c1.deinit(); var c2: MessageCollector = .{}; defer c2.deinit(); const sub1 = try conn.queueSubscribe("test", "workers", MessageCollector.processMsg, .{&c1}); defer sub1.deinit(); const sub2 = try conn.queueSubscribe("test", "workers", MessageCollector.processMsg, .{&c2}); defer sub2.deinit(); try conn.publish("test", "one"); try conn.flush(); const m1 = c1.timedWait(3_000) catch null; const m2 = c2.timedWait(3_000) catch null; // Exactly one should receive the message. try std.testing.expect((m1 != null) != (m2 != null)); if (m1) |msg| msg.deinit(); if (m2) |msg| msg.deinit(); }
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
tests/subscribe_test.zig(1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-25T14:51:33.542Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#31
File: tests/jetstream_pull_test.zig:21-23
Timestamp: 2025-08-25T14:51:33.542Z
Learning: The nats.zig project uses a "tests:beforeEach" test in tests/all_tests.zig that runs before each test to clean up all JetStream streams and consumers, ensuring test isolation. This eliminates the need for unique test resource names as each test starts with a clean JetStream state.
Applied to files:
tests/subscribe_test.zig
TL;DR
Added queue subscription support to the NATS client library.
What changed?
queueSubscribeandqueueSubscribeSyncmethods to the Connection structHow to test?
The PR includes new tests in
subscribe_test.zigthat verify both regular and queue subscriptions:subscribeSyncandqueueSubscribeSynctests for synchronous subscriptionssubscribeandqueueSubscribetests for asynchronous subscriptions with callbacksRun the tests with:
Why make this change?
Queue subscriptions are a core feature of NATS that enable load balancing of messages across multiple subscribers. This implementation allows multiple subscribers to form a queue group where only one member receives each message, enabling horizontal scaling of message processing.