-
Notifications
You must be signed in to change notification settings - Fork 1
Duplicate ack/nak prevention for JetStream messages (plus cleanup) #43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- Add atomic bool field to track acknowledgment status - Add JetStreamError.MessageAlreadyAcknowledged error type - Implement duplicate prevention in ack(), nak(), and term() methods - Preserve inProgress() behavior (can be called multiple times) - Add isAcknowledged() introspection method - Add comprehensive test suite with 9 test scenarios - Follow Go NATS client atomic bool pattern as requested - All 93 tests pass including new duplicate prevention tests Closes #42 Co-authored-by: Lukáš Lalinský <[email protected]>
WalkthroughReplace per-message JetStream context with the underlying NATS connection, add an atomic ack flag and AckError.AlreadyAcked, make final acks use CAS to prevent duplicate acknowledgments (returning AlreadyAcked), add isAcked(), update AckType and ack payload mapping, and add tests covering duplicate-ack behavior. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Handler
participant JSM as JetStreamMessage
participant AF as AtomicFlag(acked)
participant NC as Connection
Handler->>JSM: ack()/nak()/term()/inProgress()
alt inProgress
JSM->>NC: publish "progress" payload
NC-->>JSM: publish result
JSM-->>Handler: result
else ack/nak/term
JSM->>AF: cmpxchg(false -> true)?
alt first acknowledgment
AF-->>JSM: success
JSM->>NC: publish final payload (AckType.toString())
NC-->>JSM: publish result
alt publish ok
JSM-->>Handler: ok
else publish err
JSM->>AF: set false (revert)
JSM-->>Handler: publish error
end
else duplicate acknowledgment
AF-->>JSM: already true
JSM-->>Handler: AckError.AlreadyAcked
end
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Assessment against linked issues
Out-of-scope changes
Possibly related PRs
Poem
✨ Finishing Touches🧪 Generate unit tests
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
Status, Documentation and Community
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@claude address my comments
|
Claude finished @lalinsky's task —— View job ✅ Review Comments AddressedAll review comments have been successfully addressed:
Additional Fixes
Tests AddedThe new test suite covers:
Changes committed and pushed to branch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (3)
tests/jetstream_duplicate_ack_test.zig (3)
3-3: Avoid unused import binding.
const nats = @import("nats");is unused and may fail as an unused top-level constant. Import for side effects instead.-const nats = @import("nats"); +const _ = @import("nats");
38-42: Fix init signature or remove it.
MockJetStream.initusestrybut returns a non-error type. Either make it!MockJetStreamand initialize the inner connection or remove it since tests constructMockJetStreamdirectly.Option A (return error union and initialize):
-fn init(allocator: std.mem.Allocator) MockJetStream { - return MockJetStream{ - .nc = try allocator.create(MockConnection), - }; -} +fn init(allocator: std.mem.Allocator) !MockJetStream { + const nc_ptr = try allocator.create(MockConnection); + nc_ptr.* = MockConnection.init(allocator); + return MockJetStream{ .nc = nc_ptr }; +}Option B: delete the unused function.
223-248: Term-after-ack failure is covered; add positive term tests.Consider adding:
- term succeeds on first call and publishes "+TERM".
- duplicate term returns MessageAlreadyAcknowledged.
I can draft these tests quickly if you want them added to this PR.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
src/jetstream_message.zig(3 hunks)tests/jetstream_duplicate_ack_test.zig(1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-25T14:51:33.542Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#31
File: tests/jetstream_pull_test.zig:21-23
Timestamp: 2025-08-25T14:51:33.542Z
Learning: The nats.zig project uses a "tests:beforeEach" test in tests/all_tests.zig that runs before each test to clean up all JetStream streams and consumers, ensuring test isolation. This eliminates the need for unique test resource names as each test starts with a clean JetStream state.
Applied to files:
tests/jetstream_duplicate_ack_test.zig
🔇 Additional comments (12)
src/jetstream_message.zig (4)
57-58: Good choice: single atomic bool per message.Compact and thread-safe. With the ordering fix above, this becomes robust across threads.
95-95: Doc tweak is correct.inProgress being multi-callable aligns with JetStream semantics.
181-185: isAcknowledged() looks good, minor note on ordering.
.load(.acquire)pairs well with the proposed.acq_relon CAS. Keep as-is.
40-44: The above script will locate all references tojetstream.zigand show the relevant sections insrc/root.zig,src/jetstream.zig(if present), and the test file. This will help confirm whereJetStreamErroris defined and re-exported, ensuring we have full context on potential naming conflicts.tests/jetstream_duplicate_ack_test.zig (8)
63-88: Ack first-call success test is solid.Covers publish side-effect and
isAcknowledged()state.
90-114: Duplicate-ack error path validated.Asserts only one publish. Good.
116-141: NAK first-call success test mirrors ACK path well.Checks both publish payload and acked state.
143-167: Duplicate-NAK error path validated.Good symmetry and side-effect check.
169-194: Cross-action exclusivity (NAK then ACK) is correct.Ensures only the first operation publishes.
196-221: Cross-action exclusivity (ACK then NAK) is correct.Validates expected error and single publish.
250-283: inProgress multi-call behavior verified.Nice coverage, including state remain-unacked before ACK.
285-312: inProgress after ack behavior verified.Good to see both messages captured in-order.
- Rename field from `ack_done` to `acknowledged` for clarity - Rename method from `isAcknowledged` to `acked` for brevity - Register test file in all_tests.zig - Rewrite tests to use real JetStream connections instead of mocks - Re-export JetStreamError from jetstream.zig for proper imports 🤖 Generated with [Claude Code](https://claude.ai/code) Co-authored-by: Lukáš Lalinský <[email protected]>
- Change CAS ordering from .acquire to .acq_rel for success case - Add errdefer to revert acknowledged flag on sendAck failure - Prevents 'false acknowledged' state when publish fails - Allows retry after transient publish errors Co-authored-by: Lukáš Lalinský <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/jetstream.zig (1)
403-407: Fix import cycle by passing Connection instead of JetStream to createJetStreamMessage.jetstream_message.zig imports jetstream.zig inside sendAck(), creating a circular import. Pass
*ConnectiontocreateJetStreamMessageand store it on the message instead.Apply this diff here after updating jetstream_message.zig accordingly:
- const js_msg_ptr = try jetstream_message.createJetStreamMessage(self.js, raw_msg); + const js_msg_ptr = try jetstream_message.createJetStreamMessage(self.js.nc, raw_msg);- const js_msg = jetstream_message.createJetStreamMessage(self.js, msg) catch { + const js_msg = jetstream_message.createJetStreamMessage(self.js.nc, msg) catch {- const js_msg = jetstream_message.createJetStreamMessage(js, msg) catch { + const js_msg = jetstream_message.createJetStreamMessage(js.nc, msg) catch {Also applies to: 454-458, 923-927
src/jetstream_message.zig (1)
106-121: Break the circular import: store Connection on the message, not JetStream.
sendAck()does@import("jetstream.zig"), but jetstream.zig imports this file → import cycle. Store*Connectionand publish directly.Apply:
-const Message = @import("message.zig").Message; +const Message = @import("message.zig").Message; +const Connection = @import("connection.zig").Connection; @@ pub const JetStreamMessage = struct { /// Underlying NATS message msg: *Message, - /// JetStream context for acknowledgments - js: *anyopaque, // Forward declaration - will be cast to *JetStream + /// NATS connection for acknowledgments + nc: *Connection, @@ fn sendAck(self: *JetStreamMessage, ack_type: AckType) !void { if (self.reply) |reply_subject| { @@ - // Import JetStream to access connection - const JetStream = @import("jetstream.zig").JetStream; - const js: *JetStream = @ptrCast(@alignCast(self.js)); - try js.nc.publish(reply_subject, ack_payload); + try self.nc.publish(reply_subject, ack_payload); } } @@ -pub fn createJetStreamMessage(js: *anyopaque, msg: *Message) !*JetStreamMessage { +pub fn createJetStreamMessage(nc: *Connection, msg: *Message) !*JetStreamMessage { @@ js_msg.* = JetStreamMessage{ .msg = msg, - .js = js, + .nc = nc, .reply = msg.reply, };Follow-up in src/jetstream.zig: pass
self.js.nc/self.ncat call sites (see my comment there).Also applies to: 188-196, 45-50
🧹 Nitpick comments (2)
src/jetstream.zig (1)
28-29: Name collision risk: JetStreamError alias vs local error tag.This module returns
error.JetStreamError(Line 510) and also exportspub const JetStreamError = …. It compiles but is confusing. Consider renaming the local error toerror.ApiError(or similar) to avoid ambiguity.src/jetstream_message.zig (1)
4-14: Dead constants: token position values unused.Either use these in parsers or remove them to reduce noise.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (4)
src/jetstream.zig(1 hunks)src/jetstream_message.zig(3 hunks)tests/all_tests.zig(1 hunks)tests/jetstream_duplicate_ack_test.zig(1 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-08-25T14:51:33.542Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#31
File: tests/jetstream_pull_test.zig:21-23
Timestamp: 2025-08-25T14:51:33.542Z
Learning: The nats.zig project uses a "tests:beforeEach" test in tests/all_tests.zig that runs before each test to clean up all JetStream streams and consumers, ensuring test isolation. This eliminates the need for unique test resource names as each test starts with a clean JetStream state.
Applied to files:
tests/all_tests.zigtests/jetstream_duplicate_ack_test.zig
📚 Learning: 2025-08-23T20:26:30.660Z
Learnt from: CR
PR: lalinsky/nats.zig#0
File: CLAUDE.md:0-0
Timestamp: 2025-08-23T20:26:30.660Z
Learning: Run tests (zig build test) to confirm your changes are working
Applied to files:
tests/jetstream_duplicate_ack_test.zig
🔇 Additional comments (7)
tests/all_tests.zig (1)
19-19: Test module registered — good.New test suite is correctly wired into the aggregator.
tests/jetstream_duplicate_ack_test.zig (4)
8-86: LGTM: first-ack path covered.Solid coverage of
ack()success andacked()introspection.
88-176: LGTM with robust duplicate-ack assertion.Second
ack()error path is validated againstMessageAlreadyAcknowledged.
177-265: LGTM: nak-after-ack path covered.Validates that
nak()is blocked post-ack.
266-361: LGTM: inProgress idempotence verified.Exercises multiple
inProgress()calls followed by a finalack().src/jetstream_message.zig (2)
40-44: Error type naming looks good.
MessageAlreadyAcknowledgedis explicit and clear.
100-104: Visibility helper is fine.
acked()with.load(.acquire)matches the CAS changes suggested above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
src/jetstream_message.zig (2)
40-44: Name the error set more specifically (aligns with prior maintainer note).JetStreamError is broad; consider scoping to the message/ack domain to avoid namespace collisions as more JetStream errors appear.
-pub const JetStreamError = error{ +pub const JetStreamMessageError = error{ MessageAlreadyAcknowledged, };
106-110: acked() name + acquire load look good.Matches earlier feedback (“call this acked”) and uses appropriate ordering for readers.
🧹 Nitpick comments (1)
src/jetstream_message.zig (1)
65-74: Reduce duplication: extract the CAS+send pattern into a tiny helper.Keeps the three methods consistent and easier to maintain.
@@ - /// Acknowledge successful processing - pub fn ack(self: *JetStreamMessage) !void { - // Check if already acknowledged using atomic compare-and-swap - const was_acknowledged = self.acknowledged.cmpxchgStrong(false, true, .acq_rel, .acquire); - if (was_acknowledged != null) { - return JetStreamError.MessageAlreadyAcknowledged; - } - // If sendAck fails, revert the acknowledged flag to allow retry - errdefer self.acknowledged.store(false, .release); - try self.sendAck(.ack); - } + fn ackOnce(self: *JetStreamMessage, ack_type: AckType) !void { + const was_acknowledged = self.acknowledged.cmpxchgStrong(false, true, .acq_rel, .acquire); + if (was_acknowledged != null) return JetStreamError.MessageAlreadyAcknowledged; + errdefer self.acknowledged.store(false, .release); + try self.sendAck(ack_type); + } + + /// Acknowledge successful processing + pub fn ack(self: *JetStreamMessage) !void { + try self.ackOnce(.ack); + } @@ - pub fn nak(self: *JetStreamMessage) !void { - // Check if already acknowledged using atomic compare-and-swap - const was_acknowledged = self.acknowledged.cmpxchgStrong(false, true, .acq_rel, .acquire); - if (was_acknowledged != null) { - return JetStreamError.MessageAlreadyAcknowledged; - } - // If sendAck fails, revert the acknowledged flag to allow retry - errdefer self.acknowledged.store(false, .release); - try self.sendAck(.nak); - } + pub fn nak(self: *JetStreamMessage) !void { + try self.ackOnce(.nak); + } @@ - pub fn term(self: *JetStreamMessage) !void { - // Check if already acknowledged using atomic compare-and-swap - const was_acknowledged = self.acknowledged.cmpxchgStrong(false, true, .acq_rel, .acquire); - if (was_acknowledged != null) { - return JetStreamError.MessageAlreadyAcknowledged; - } - // If sendAck fails, revert the acknowledged flag to allow retry - errdefer self.acknowledged.store(false, .release); - try self.sendAck(.term); - } + pub fn term(self: *JetStreamMessage) !void { + try self.ackOnce(.term); + }Also applies to: 77-86, 89-98
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
src/jetstream_message.zig(3 hunks)
🔇 Additional comments (4)
src/jetstream_message.zig (4)
57-58: LGTM: atomic flag is a simple, memory-efficient guard.Good choice using std.atomic.Value(bool) with a safe default (false).
66-74: LGTM: correct CAS ordering and rollback semantics across ack/nak/term.
- Success: .acq_rel publishes the store; failure uses .acquire.
- errdefer store(false, .release) avoids “false-acked” on publish failure.
Also applies to: 78-86, 90-98
101-101: Doc note is correct.inProgress being multi-callable matches JetStream semantics.
192-192: No comment needed on this hunk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
♻️ Duplicate comments (1)
tests/jetstream_duplicate_ack_test.zig (1)
1-6: Register tests in aggregator.Ensure
tests/all_tests.zigincludes these tests.#!/bin/bash rg -nP 'jetstream_duplicate_ack_test|jetstream_duplicate_ack_tests' tests/all_tests.zig
🧹 Nitpick comments (2)
tests/jetstream_duplicate_ack_test.zig (1)
148-151: Unsubscribe + deinit is likely redundant.Given subscriptions are refcounted,
deinit()should suffice; consider dropping explicitunsubscribe()to save an extra server op in tests. Optional.- defer push_sub.deinit(); - defer push_sub.unsubscribe() catch {}; + defer push_sub.deinit();Also applies to: 237-240, 333-336
src/jetstream_message.zig (1)
175-208: Wrapper creation + header parsing looks good. One suggestion: guard header parse cost.
ensureHeadersParsed()is unconditional; if headers are absent often, consider lazily checkingmsg.has_headers(if available) before parsing. Optional.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (4)
src/jetstream.zig(3 hunks)src/jetstream_message.zig(4 hunks)src/root.zig(7 hunks)tests/jetstream_duplicate_ack_test.zig(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- src/jetstream.zig
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-08-25T14:51:33.542Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#31
File: tests/jetstream_pull_test.zig:21-23
Timestamp: 2025-08-25T14:51:33.542Z
Learning: The nats.zig project uses a "tests:beforeEach" test in tests/all_tests.zig that runs before each test to clean up all JetStream streams and consumers, ensuring test isolation. This eliminates the need for unique test resource names as each test starts with a clean JetStream state.
Applied to files:
tests/jetstream_duplicate_ack_test.zig
📚 Learning: 2025-08-23T20:26:30.660Z
Learnt from: CR
PR: lalinsky/nats.zig#0
File: CLAUDE.md:0-0
Timestamp: 2025-08-23T20:26:30.660Z
Learning: Run tests (zig build test) to confirm your changes are working
Applied to files:
tests/jetstream_duplicate_ack_test.zig
📚 Learning: 2025-08-25T02:49:59.082Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#28
File: src/response_manager.zig:89-105
Timestamp: 2025-08-25T02:49:59.082Z
Learning: In this NATS Zig codebase, subscriptions are reference counted, so calling deinit() on a subscription from multiple locations (like both Connection and ResponseManager) is safe and doesn't cause double-free issues.
Applied to files:
src/jetstream_message.zig
🔇 Additional comments (4)
src/root.zig (2)
57-94: Tests block touched only by whitespace/comments — OK.No behavioral changes detected.
Also applies to: 96-124, 125-165
42-42: AckError Re-export Verified: JetStreamError Removal is SafeI ran a repository-wide search for the
JetStreamErrorsymbol and confirmed:• No references to
nats.JetStreamErrorremain in the public API surface (i.e.src/root.zig)
• All remaining occurrences are either in internal implementation (src/jetstream.zig) or in commented-out test snippets (tests undertests/jetstream*.zig)Since there are no live external dependencies on
nats.JetStreamError, it’s safe to remove that re-export from the root. Be sure to document this breaking change in your changelog and migration guide so downstream users know to importJetStreamErrordirectly fromjetstream.zig(e.g.@import("jetstream.zig").JetStreamError).src/jetstream_message.zig (2)
93-109: ACK CAS + rollback pattern looks correct.
- Uses
.acq_relon success and.acquireon failure.- Reverts
ackedon publish error viaerrdefer.- Applies only to final acks (ack/nak/term). Good.
66-86: Method semantics align with PR goals.
ack/nak/termdedupe viasendAckCAS.inProgressallowed multiple times.isAcked()uses.acquire. LGTM.Also applies to: 87-91
| // First ack should succeed | ||
| if (js_msg.ack()) { | ||
| data.first_ack_success = true; | ||
| } else |_| { | ||
| return; | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix Zig error handling: if (js_msg.ack()) is invalid for !void.
Use catch with error unions returning void. Also simplify second-ack failure check.
Apply:
// First ack should succeed
- if (js_msg.ack()) {
- data.first_ack_success = true;
- } else |_| {
- return;
- }
+ js_msg.ack() catch |_| { return; };
+ data.first_ack_success = true;
// Second ack should fail with AlreadyAcked
- if (js_msg.ack()) {
- // Should not reach here
- } else |err| {
- if (err == nats.AckError.AlreadyAcked) {
- data.second_ack_failed = true;
- }
- }
+ js_msg.ack() catch |err| {
+ if (err == nats.AckError.AlreadyAcked) {
+ data.second_ack_failed = true;
+ }
+ };Also applies to: 131-137
🤖 Prompt for AI Agents
In tests/jetstream_duplicate_ack_test.zig around lines 123-129 and 131-137, the
code uses `if (js_msg.ack())` which is invalid because `ack()` returns an error
union (`!void`); replace the first `if` with a direct call that returns on
error: call `js_msg.ack() catch { return; };` then set `data.first_ack_success =
true;`. For the second-ack check, expect an error and set the failure flag
inside the `catch`: call `js_msg.ack() catch |_| { data.second_ack_failed =
true; } else { return; }` (i.e., if the second `ack()` unexpectedly succeeds,
return/fail the test), so both acknowledges use `catch` to handle the `!void`
correctly and set the appropriate flags.
| // First ack should succeed | ||
| if (js_msg.ack()) { | ||
| data.ack_success = true; | ||
| } else |_| { | ||
| return; | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix Zig error handling in ack-then-nak test.
Replace if (...) over !void with catch blocks.
// First ack should succeed
- if (js_msg.ack()) {
- data.ack_success = true;
- } else |_| {
- return;
- }
+ js_msg.ack() catch |_| { return; };
+ data.ack_success = true;
// NAK after ack should fail
- if (js_msg.nak()) {
- // Should not reach here
- } else |err| {
- if (err == nats.AckError.AlreadyAcked) {
- data.nak_failed = true;
- }
- }
+ js_msg.nak() catch |err| {
+ if (err == nats.AckError.AlreadyAcked) {
+ data.nak_failed = true;
+ }
+ };Also applies to: 220-226
🤖 Prompt for AI Agents
In tests/jetstream_duplicate_ack_test.zig around lines 212-218 (and likewise for
220-226), the current pattern uses "if (js_msg.ack()) { ... } else |_| { return;
}" which is improper Zig error handling; replace these if/else blocks with
direct catch usage: call js_msg.ack() and use "catch { return; }" to
early-return on error, then set data.ack_success = true after the call succeeds;
do the same for the subsequent ack/nak calls (use js_msg.nak() or the
appropriate method) replacing their if/else over !void with direct calls
followed by catch to handle errors.
| var i: u32 = 0; | ||
| while (i < 3) : (i += 1) { | ||
| if (js_msg.inProgress()) { | ||
| data.progress_calls += 1; | ||
| } else |_| { | ||
| return; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix Zig error handling in inProgress/ack flow.
Use catch for inProgress() and ack().
while (i < 3) : (i += 1) {
- if (js_msg.inProgress()) {
- data.progress_calls += 1;
- } else |_| {
- return;
- }
+ js_msg.inProgress() catch { return; };
+ data.progress_calls += 1;
}
// Final ack should still work
- if (js_msg.ack()) {
- data.final_ack = true;
- } else |_| {
- return;
- }
+ js_msg.ack() catch { return; };
+ data.final_ack = true;Also applies to: 318-323
🤖 Prompt for AI Agents
In tests/jetstream_duplicate_ack_test.zig around lines 303-310 (and similarly
for 318-323), the calls to js_msg.inProgress() and js_msg.ack() currently ignore
Zig error returns; change them to use catch to handle errors. For inProgress(),
assign the result using `var in_prog = js_msg.inProgress() catch { return; }`
and then check `in_prog` to increment data.progress_calls; for ack(), call
`js_msg.ack() catch { return; }` (or `catch |err| { /* handle or return */ }`)
so any error is handled instead of propagating silently. Ensure progress_calls
is only incremented when inProgress() succeeds and ack() is called with its
error handled via catch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
src/jetstream_message.zig (1)
161-165: Off-by-one: v2 ACK subjects have 12 tokens, not 11.Branch should detect 12 tokens for v2 (
$JS.ACK.<domain>.<account>.<stream>.<consumer>.<delivered>.<sseq>.<cseq>.<tm>.<pending>.<token>). With 11, valid v2 subjects fall through toInvalidAckSubject.- } else if (token_count == 11) { + } else if (token_count == 12) { parseAckV2(subject, metadata);Consider adding a unit test with a real v2 subject (including the token) to prevent regressions.
🧹 Nitpick comments (2)
src/jetstream_message.zig (2)
107-127: Parsing returns success even on header mismatches; consider surfacing errors.
parseAckV1/V2early-return on token mismatches but do not signal failure.parseAckSubjectthen returnsvoid, so callers may see partially filled metadata without an error. If you want stricter behavior, haveparseAckV1/V2returnbool(parsed) or!voidand bubbleerror.InvalidAckSubject.Example minimal change:
- Make
parseAckV1/parseAckV2returnbool(true if fully parsed).- In
parseAckSubject, call and verify; otherwise returnerror.InvalidAckSubject.Also applies to: 129-155
88-104: No-op whenreplyis null — confirm desired API.Currently, if
msg.replyis absent,sendAcksilently no-ops and returns success. IfJetStreamMessagecan exist without a reply subject, consider returning a specific error (e.g.,error.NoAckSubject) to make failures explicit. If wrappers guarantee JetStream messages always have reply subjects, ignore.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
src/jetstream_message.zig(4 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-25T02:49:59.082Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#28
File: src/response_manager.zig:89-105
Timestamp: 2025-08-25T02:49:59.082Z
Learning: In this NATS Zig codebase, subscriptions are reference counted, so calling deinit() on a subscription from multiple locations (like both Connection and ResponseManager) is safe and doesn't cause double-free issues.
Applied to files:
src/jetstream_message.zig
🔇 Additional comments (4)
src/jetstream_message.zig (4)
20-27: AckType mapping looks correct and future-proof.
isFinal()andtoString()match JetStream semantics (+ACK,-NAK,+TERM,+WPI). Centralizing payload strings here is a good call.
54-56: Approach to duplicate-prevention via atomic flag is solid.
std.atomic.Value(bool)with defaultfalseplusisAcked()using.acquireis appropriate. Pairs well with.acq_relon the successful CAS.
61-75: Public ack APIs are clean and align with spec.Final acks (
ack/nak/term) route through the guarded path;inProgressskips the finality guard, allowing multiple calls. Nice separation.Also applies to: 76-80
170-185: Factory initialization looks correct.Allocating on the message arena and populating metadata from
replyis consistent and avoids extra ownership complexity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
src/jetstream_message.zig (1)
159-169: Off-by-one fix for v2 token count looks good.The check now uses 12 tokens for v2. This addresses the prior feedback.
🧹 Nitpick comments (3)
src/jetstream_message.zig (3)
5-7: Error type naming: align with public API/docs (or re-export).PR text mentions
JetStreamError.MessageAlreadyAcknowledged, while the code exposesAckError.AlreadyAcked. Confirm the intended public surface and either (a) rename to match docs or (b) re-export/alias at the package root to avoid churn for users.
89-103: CAS + errdefer placement correctly prevents false-acked and allows retry.
.cmpxchgStrong(false, true, .acq_rel, .acquire)is the right ordering.errdeferis registered only after a successful CAS (due to placement), so duplicate-ack errors won’t clear the flag.- Store rollback uses
.release, which is fine.Tiny readability nit: rename
was_ackedtoprevto reflect Zig’s return semantics (null on success, previous value on failure).Apply:
- const was_acked = self.acked.cmpxchgStrong(false, true, .acq_rel, .acquire); - if (was_acked != null) { + const prev = self.acked.cmpxchgStrong(false, true, .acq_rel, .acquire); + if (prev != null) { return AckError.AlreadyAcked; }
212-226: Creation path is clean; confirm ack behavior when reply is absent.
ack/nak/termbecome no-ops ifmsg.replyis null. If that’s intentional, great; otherwise consider returning a specific error to catch misuse.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
src/jetstream_message.zig(2 hunks)src/root.zig(6 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- src/root.zig
🔇 Additional comments (12)
src/jetstream_message.zig (12)
10-27: AckType + helpers look solid.Enum values and
isFinal()/toString()mapping match JetStream semantics (+ACK/-NAK/+TERM/+WPI).
32-34: SequencePair defaults are reasonable.Optional fields default to null; parsing fills them from the subject.
38-45: MsgMetadata defaults: OK.Defaults are sensible; values are populated by subject parsing.
50-56: Atomic ack flag choice is correct and lightweight.Using
std.atomic.Value(bool)initialized to false meets the duplicate-prevention goal with minimal memory.
61-74: ack/nak/term delegate cleanly to a single path.Thin wrappers keep the API tidy.
76-80: inProgress correctly remains idempotent and non-final.No mutation of the ack flag — matches spec.
82-85: isAcked memory ordering is appropriate.
.load(.acquire)pairs well with.cmpxchgStrong(..., .acq_rel, ...).
108-127: V1 subject parsing is correct.Prefix validation and numeric fields map to metadata as expected.
130-156: V2 subject parsing is correct, including domain and token handling.Skips account hash and token; accepts "_" as “no domain”.
171-177: Invalid-subject tests cover key failure modes.Covers empty/malformed subjects and invalid numerics.
179-193: V1 parsing test LGTM.Asserts all parsed fields, including sequences and counts.
195-209: V2 parsing test LGTM.Validates domain and all numeric fields.
Remove incorrect orelse operators on non-optional string fields in JetStream metadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
tests/jetstream_nak_test.zig (3)
61-88: Avoid holding the mutex while performing network ops (ACK/NAK) and loggingKeep the critical section small: capture/record state under lock, then release it before calling
nak()/ack()and doing logs. This prevents unnecessary contention and potential head-of-line blocking in concurrent handlers.- data.mutex.lock(); - defer data.mutex.unlock(); - - // Store message data copy for comparison - const msg_copy = data.allocator.dupe(u8, js_msg.msg.data) catch return; - data.messages.append(msg_copy) catch return; - - // Get delivery count from JetStream message metadata - const delivery_count = js_msg.metadata.num_delivered; - data.delivery_counts.append(delivery_count) catch return; - - log.info("Received message (delivery #{}): {s}", .{ delivery_count, js_msg.msg.data }); - - if (delivery_count == 1) { - // First delivery - store the data and NAK it - data.first_delivery_data = data.allocator.dupe(u8, js_msg.msg.data) catch return; - data.nak_count += 1; - js_msg.nak() catch |err| { - log.err("Failed to NAK message: {}", .{err}); - }; - log.info("NAK'd message on first delivery", .{}); - } else { - // Second or later delivery - ACK it - js_msg.ack() catch |err| { - log.err("Failed to ACK message: {}", .{err}); - }; - log.info("ACK'd message on delivery #{}", .{delivery_count}); - } + var delivery_count: u64 = 0; + var do_nak = false; + { + data.mutex.lock(); + defer data.mutex.unlock(); + + // Store message data copy for comparison + const msg_copy = data.allocator.dupe(u8, js_msg.msg.data) catch return; + data.messages.append(msg_copy) catch return; + + // Get delivery count from JetStream message metadata + delivery_count = js_msg.metadata.num_delivered; + data.delivery_counts.append(delivery_count) catch return; + + if (delivery_count == 1) { + // First delivery - store the data; decide NAK outside lock + data.first_delivery_data = data.allocator.dupe(u8, js_msg.msg.data) catch return; + data.nak_count += 1; + do_nak = true; + } + } + + log.info("Received message (delivery #{}): {s}", .{ delivery_count, js_msg.msg.data }); + if (do_nak) { + js_msg.nak() catch |err| { + log.err("Failed to NAK message: {}", .{err}); + }; + log.info("NAK'd message on first delivery", .{}); + } else { + js_msg.ack() catch |err| { + log.err("Failed to ACK message: {}", .{err}); + }; + log.info("ACK'd message on delivery #{}", .{delivery_count}); + }
180-197: Same here: release the mutex before NAK to avoid blocking other deliveriesCapture the delivery number and update shared state under lock; perform
nak()after unlocking.- data.mutex.lock(); - defer data.mutex.unlock(); - - // Get delivery count from JetStream message metadata - const delivery_num = js_msg.metadata.num_delivered; - - log.info("Received message delivery #{}", .{delivery_num}); - - if (data.delivery_count < data.received_deliveries.len) { - data.received_deliveries[data.delivery_count] = delivery_num; - data.delivery_count += 1; - } - - // Always NAK to trigger redelivery (up to max_deliver limit) - js_msg.nak() catch |err| { - log.err("Failed to NAK: {}", .{err}); - }; + var delivery_num: u64 = 0; + { + data.mutex.lock(); + defer data.mutex.unlock(); + + // Get delivery count from JetStream message metadata + delivery_num = js_msg.metadata.num_delivered; + + if (data.delivery_count < data.received_deliveries.len) { + data.received_deliveries[data.delivery_count] = delivery_num; + data.delivery_count += 1; + } + } + + log.info("Received message delivery #{}", .{delivery_num}); + + // Always NAK to trigger redelivery (up to max_deliver limit) + js_msg.nak() catch |err| { + log.err("Failed to NAK: {}", .{err}); + };
281-283: Logging: show a placeholder when stream/consumer are emptyWith metadata fields now defaulting to empty strings, printing a visible placeholder helps diagnose parsing/propagation issues in CI logs.
- log.info("- Stream: {s}", .{js_msg.metadata.stream}); - log.info("- Consumer: {s}", .{js_msg.metadata.consumer}); + log.info("- Stream: {s}", .{ if (js_msg.metadata.stream.len > 0) js_msg.metadata.stream else "unknown" }); + log.info("- Consumer: {s}", .{ if (js_msg.metadata.consumer.len > 0) js_msg.metadata.consumer else "unknown" });
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
tests/jetstream_nak_test.zig(1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-25T14:51:33.542Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#31
File: tests/jetstream_pull_test.zig:21-23
Timestamp: 2025-08-25T14:51:33.542Z
Learning: The nats.zig project uses a "tests:beforeEach" test in tests/all_tests.zig that runs before each test to clean up all JetStream streams and consumers, ensuring test isolation. This eliminates the need for unique test resource names as each test starts with a clean JetStream state.
Applied to files:
tests/jetstream_nak_test.zig
🔇 Additional comments (1)
tests/jetstream_nak_test.zig (1)
289-291: LGTM: direct access to non-optional metadata fieldsUsing
js_msg.metadata.stream/consumerdirectly aligns with the updated non-optional fields and simplifies the test.
Implements duplicate ack/nak prevention for JetStream messages using atomic bool approach as requested in #42.
Changes
ack_donefield to track acknowledgment statusJetStreamError.MessageAlreadyAcknowledgederror typeack(),nak(), andterm()methodsinProgress()behavior (can be called multiple times)isAcknowledged()introspection methodImplementation Details
std.atomic.Value(bool)with compare-and-swap operationsCloses #42
Summary by CodeRabbit
New Features
Bug Fixes
Documentation
Tests