-
Notifications
You must be signed in to change notification settings - Fork 1
Add JetStream message operations #33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughAdds JetStream message-level APIs (get, getLast, delete, erase), pull-subscription/fetch with inbox-based replies, status (heartbeat/flow-control) handling, Message.initEmpty, FetchRequest default tweaks, and comprehensive tests for message operations and errors. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor App
participant JS as JetStream
participant NATS as NATS Server
note over App,JS: Get message by seq / last-by-subject
App->>JS: getMsg(stream, seq) / getLastMsg(stream, subj)
JS->>NATS: STREAM.MSG.GET {seq | last_by_subj}
NATS-->>JS: GetMsgResponse {message:{hdrs(base64),data(base64)}}
JS->>JS: Decode hdrs/data -> StoredMessage -> Message
JS-->>App: *Message
note over App,JS: Delete / Erase message
App->>JS: deleteMsg/eraseMsg(stream, seq)
JS->>NATS: STREAM.MSG.DELETE {seq, no_erase}
NATS-->>JS: MsgDeleteResponse {success: bool}
JS-->>App: bool
sequenceDiagram
autonumber
actor App
participant JS as JetStream
participant Sub as PullSubscription
participant NATS as NATS Server
rect rgb(245,248,255)
note right of JS: Create pull subscription
App->>JS: pullSubscribe(stream, consumer_cfg)
JS->>NATS: Create consumer (with reply inbox)
NATS-->>JS: ConsumerInfo + messages delivered to inbox
JS-->>App: *PullSubscription
end
rect rgb(245,255,245)
note right of Sub: Fetch batch via inbox
App->>Sub: fetch(batch, timeout)
Sub->>NATS: Fetch/Request with reply inbox subject
NATS-->>Sub: Messages -> inbox subjects
Sub-->>App: MessageBatch{messages|err}
end
rect rgb(255,250,240)
note over Sub,NATS: Status handling
NATS-->>Sub: 100 Idle Heartbeat (Status)
Sub->>Sub: handleStatusMessage (track/ignore)
NATS-->>Sub: FlowControl (Description header)
Sub->>NATS: Empty reply to flow-control subject
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60–90 minutes Possibly related PRs
Poem
Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. ✨ Finishing Touches🧪 Generate unit tests
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
Status, Documentation and Community
|
|
@coderabbitai full review |
✅ Actions performedFull review triggered. |
- Add getMsg() and getLastMsg() methods to retrieve messages from streams - Add deleteMsg() and eraseMsg() methods to remove messages from streams - Add initEmpty() method to Message for creating empty messages - Add new test file for message operations - Support getting messages by sequence number or last message by subject - Support secure deletion (erase) vs regular deletion - Handle base64 decoding of message data and headers
0cd0d76 to
5c62aef
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (9)
src/message.zig (3)
101-116: initEmpty: solid arena-backed ctor; minor clarity improvements possibleAllocator ownership and deinit semantics look correct and safe. Two optional tweaks:
- Explicitly set reply = null and needs_header_parsing = false for readability (they already default, but this reduces cognitive load).
- Consider documenting that subject/data are empty and intended to be filled later, and that the message owns its arena (self-deinit invalidates self).
Example small clarity diff:
msg.* = .{ - .subject = &[_]u8{}, - .data = &[_]u8{}, - .arena = arena, + .subject = &[_]u8{}, + .reply = null, + .data = &[_]u8{}, + .needs_header_parsing = false, + .arena = arena, };
195-213: Header keys are treated case-sensitively; consider normalizing for HTTP-like semanticsNATS headers are modeled after HTTP-style headers, which are conventionally case-insensitive. headerSet/headerGet/headerDelete currently use a case-sensitive StringHashMap; callers must match exact casing to read/delete values. Consider normalizing keys (e.g., lowercasing on insert and lookup) to avoid subtle mismatches. Also, avoid allocating a new owned_key for getOrPut when the key already exists to reduce arena churn.
Illustrative diff (normalizing only in setter/getter paths):
@@ pub fn headerSet(self: *Self, key: []const u8, value: []const u8) !void { try self.ensureHeadersParsed(); const arena_allocator = self.arena.allocator(); - // Remove existing values (arena will clean up memory automatically) - _ = self.headers.fetchRemove(key); + // Normalize key (lowercase ASCII) and remove existing values + var key_norm = try arena_allocator.dupe(u8, key); + std.ascii.lowerString(key_norm, key_norm); + _ = self.headers.fetchRemove(key_norm); // Add new value - const owned_key = try arena_allocator.dupe(u8, key); + const owned_key = key_norm; // already normalized and owned const owned_value = try arena_allocator.dupe(u8, value); @@ pub fn headerGet(self: *Self, key: []const u8) !?[]const u8 { try self.ensureHeadersParsed(); - if (self.headers.get(key)) |values| { + var key_norm_buf = try self.arena.allocator().dupe(u8, key); + std.ascii.lowerString(key_norm_buf, key_norm_buf); + if (self.headers.get(key_norm_buf)) |values| { if (values.items.len > 0) { return values.items[0]; } } @@ pub fn headerGetAll(self: *Self, key: []const u8) !?[]const []const u8 { try self.ensureHeadersParsed(); - if (self.headers.get(key)) |values| { + var key_norm_buf = try self.arena.allocator().dupe(u8, key); + std.ascii.lowerString(key_norm_buf, key_norm_buf); + if (self.headers.get(key_norm_buf)) |values| { return values.items; // No copy needed - arena owns the data }If you prefer to keep case sensitivity internally, at least avoid the extra owned_key allocation when the key exists:
-const owned_key = try arena_allocator.dupe(u8, key); -const result = try self.headers.getOrPut(arena_allocator, owned_key); +const result = try self.headers.getOrPut(arena_allocator, key); +const owned_key = if (!result.found_existing) try arena_allocator.dupe(u8, key) else result.key_ptr.*; if (!result.found_existing) { - result.value_ptr.* = .{}; + result.key_ptr.* = owned_key; + result.value_ptr.* = .{}; }
34-34: JetStream-onlyseqon genericMessage— usage confined to JetStream modulesVerification shows
seqis only read or written in JetStream contexts:
- Definition:
src/message.zig:34(seq: u64 = 0)- Assignment:
src/jetstream.zig:770(msg.seq = stored_msg.seq;)- Test assertion:
tests/jetstream_msg_test.zig:33(testing.expectEqual(..., msg.seq);)Consider refactoring to keep core NATS payloads free of JetStream metadata:
- Move
seqout of the genericMessagetype and into a JetStream-specific wrapper (e.g. inJetStreamMessagefromsrc/jetstream_message.zig), or- Retain the field here but add a doc comment (and/or make it
?u64) to signal it’s only populated by JetStream operations.This will enforce separation of concerns and prevent accidental reliance on
seqin non-JetStream code paths.tests/jetstream_msg_test.zig (6)
6-41: Good coverage for basic getMsg sequencing; consider asserting reply and headers absenceThe assertions on subject, data, and seq are solid. As a small robustness tweak, also assert that reply is null and that there are no headers to catch accidental population.
Example:
try testing.expectEqualStrings("test.msg.get", msg.subject); try testing.expectEqualStrings("First message", msg.data); try testing.expectEqual(@as(u64, 1), msg.seq); +try testing.expect(msg.reply == null); +try testing.expectEqual(@as(usize, 0), msg.headers.count());
43-81: getLastMsg happy path looks correct; add a negative probe in this test for a missing subjectAs an extra guardrail within the same setup, assert that a non-existent subject returns JetStreamError to keep locality of expectations for getLastMsg.
const last_bar = try js.getLastMsg("TEST_MSG_LAST", "test.last.bar"); defer last_bar.deinit(); try testing.expectEqualStrings("test.last.bar", last_bar.subject); try testing.expectEqualStrings("Second bar", last_bar.data); try testing.expectEqual(@as(u64, 4), last_bar.seq); + +// Negative probe: subject with no messages +const missing = js.getLastMsg("TEST_MSG_LAST", "test.last.baz"); +try testing.expectError(error.JetStreamError, missing);
82-125: deleteMsg assertions are clear; also verify idempotency/error semanticsAfter a successful delete, calling deleteMsg on the same sequence should error (or return a defined false), depending on your API contract. Pinning this behavior avoids accidental changes later.
If contract is “error on second delete”, add:
const deleted = try js.deleteMsg("TEST_MSG_DELETE", 2); try testing.expect(deleted); + +// Deleting again should fail +const delete_again = js.deleteMsg("TEST_MSG_DELETE", 2); +try testing.expectError(error.JetStreamError, delete_again);
126-169: eraseMsg test is good; consider asserting data is truly gone per “secure erase” semanticsIf eraseMsg guarantees secure removal, a follow-up retrieval through lower-level APIs (if exposed) should not leak payload remnants. If that surface isn’t available, at least mirror deleteMsg’s idempotency check here too.
const erased = try js.eraseMsg("TEST_MSG_ERASE", 2); try testing.expect(erased); + +// Erasing again should fail +const erase_again = js.eraseMsg("TEST_MSG_ERASE", 2); +try testing.expectError(error.JetStreamError, erase_again);
170-214: Headers round-trip: nice. Add a binary payload case to exercise base64 pathsGiven this PR mentions base64 decoding for data and headers, consider publishing binary data (including zero bytes) and verifying it round-trips via getMsg. That ensures the JSON/base64 path is covered, not just plain UTF-8 strings.
Illustrative addition:
// Verify headers are preserved const test_header = try retrieved.headerGet("X-Test-Header"); try testing.expect(test_header != null); try testing.expectEqualStrings("test-value", test_header.?); const another_header = try retrieved.headerGet("X-Another-Header"); try testing.expect(another_header != null); try testing.expectEqualStrings("another-value", another_header.?); + +// Binary payload (exercises base64 decode in responses) +{ + var bin: [6]u8 = .{ 0x00, 0x01, 0xFF, 0x7F, 0x00, 0x42 }; + try conn.publish("test.msg.headers", bin[0..]); + try conn.flush(); + const binary_msg = try js.getMsg("TEST_MSG_HEADERS", 2); + defer binary_msg.deinit(); + try testing.expectEqual(@as(u64, 2), binary_msg.seq); + try testing.expectEqual(@as(usize, 6), binary_msg.data.len); + try testing.expectEqualSlices(u8, &bin, binary_msg.data); +}If header values can be base64-encoded on the wire in your implementation, add a similar probe for a header containing non-ASCII bytes and validate it via headerGet.
215-245: Error cases are comprehensive; consider covering stream-not-found for delete/eraseYou already cover getMsg/getLastMsg error paths. For completeness, assert deleteMsg/eraseMsg on a non-existent stream also return JetStreamError (if that’s the contract).
// Test deleting non-existent message const delete_result = js.deleteMsg("TEST_MSG_ERRORS", 999); try testing.expectError(error.JetStreamError, delete_result); + +// Stream not found for delete/erase +const delete_missing_stream = js.deleteMsg("NO_SUCH_STREAM", 1); +try testing.expectError(error.JetStreamError, delete_missing_stream); +const erase_missing_stream = js.eraseMsg("NO_SUCH_STREAM", 1); +try testing.expectError(error.JetStreamError, erase_missing_stream);
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (4)
src/jetstream.zig(3 hunks)src/message.zig(2 hunks)tests/all_tests.zig(1 hunks)tests/jetstream_msg_test.zig(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- tests/all_tests.zig
- src/jetstream.zig
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-08-25T14:51:33.542Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#31
File: tests/jetstream_pull_test.zig:21-23
Timestamp: 2025-08-25T14:51:33.542Z
Learning: The nats.zig project uses a "tests:beforeEach" test in tests/all_tests.zig that runs before each test to clean up all JetStream streams and consumers, ensuring test isolation. This eliminates the need for unique test resource names as each test starts with a clean JetStream state.
Applied to files:
tests/jetstream_msg_test.zig
📚 Learning: 2025-08-23T20:26:30.660Z
Learnt from: CR
PR: lalinsky/nats.zig#0
File: CLAUDE.md:0-0
Timestamp: 2025-08-23T20:26:30.660Z
Learning: Run tests (zig build test) to confirm your changes are working
Applied to files:
tests/jetstream_msg_test.zig
This PR adds JetStream message operations functionality:
Changes
getMsg()andgetLastMsg()methods to retrieve messages from streamsdeleteMsg()anderaseMsg()methods to remove messages from streamsinitEmpty()method to Message for creating empty messagesAPI Changes
JetStream.getMsg(stream_name, seq)- Get message by sequence numberJetStream.getLastMsg(stream_name, subject)- Get last message by subjectJetStream.deleteMsg(stream_name, seq)- Delete message (mark as deleted)JetStream.eraseMsg(stream_name, seq)- Erase message (secure removal)Message.initEmpty(allocator)- Create empty message with arenaTesting
Summary by CodeRabbit
New Features
Improvements
Tests