-
Notifications
You must be signed in to change notification settings - Fork 1
Change nextMsg() to return error.Timeout instead of null #83
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- Update nextMsg() return type from ?*Message to error{Timeout}!*Message
- Map all queue errors (QueueEmpty, QueueClosed, BufferFrozen) to error.Timeout
- Update JetStream nextMsg() wrapper to propagate errors correctly
- Update all call sites across tests, examples, and benchmarks
- Replace `orelse return error.Timeout` patterns with `try` for direct error propagation
- Replace `if (nextMsg()) |msg|` patterns with `if (nextMsg() catch null) |msg|` for optional handling
Fixes #75
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-authored-by: Lukáš Lalinský <[email protected]>
WalkthroughConvert nextMsg APIs to return a non-null message or explicit error.Timeout, update JetStream fetch/nextMsg to surface Timeout, and adapt callers (benchmarks, examples, tests, README) to use try/catch or catch-based handling and move per-message deinit immediately after retrieval. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Assessment against linked issues
Out-of-scope changes
Possibly related PRs
📜 Recent review detailsConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro 💡 Knowledge Base configuration:
You can enable these sources in your CodeRabbit configuration. 📒 Files selected for processing (6)
🚧 Files skipped from review as they are similar to previous changes (6)
✨ Finishing Touches🧪 Generate unit tests
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
Status, Documentation and Community
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
examples/replier.zig (1)
29-31: Fix API mismatch:sub.deinit(allocator)no longer matches the Subscription API.
Subscription.deinittakes no parameters now (it uses the stored allocator internally). This example won’t compile as-is.Apply:
- sub.deinit(allocator); + sub.deinit();
🧹 Nitpick comments (7)
tests/headers_test.zig (1)
28-28: Don’t silently swallow future non-timeout errors; catch Timeout explicitly.
catch nullis fine today becausenextMsgonly returnserror.Timeout, but this pattern would also hide unexpected errors if the error set grows. Prefer catchingerror.Timeoutand surfacing others.Apply this pattern to all three occurrences:
-if (sub.nextMsg(1000) catch null) |received_msg| { +if (sub.nextMsg(1000) catch |err| switch (err) { + error.Timeout => null, + else => return err, +}) |received_msg| {Also applies to: 69-69, 139-139
tests/minimal_test.zig (1)
32-32: Prefer catching only Timeout to avoid masking other errors.Same rationale as other tests: restrict the
catchso unexpected errors don’t look like “no message.”-if (sub.nextMsg(100) catch null) |msg| { +if (sub.nextMsg(100) catch |err| switch (err) { + error.Timeout => null, + else => return err, +}) |msg| {benchmarks/echo_server.zig (1)
44-44: Benchmark: log unexpected errors, still ignore timeouts.For long-running loops, don’t swallow non-timeout errors silently. Log them and keep going.
-if (sub.nextMsg(1000) catch null) |msg| { +if (sub.nextMsg(1000) catch |err| switch (err) { + error.Timeout => null, + else => { std.debug.print("nextMsg error: {}\n", .{err}); null }, +}) |msg| {examples/sub.zig (1)
28-28: Example: be explicit about Timeout; pass through other errors.Keeps the tutorial behavior the same for timeouts while not hiding other failures.
-if (sub.nextMsg(100) catch null) |msg| { +if (sub.nextMsg(100) catch |err| switch (err) { + error.Timeout => null, + else => return err, +}) |msg| {tests/jetstream_sync_test.zig (1)
79-86: Timeout assertion updated correctly; consider monotonic timer.Logic matches the new API. If you ever see flakiness on CI clocks, switch to std.time.Timer for a monotonic duration check.
- const start = std.time.milliTimestamp(); - const result = sync_sub.nextMsg(100); // 100ms timeout - const duration = std.time.milliTimestamp() - start; + var timer = try std.time.Timer.start(); + const result = sync_sub.nextMsg(100); // 100ms timeout + const duration = timer.read() / std.time.ns_per_ms;src/jetstream.zig (2)
513-518: Handle-only-Timeout branch is fine; add trace for visibility.Given nextMsg now only errors with Timeout, this switch is correct. A small debug log helps when diagnosing partial batches.
- } else |err| switch (err) { - error.Timeout => { - // Timeout occurred - batch_complete = true; - }, - } + } else |err| switch (err) { + error.Timeout => { + // Timeout occurred + log.debug("Pull inbox nextMsg timed out after {d} ms", .{ timeout_ms * 2 }); + batch_complete = true; + }, + }
556-564: Log underlying createJetStreamMessage errors before mapping to Timeout
createJetStreamMessage can fail with allocator OOM or parse errors; logerrin the catch block so failures aren’t silently hidden.- const js_msg = jetstream_message.createJetStreamMessage(self.js.nc, msg) catch { - msg.deinit(); // Clean up on error - return error.Timeout; // Convert any error to Timeout for API consistency - }; + const js_msg = jetstream_message.createJetStreamMessage(self.js.nc, msg) catch |err| { + msg.deinit(); // Clean up on error + log.err("nextMsg: failed to wrap JetStream message: {}", .{err}); + return error.Timeout; // API contract: surface only Timeout + };
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (10)
benchmarks/echo_server.zig(1 hunks)benchmarks/subscriber.zig(1 hunks)examples/replier.zig(1 hunks)examples/sub.zig(1 hunks)src/jetstream.zig(2 hunks)src/subscription.zig(1 hunks)tests/headers_test.zig(3 hunks)tests/jetstream_sync_test.zig(3 hunks)tests/minimal_test.zig(1 hunks)tests/subscribe_test.zig(2 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.zig
📄 CodeRabbit inference engine (CLAUDE.md)
Format code using
zig fmtbefore commit
Files:
tests/headers_test.zigbenchmarks/echo_server.zigexamples/replier.zigtests/minimal_test.zigbenchmarks/subscriber.zigsrc/subscription.zigexamples/sub.zigsrc/jetstream.zigtests/subscribe_test.zigtests/jetstream_sync_test.zig
🧠 Learnings (2)
📚 Learning: 2025-08-30T07:40:28.186Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#60
File: src/queue.zig:390-393
Timestamp: 2025-08-30T07:40:28.186Z
Learning: In the nats.zig project, the maintainer prefers to rely on Zig's built-in integer overflow detection rather than implementing manual saturated conversions for timeout calculations (timeout_ms * std.time.ns_per_ms). Zig automatically fails on integer overflow in Debug and ReleaseSafe modes.
Applied to files:
examples/sub.zigsrc/jetstream.zig
📚 Learning: 2025-08-25T14:51:33.570Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#31
File: tests/jetstream_pull_test.zig:21-23
Timestamp: 2025-08-25T14:51:33.570Z
Learning: The nats.zig project uses a "tests:beforeEach" test in tests/all_tests.zig that runs before each test to clean up all JetStream streams and consumers, ensuring test isolation. This eliminates the need for unique test resource names as each test starts with a clean JetStream state.
Applied to files:
tests/jetstream_sync_test.zig
🔇 Additional comments (6)
examples/replier.zig (1)
38-38: LGTM:catch nullmatches the new API and keeps the loop simple.This correctly treats
error.Timeoutas "no message."benchmarks/subscriber.zig (1)
45-45: LGTM: call site updated correctly.
catch nullis appropriate here since timeouts are expected during the benchmark run loop.tests/subscribe_test.zig (2)
18-18: LGTM: propagate timeout via try (matches new API).This is the right adjustment for
error{Timeout}!*Message.
35-35: LGTM: consistent use of try here as well.Keeps the test failing fast on real timeouts.
tests/jetstream_sync_test.zig (2)
40-47: Good switch to error-union API with try.This aligns with nextMsg now returning error{Timeout}!*JetStreamMessage. Defer cleanup + explicit ack are correct.
123-132: Multi-message path updated correctly to use try and explicit ack.Looks good. Per-message defer deinit is the right call.
- Update benchmarks and examples to use catch break for loop exit - Update tests to return error instead of using catch null - Fix subscription.zig error handling to explicitly map PopError variants - Fix examples/replier.zig deinit call (remove allocator parameter) - Update README example to show proper error handling 🤖 Generated with [Claude Code](https://claude.ai/code) Co-authored-by: Lukáš Lalinský <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (7)
README.md (1)
52-57: Tighten example: no need to binderr; make the timeout message explicit.The API only returns
error.Timeouthere. You can drop the binding and print a clearer message.-var msg = sub.nextMsg(5000) catch |err| { - std.debug.print("No message received: {}\n", .{err}); +var msg = sub.nextMsg(5000) catch { + std.debug.print("No message received (timeout)\n", .{}); return; }; defer msg.deinit(); std.debug.print("Received: {s}\n", .{msg.data});benchmarks/echo_server.zig (2)
44-63: Pattern switch to error-union looks good; confirmbreakon timeout is intended.Adopts the requested
var msg = sub.nextMsg(1000) catch break;pattern and defers cleanup correctly. One question: breaking the loop on the first timeout stops the server after 1s of inactivity. The C benchmark continues on timeout. If you want to keep serving until a signal, considercatch continue;.Alternative:
- var msg = sub.nextMsg(1000) catch break; + var msg = sub.nextMsg(1000) catch continue;
50-57: Micro: avoid the temporary forpublish; inline the error-unionif.Same behavior, fewer temporaries.
- if (msg.reply) |reply_subject| { - const result = conn.publish(reply_subject, msg.data); - if (result) { + if (msg.reply) |reply_subject| { + if (conn.publish(reply_subject, msg.data)) { stats.success_count += 1; } else |err| { std.debug.print("Failed to send echo reply: {}\n", .{err}); stats.error_count += 1; } }benchmarks/subscriber.zig (1)
45-53: Same note as echo server: verifybreakvs.continueon timeout.Change matches the new API and defers cleanup correctly. If continuous benchmarking is desired, swap
breakforcontinue.- var msg = sub.nextMsg(1000) catch break; + var msg = sub.nextMsg(1000) catch continue; defer msg.deinit();examples/sub.zig (1)
28-36: Good timeout retry loop; minor polish: increment before sleep to keep timing closer to 5s.Functionally fine. If you care about the total wall time, bump the counter first so the last sleep doesn't overshoot.
- var msg = sub.nextMsg(100) catch { - std.time.sleep(100 * std.time.ns_per_ms); - attempts += 1; + var msg = sub.nextMsg(100) catch { + attempts += 1; + std.time.sleep(100 * std.time.ns_per_ms); continue; };src/subscription.zig (1)
120-126: Explicit error narrowing is correct; add docs and assert invariants.Mapping only
QueueEmpty→error.Timeoutand treatingQueueClosed/BufferFrozenas unreachable matches the intended API. Add a short doc comment and an assertion to make the invariant obvious in debug builds.- pub fn nextMsg(self: *Subscription, timeout_ms: u64) error{Timeout}!*Message { - return self.messages.pop(timeout_ms) catch |err| switch (err) { - error.QueueEmpty => error.Timeout, - error.QueueClosed => unreachable, - error.BufferFrozen => unreachable, - }; - } + /// Blocks up to `timeout_ms` milliseconds and returns the next message. + /// Returns `error.Timeout` if no message arrives before the deadline. + pub fn nextMsg(self: *Subscription, timeout_ms: u64) error{Timeout}!*Message { + return self.messages.pop(timeout_ms) catch |err| switch (err) { + error.QueueEmpty => error.Timeout, + error.QueueClosed => unreachable, // subscription should not be used after deinit + error.BufferFrozen => unreachable, + }; + }If
QueueClosedcan be observed by user code during shutdown, consider mapping it toerror.Timeoutas well to keep the public error set stable.examples/replier.zig (1)
41-41: Optional: trim noisy payload logging.For large or binary payloads, logging length is cleaner.
- std.debug.print("Received msg: {s} - {s}\n", .{ msg.subject, msg.data }); + std.debug.print("Received msg on {s}, len={d}\n", .{ msg.subject, msg.data.len });
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (8)
README.md(1 hunks)benchmarks/echo_server.zig(1 hunks)benchmarks/subscriber.zig(1 hunks)examples/replier.zig(1 hunks)examples/sub.zig(1 hunks)src/subscription.zig(1 hunks)tests/headers_test.zig(3 hunks)tests/minimal_test.zig(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- tests/minimal_test.zig
- tests/headers_test.zig
🧰 Additional context used
📓 Path-based instructions (2)
README.md
📄 CodeRabbit inference engine (CLAUDE.md)
Read README.md for an overview of the project
Files:
README.md
**/*.zig
📄 CodeRabbit inference engine (CLAUDE.md)
Format code using
zig fmtbefore commit
Files:
src/subscription.zigexamples/sub.zigbenchmarks/echo_server.zigexamples/replier.zigbenchmarks/subscriber.zig
🧬 Code graph analysis (2)
benchmarks/echo_server.zig (1)
benchmarks/echo_server.c (1)
main(5-83)
benchmarks/subscriber.zig (1)
benchmarks/subscriber.c (1)
main(5-70)
🔇 Additional comments (2)
examples/replier.zig (2)
44-48: Reply path looks good.Catches publish errors and logs them; solid for an example.
30-30: Approve subscription deinit update
No othersub.deinit(allocator)calls detected—remainingdeinit(allocator)usages are on types whose API still requires an allocator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (3)
benchmarks/echo_server.zig (1)
44-46: Make msg binding const; confirm timeout policy.Minor: msg isn’t reassigned.
Also, maintainer earlier asked for
catch break; here it’scatch continue. For a long-running server,continuemakes sense (don’t exit on idle). Please confirm the intended policy for benchmarks.- var msg = sub.nextMsg(1000) catch continue; // continue on timeout + const msg = sub.nextMsg(1000) catch continue; // continue on timeoutREADME.md (1)
54-55: Use const for msg in sync example.Not reassigned; tighten binding.
- var msg = sub.nextMsg(5000) catch continue; // continue on timeout + const msg = sub.nextMsg(5000) catch continue; // continue on timeoutexamples/replier.zig (1)
38-40: Use const for msg.Not reassigned; prefer const.
- var msg = sub.nextMsg(1000) catch continue; // continue on timeout + const msg = sub.nextMsg(1000) catch continue; // continue on timeout
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (4)
README.md(2 hunks)benchmarks/echo_server.zig(1 hunks)benchmarks/subscriber.zig(1 hunks)examples/replier.zig(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- benchmarks/subscriber.zig
🧰 Additional context used
📓 Path-based instructions (2)
**/*.zig
📄 CodeRabbit inference engine (CLAUDE.md)
Format code using
zig fmtbefore commit
Files:
benchmarks/echo_server.zigexamples/replier.zig
README.md
📄 CodeRabbit inference engine (CLAUDE.md)
Read README.md for an overview of the project
Files:
README.md
🧠 Learnings (1)
📚 Learning: 2025-08-25T02:49:59.119Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#28
File: src/response_manager.zig:89-105
Timestamp: 2025-08-25T02:49:59.119Z
Learning: In this NATS Zig codebase, subscriptions are reference counted, so calling deinit() on a subscription from multiple locations (like both Connection and ResponseManager) is safe and doesn't cause double-free issues.
Applied to files:
README.mdexamples/replier.zig
🔇 Additional comments (3)
benchmarks/echo_server.zig (1)
60-63: LGTM: throughput print gating.Interval-based reporting after counting is correct.
README.md (1)
66-71: Async example looks correct.Pointer counter usage and per-message deinit are correct.
examples/replier.zig (1)
30-31: API alignment: sub.deinit() without allocator is correct.Matches the refcounted subscription model we’ve seen before; safe to call here. (Using retrieved learnings.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
src/subscription.zig (1)
120-125: Don't mask QueueClosed/BufferFrozen as Timeout; narrow the mapping.Mapping every queue error to Timeout hides a closed/frozen queue and can cause callers to spin forever. Keep the API’s error set as-only-Timeout by marking the non-timeout paths unreachable (or introduce a distinct Closed error in a follow-up).
Apply (minimal, preserves error set = {Timeout}):
- pub fn nextMsg(self: *Subscription, timeout_ms: u64) error{Timeout}!*Message { - return self.messages.pop(timeout_ms) catch |err| switch (err) { - error.BufferFrozen => error.Timeout, - error.QueueEmpty => error.Timeout, - error.QueueClosed => error.Timeout, // TODO: this should be mapped to ConnectionClosed - }; - } + /// Blocks up to timeout_ms ms and returns the next message or error.Timeout on deadline. + /// Other queue errors are not expected here. + pub fn nextMsg(self: *Subscription, timeout_ms: u64) error{Timeout}!*Message { + return self.messages.pop(timeout_ms) catch |err| switch (err) { + error.QueueEmpty => error.Timeout, + error.QueueClosed => unreachable, // closed subscriptions should be coordinated by caller + error.BufferFrozen => unreachable, // invariant: buffer not frozen during reads + }; + }If you want to surface closure explicitly (preferred long-term), widen the error set:
- pub fn nextMsg(self: *Subscription, timeout_ms: u64) error{Timeout}!*Message { + pub fn nextMsg(self: *Subscription, timeout_ms: u64) error{Timeout, ConnectionClosed}!*Message { return self.messages.pop(timeout_ms) catch |err| switch (err) { error.QueueEmpty => error.Timeout, - error.QueueClosed => error.Timeout, + error.QueueClosed => error.ConnectionClosed, error.BufferFrozen => unreachable, }; }Run to confirm the queue’s pop() error set and find any other places mapping non-timeout errors to Timeout:
#!/bin/bash # Inspect pop() signatures and errors rg -nP -C3 --type zig 'pub\s+fn\s+pop\s*\(' src # Quick scan for catch-all mappings on pop() rg -nP --type zig 'pop\s*\(.*\)\s*catch\s*(\|err\|.*|error\.Timeout)' src
🧹 Nitpick comments (1)
examples/sub.zig (1)
24-29: Only swallow error.Timeout; propagate anything else.Catching all errors will also ignore future non-timeout errors (e.g., ConnectionClosed). Match explicitly on Timeout.
- while (true) { - var msg = sub.nextMsg(1000) catch continue; + while (true) { + var msg = sub.nextMsg(1000) catch |err| switch (err) { + error.Timeout => continue, + else => return err, + }; defer msg.deinit(); std.log.info("Received message: {s} - {s}", .{ msg.subject, msg.data }); return; }
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (3)
README.md(2 hunks)examples/sub.zig(1 hunks)src/subscription.zig(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- README.md
🧰 Additional context used
📓 Path-based instructions (1)
**/*.zig
📄 CodeRabbit inference engine (CLAUDE.md)
Format code using
zig fmtbefore commit
Files:
examples/sub.zigsrc/subscription.zig
🧬 Code graph analysis (1)
examples/sub.zig (1)
benchmarks/subscriber.c (1)
main(5-70)
Changes nextMsg() return type from ?*Message to error{Timeout}!*Message and updates all call sites.
Fixes #75
🤖 Generated with Claude Code