Skip to content

Conversation

@lalinsky
Copy link
Owner

@lalinsky lalinsky commented Sep 3, 2025

No description provided.

lalinsky and others added 3 commits September 3, 2025 22:58
This commit implements the JetStream Direct Get functionality according to ADR-31
and reorganizes the message-related tests into separate, focused files.

Key changes:
- Added support for JetStream Direct Get API ($JS.API.DIRECT.GET.<stream>)
- Unified getMsg() API with GetMsgOptions supporting both legacy and direct get
- Added allow_direct field to StreamConfig for enabling direct get on streams
- Implemented proper header extraction for direct get responses
- Added comprehensive validation for option combinations
- Reorganized tests into three focused files:
  * jetstream_get_msg_test.zig - Legacy API tests
  * jetstream_get_msg_direct_test.zig - Direct API tests
  * jetstream_delete_msg_test.zig - Delete/erase operation tests

Features implemented:
- Direct get from any replica (not just leader)
- Support for seq, last_by_subj, and next_by_subj operations
- Header preservation in direct get responses
- Proper error handling for both APIs
- Protocol-mapped field validation

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
- Fix critical segfault in MessagePool recycling by replacing
  clearRetainingCapacity() with complete HashMap reset (.{}) in Message.reset()
- Move header parsing to occur before processMsg() in parser to eliminate
  lazy parsing and race conditions
- Remove needs_header_parsing boolean flag and associated complexity
- Simplify header API functions to remove unnecessary error unions
- All direct API tests now pass with message recycling enabled

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 3, 2025

Walkthrough

Adds option-driven JetStream GET APIs including a direct GET path, expands GetMsg request fields and StreamConfig.allow_direct, converts Message header parsing to an explicit parseHeaders flow with numeric status codes, updates parser to parse headers before delivery, and splits/refactors JetStream message tests.

Changes

Cohort / File(s) Summary
JetStream API & GET flows
src/jetstream.zig
Introduces GetMsgOptions and GetMsgsOptions; changes getMsg to accept options and dispatch to legacy (getMsgLegacy) or direct (getMsgDirect) flows; expands GetMsgRequest fields; adds getMsgs stub; exposes StreamConfig.allow_direct; removes getLastMsg.
Message internals & headers
src/message.zig
Replaces string status header constants with numeric STATUS_* (u16); adds status_code field; removes lazy parsing flag; adds parseHeaders(); setRawHeaders triggers parse; header accessors made non-throwing; updates encode/reset behavior.
Parser integration
src/parser.zig
Removes needs-header-parsing flag in MSG handling and calls msg.parseHeaders() before conn.processMsg(msg); test expectation updated to reflect non-throwing headerGet.
Message tests / API usage updates
src/message_test.zig, tests/headers_test.zig
Tests adapted to allocator-only Message.init, use of setters (setSubject, setReply, setPayload, setRawHeaders), explicit parseHeaders(), and non-throwing header APIs.
JetStream tests — split and new direct tests
tests/jetstream_get_msg_test.zig, tests/jetstream_get_msg_direct_test.zig, tests/jetstream_delete_msg_test.zig, tests/all_tests.zig
Adds separate test suites for legacy getMsg, direct getMsg, and delete/erase semantics; updates all_tests.zig to import the new modules.
Removed legacy test
tests/jetstream_msg_test.zig
Deletes the old combined JetStream message test file (replaced by focused suites).

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

✨ Finishing Touches
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch jetstream-direct-get

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore or @coderabbit ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
src/parser.zig (1)

763-783: Remove try from non-throwing headerGet calls in tests
headerGet now returns ?[]const u8 and doesn’t throw—drop try in all test usages:

parser.zig (around line 780):

-            try std.testing.expectEqualStrings("Bar", try msg.headerGet("Foo") orelse "");
+            try std.testing.expectEqualStrings("Bar", msg.headerGet("Foo") orelse "");

message_test.zig (line 70):

-    const missing = try msg.headerGet("Missing");
+    const missing = msg.headerGet("Missing");

message_test.zig (line 166):

-    const result = try msg.headerGet("");
+    const result = msg.headerGet("");

Tests will compile once these try keywords are removed.

src/message_test.zig (2)

18-30: Constructor mismatch vs implementation

Tests call Message.init(allocator, subject, reply, data) and Message.initWithHeaders(...), but the implementation exposes Message.init(allocator) with setters. Either add these convenience constructors or rewrite tests to use setSubject/setReply/setPayload/setRawHeaders + parseHeaders().

I can draft the constructors or update tests — your pick.


79-98: Remove try from non-throwing API calls in tests

  • In src/message_test.zig:

    • Line 70: change
      const missing = try msg.headerGet("Missing");
      to
      const missing = msg.headerGet("Missing");
    • Line 110: change
      try testing.expect(try msg.isNoResponders());
      to
      try testing.expect(msg.isNoResponders());
    • Line 114: change
      try testing.expect(!try msg.isNoResponders());
      to
      try testing.expect(!msg.isNoResponders());
    • Line 166: change
      const result = try msg.headerGet("");
      to
      const result = msg.headerGet("");
  • In src/parser.zig (line 780): change
    try std.testing.expectEqualStrings("Bar", try msg.headerGet("Foo") orelse "");
    to
    try std.testing.expectEqualStrings("Bar", msg.headerGet("Foo") orelse "");

src/jetstream.zig (1)

891-959: Legacy get: call Message helpers for subject and headers.
Replace manual subject duplication with:

-        msg.subject = try arena_allocator.dupe(u8, stored_msg.subject);
+        try msg.setSubject(stored_msg.subject);

And immediately after assigning msg.raw_headers = decoded_headers;, add:

+        if (msg.raw_headers) try msg.parseHeaders();

Consider exposing stored_msg.time (e.g. store the raw timestamp) since RFC3339 parsing is still TODO.

🧹 Nitpick comments (11)
src/message.zig (5)

119-162: parseHeaders: make idempotent or clear map to avoid silent duplication

Repeated calls will append/replace entries in an arena-backed map, leaking capacity within the arena and potentially producing multiple values for the same key. Either short-circuit when headers are already parsed or clear the map first.

Example change:

 pub fn parseHeaders(self: *Self) !void {
-    const raw = self.raw_headers orelse return;
+    const raw = self.raw_headers orelse return;
+    if (self.headers.count() != 0) return; // already parsed or headerSet() used

130-161: Status/Description values rely on raw backing — fine, but document invariant

You’re storing slices into raw_headers (arena-owned). That’s OK, but it assumes raw_headers outlives the header map. Add a brief comment to make that invariant explicit. Alternatively, dupe into the arena for consistency with the rest of the code.


164-184: Consider canonicalizing header keys (case-insensitive semantics)

NATS headers are conventionally case-insensitive (http.Header in Go). Today keys are stored verbatim, so "Content-Type" and "content-type" will coexist. Normalizing (e.g., title-case) on parse avoids surprises.

Sketch:

-const key = std.mem.trim(u8, line[0..colon_pos], " \t");
+var key_buf = std.ArrayList(u8).init(arena_allocator);
+defer key_buf.deinit();
+try key_buf.appendSlice(std.mem.trim(u8, line[0..colon_pos], " \t"));
+// apply canonicalization to key_buf.items
+const key = key_buf.items;

215-222: headerGetAll: returning arena-backed slice is fine — call out lifetime

The returned slice’s lifetime is tied to the message arena. A short note in the docstring would prevent misuse.


264-265: reset: switching to headers = .{} discards table capacity

Given the pool immediately resets the arena afterwards, this is probably fine. If you ever reuse Message without resetting the arena, clearRetainingCapacity() would be cheaper.

tests/jetstream_get_msg_direct_test.zig (1)

279-284: Mixed error types across direct vs legacy paths.

Here you expect error.MessageNotFound, while legacy tests assert nats.JetStreamError.NoMessageFound. If intentional (different surfaces), great; otherwise consider unifying the error mapping for a flatter API.

Would you like a follow-up patch to normalize these to a common error in the public API?

src/jetstream.zig (5)

157-159: StreamConfig.allow_direct added — double-check server compatibility.

Field name matches NATS config. Ensure your test matrix includes a server that supports direct reads; older servers should ignore unknown fields but verify behavior when nulls are sent.


307-330: GetMsgRequest grew a lot; most fields aren’t reachable yet.

Only seq/last_by_subj/next_by_subj are used. Either (a) keep the extra fields but mark them explicitly “reserved, not yet wired” in docs, or (b) defer adding until GetMsgs/direct-batch lands to avoid freezing an unused API.


331-341: GetMsgOptions validation pairs well with getMsg; consider builders for ergonomics.

Optional: add small constructors like GetMsgOptions.bySeq(u64) and .lastBySubj([]const u8) to prevent invalid combos at call sites.


385-389: Public GetMsgsOptions placeholder.

If batch isn’t shipping in this PR, consider making this internal or clearly annotating “experimental” to avoid premature API surface.


988-994: Batch get stub.

Error.NotImplemented is fine. If you want, I can scaffold the ADR-31-compatible request/response and tests behind a feature flag.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between e95a1c5 and a82f666.

📒 Files selected for processing (10)
  • src/jetstream.zig (7 hunks)
  • src/message.zig (6 hunks)
  • src/message_test.zig (3 hunks)
  • src/parser.zig (1 hunks)
  • tests/all_tests.zig (1 hunks)
  • tests/headers_test.zig (3 hunks)
  • tests/jetstream_delete_msg_test.zig (1 hunks)
  • tests/jetstream_get_msg_direct_test.zig (1 hunks)
  • tests/jetstream_get_msg_test.zig (1 hunks)
  • tests/jetstream_msg_test.zig (0 hunks)
💤 Files with no reviewable changes (1)
  • tests/jetstream_msg_test.zig
🧰 Additional context used
📓 Path-based instructions (1)
**/*.zig

📄 CodeRabbit inference engine (CLAUDE.md)

Format code using zig fmt before commit

Files:

  • tests/all_tests.zig
  • tests/jetstream_get_msg_test.zig
  • tests/jetstream_get_msg_direct_test.zig
  • tests/jetstream_delete_msg_test.zig
  • tests/headers_test.zig
  • src/parser.zig
  • src/message_test.zig
  • src/message.zig
  • src/jetstream.zig
🧠 Learnings (2)
📚 Learning: 2025-08-25T14:51:33.570Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#31
File: tests/jetstream_pull_test.zig:21-23
Timestamp: 2025-08-25T14:51:33.570Z
Learning: The nats.zig project uses a "tests:beforeEach" test in tests/all_tests.zig that runs before each test to clean up all JetStream streams and consumers, ensuring test isolation. This eliminates the need for unique test resource names as each test starts with a clean JetStream state.

Applied to files:

  • tests/all_tests.zig
  • tests/jetstream_get_msg_test.zig
  • tests/jetstream_get_msg_direct_test.zig
  • tests/jetstream_delete_msg_test.zig
📚 Learning: 2025-08-29T10:22:22.247Z
Learnt from: CR
PR: lalinsky/nats.zig#0
File: CLAUDE.md:0-0
Timestamp: 2025-08-29T10:22:22.247Z
Learning: Run `zig build test` to confirm your changes are working

Applied to files:

  • tests/all_tests.zig
🪛 GitHub Actions: CI
tests/jetstream_get_msg_test.zig

[error] 1-1: zig fmt --check failed. File not properly formatted. Run 'zig fmt' to fix.

tests/jetstream_get_msg_direct_test.zig

[error] 1-1: zig fmt --check failed. File not properly formatted. Run 'zig fmt' to fix.

tests/jetstream_delete_msg_test.zig

[error] 1-1: zig fmt --check failed. File not properly formatted. Run 'zig fmt' to fix.

src/message.zig

[error] 1-1: zig fmt --check failed. File not properly formatted. Run 'zig fmt' to fix.

src/jetstream.zig

[error] 1-1: zig fmt --check failed. File not properly formatted. Run 'zig fmt' to fix.

🔇 Additional comments (32)
src/message.zig (4)

205-214: headerGet: non-throwing API looks good

Returning an optional aligns with typical header maps and simplifies call sites.


223-227: headerDelete: OK

Fetch-remove is sufficient; arena handles the memory.


232-234: isNoResponders: OK with new optional API

Matches the 503 check and empty body requirement.


1-1: CI: zig fmt check failing
CI is failing because code isn’t formatted per project guidelines. Run zig fmt . and commit the formatting fixes.

src/parser.zig (1)

1-1: CI formatting

zig fmt --check failed. Please format this file too.

src/message_test.zig (3)

63-68: headerGet now optional — good change

Dropping error propagation here is correct.


74-76: Delete + verify: OK

Non-throwing delete and optional get are consistent.


1-1: Format with zig fmt

CI flagged formatting; run zig fmt on tests as well.

tests/all_tests.zig (1)

19-21: Good split of JetStream message tests into focused modules.

Clearer test ownership, easier triage.

tests/headers_test.zig (3)

36-43: Optional-return header reads are used correctly.

Nil-check before .? keeps this safe.

Also applies to: 40-43


86-93: Header API transition looks right.

  • Using optionals for gets and void delete matches the new surface.
  • Tests cover existent/non-existent cases well.

Also applies to: 95-97, 99-107


142-149: Reply-with-headers assertions are sound.

Checks mirror the production contract.

Also applies to: 146-149

tests/jetstream_get_msg_test.zig (7)

33-47: Seq-based get: assertions match publish order.

Covers multiple sequences; good.


88-103: Last-by-subject over wildcard stream: expectations are correct.

Interleaving and seq assertions line up.


145-168: Next-by-subject: correct use of inclusive starting seq.

Solid coverage at seq 1, 3, and 5.


221-257: Error-path coverage looks good.

StreamNotFound and NoMessageFound are the right shapes here.


270-289: Client-side option validation is well specified.

All invalid mixes covered; nice.


302-305: getMsgs stub: explicit NotImplemented is fine.

Keeps API surface future-proof and test-enforced.


1-1: Run zig fmt on tests/jetstream_get_msg_test.zig to fix CI formatting

CI is failing zig fmt --check on this file. Execute:

zig fmt tests/jetstream_get_msg_test.zig
zig fmt --check .

and ensure zig build test passes before merging.

tests/jetstream_get_msg_direct_test.zig (4)

19-27: Direct mode is correctly enabled at the stream level.

allow_direct = true is required for server-side direct responders.


247-250: Expected error when direct is used without allow_direct.

error.NoResponders makes sense if no direct responder exists; confirm this is stable across supported server versions.

You can pin a minimum NATS Server version in CI matrix or guard with a feature probe if needed.


1-1: Verify formatting and tests locally. CI requires running zig fmt --check and zig build test, but zig isn’t available here. Please run those commands in your development environment to confirm formatting and test suite pass.


208-219: Ignore this—no header-name constants exist in the library; the tests correctly use the protocol-defined literals as in parseHeaders.

Likely an incorrect or invalid review comment.

tests/jetstream_delete_msg_test.zig (5)

51-54: Delete flow validated correctly.

Post-delete getMsgNoMessageFound is the expected behavior.


101-104: Erase flow validated correctly.

Good to distinct-test delete vs erase semantics.


151-157: Combined delete/erase scenario covers both visibility gaps.

Nice end-to-end guard.


173-195: Error cases look right.

StreamNotFound for missing stream; StreamMsgDeleteFailed for missing seq. Good coverage.


1-1: Run zig fmt and verify formatting compliance
CI formatting checks are failing for this file—please run zig fmt tests/jetstream_delete_msg_test.zig (and zig fmt --check .) and then zig build test locally to ensure all files are properly formatted and tests pass.

src/jetstream.zig (4)

16-16: Import MessageList looks fine.

Used by the getMsgs signature; no issues.


512-546: Good: non-throwing Status header read.

Switching to headerGet without try avoids turning missing headers into errors during fetch.


960-986: Option validation is clear and defensive.

Routing to direct vs legacy based on options.direct keeps behavior explicit. No changes requested.


1-1: Format with zig fmt
CI is failing zig fmt --check. Run zig fmt locally before pushing.

lalinsky and others added 3 commits September 4, 2025 00:10
- Add status_code: u16 field to Message struct for efficient status checking
- Convert status constants from strings to u16 values (100, 400, 404, etc.)
- Update parseHeaders() to set both status_code field and full Status header
- Make setRawHeaders() automatically call parseHeaders()
- Update isNoResponders() to use simple integer comparison
- Update encodeHeaders() to use Status header as first line when available
- Update all JetStream code to use status_code field instead of string comparisons
- Add comprehensive tests for status field and header parsing
- Fix parser test to use correct headerGet API

This provides more efficient status code checking while maintaining full
compatibility with the existing Status header for debugging/logging.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/message_test.zig (1)

206-219: Drop try: headerGet no longer throws

Same API change as above.

-    const result = try msg.headerGet("");
+    const result = msg.headerGet("");
♻️ Duplicate comments (2)
tests/jetstream_get_msg_test.zig (1)

201-201: Fix PubAck leak from publishMsg

publishMsg returns an ack that owns memory. Capture and deinit it.

-    // Publish the message
-    _ = try js.publishMsg(msg, .{});
+    // Publish the message
+    const pub_ack = try js.publishMsg(msg, .{});
+    defer pub_ack.deinit();
src/jetstream.zig (1)

995-1051: Direct GET: status mapping and header usage look right

  • 408→BadRequest for direct GET is correct (Empty Request).
  • Assigning Nats-Subject directly avoids copies and matches arena ownership.

Optional: log Description on status errors for easier triage.

Apply:

-        if (resp.status_code == 404) {
-            log.debug("getMsgDirect: Returning MessageNotFound error", .{});
-            return error.MessageNotFound;
-        } else if (resp.status_code == 408) {
-            return error.BadRequest;
-        } else if (resp.status_code == 413) {
-            return error.TooManySubjects;
-        }
+        if (resp.status_code == 404 or resp.status_code == 408 or resp.status_code == 413) {
+            if (resp.headerGet("Description")) |desc| {
+                log.debug("Direct GET status {d}: {s}", .{ resp.status_code, desc });
+            }
+            return switch (resp.status_code) {
+                404 => error.MessageNotFound,
+                408 => error.BadRequest,
+                413 => error.TooManySubjects,
+                else => unreachable,
+            };
+        }

Nit: if Nats-Sequence parse fails, consider logging before defaulting to 0.

🧹 Nitpick comments (10)
src/message.zig (2)

121-183: Idempotent parse: clear prior headers and status before re-parse

Calling parseHeaders multiple times currently accumulates entries in the arena. Reset state up front to avoid duplication and bloat.

 pub fn parseHeaders(self: *Self) !void {
-    const raw = self.raw_headers orelse return;
+    const raw = self.raw_headers orelse return;
+    // Reset parse state (safe: caller opted into parsing from raw_headers)
+    self.headers = .{};
+    self.status_code = 0;

23-23: Dead constant?

HDR_DESCRIPTION is defined but unused. Remove or reintroduce if you plan to surface status text separately.

tests/jetstream_get_msg_direct_test.zig (3)

29-33: Nit: flush likely unnecessary here

Same-connection PUBs are processed in order before the subsequent request; the extra flush is harmless but not strictly required.


178-219: Use acked sequence instead of assuming 1

Safer to request by the server-assigned sequence from PubAck to avoid accidental flakiness if sequencing ever changes.

Apply:

-    const retrieved = try js.getMsg(stream_name, .{ .seq = 1, .direct = true });
+    const retrieved = try js.getMsg(stream_name, .{ .seq = pub_ack.value.seq, .direct = true });

13-18: Optional: skip unique names

The suite runs with a pre-test cleanup; you can reuse a fixed stream/subject to reduce allocator churn in tests.

src/jetstream.zig (5)

307-330: Request bag includes direct-only fields not yet used

Clear docs, but only seq/last_by_subj/next_by_subj are currently serialized. Consider a brief note or TODO pointing to ADR-31 for the unused fields to avoid confusion.


385-389: Placeholder type for batch

Fine to stub; consider marking with a TODO describing intended options.


512-538: Pull NEXT status handling is correct

Using status_code with 404/408/409/100 branches matches pull semantics; 408→Timeout here is appropriate (different from direct GET’s 408=Empty Request). Maybe add a one-liner comment about that difference to prevent future regressions.


890-957: Legacy GET path decode is fine; consider eager header parse

You set raw_headers; optionally call parseHeaders() so header accessors work without extra parsing by callers.

Example:

         if (stored_msg.hdrs) |hdrs_b64| {
             const decoder = std.base64.standard.Decoder;
             const hdrs_len = try decoder.calcSizeForSlice(hdrs_b64);
             const decoded_headers = try arena_allocator.alloc(u8, hdrs_len);
             try decoder.decode(decoded_headers, hdrs_b64);
             msg.raw_headers = decoded_headers;
+            try msg.parseHeaders();
         }

987-993: getMsgs() stub

Returning NotImplemented is fine; consider @CompileError if accidentally used, but not required.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between a82f666 and 64acff0.

📒 Files selected for processing (7)
  • src/jetstream.zig (8 hunks)
  • src/message.zig (8 hunks)
  • src/message_test.zig (4 hunks)
  • src/parser.zig (2 hunks)
  • tests/jetstream_delete_msg_test.zig (1 hunks)
  • tests/jetstream_get_msg_direct_test.zig (1 hunks)
  • tests/jetstream_get_msg_test.zig (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • tests/jetstream_delete_msg_test.zig
  • src/parser.zig
🧰 Additional context used
📓 Path-based instructions (1)
**/*.zig

📄 CodeRabbit inference engine (CLAUDE.md)

Format code using zig fmt before commit

Files:

  • tests/jetstream_get_msg_test.zig
  • tests/jetstream_get_msg_direct_test.zig
  • src/jetstream.zig
  • src/message.zig
  • src/message_test.zig
🧠 Learnings (6)
📚 Learning: 2025-08-25T14:51:33.570Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#31
File: tests/jetstream_pull_test.zig:21-23
Timestamp: 2025-08-25T14:51:33.570Z
Learning: The nats.zig project uses a "tests:beforeEach" test in tests/all_tests.zig that runs before each test to clean up all JetStream streams and consumers, ensuring test isolation. This eliminates the need for unique test resource names as each test starts with a clean JetStream state.

Applied to files:

  • tests/jetstream_get_msg_test.zig
  • tests/jetstream_get_msg_direct_test.zig
📚 Learning: 2025-09-03T22:00:56.171Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#95
File: src/parser.zig:219-224
Timestamp: 2025-09-03T22:00:56.171Z
Learning: In Zig parser code, when transferring message ownership from parser to connection, parse headers before nulling the ownership pointer (self.ma.msg = null) to avoid memory leaks if parsing fails, since parser cleanup logic will handle deinit if self.ma.msg is non-null.

Applied to files:

  • tests/jetstream_get_msg_test.zig
  • src/message_test.zig
📚 Learning: 2025-09-03T22:04:03.624Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#95
File: src/jetstream.zig:996-1057
Timestamp: 2025-09-03T22:04:03.624Z
Learning: In NATS JetStream direct get API, HTTP status code 408 means "Empty Request" (when the request payload is empty), not timeout. This is different from standard HTTP 408 Request Timeout semantics. The error.BadRequest mapping for 408 status is appropriate in this context.

Applied to files:

  • src/jetstream.zig
📚 Learning: 2025-09-03T22:13:20.760Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#95
File: src/jetstream.zig:0-0
Timestamp: 2025-09-03T22:13:20.760Z
Learning: When headerGet() returns header values from a NATS message, the returned slices point to data already owned by the message's arena. Direct assignment to message fields like resp.subject = nats_subject is more efficient than using setter methods like setSubject() which would copy the data unnecessarily.

Applied to files:

  • src/jetstream.zig
📚 Learning: 2025-08-25T14:49:05.838Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#31
File: src/jetstream.zig:0-0
Timestamp: 2025-08-25T14:49:05.838Z
Learning: In NATS protocol, messages are written in order over TCP and processed by the server in the order received. This means a SUB command will always be processed before a subsequent request message, even if both are buffered, making explicit flush() calls unnecessary for subscription-then-request patterns.

Applied to files:

  • src/jetstream.zig
📚 Learning: 2025-08-24T20:23:43.436Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#28
File: tests/headers_test.zig:158-159
Timestamp: 2025-08-24T20:23:43.436Z
Learning: In NATS header parsing, when processing status lines like "NATS/1.0 503 No Responders", the 3-digit status code (e.g., "503") gets stored in the Status header, while any additional descriptive text (e.g., "No Responders") gets stored in a separate Description header. The Status header should contain only the numeric code, not the full line.

Applied to files:

  • src/message.zig
🪛 GitHub Actions: CI
tests/jetstream_get_msg_test.zig

[error] 201-201: Memory leak detected during test jetstream_get_msg_test.test.get message with headers.


[error] 213-213: TestUnexpectedResult in jetstream_get_msg_test.test.get message with headers.

🔇 Additional comments (13)
src/message_test.zig (1)

79-98: Explicit parse before access looks good

parseHeaders is called explicitly before headerGet. Matches the new API.

tests/jetstream_get_msg_direct_test.zig (7)

19-26: Good: enable allow_direct on stream creation

This sets up the direct path correctly for server-side responders.


33-45: Direct-by-seq path exercised well

Covers basic happy path and validates subject/data.


70-76: Wildcard + allow_direct config looks right

Stream accepts interleaved subjects; good setup for later next_by_subj checks.


87-100: Last-by-subject via direct GET looks correct

Validates subject routing and payload selection.


134-154: next_by_subj semantics validated

Interleaving and anchor sequence checks look accurate.


221-250: Negative case covers NoResponders correctly

Asserting NoResponders when direct isn’t enabled matches server behavior.


278-285: 404 mapping checks for direct path are solid

Both missing sequence and unknown subject are exercised.

src/jetstream.zig (5)

16-16: MessageList re-export for batch API

Import is fine; keeps the future getMsgs signature compiling.


157-159: allow_direct in StreamConfig

Accurate doc string; aligns with server semantics to allow direct GET on replicas.


331-341: GetMsgOptions shape looks good

Covers the supported combinations and toggles direct vs legacy paths.


959-985: Options validation is tight

The three valid combos are enforced; good guardrails.


1148-1155: Status 100 handling in push callback

Good to consume heartbeats/flow-control and skip user handler.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
src/jetstream.zig (2)

1022-1029: Status code mapping: keep 408 -> BadRequest

Correct for DIRECT.GET: 408 is “Empty Request” in this context, not a timeout. Mapping here is right.


1032-1034: Subject assignment from header is correct here

headerGet() returns slices owned by the message arena; direct assignment avoids an extra copy.

🧹 Nitpick comments (4)
src/jetstream.zig (4)

157-159: allow_direct added to StreamConfig

Good addition. Optional nit: if you want to avoid sending "allow_direct": null in create/update calls, serialize configs with emit_null_optional_fields = false.


385-389: Stubbing GetMsgsOptions

OK to land as placeholder. Consider adding a short comment linking ADR-31 or server PR for future reference.


988-994: getMsgs() NotImplemented stub

Reasonable to land the signature now. Consider returning a dedicated error (or TODO tag) in logs to make it obvious at call sites.


1036-1039: Don’t silently set seq to 0 on parse failure

Avoid masking bad headers. Either skip setting or log a warning when parsing fails.

Apply:

-        if (resp.headerGet("Nats-Sequence")) |nats_seq_str| {
-            resp.seq = std.fmt.parseInt(u64, nats_seq_str, 10) catch 0;
-        }
+        if (resp.headerGet("Nats-Sequence")) |nats_seq_str| {
+            const maybe_seq: ?u64 = std.fmt.parseInt(u64, nats_seq_str, 10) catch null;
+            if (maybe_seq) |s| {
+                resp.seq = s;
+            } else {
+                log.warn("Invalid Nats-Sequence header: {s}", .{nats_seq_str});
+            }
+        }
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 64acff0 and 1f6c138.

📒 Files selected for processing (3)
  • src/jetstream.zig (9 hunks)
  • src/parser.zig (2 hunks)
  • tests/jetstream_get_msg_test.zig (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/parser.zig
  • tests/jetstream_get_msg_test.zig
🧰 Additional context used
📓 Path-based instructions (1)
**/*.zig

📄 CodeRabbit inference engine (CLAUDE.md)

Format code using zig fmt before commit

Files:

  • src/jetstream.zig
🧠 Learnings (3)
📚 Learning: 2025-09-03T22:04:03.624Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#95
File: src/jetstream.zig:996-1057
Timestamp: 2025-09-03T22:04:03.624Z
Learning: In NATS JetStream direct get API, HTTP status code 408 means "Empty Request" (when the request payload is empty), not timeout. This is different from standard HTTP 408 Request Timeout semantics. The error.BadRequest mapping for 408 status is appropriate in this context.

Applied to files:

  • src/jetstream.zig
📚 Learning: 2025-09-03T22:13:20.760Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#95
File: src/jetstream.zig:0-0
Timestamp: 2025-09-03T22:13:20.760Z
Learning: When headerGet() returns header values from a NATS message, the returned slices point to data already owned by the message's arena. Direct assignment to message fields like resp.subject = nats_subject is more efficient than using setter methods like setSubject() which would copy the data unnecessarily.

Applied to files:

  • src/jetstream.zig
📚 Learning: 2025-08-25T14:49:05.838Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#31
File: src/jetstream.zig:0-0
Timestamp: 2025-08-25T14:49:05.838Z
Learning: In NATS protocol, messages are written in order over TCP and processed by the server in the order received. This means a SUB command will always be processed before a subsequent request message, even if both are buffered, making explicit flush() calls unnecessary for subscription-then-request patterns.

Applied to files:

  • src/jetstream.zig
🔇 Additional comments (11)
src/jetstream.zig (11)

16-16: Import of MessageList is fine

Used as the return type for getMsgs(); no issues.


307-330: GetMsgRequest expansion looks correct

Fields and doc comments align with STREAM.MSG.GET/DIRECT.GET; nulls are omitted by callers.


331-341: GetMsgOptions API is sane

Minimal surface (seq/last_by_subj/next_by_subj + direct) keeps things simple while enabling direct get.


512-545: Status handling in pull fetch matches JetStream semantics

404=no messages, 408=timeout, 409=consumer seq mismatch, 100=heartbeat — all handled appropriately. One small question: why nextMsg(timeout_ms * 2) instead of timeout_ms? This can exceed the caller’s contract.


890-891: getMsgLegacy split is good

Clear separation of legacy vs direct path.


900-906: Request shaping from options is correct

Only the supported fields are set for the legacy path.


907-911: Omitting nulls in JSON is the right call

Keeps the wire payload minimal and server-friendly.


948-948: Parsing headers eagerly

Parsing into the Message post base64-decode is necessary for consumers downstream — good.


960-986: Options validation is solid

Enforces valid combinations: last_by_subj exclusivity and next_by_subj requiring seq.


996-1001: Direct GET path scaffolding

Looks good; validates stream name and logs at debug.


1138-1145: Internal handling of status 100 in push subscriptions

Good call to consume heartbeats/flow control internally and not expose them to user handlers.

Fixed unit tests to work with the refactored Message API:
- Updated Message.init() calls to use new single-parameter constructor
- Replaced Message.initWithHeaders() with setRawHeaders() pattern
- Added STATUS_NO_RESPONSE constant import
- Fixed headerGet/headerDelete calls that no longer return errors
- Changed const to var for mutable message instances

All 76 unit tests now pass including 9 message tests.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/message.zig (1)

121-160: Honor NATS header semantics: Status numeric in Status, text in Description; make parsing idempotent

  • Store only the 3-digit code in “Status”; put trailing text into “Description”. (Matches prior learnings and other libs.)
  • Clear previous parsed headers, reset status_code, and null raw_headers after success to prevent duplicates.
 pub fn parseHeaders(self: *Self) !void {
-    const raw = self.raw_headers orelse return;
+    const raw = self.raw_headers orelse return;
+
+    // Reset parsed state to keep this idempotent
+    self.status_code = 0;
+    if (self.headers.count() != 0) self.headers.clearRetainingCapacity();

@@
-    // Check if we have an inlined status (like "NATS/1.0 503" or "NATS/1.0 503 No Responders")
+    // Check if we have an inlined status (e.g., "NATS/1.0 503" or "NATS/1.0 503 No Responders")
     if (std.mem.startsWith(u8, first_line, NATS_STATUS_PREFIX) and first_line.len > NATS_STATUS_PREFIX.len) {
         const status_part = std.mem.trim(u8, first_line[NATS_STATUS_PREFIX.len..], " \t");
         if (status_part.len > 0) {
-            // Extract status code (first 3 characters if available)
-            const status_len = 3; // Like Go's statusLen
-            var status_code: []const u8 = undefined;
+            // Extract status code (first 3 chars) and optional description
+            const status_len = 3;
+            var code_slice: []const u8 = undefined;

-            if (status_part.len == status_len) {
-                status_code = status_part;
+            if (status_part.len == status_len) {
+                code_slice = status_part;
             } else if (status_part.len > status_len) {
-                status_code = status_part[0..status_len];
+                code_slice = status_part[0..status_len];
             } else {
-                status_code = status_part; // Less than 3 chars, use as-is
+                code_slice = status_part; // Less than 3 chars, use as-is
             }
 
-            // Parse status code to u16
-            if (std.fmt.parseInt(u16, status_code, 10)) |code| {
+            // Parse status code to u16
+            if (std.fmt.parseInt(u16, code_slice, 10)) |code| {
                 self.status_code = code;
             } else |_| {
                 // If parsing fails, just set to 0
                 self.status_code = 0;
             }
 
-            // Store the full status line in the Status header
-            var status_list = try ArrayListUnmanaged([]const u8).initCapacity(arena_allocator, 1);
-            const full_status = try arena_allocator.dupe(u8, first_line);
-            status_list.appendAssumeCapacity(full_status);
-            try self.headers.put(arena_allocator, HDR_STATUS, status_list);
+            // Store numeric Status header
+            var status_list: ArrayListUnmanaged([]const u8) = .{};
+            const status_str = try std.fmt.allocPrint(arena_allocator, "{d}", .{ self.status_code });
+            try status_list.append(arena_allocator, status_str);
+            try self.headers.put(arena_allocator, HDR_STATUS, status_list);
+
+            // Store Description header if present
+            if (status_part.len > status_len) {
+                const desc_slice = std.mem.trim(u8, status_part[status_len..], " \t");
+                if (desc_slice.len > 0) {
+                    var desc_list: ArrayListUnmanaged([]const u8) = .{};
+                    const desc = try arena_allocator.dupe(u8, desc_slice);
+                    try desc_list.append(arena_allocator, desc);
+                    try self.headers.put(arena_allocator, HDR_DESCRIPTION, desc_list);
+                }
+            }
         }
     }
@@
-}
+    // On success, drop raw to prevent accidental re-parse
+    self.raw_headers = null;
+}

Also applies to: 181-183

♻️ Duplicate comments (3)
src/message.zig (2)

25-30: Export STATUS_ constants for reuse in tests and downstream code*

These are module-private; tests work around with a local 503. Export them instead.

-const STATUS_CONTROL: u16 = 100;
-const STATUS_BAD_REQUEST: u16 = 400;
-const STATUS_NOT_FOUND: u16 = 404;
-const STATUS_TIMEOUT: u16 = 408;
-const STATUS_MAX_BYTES: u16 = 409;
-const STATUS_NO_RESPONSE: u16 = 503;
+pub const STATUS_CONTROL: u16 = 100;
+pub const STATUS_BAD_REQUEST: u16 = 400;
+pub const STATUS_NOT_FOUND: u16 = 404;
+pub const STATUS_TIMEOUT: u16 = 408;
+pub const STATUS_MAX_BYTES: u16 = 409;
+pub const STATUS_NO_RESPONSE: u16 = 503;

233-247: Encode: write status line even without headers; don’t treat “Status” as first line unless it starts with NATS/1.0; include Description in first line

  • Early return currently drops status-only replies.
  • Only use Status as the status line when it’s a full NATS line.
  • If status_code > 0, construct “NATS/1.0 [ ]”.
-    if (self.headers.count() == 0) return;
+    // If there are no headers and no status, nothing to write
+    if (self.headers.count() == 0 and self.status_code == 0) return;
 
-    // Check if we have a Status header (which contains the full status line)
-    if (self.headerGet(HDR_STATUS)) |status_line| {
-        try writer.writeAll(status_line);
-        try writer.writeAll("\r\n");
-    } else if (self.status_code > 0) {
-        // Fallback: construct status line from status code
-        try writer.print("{s} {d}\r\n", .{ NATS_STATUS_PREFIX, self.status_code });
-    } else {
-        // Default NATS status line
-        try writer.writeAll(NATS_STATUS_PREFIX ++ "\r\n");
-    }
+    var used_status_header = false;
+    if (self.headerGet(HDR_STATUS)) |status_line| {
+        if (std.mem.startsWith(u8, status_line, NATS_STATUS_PREFIX)) {
+            try writer.writeAll(status_line);
+            try writer.writeAll("\r\n");
+            used_status_header = true;
+        }
+    }
+    if (!used_status_header) {
+        if (self.status_code > 0) {
+            if (self.headerGet(HDR_DESCRIPTION)) |desc| {
+                try writer.print("{s} {d} {s}\r\n", .{ NATS_STATUS_PREFIX, self.status_code, desc });
+            } else {
+                try writer.print("{s} {d}\r\n", .{ NATS_STATUS_PREFIX, self.status_code });
+            }
+        } else {
+            try writer.writeAll(NATS_STATUS_PREFIX ++ "\r\n");
+        }
+    }
@@
-            // Skip the Status header since we already wrote it as the first line
-            if (std.mem.eql(u8, key, HDR_STATUS)) continue;
+            // Skip Status only if it carried the first-line status
+            if (used_status_header and std.mem.eql(u8, key, HDR_STATUS)) continue;

Also applies to: 253-255

src/message_test.zig (1)

16-19: Use exported status constant instead of hardcoding 503

Once STATUS_* are public, use the module constant; drop the local alias.

-const message = @import("message.zig");
-const Message = message.Message;
-const STATUS_NO_RESPONSE = 503;
+const message = @import("message.zig");
+const Message = message.Message;
🧹 Nitpick comments (4)
src/message.zig (1)

272-274: Optional: reuse header map capacity on reset

Small perf win in pools.

-        self.status_code = 0;
-        self.headers = .{}; // Completely reset HashMap instead of clearRetainingCapacity()
+        self.status_code = 0;
+        self.headers.clearRetainingCapacity();
src/message_test.zig (3)

90-104: Avoid redundant parseHeaders() after setRawHeaders()

Given eager parsing, this double-parses. Either keep eager parse (preferred) and drop this call, or switch setRawHeaders() back to lazy. Since code is eager, trim the explicit parse.

-    // Parse headers explicitly
-    try msg.parseHeaders();

123-128: Reference the exported constant

Use message.STATUS_NO_RESPONSE.

-    msg.status_code = STATUS_NO_RESPONSE;
+    msg.status_code = message.STATUS_NO_RESPONSE;
@@
-    msg.status_code = STATUS_NO_RESPONSE;
+    msg.status_code = message.STATUS_NO_RESPONSE;

Also applies to: 130-137


234-239: Non-standard empty header key

If you want to disallow empty header names, add input validation in headerSet and adjust this test; otherwise keep as is.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 1f6c13804a458afa08e2b8d9449789090d9f5220 and 83b36b6b41368c0309fe6a2fbb9330184e4a5a1a.

📒 Files selected for processing (2)
  • src/message.zig (9 hunks)
  • src/message_test.zig (4 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.zig

📄 CodeRabbit inference engine (CLAUDE.md)

Format code using zig fmt before commit

Files:

  • src/message_test.zig
  • src/message.zig
🧠 Learnings (3)
📚 Learning: 2025-09-03T22:00:56.171Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#95
File: src/parser.zig:219-224
Timestamp: 2025-09-03T22:00:56.171Z
Learning: In Zig parser code, when transferring message ownership from parser to connection, parse headers before nulling the ownership pointer (self.ma.msg = null) to avoid memory leaks if parsing fails, since parser cleanup logic will handle deinit if self.ma.msg is non-null.

Applied to files:

  • src/message_test.zig
📚 Learning: 2025-08-24T20:23:43.436Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#28
File: tests/headers_test.zig:158-159
Timestamp: 2025-08-24T20:23:43.436Z
Learning: In NATS header parsing, when processing status lines like "NATS/1.0 503 No Responders", the 3-digit status code (e.g., "503") gets stored in the Status header, while any additional descriptive text (e.g., "No Responders") gets stored in a separate Description header. The Status header should contain only the numeric code, not the full line.

Applied to files:

  • src/message.zig
📚 Learning: 2025-08-29T10:22:22.247Z
Learnt from: CR
PR: lalinsky/nats.zig#0
File: CLAUDE.md:0-0
Timestamp: 2025-08-29T10:22:22.247Z
Learning: Run `zig build test` to confirm your changes are working

Applied to files:

  • src/message.zig
🔇 Additional comments (10)
src/message.zig (7)

70-71: Field addition LGTM

status_code: u16 defaulting to 0 reads fine and matches NATS usage.


202-211: Non-throwing headerGet is good

Optional return for missing headers keeps callsites simple.


212-219: headerGetAll LGTM

Returning arena-owned slice is fine given message lifetime.


220-224: headerDelete LGTM

Void return is appropriate; arena handles memory.


225-230: isNoResponders logic LGTM

Matches NATS semantics: empty payload + 503.


337-340: Test import hook LGTM

Keeps tests colocated.


1-341: Verify formatting and tests

Automated check couldn’t run Zig. Please install Zig locally and run

zig fmt src
zig build test

to ensure formatting and all tests pass before merging.

src/message_test.zig (3)

57-59: LGTM: relies on eager parse from setRawHeaders()

This passes with eager parsing; no action.


199-207: Encode assertion still valid with new semantics

We’ll synthesize “NATS/1.0 503 No Responders” from status_code + Description; and we won’t emit a “Status: NATS/1.0 …” header.


229-239: LGTM: error handling sanity checks

Non-throwing gets and deletes behave as expected.

lalinsky and others added 2 commits September 4, 2025 07:27
Made STATUS_* constants in message.zig public so they can be used
consistently throughout the codebase. Updated jetstream.zig to use
these constants instead of hardcoded values for better maintainability.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
- Status header now contains just the status code (e.g., "503")
- Description header contains the description text (e.g., "No Responders")
- Keeps existing status_code field for efficient access
- Updates encodeHeaders to reconstruct proper NATS status line from headers
- Updates tests to verify new header behavior

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
src/message.zig (1)

116-119: Make header parsing idempotent; avoid double-parse leaks

Calling parseHeaders() from setRawHeaders plus explicit parse calls re-appends values and leaks arena entries. Clear prior state and null out raw_headers on success; also reset status_code before reparse.

Apply:

 pub fn setRawHeaders(self: *Self, headers: []const u8) !void {
   self.raw_headers = try self.arena.allocator().dupe(u8, headers);
-  try self.parseHeaders();
+  try self.parseHeaders();
 }

 // Parse headers from raw header data
 pub fn parseHeaders(self: *Self) !void {
-    const raw = self.raw_headers orelse return;
+    const raw = self.raw_headers orelse return;
+    // Idempotency: clear prior parsed state
+    if (self.headers.count() > 0) {
+        self.headers.clearRetainingCapacity();
+    }
+    self.status_code = 0;

     // Parse headers like Go NATS library
     var lines = std.mem.splitSequence(u8, raw, "\r\n");
     const first_line = lines.next() orelse return;
@@
-                try self.headers.put(arena_allocator, HDR_STATUS, status_code_list);
+                try self.headers.put(arena_allocator, HDR_STATUS, status_code_list);
@@
-                        try self.headers.put(arena_allocator, HDR_DESCRIPTION, description_list);
+                        try self.headers.put(arena_allocator, HDR_DESCRIPTION, description_list);
                     }
                 }
             }
         }
     }
@@
-    }
+    }
+    // Prevent re-parsing the same buffer again
+    self.raw_headers = null;
 }

Also applies to: 121-192

🧹 Nitpick comments (5)
src/message.zig (1)

287-289: Prefer clearing header map over reinitializing

Re-init (.{}) churns table allocations. Clearing retains capacity and is cheaper; arena owns values anyway.

Apply:

-    self.status_code = 0;
-    self.headers = .{}; // Completely reset HashMap instead of clearRetainingCapacity()
+    self.status_code = 0;
+    self.headers.clearRetainingCapacity();
     self.raw_headers = null;
src/message_test.zig (2)

90-112: Avoid redundant parseHeaders() after setRawHeaders()

setRawHeaders already parses; re-calling parseHeaders() needlessly repeats work and, pre-fix, duplicates values.

Apply:

-    // Parse headers explicitly
-    try msg.parseHeaders();
-
     // Verify headers were parsed
     const content_type = msg.headerGet("Content-Type");

139-164: Add a status_code-only encode case to tests

Current test doesn’t cover “no headers, only status_code” path (bug fixed above). Add a small check to prevent regressions.

Example:

var msg2 = Message.init(allocator);
defer msg2.deinit();
msg2.status_code = 404;
var buf2 = std.ArrayList(u8).init(allocator);
defer buf2.deinit();
try msg2.encodeHeaders(buf2.writer());
try testing.expectEqualStrings("NATS/1.0 404\r\n\r\n", buf2.items);
src/jetstream.zig (2)

1001-1046: Direct GET: status mapping and zero-copy subject — good; add tiny hardening

Mappings for 404/408/413 look right for DIRECT.GET, and direct subject assignment avoids copies. Consider logging unexpected status codes to aid diagnosis.

Apply:

-        } else if (resp.status_code == 413) {
+        } else if (resp.status_code == 413) {
             return error.TooManySubjects;
+        } else if (resp.status_code > 0) {
+            log.warn("DIRECT.GET unexpected status: {d}", .{resp.status_code});
+            return error.JetStreamStatus;
         }

517-545: Pull fetch: unknown status silently dropped

For observability, log and surface an error instead of quietly deinit+continue on unknown status codes.

Apply:

-                } else if (raw_msg.status_code > 0) {
-                    // Unknown status code - clean up and continue
-                    raw_msg.deinit();
+                } else if (raw_msg.status_code > 0) {
+                    log.warn("fetch: unknown status code {d}", .{raw_msg.status_code});
+                    raw_msg.deinit();
+                    fetch_error = error.JetStreamStatus;
+                    batch_complete = true;
+                    break;
                 } else {
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 83b36b6 and ea03a23.

📒 Files selected for processing (3)
  • src/jetstream.zig (9 hunks)
  • src/message.zig (9 hunks)
  • src/message_test.zig (4 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.zig

📄 CodeRabbit inference engine (CLAUDE.md)

Format code using zig fmt before commit

Files:

  • src/message_test.zig
  • src/jetstream.zig
  • src/message.zig
🧠 Learnings (6)
📚 Learning: 2025-09-03T22:00:56.171Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#95
File: src/parser.zig:219-224
Timestamp: 2025-09-03T22:00:56.171Z
Learning: In Zig parser code, when transferring message ownership from parser to connection, parse headers before nulling the ownership pointer (self.ma.msg = null) to avoid memory leaks if parsing fails, since parser cleanup logic will handle deinit if self.ma.msg is non-null.

Applied to files:

  • src/message_test.zig
  • src/message.zig
📚 Learning: 2025-08-24T20:23:43.436Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#28
File: tests/headers_test.zig:158-159
Timestamp: 2025-08-24T20:23:43.436Z
Learning: In NATS header parsing, when processing status lines like "NATS/1.0 503 No Responders", the 3-digit status code (e.g., "503") gets stored in the Status header, while any additional descriptive text (e.g., "No Responders") gets stored in a separate Description header. The Status header should contain only the numeric code, not the full line.

Applied to files:

  • src/message_test.zig
  • src/message.zig
📚 Learning: 2025-09-03T22:04:03.624Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#95
File: src/jetstream.zig:996-1057
Timestamp: 2025-09-03T22:04:03.624Z
Learning: In NATS JetStream direct get API, HTTP status code 408 means "Empty Request" (when the request payload is empty), not timeout. This is different from standard HTTP 408 Request Timeout semantics. The error.BadRequest mapping for 408 status is appropriate in this context.

Applied to files:

  • src/jetstream.zig
📚 Learning: 2025-09-03T22:13:20.760Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#95
File: src/jetstream.zig:0-0
Timestamp: 2025-09-03T22:13:20.760Z
Learning: When headerGet() returns header values from a NATS message, the returned slices point to data already owned by the message's arena. Direct assignment to message fields like resp.subject = nats_subject is more efficient than using setter methods like setSubject() which would copy the data unnecessarily.

Applied to files:

  • src/jetstream.zig
📚 Learning: 2025-08-25T14:49:05.838Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#31
File: src/jetstream.zig:0-0
Timestamp: 2025-08-25T14:49:05.838Z
Learning: In NATS protocol, messages are written in order over TCP and processed by the server in the order received. This means a SUB command will always be processed before a subsequent request message, even if both are buffered, making explicit flush() calls unnecessary for subscription-then-request patterns.

Applied to files:

  • src/jetstream.zig
📚 Learning: 2025-08-29T10:22:22.247Z
Learnt from: CR
PR: lalinsky/nats.zig#0
File: CLAUDE.md:0-0
Timestamp: 2025-08-29T10:22:22.247Z
Learning: Run `zig build test` to confirm your changes are working

Applied to files:

  • src/message.zig
🔇 Additional comments (6)
src/message.zig (3)

25-30: Exported STATUS_ constants — good*

Public visibility unblocks tests and downstream usage.


211-229: Header accessors now non-throwing — good

Optional returns simplify call sites and match explicit parsing.


234-239: isNoResponders numeric-only check — good

Matches the “Status numeric, Description text” convention and retrieved learning.

src/message_test.zig (1)

166-213: Status numeric in header, text in Description — good

Assertions align with the intended parsing and encoding semantics.

src/jetstream.zig (2)

895-955: Legacy GET: header/base64 handling — good

Decoding hdrs/data into the message arena and calling parseHeaders() is correct and keeps ownership simple.


965-991: Option validation/readability — good

The combinations and error shaping are sensible; direct switch later keeps call sites clean.

@lalinsky lalinsky changed the title Jetstream direct get Add support for js.getMsg() via the direct get API Sep 4, 2025
@lalinsky lalinsky merged commit b088486 into main Sep 4, 2025
5 checks passed
@lalinsky lalinsky deleted the jetstream-direct-get branch September 4, 2025 14:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant