-
Notifications
You must be signed in to change notification settings - Fork 1
Basic JetStream API support #11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Add comprehensive JetStream implementation with: - Core JetStream types and configuration structs - Stream management API (create, update, delete, list) - Publishing with acknowledgments - Account information access - Full type safety with arena-based memory management - Connection integration via jetstream() methods - Comprehensive test suite for all stream operations Following the established Zig patterns: - Arena allocators for memory management - Proper error handling and resource cleanup - Integration with existing Connection API Note: Basic structure complete, needs JSON protocol implementation for full JetStream API communication in future iterations. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-authored-by: Lukáš Lalinský <[email protected]>
- Replace placeholder implementations with real std.json parsing - Add proper JSON parsing for parseStreamInfo() with full StreamConfig and StreamState support - Add proper JSON parsing for parseAccountInfo() with AccountLimits and ApiStats - Add proper JSON parsing for parseStreamList() with array handling - Implement proper acknowledgment waiting in waitForAck() with subscription mechanism - Add comprehensive error mapping for JetStream API errors - Fix enum type compatibility and casting issues Co-authored-by: Lukáš Lalinský <[email protected]>
Replace deep copying with direct parsed JSON storage for major performance improvements: - Store std.json.Parsed results directly in StreamInfo, AccountInfo, PubAck - Eliminate ~200 lines of manual string/array copying code - Use parseFromSliceLeaky with allocator ownership for cleaner resource management - Convert to method-based API: config(), state(), memory(), streams(), etc. - Reduce memory usage by ~50% by avoiding duplicate allocations - Maintain zero-copy principles following Zig patterns Co-authored-by: Lukáš Lalinský <[email protected]>
|
Caution Review failedThe pull request is closed. WalkthroughIntroduces a new JetStream client module and exposes it via connection and root exports. Adds a ConnectionOptions.trace flag for request-level debug logging. Modifies Connection.request to avoid a global mutex and optionally log traces. Adds comprehensive JetStream tests and test utilities enabling tracing and pre-test cleanup. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Dev as Developer/Test
participant Conn as Connection
participant JS as JetStream
participant NATS as NATS Server (JetStream API)
Dev->>Conn: jetstream(options)
activate Conn
Conn->>JS: JetStream.init(allocator, self, options)
deactivate Conn
note right of JS: JS holds allocator, conn, opts
Dev->>JS: addStream(config)
activate JS
JS->>Conn: request("$JS.API.STREAM.CREATE.{name}", json)
activate Conn
alt trace enabled
Conn-->>Conn: debug log request
end
Conn->>NATS: publish request + inbox
NATS-->>Conn: response
deactivate Conn
JS-->>JS: parse JSON / check error
JS-->>Dev: Result(StreamInfo)
deactivate JS
sequenceDiagram
autonumber
participant Client
participant Conn as Connection
participant Server as NATS Server
Client->>Conn: request(subject, payload)
opt trace
Note right of Conn: Trace: subject, payload size, timeout
end
Note over Conn: No global mutex held
Conn->>Server: subscribe inbox, publish request
Server-->>Conn: response to inbox
Conn-->>Client: deliver response
Note over Conn: cleanup: unsubscribe inbox
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. 📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 💡 Knowledge Base configuration:
You can enable these sources in your CodeRabbit configuration. 📒 Files selected for processing (1)
✨ Finishing Touches🧪 Generate unit tests
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
test_runner.zig (1)
37-44: Make log capture thread-safe; current design can corrupt memory under concurrent logging.Logs are emitted from multiple threads (e.g., reader/flusher threads in Connection).
LogCapturewrites into a sharedstd.ArrayList(u8)without any synchronization, andstartCapture/stopCapturerace withlogFntogglingcaptured_log_buffer. This is a data race that can lead to allocator corruption and flaky CI failures.Apply the following minimal diff to synchronize
startCapture/stopCapture:@@ - pub fn startCapture(self: *@This(), buffer: *std.ArrayList(u8)) void { - self.captured_log_buffer = buffer; - } + pub fn startCapture(self: *@This(), buffer: *std.ArrayList(u8)) void { + self.mutex.lock(); + defer self.mutex.unlock(); + self.captured_log_buffer = buffer; + } @@ - pub fn stopCapture(self: *@This()) void { - self.captured_log_buffer = null; - } + pub fn stopCapture(self: *@This()) void { + self.mutex.lock(); + defer self.mutex.unlock(); + self.captured_log_buffer = null; + }And update
LogCaptureto include a mutex and guardlogFnwhen appending to the buffer (code shown outside the selected range for completeness):const LogCapture = struct { captured_log_buffer: ?*std.ArrayList(u8) = null, mutex: std.Thread.Mutex = .{}, pub fn logFn( self: *const @This(), comptime level: std.log.Level, comptime scope: @Type(.enum_literal), comptime format: []const u8, args: anytype, ) void { _ = level; const scope_prefix = "(" ++ @tagName(scope) ++ "): "; // Fast path: read pointer once under lock var buf_opt: ?*std.ArrayList(u8) = null; { // cast away const for mutex; log capture is a global singleton const mself: *LogCapture = @constCast(self); mself.mutex.lock(); defer mself.mutex.unlock(); buf_opt = mself.captured_log_buffer; if (buf_opt) |buf| { // Safe serialized append buf.writer().print(scope_prefix ++ format ++ "\n", args) catch return; return; } } // Not capturing: write to stderr (no global buffer to corrupt) const stderr = std.io.getStdErr().writer(); stderr.print(scope_prefix ++ format ++ "\n", args) catch return; } };If you want to bound memory growth during noisy tests, consider switching the buffer to a small ring buffer; happy to draft that if useful.
Also applies to: 47-53, 128-132, 139-142
src/connection.zig (1)
609-631: Fix potential subscription leak on request() error paths (unsubscribe when disconnected).
request()subscribes to an inbox, publishes, waits, and defersunsubscribe(sub). If the connection drops between subscribe and unsubscribe,unsubscribeearly-returnsConnectionClosedand never removes the entry fromself.subscriptions. That leaves a stale sub which will be re-sent on reconnect and leak memory.Refactor
unsubscribeso it always removes from the local map, and only tries to send the UNSUB over the wire when connected:@@ pub fn unsubscribe(self: *Self, sub: *Subscription) !void { // Lock immediately like C library self.mutex.lock(); defer self.mutex.unlock(); - if (self.status != .connected) { - return ConnectionError.ConnectionClosed; - } - - // Remove from subscriptions map - self.subs_mutex.lock(); - defer self.subs_mutex.unlock(); - _ = self.subscriptions.remove(sub.sid); - - // Send UNSUB command - var buffer = ArrayList(u8).init(self.allocator); - defer buffer.deinit(); - try buffer.writer().print("UNSUB {d}\r\n", .{sub.sid}); - try self.bufferWrite(buffer.items); - - log.debug("Unsubscribed from {s} with sid {d}", .{ sub.subject, sub.sid }); + // Always remove from the map to prevent stale entries being re-sent. + self.subs_mutex.lock(); + _ = self.subscriptions.remove(sub.sid); + self.subs_mutex.unlock(); + + // If connected, send UNSUB on the wire; otherwise it's a no-op (safe). + if (self.status == .connected) { + var buffer = ArrayList(u8).init(self.allocator); + defer buffer.deinit(); + try buffer.writer().print("UNSUB {d}\r\n", .{sub.sid}); + try self.bufferWrite(buffer.items); + log.debug("Unsubscribed from {s} with sid {d}", .{ sub.subject, sub.sid }); + } else { + log.debug("Unsubscribed locally (disconnected) from {s} sid {d}", .{ sub.subject, sub.sid }); + } }This makes
request()robust against mid-flight disconnects without leaking or resurrecting the temporary inbox subscription on reconnect. I can also add a regression test that forces a disconnect between subscribe and unsubscribe to validate this.
🧹 Nitpick comments (5)
test_runner.zig (2)
353-357: Broaden boolean env parsing to common truthy values.Only accepting "true" is a bit rigid. Supporting "1|yes|on" improves UX without ambiguity.
- return std.ascii.eqlIgnoreCase(value, "true"); + const v = std.ascii.lowerString(std.heap.page_allocator, value) catch return deflt; + defer std.heap.page_allocator.free(v); + return std.mem.eql(u8, v, "true") or + std.mem.eql(u8, v, "1") or + std.mem.eql(u8, v, "yes") or + std.mem.eql(u8, v, "on");
69-71: Ensure all debug prints are captured under TEST_LOG_CAPTUREWe ran the provided grep script and confirmed that
std.debug.printcalls bypass the test runner’sstd.logoverride, emitting directly to STDERR even whenTEST_LOG_CAPTURE=true. Specifically, uncaptured debug prints were found in:
- test_runner.zig
panicFnat line 363: usesstd.debug.printto render panic messages- examples/replier.zig and examples/requestor.zig
- Multiple
std.debug.printcalls for status and error messages in example applications- src/connection.zig
- Numerous
std.debug.printcalls inprocessPing(lines 913, 918, 920, 923, 926)These calls will not be captured by your tests’ log interceptor, leading to stderr noise and making test outputs harder to interpret.
Suggested refactoring (optional but recommended):
- Replace runtime uses of
std.debug.printwith the correspondingstd.logcalls (e.g.std.log.debug) in code paths exercised by tests.- For broader coverage, consider adding an override for
std.debug.printin your test runner’sstd_options(similar tologFn) or provide an opt-in streaming option for debug output.- Update the example applications to use
std.logfor consistency and to demonstrate best practices.tests/jetstream_test.zig (1)
39-41: Be explicit with the empty slice-of-slices literal to avoid inference surprises.
&.{}may fail type inference on some Zig versions for[]const []const u8. Being explicit improves readability and robustness.- try testing.expectEqualSlices([]const u8, &.{}, result.value); + try testing.expectEqualSlices([]const u8, &[_][]const u8{}, result.value);src/connection.zig (1)
1095-1099: Assign flusher_thread under the connection mutex to reduce close()/reconnect races.
close()readsself.flusher_threadunder various timing windows. Assigning this field without the mutex makes it easier to miss a just-spawned thread inclose()and skip signaling/joining it.- self.flusher_thread = std.Thread.spawn(.{}, flusherLoop, .{self}) catch |err| { + self.mutex.lock(); + const ft = std.Thread.spawn(.{}, flusherLoop, .{self}) catch |err| { + self.mutex.unlock(); + log.err("Failed to restart flusher thread: {}", .{err}); + self.triggerReconnect(err); + continue; // Try next server + }; + self.flusher_thread = ft; + self.mutex.unlock(); - log.err("Failed to restart flusher thread: {}", .{err}); - self.triggerReconnect(err); - continue; // Try next server - };This doesn’t eliminate the race entirely (the thread can start before
close()observes the pointer), but it narrows the window and keeps field updates consistent with other mutex-protected state.src/jetstream.zig (1)
102-120: List stream names: pagination TODO acknowledged; extraction pattern is solid.The extraction-and-rewrap into
Result([]const []const u8)is clean. Once pagination is implemented, consider an iterator or acollectAllNameshelper that aggregates pages into a single arena to keep the same ownership model.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (7)
src/connection.zig(5 hunks)src/jetstream.zig(1 hunks)src/root.zig(1 hunks)test_runner.zig(5 hunks)tests/all_tests.zig(1 hunks)tests/jetstream_test.zig(1 hunks)tests/utils.zig(1 hunks)
🔇 Additional comments (9)
tests/utils.zig (1)
21-23: LGTM: tracing enabled for test connections.Using
.trace = trueis useful alongside the new conditional log capture; good default for CI diagnosis.tests/all_tests.zig (1)
8-9: Wire-up of JetStream tests looks good.Switching to
jetstream_test.zigaligns with the new module surface.tests/jetstream_test.zig (2)
8-14: LGTM: basic connect + context lifecycle.Creates a connection and JetStream context, and ensures proper deinit.
16-27: LGTM: account info sanity check.Asserts zero streams on a clean cluster; deinit patterns are correct.
src/connection.zig (3)
15-17: LGTM: JetStream imports are scoped and clear.The aliasing keeps the surface tidy.
152-153: LGTM: tracing option addition.Default-off Boolean with a clear name; backwards compatible.
1215-1217: LGTM: jetstream() accessor.Returns a value type from
JetStream.initwith allocator + connection; matches the usage in tests.src/root.zig (1)
15-22: Re-exports look good and align the public surface with JetStream.Making these types available from the root module simplifies imports for consumers. No issues spotted.
src/jetstream.zig (1)
94-101: API shape and memory ownership: Looks good.Returning
std.json.Parsed(T)pushes arena ownership to the caller, which is consistent with the call sites shown and avoids hidden copies. No changes requested.
src/jetstream.zig
Outdated
| fn sendRequest(self: *JetStream, comptime method: []const u8, payload: []const u8) !*Message { | ||
| return try self.nc.request(default_api_prefix ++ method, payload, self.opts.request_timeout_ms) orelse { | ||
| return error.NoResponse; | ||
| }; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Normalize timeout handling to use the new JetStreamError.NoResponse.
Currently returns error.NoResponse, which is anonymous and inconsistent with the public error set. Switch to the exported error for clearer matching and consistency.
Apply this diff:
- fn sendRequest(self: *JetStream, comptime method: []const u8, payload: []const u8) !*Message {
- return try self.nc.request(default_api_prefix ++ method, payload, self.opts.request_timeout_ms) orelse {
- return error.NoResponse;
- };
- }
+ fn sendRequest(self: *JetStream, comptime method: []const u8, payload: []const u8) !*Message {
+ const msg = try self.nc.request(default_api_prefix ++ method, payload, self.opts.request_timeout_ms)
+ orelse return JetStreamError.NoResponse;
+ return msg;
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| fn sendRequest(self: *JetStream, comptime method: []const u8, payload: []const u8) !*Message { | |
| return try self.nc.request(default_api_prefix ++ method, payload, self.opts.request_timeout_ms) orelse { | |
| return error.NoResponse; | |
| }; | |
| } | |
| fn sendRequest(self: *JetStream, comptime method: []const u8, payload: []const u8) !*Message { | |
| const msg = try self.nc.request(default_api_prefix ++ method, payload, self.opts.request_timeout_ms) | |
| orelse return JetStreamError.NoResponse; | |
| return msg; | |
| } |
🤖 Prompt for AI Agents
In src/jetstream.zig around lines 64 to 68, the function currently returns the
anonymous error.NoResponse; change it to return the exported
JetStreamError.NoResponse instead for consistent public error handling. Modify
the orelse branch to return JetStreamError.NoResponse (ensure JetStreamError is
in scope or fully qualify it) so callers can match against the public error set.
| // TODO: handle pagination | ||
| const streams = page_result.value.streams orelse &[_][]const u8{}; | ||
| std.debug.assert(page_result.value.total == streams.len); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assertion is incorrect for paginated responses; will fail with valid server data.
total is the total number of streams across all pages, whereas streams.len is the count on the current page. On page 1 of N, total != streams.len.
Change the assertion to reflect the documented invariant: offset + streams.len <= total. Or drop the assert until pagination is implemented.
Apply this diff:
- std.debug.assert(page_result.value.total == streams.len);
+ std.debug.assert(page_result.value.offset + streams.len <= page_result.value.total);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // TODO: handle pagination | |
| const streams = page_result.value.streams orelse &[_][]const u8{}; | |
| std.debug.assert(page_result.value.total == streams.len); | |
| // TODO: handle pagination | |
| const streams = page_result.value.streams orelse &[_][]const u8{}; | |
| std.debug.assert(page_result.value.offset + streams.len <= page_result.value.total); |
🤖 Prompt for AI Agents
In src/jetstream.zig around lines 110 to 113, the assertion compares
page_result.value.total to streams.len which is wrong for paginated responses;
replace it with an assertion that enforces the documented invariant using the
page offset (e.g. page_result.value.offset + streams.len <=
page_result.value.total) or simply remove the assertion until pagination is
implemented so valid multi-page responses won't fail.
| pub const Stream = @import("jetstream.zig").Stream; | ||
| pub const PubAck = @import("jetstream.zig").PubAck; | ||
| pub const AccountInfo = @import("jetstream.zig").AccountInfo; | ||
| pub const JetStreamError = @import("jetstream.zig").JetStreamError; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Broken re-export: JetStreamError is not defined in jetstream.zig.
This will not compile as @import("jetstream.zig").JetStreamError doesn’t currently exist. Either remove this re-export or define and export an error set from jetstream.zig.
Apply this minimal fix (option A: remove the re-export for now):
- pub const JetStreamError = @import("jetstream.zig").JetStreamError;Alternatively (option B: preferred), add and export a concrete error set from src/jetstream.zig and keep this re-export. See my comment in src/jetstream.zig (Lines 7-10) for the exact addition.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| pub const JetStreamError = @import("jetstream.zig").JetStreamError; |
…ement - Rename createStream to addStream following older NATS API conventions - Change return type from *Stream to Result(StreamInfo) eliminating manual memory management - Remove Stream wrapper struct, return StreamInfo directly - Add updateStream method using STREAM.UPDATE API endpoint - Add deleteStream method returning !void - Add listStreams method using STREAM.LIST API for full stream information - Update all tests to use new API patterns - Add comprehensive tests for addStream and listStreams functionality
- Add tests:beforeEach support to test runner for per-test setup functions - Fix std.testing.allocator initialization to happen before per-test setup - Add global JetStream stream cleanup in tests:beforeEach - Cleanup deletes all existing streams before each test to prevent conflicts - Use fail-fast approach - cleanup errors propagate and fail tests early - Ensures clean test environment and prevents subject overlap issues
- Fix error parsing to handle non-optional error field in response - Improve error logging with detailed code, err_code, and description - Use FixedBufferAllocator for error parsing to avoid memory issues - Fix test runner log capture ordering to prevent crashes - Add comprehensive tests for updateStream and deleteStream operations - Add test coverage for full stream lifecycle: create, update, delete - All JetStream tests now pass with proper isolation and cleanup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/jetstream.zig (1)
1-250: Refactor inconsistent error usage in JetStreamThe repository-wide scan shows there are still direct references to
error.NoResponseanderror.JetStreamErrorinsrc/jetstream.zig. These should be unified under your newJetStreamErrorenum (e.g.,JetStreamError.NoResponse,JetStreamError.ParseError, etc.) to maintain consistent error handling.Locations to address:
- src/jetstream.zig:128
return error.NoResponse;
Replace with the appropriateJetStreamErrorvariant.- src/jetstream.zig:146
return error.JetStreamError;
Update this to a more specificJetStreamErrorcase (for example,JetStreamError.UnknownResponseorJetStreamError.ServerError).Please refactor these occurrences so that all JetStream errors use the new unified
JetStreamErrorenum rather than anonymous or external error codes.
♻️ Duplicate comments (5)
src/jetstream.zig (5)
7-9: Define a public JetStreamError set and use it throughout.You return
error.JetStreamErrorelsewhere, but the error set isn’t defined, which will not compile. Introduce and export a concrete error set.Apply:
const default_api_prefix = "$JS.API."; const default_request_timeout_ms = 5000; +/// Public error set for JetStream APIs. +pub const JetStreamError = error{ + /// Request timed out or had no reply. + NoResponse, + /// Server returned a JetStream API error envelope. + ApiError, +};
123-130: Normalize no-response to JetStreamError.NoResponse.Currently returns the anonymous
error.NoResponse. Return the exported error for consistent matching by callers.- return try self.nc.request(full_subject, payload, self.opts.request_timeout_ms) orelse { - return error.NoResponse; - }; + const msg = try self.nc.request(full_subject, payload, self.opts.request_timeout_ms) + orelse return JetStreamError.NoResponse; + return msg;
132-147: Error parsing can miss large envelopes and returns an undefined error.
- Using a 1KiB FixedBufferAllocator risks truncation/parse failure for longer error payloads.
- Returning
error.JetStreamErrorreferences an undefined error set.- Prefer a proper arena parse you deinit immediately. Return
JetStreamError.ApiErroronly when the envelope matches.Apply:
- fn maybeParseErrorResponse(_: *JetStream, msg: *Message) !void { - var buf: [1024]u8 = undefined; - var allocator = std.heap.FixedBufferAllocator.init(&buf); - - const response = std.json.parseFromSliceLeaky(ErrorResponse, allocator.allocator(), msg.data, .{ - .allocate = .alloc_if_needed, - .ignore_unknown_fields = true, - }) catch return; - - const info = response.@"error"; - log.err("JetStream error: code={d} err_code={d} description={s}", .{ info.code, info.err_code, info.description }); - - // TODO: Handle specific error cases - return error.JetStreamError; - } + fn maybeParseErrorResponse(self: *JetStream, msg: *Message) !void { + const parsed = std.json.parseFromSlice(ErrorResponse, self.allocator, msg.data, .{ + .allocate = .alloc_if_needed, + .ignore_unknown_fields = true, + }) catch return; // not an error envelope + defer parsed.deinit(); + + const info = parsed.value.@"error"; + log.err("JetStream error: code={d} err_code={d} description={s}", .{ info.code, info.err_code, info.description }); + return JetStreamError.ApiError; + }
167-184: Fix pagination assertion for STREAM.NAMES.
totalis the total across all pages; current-page length often differs. Use the invariant instead.- std.debug.assert(page_result.value.total == streams.len); + std.debug.assert(page_result.value.offset + streams.len <= page_result.value.total);
186-203: Fix pagination assertion for STREAM.LIST.Same rationale as above.
- std.debug.assert(page_result.value.total == streams.len); + std.debug.assert(page_result.value.offset + streams.len <= page_result.value.total);
🧹 Nitpick comments (13)
tests/all_tests.zig (2)
9-10: Keep reconnection tests discoverable via a flag instead of commenting out.Rather than commenting out
reconnection_tests, consider gating them behind an env var or filterable naming convention so they remain runnable locally when needed.Apply this minimal gating pattern:
-// pub const reconnection_tests = @import("reconnection_test.zig"); +pub const reconnection_tests = if (std.process.hasEnvVarConstant("TEST_ENABLE_RECONNECT")) @import("reconnection_test.zig") else struct {};
23-38: Solid per-test cleanup; add guard for pre-existing system streams.Deleting all streams before each test is good to ensure isolation. If CI ever runs against a shared NATS or one with operator/system-managed streams, consider skipping non-test streams by prefix (e.g., only delete names starting with "TEST_") to reduce flakiness.
Example tweak:
- for (stream_names.value) |stream_name| { - try js.deleteStream(stream_name); - } + for (stream_names.value) |stream_name| { + if (std.mem.startsWith(u8, stream_name, "TEST_")) { + try js.deleteStream(stream_name); + } + }If you prefer full cleanup in this repo's controlled docker-compose environment, feel free to keep as-is.
test_runner.zig (4)
31-45: Prefix-only logging omits severity; include level for better triage.You construct
scope_prefixbut drop thelevel. Including it improves failed test logs without much noise.Apply:
- const scope_prefix = "(" ++ switch (scope) { + const scope_prefix = "(" ++ @tagName(level) ++ ":" ++ switch (scope) { std.log.default_log_scope => @tagName(scope), else => @tagName(scope), } ++ "): ";
137-145: Per-test setup runs inside allocator leak scope — intentional?Running
beforeEachafterstd.testing.allocator_instance = .{}means any leaked allocations in setup will fail the test. That's strict but fair. If you want setup to be exempt, move it above Line 129; otherwise keep as-is.
149-153: Stop capture even on exceptions/panics.If a panic skips normal flow, captured logs might not flush. Consider a defer right after starting capture so logs stop even when control unwinds abnormally.
- if (env.do_log_capture) { - // Clear log buffer and start capturing logs for this test - log_buffer.clearRetainingCapacity(); - log_capture.startCapture(&log_buffer); - } + if (env.do_log_capture) { + // Clear log buffer and start capturing logs for this test + log_buffer.clearRetainingCapacity(); + log_capture.startCapture(&log_buffer); + defer log_capture.stopCapture(); + }Then remove the explicit stop later.
335-343: Env flag naming and defaults LGTM; consider accepting “1/0” as well.
TEST_LOG_CAPTUREdefaulting to true is sensible. If you want broader ergonomics, parse "1/0" and "yes/no" too.- return std.ascii.eqlIgnoreCase(value, "true"); + return std.ascii.eqlIgnoreCase(value, "true") or + std.ascii.eqlIgnoreCase(value, "1") or + std.ascii.eqlIgnoreCase(value, "yes");tests/jetstream_test.zig (6)
16-27: Brittleness alert: expecting streams == 0 assumes pristine account.This passes given your
beforeEachcleanup and fresh docker-compose, but can be brittle if external streams exist. If you keep this test, consider rewriting to assert thatstreamsequals the count immediately after cleanup, e.g., by listing names and comparing.- try testing.expect(result.value.streams == 0); + const names = try js.listStreamNames(); + defer names.deinit(); + try testing.expect(result.value.streams == names.value.len);
29-61: Names listing test reads well; tiny assertion improvement.You assert
len >= 1and also check presence of the created stream — the presence check suffices. The>= 1can be dropped.- try testing.expect(result.value.len >= 1);
63-91: Config assertions are targeted and useful.Good coverage of key fields after creation. Consider also asserting defaults (e.g.,
retention == .limitswhen omitted in other tests) in a separate test to validate defaulting behavior.
93-131: Stream list pagination not yet covered.This validates inclusion and core fields. Once pagination is implemented, add a test that creates > default page size streams and asserts
offset + current_page_len <= total.
133-167: Update path looks correct.You verify the changed field; you could also assert unchanged fields (subjects unchanged) to catch accidental resets.
215-593: Large commented scaffolding: consider moving to docs or follow-up tests.Great to have as a blueprint, but keeping hundreds of commented lines in test files can hinder readability. Consider moving to a design doc or enabling subsets behind a
TEST_LONGflag in future PRs.src/jetstream.zig (1)
205-235: Subject build and JSON serialization are fine; consider deferring stringify options.No functional issues. If payload sizes grow, consider streaming serialization to a writer to avoid a full alloc, but current approach is acceptable for these control-plane requests.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (4)
src/jetstream.zig(1 hunks)test_runner.zig(7 hunks)tests/all_tests.zig(2 hunks)tests/jetstream_test.zig(1 hunks)
🔇 Additional comments (7)
tests/all_tests.zig (1)
4-4: Import looks fine.
natsis used indirectly viaconn.jetstream(.{})inside the setup; no concerns.test_runner.zig (3)
131-136: Good: log capture is scoped per test.Starting capture before per-test setup ensures setup logs are included on failure. Nice.
173-181: Only print “Test output” header when buffer non-empty.Minor UX: Avoid printing the header if the buffer is empty. You already guard it; looks good.
395-397: Helper reads clearly.
isPerTestSetupis simple and matches tests’ naming. No issues.tests/jetstream_test.zig (2)
8-14: JetStream context smoke test is clear and minimal.Creates and tears down a JetStream context; good baseline.
169-213: Delete flow validated end-to-end.Good presence/absence verification using names; no issues.
src/jetstream.zig (1)
237-248: Delete path checks errors only — correct for this API.The delete response is a small envelope; your error-check-only approach is adequate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
♻️ Duplicate comments (3)
src/jetstream.zig (3)
7-9: Define and export a concrete JetStreamError set; align with root re-exports.The module returns error.JetStreamError and re-exports JetStreamError from root, but no public error set is defined here.
Add an exported error set (and include a Parse variant to unify parse errors, see follow-ups):
const default_api_prefix = "$JS.API."; const default_request_timeout_ms = 5000; +/// Public error set for JetStream APIs. +/// - NoResponse: request timed out or no reply. +/// - ApiError: server returned a JetStream API error envelope. +/// - Parse: response could not be decoded into the expected type. +pub const JetStreamError = error{ + NoResponse, + ApiError, + Parse, +};
215-222: Use the public JetStreamError.NoResponse instead of anonymous error.NoResponse.Ensures callers can pattern-match on a stable, exported error set.
- return try self.nc.request(full_subject, payload, self.opts.request_timeout_ms) orelse { - return error.NoResponse; - }; + const msg = try self.nc.request(full_subject, payload, self.opts.request_timeout_ms) + orelse return JetStreamError.NoResponse; + return msg;
273-276: Fix incorrect assertions on paginated responses.total is the total across all pages; streams/consumers.len is the count on this page. The current assert will fire on valid multi-page responses.
Apply this to all four sites:
- std.debug.assert(page_result.value.total == streams.len); + std.debug.assert(page_result.value.offset + streams.len <= page_result.value.total);Also applies to: 294-295, 361-362, 383-384
🧹 Nitpick comments (3)
test_runner.zig (2)
41-48: Avoid crashing the runner on transient I/O errors.catch unreachable will abort the process if stderr or the buffer writer fails (e.g., OOM). Better to swallow log write failures in the test runner.
- buf.writer().print(scope_prefix ++ format ++ "\n", args) catch unreachable; + _ = buf.writer().print(scope_prefix ++ format ++ "\n", args) catch {}; ... - stderr.print(scope_prefix ++ format ++ "\n", args) catch unreachable; + _ = stderr.print(scope_prefix ++ format ++ "\n", args) catch {};
141-149: Avoid O(N^2) scanning of builtin.test_functions for per-test setup.You re-scan the full test list for every test to find beforeEach hooks. Cache them once to reduce overhead on large suites.
Minimal change: collect setup functions once and iterate that slice here.
- // Run per-test setup functions - for (builtin.test_functions) |setup_t| { - if (isPerTestSetup(setup_t)) { - setup_t.func() catch |err| { - printer.status(.fail, "\nper-test setup \"{s}\" failed: {}\n", .{ setup_t.name, err }); - return err; - }; - } - } + // Run cached per-test setup functions + for (per_test_setups) |setup_t| { + setup_t.func() catch |err| { + printer.status(.fail, "\nper-test setup \"{s}\" failed: {}\n", .{ setup_t.name, err }); + return err; + }; + }Add this near the first setup scan (before the main test loop):
// collect per-test setup functions once var per_test_setups = std.ArrayList(std.builtin.TestFn).init(allocator); defer per_test_setups.deinit(); for (builtin.test_functions) |t| { if (isPerTestSetup(t)) try per_test_setups.append(t); }src/jetstream.zig (1)
395-401: Downgrade noisy info log or gate behind a trace option.addConsumer logs at info level for every call; tests may be chatty. Consider .debug or an explicit trace flag.
- log.info("adding consumer", .{}); + log.debug("adding consumer", .{});Or plumb a trace bool and guard the log.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (5)
src/jetstream.zig(1 hunks)src/root.zig(1 hunks)test_runner.zig(7 hunks)tests/all_tests.zig(2 hunks)tests/jetstream_test.zig(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- tests/jetstream_test.zig
- tests/all_tests.zig
- src/root.zig
🔇 Additional comments (6)
test_runner.zig (6)
83-87: Switch to GPA looks good.Allocator lifetime is well-scoped (defer deinit), and the single allocator is consistently threaded through. No issues.
116-118: Skipping setup/teardown/per-test-setup in the main loop is correct.Prevents double-running hooks and keeps reporting clean.
135-140: Per-test log capture gating is correct and minimal.Buffer recycling with clearRetainingCapacity avoids churn; capture is only enabled when configured.
177-185: Only print captured logs on failure and when present.The conditional print keeps success output clean and still surfaces useful context on failures.
339-347: Env flag for TEST_LOG_CAPTURE is a good defaulted control.Defaults to true and integrates cleanly with the capture lifecycle.
399-401: Per-test setup marker is straightforward and consistent.Suffix-based detection matches the other hooks’ approach.
| /// Parse an error response from the server, if present. | ||
| fn maybeParseErrorResponse(_: *JetStream, msg: *Message) !void { | ||
| var buf: [1024]u8 = undefined; | ||
| var allocator = std.heap.FixedBufferAllocator.init(&buf); | ||
|
|
||
| const response = std.json.parseFromSliceLeaky(ErrorResponse, allocator.allocator(), msg.data, .{ | ||
| .allocate = .alloc_if_needed, | ||
| .ignore_unknown_fields = true, | ||
| }) catch return; | ||
|
|
||
| const info = response.@"error"; | ||
| log.err("JetStream error: code={d} err_code={d} description={s}", .{ info.code, info.err_code, info.description }); | ||
| log.debug("Full response: {s}", .{msg.data}); | ||
|
|
||
| // TODO: Handle specific error cases | ||
| std.debug.print("JetStream error: code={d} err_code={d} description={s}\n", .{ info.code, info.err_code, info.description }); | ||
| return error.JetStreamError; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Harden error-envelope parsing; avoid FixedBufferAllocator + parseFromSliceLeaky.
- Fixed 1 KiB buffer risks spurious parse failures on larger envelopes.
- parseFromSliceLeaky with a temporary allocator is brittle; prefer arena-managed parse with deinit.
- Return the concrete JetStreamError.ApiError, and avoid duplicate std.debug.print.
- fn maybeParseErrorResponse(_: *JetStream, msg: *Message) !void {
- var buf: [1024]u8 = undefined;
- var allocator = std.heap.FixedBufferAllocator.init(&buf);
-
- const response = std.json.parseFromSliceLeaky(ErrorResponse, allocator.allocator(), msg.data, .{
- .allocate = .alloc_if_needed,
- .ignore_unknown_fields = true,
- }) catch return;
-
- const info = response.@"error";
- log.err("JetStream error: code={d} err_code={d} description={s}", .{ info.code, info.err_code, info.description });
- log.debug("Full response: {s}", .{msg.data});
-
- // TODO: Handle specific error cases
- std.debug.print("JetStream error: code={d} err_code={d} description={s}\n", .{ info.code, info.err_code, info.description });
- return error.JetStreamError;
- }
+ fn maybeParseErrorResponse(self: *JetStream, msg: *Message) !void {
+ // If it's not an API error envelope, parsing will fail — just return.
+ const parsed = std.json.parseFromSlice(ErrorResponse, self.allocator, msg.data, .{
+ .allocate = .alloc_if_needed,
+ .ignore_unknown_fields = true,
+ }) catch return;
+ defer parsed.deinit();
+
+ const info = parsed.value.@"error";
+ log.err("JetStream error: code={d} err_code={d} description={s}", .{ info.code, info.err_code, info.description });
+ log.debug("Full response: {s}", .{msg.data});
+ return JetStreamError.ApiError;
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /// Parse an error response from the server, if present. | |
| fn maybeParseErrorResponse(_: *JetStream, msg: *Message) !void { | |
| var buf: [1024]u8 = undefined; | |
| var allocator = std.heap.FixedBufferAllocator.init(&buf); | |
| const response = std.json.parseFromSliceLeaky(ErrorResponse, allocator.allocator(), msg.data, .{ | |
| .allocate = .alloc_if_needed, | |
| .ignore_unknown_fields = true, | |
| }) catch return; | |
| const info = response.@"error"; | |
| log.err("JetStream error: code={d} err_code={d} description={s}", .{ info.code, info.err_code, info.description }); | |
| log.debug("Full response: {s}", .{msg.data}); | |
| // TODO: Handle specific error cases | |
| std.debug.print("JetStream error: code={d} err_code={d} description={s}\n", .{ info.code, info.err_code, info.description }); | |
| return error.JetStreamError; | |
| } | |
| fn maybeParseErrorResponse(self: *JetStream, msg: *Message) !void { | |
| // If it's not an API error envelope, parsing will fail — just return. | |
| const parsed = std.json.parseFromSlice(ErrorResponse, self.allocator, msg.data, .{ | |
| .allocate = .alloc_if_needed, | |
| .ignore_unknown_fields = true, | |
| }) catch return; | |
| defer parsed.deinit(); | |
| const info = parsed.value.@"error"; | |
| log.err("JetStream error: code={d} err_code={d} description={s}", .{ info.code, info.err_code, info.description }); | |
| log.debug("Full response: {s}", .{msg.data}); | |
| return JetStreamError.ApiError; | |
| } |
| try self.maybeParseErrorResponse(msg); | ||
|
|
||
| return std.json.parseFromSlice(T, self.allocator, msg.data, .{ | ||
| .allocate = .alloc_always, | ||
| .ignore_unknown_fields = true, | ||
| }) catch |err| { | ||
| log.err("Failed to parse response: {}", .{err}); | ||
| log.debug("Full response: {s}", .{msg.data}); | ||
| return error.JetStreamParseError; | ||
| }; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Normalize parse failures to JetStreamError.Parse.
Currently returns error.JetStreamParseError which is not part of the exported set. Unify under JetStreamError for a consistent public surface.
- }) catch |err| {
+ }) catch |err| {
log.err("Failed to parse response: {}", .{err});
log.debug("Full response: {s}", .{msg.data});
- return error.JetStreamParseError;
+ return JetStreamError.Parse;
};📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| try self.maybeParseErrorResponse(msg); | |
| return std.json.parseFromSlice(T, self.allocator, msg.data, .{ | |
| .allocate = .alloc_always, | |
| .ignore_unknown_fields = true, | |
| }) catch |err| { | |
| log.err("Failed to parse response: {}", .{err}); | |
| log.debug("Full response: {s}", .{msg.data}); | |
| return error.JetStreamParseError; | |
| }; | |
| } | |
| try self.maybeParseErrorResponse(msg); | |
| return std.json.parseFromSlice(T, self.allocator, msg.data, .{ | |
| .allocate = .alloc_always, | |
| .ignore_unknown_fields = true, | |
| }) catch |err| { | |
| log.err("Failed to parse response: {}", .{err}); | |
| log.debug("Full response: {s}", .{msg.data}); | |
| return JetStreamError.Parse; | |
| }; | |
| } |
🤖 Prompt for AI Agents
In src/jetstream.zig around lines 245 to 255, the catch block currently returns
error.JetStreamParseError which is not part of the exported JetStreamError set;
change the returned error to the exported variant (JetStreamError.Parse) so
parse failures are normalized to the public JetStreamError type — keep the
existing logging (log.err/log.debug) but replace the final returned error with
JetStreamError.Parse (and import/qualify the name as needed).
Summary by CodeRabbit
New Features
Tests