-
Notifications
You must be signed in to change notification settings - Fork 1
Implement nakWithDelay for JetStream message acknowledgments #115
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Add support for delayed negative acknowledgments in JetStream messages,
allowing consumers to specify a redelivery delay in milliseconds.
Key features:
- nakWithDelay(delay_ms) method for delayed NAK with millisecond precision
- Automatic conversion to nanoseconds for NATS protocol compliance
- Zero delay falls back to regular NAK behavior
- Thread-safe implementation using atomic operations
- Comprehensive test coverage including timing verification
Implementation details:
- Refactored sendAck to accept optional delay parameter
- Eliminated code duplication between regular and delayed ACK methods
- Protocol message format: -NAK {"delay": <nanoseconds>}
- Consistent with existing timeout patterns using milliseconds
Tests verify:
- 500ms delay timing accuracy (400ms minimum tolerance)
- Zero delay behaves identically to regular NAK
- Proper message redelivery after specified delays
- No regressions in existing NAK functionality
WalkthroughImplements delay-aware JetStream acknowledgments by extending the ack send path to accept an optional delay, adds nakWithDelay(delay_ms), and updates ack/nak/term/inProgress to route through it. The payload switches between plain strings and structured messages with delay (ns). Root re-exports nuid. New tests cover delayed and zero-delay NAK behavior. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (3)
src/jetstream_message.zig (1)
71-74: Check for accidental non-NAK usage of delay.
sendAck(.nak, delay)is the only current path, but a defensive assertion keeps future call sites honest.Also applies to: 93-121
tests/jetstream_nak_test.zig (2)
339-457: Prefer monotonic time for delay assertions.Wall-clock jumps can flake the 400ms check. Consider using a monotonic source (e.g., nano timestamp divided by
ns_per_ms) forcurrent_time.
420-438: Optional: add a small grace wait after hitting the target count.A brief extra sleep (e.g., 100–200ms) after
count >= 2helps ensure no unexpected extra deliveries when timing is tight.Also applies to: 520-531
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
src/jetstream_message.zig(3 hunks)src/root.zig(1 hunks)tests/jetstream_nak_test.zig(1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-25T14:51:33.570Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#31
File: tests/jetstream_pull_test.zig:21-23
Timestamp: 2025-08-25T14:51:33.570Z
Learning: The nats.zig project uses a "tests:beforeEach" test in tests/all_tests.zig that runs before each test to clean up all JetStream streams and consumers, ensuring test isolation. This eliminates the need for unique test resource names as each test starts with a clean JetStream state.
Applied to files:
src/root.zigtests/jetstream_nak_test.zig
🔇 Additional comments (6)
src/root.zig (1)
32-32: Re-export looks good.
pub const nuid = @import("nuid.zig");mirrors the existinginboxpattern and cleanly exposes the module.src/jetstream_message.zig (4)
61-69: Unifying ACK paths viasendAck(..., null)is clean.This removes duplication and keeps behavior consistent across
ack,nak,term, andinProgress.Also applies to: 76-85
71-74: API fornakWithDelay(ms)is clear; zero-as-regular NAK is intuitive.Nice, matches server expectations and keeps call sites simple.
93-105: Atomic final-ACK guard with errdefer rollback is correct.Prevents duplicate final acks and restores state on publish failure.
Also applies to: 119-121
61-69: NAK-as-final semantics are intentional — tests enforce a single terminal signal per delivery.tests/jetstream_nak_test.zig uses NAK to trigger redelivery; tests/jetstream_duplicate_ack_test.zig asserts NAK after ACK fails. Behavior matches "one terminal signal per delivery" and needs no change.
tests/jetstream_nak_test.zig (1)
459-542: Zero-delay NAK behavior test looks solid.Clear two-delivery assertion with explicit ACK on redelivery.
| // Format the acknowledgment message with optional delay | ||
| var ack_message: [256]u8 = undefined; | ||
| const ack_body = if (delay_ms) |delay| blk: { | ||
| if (delay > 0) { | ||
| // Convert milliseconds to nanoseconds for the protocol message | ||
| const delay_ns = delay * std.time.ns_per_ms; | ||
| const formatted = try std.fmt.bufPrint(&ack_message, "{s} {{\"delay\": {}}}", .{ ack_type.toString(), delay_ns }); | ||
| break :blk formatted; | ||
| } else { | ||
| break :blk ack_type.toString(); | ||
| } | ||
| } else ack_type.toString(); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Guard against delay overflow (ms → ns) and cap to i64 range.
delay * ns_per_ms can overflow u64; the server expects an int64 duration. Clamp to math.maxInt(i64) in ns and format as i64. Optionally assert delay-with-json is only used for NAK.
Apply this diff inside sendAck:
- // Format the acknowledgment message with optional delay
- var ack_message: [256]u8 = undefined;
- const ack_body = if (delay_ms) |delay| blk: {
- if (delay > 0) {
- // Convert milliseconds to nanoseconds for the protocol message
- const delay_ns = delay * std.time.ns_per_ms;
- const formatted = try std.fmt.bufPrint(&ack_message, "{s} {{\"delay\": {}}}", .{ ack_type.toString(), delay_ns });
- break :blk formatted;
- } else {
- break :blk ack_type.toString();
- }
- } else ack_type.toString();
+ // Format the acknowledgment message with optional delay
+ var ack_message: [256]u8 = undefined;
+ const ack_body = if (delay_ms) |delay| blk: {
+ if (delay > 0) {
+ // ms -> ns with clamping to i64 range expected by the server
+ const max_ms_for_i64: u64 = @as(u64, std.math.maxInt(i64)) / std.time.ns_per_ms;
+ const clamped_ms: u64 = if (delay > max_ms_for_i64) max_ms_for_i64 else delay;
+ const delay_ns_i64: i64 = @intCast(clamped_ms * std.time.ns_per_ms);
+ // Only NAK supports a JSON delay payload
+ std.debug.assert(ack_type == .nak);
+ const formatted = try std.fmt.bufPrint(&ack_message, "{s} {{\"delay\": {}}}", .{ ack_type.toString(), delay_ns_i64 });
+ break :blk formatted;
+ } else {
+ break :blk ack_type.toString();
+ }
+ } else ack_type.toString();Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In src/jetstream_message.zig around lines 106-118, the conversion delay_ms *
std.time.ns_per_ms can overflow u64 and the server expects an int64 nanosecond
value; change the logic to compute delay_ns with checked/saturating arithmetic
and clamp to @as(i64, std.math.maxInt(i64)) when overflow would occur, then
format the JSON using the i64 value; additionally, ensure (or assert) that the
delay-with-json branch is only used for the NAK ack_type if that constraint
applies.
Summary
Adds support for delayed negative acknowledgments in JetStream messages, allowing consumers to specify a redelivery delay to prevent rapid redelivery of failed messages.
Key Features
nakWithDelay(delay_ms)method with millisecond precisionImplementation Details
sendAckto accept optional delay parameter-NAK {"delay": <nanoseconds>}sendAckWithDelaymethodTesting
Protocol Compliance
Follows NATS JetStream protocol specification for delayed NAK acknowledgments:
Test Plan