Skip to content

Conversation

@lalinsky
Copy link
Owner

@lalinsky lalinsky commented Aug 25, 2025

Summary

  • Implement comprehensive JetStream pull subscriptions following NATS naming patterns
  • Add pullSubscribe() method consistent with C (js_PullSubscribe) and Python (pull_subscribe) libraries
  • Provide PullSubscription type with fetch(), fetchNoWait(), and next() methods
  • Automatic deliver_subject=null handling for pull consumers

Implementation Details

  • Protocol: Uses $JS.API.CONSUMER.MSG.NEXT.{stream}.{consumer} for fetch requests
  • Status Messages: Handles 404 (no messages), 408 (timeout), 409 (sequence mismatch), 100 (heartbeat)
  • Thread Safety: Mutex protection for concurrent fetch operations
  • Resource Management: Proper cleanup of messages and network resources
  • Integration: Works with existing JetStream acknowledgment system

API Design

// Simple ephemeral pull subscription
var pull_sub = try js.pullSubscribe(stream_name, ConsumerConfig{
    .ack_policy = .explicit,
    .filter_subject = "orders.>",
});

// Fetch batch of messages
var batch = try pull_sub.fetch(FetchRequest{ 
    .batch = 10, 
    .expires = 30_000_000_000  // 30 seconds
});
defer batch.deinit();

// Process messages
for (batch.messages) |msg| {
    // Process message
    try msg.ack();
}

Test Coverage

  • ✅ Basic fetch operation with multiple messages
  • ✅ fetchNoWait() for immediate return behavior
  • ✅ next() convenience method for single messages
  • ✅ Proper error handling and resource cleanup
  • ✅ Integration with existing test infrastructure

Research & Compatibility

Based on analysis of C NATS library (js_PullSubscribe) and Go NATS patterns to ensure consistency across the NATS ecosystem. Follows established naming conventions and return types.

🤖 Generated with Claude Code

Summary by CodeRabbit

  • New Features

    • Added pull-based JetStream consumption: batch fetches with timeouts, no-wait, idle-heartbeat support, and a pullSubscribe API returning message batches.
  • Improvements

    • Better handling of status messages (flow-control, idle heartbeats) and streamlined subscription reply routing.
  • Tests

    • Integration test added validating pull fetches, batching, acknowledgements, and ordering.
  • Chores

    • Re-exported pull-related types in the public API.

lalinsky and others added 11 commits August 24, 2025 13:20
- Implement getStreamInfo method in jetstream.zig for $JS.API.STREAM.INFO endpoint
- Add comprehensive test for STREAM.INFO functionality
- Reorganize JetStream tests by moving stream management tests to dedicated jetstream_stream_test.zig file
- Update test runner to include new test file
- All tests passing (27/27)

This addresses the missing STREAM.INFO endpoint identified in the JetStream API implementation.
…livery

This adds comprehensive support for JetStream push subscriptions following
established NATS library naming conventions.

## Core Features Implemented:

### Enhanced Consumer Configuration
- Extended ConsumerConfig with push-specific fields:
  - deliver_group for queue-based load balancing
  - flow_control for backpressure management
  - idle_heartbeat support (required with flow_control)
- Automatic filtering of pull-only fields for push consumers

### JetStream Message Enhancement
- JetStreamMessage wrapper with header parsing:
  - Nats-Stream, Nats-Sequence, Nats-Subject, Nats-Time-Stamp
  - Acknowledgment methods: ack(), nak(), term(), inProgress()
  - Automatic cleanup and memory management

### Push Subscription API
- subscribe() method following NATS.C/NATS.Go naming conventions
- Type-safe callback handler integration
- Consumer creation with deliver_subject configuration
- Automatic NATS to JetStream message conversion
- Proper configuration validation and error handling

### Advanced Features
- Flow control with idle heartbeats for high-throughput scenarios
- Queue group support for load-balanced processing
- Explicit acknowledgment handling (ACK/NAK/TERM/+WPI)
- JetStream-specific error validation and reporting

## Test Coverage
- Comprehensive test suite in jetstream_push_test.zig
- Basic push subscription with callback processing
- Flow control with heartbeat configuration
- Error handling and validation testing
- All 30 tests passing with working functionality

## API Example:
```zig
const consumer_config = nats.ConsumerConfig{
    .durable_name = "processor",
    .deliver_subject = "push.orders.process",
    .ack_policy = .explicit,
    .flow_control = true,
    .idle_heartbeat = 30_000_000_000,
};

var sub = try js.subscribe("ORDERS", consumer_config, handler, .{&state});
```

This implementation provides a solid foundation for production-ready
JetStream push subscriptions with proper callback handling, acknowledgments,
and flow control.
- Add handleStatusMessage() function to process Status: "100" messages
- Filter status messages from user callbacks in push subscriptions
- Handle "Idle Heartbeat" and "FlowControl Request" messages
- Automatically respond to flow control requests with empty messages
- Add debug logging to understand actual message header format

This implements the JetStream heartbeat protocol as used by NATS.Go
and NATS.C libraries, ensuring status messages are handled internally
rather than being passed to user message handlers.

Note: Threading issue identified - message handlers currently run
in network thread, blocking I/O. This will be addressed separately.
…h-subscriptions

# Conflicts:
#	tests/all_tests.zig
- Remove two-level context structure that was causing memory leaks
- Simplify handler to pass JetStream instance and args directly
- Add proper subscription cleanup in JetStreamSubscription.deinit()
- Eliminate JSContext allocation that wasn't being freed
- All tests now pass without memory leak warnings

This fixes the cleanup issue where js_context was allocated but never
properly freed during subscription teardown.
…d NAK testing

- Fix JetStream message metadata parsing to handle both v1 and v2 ACK subject formats
- Implement proper reply subject parsing following Go NATS library specifications
- Add comprehensive NAK redelivery tests verifying delivery count and message content
- Extract JetStream message code into separate jetstream_message.zig module
- Add MsgMetadata and SequencePair structs matching Go NATS library
- Support both legacy ($JS.ACK.<stream>.<consumer>...) and modern formats
- All 80 tests passing with full Go library compliance

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
… management

- Add js.subscribeSync() for manual message consumption with push consumers
- Add JetStreamSubscription.nextMsg() with configurable timeout support
- Improve memory management by allocating JetStreamMessage on message arena
- Simplify cleanup - users only need to call js_msg.deinit(), no manual destroy()
- Add comprehensive test coverage with 3 new tests for sync subscriptions
- All 83 tests passing with no memory leaks

Key features:
- Push-based synchronous consumption using deliver_subject
- Full JetStream message metadata and acknowledgment support
- Consistent API with existing push subscription infrastructure
- Proper timeout handling for message retrieval
- Arena-based allocation ensures automatic cleanup

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
Add comprehensive support for JetStream pull subscriptions following
the established NATS naming patterns and protocols:

Core Features:
- pullSubscribe() method following C/Go/Python naming conventions
- PullSubscription type with fetch(), fetchNoWait(), and next() methods
- FetchRequest configuration for batch size, timeouts, and no-wait behavior
- MessageBatch container with proper error handling and cleanup
- Automatic deliver_subject=null handling for pull consumers

Implementation:
- Uses $JS.API.CONSUMER.MSG.NEXT.{stream}.{consumer} protocol
- Handles JetStream status messages (404, 408, 409, 100 heartbeats)
- Thread-safe operations with mutex protection
- Proper resource management and cleanup
- Integration with existing JetStream message acknowledgment

Testing:
- Comprehensive test suite covering fetch, fetchNoWait, and next methods
- Integration with existing test utilities
- Proper stream and consumer lifecycle management

Based on research of C NATS (js_PullSubscribe) and Go NATS libraries
to ensure API consistency across the NATS ecosystem.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Aug 25, 2025

Walkthrough

Adds a pull-based JetStream API: new public types (FetchRequest, MessageBatch, PullSubscription), JetStream.pullSubscribe and PullSubscription.fetch, persistent inbox reply subscription, status handling, message batching/deallocation, root re-exports, an inbox nuid buffer change, and accompanying pull tests.

Changes

Cohort / File(s) Summary
JetStream pull implementation
src/jetstream.zig
Adds pull-based API: FetchRequest, MessageBatch (with deinit), PullSubscription (fields, deinit, fetch), and JetStream.pullSubscribe. Implements fetch flow: publish pull request to consumer API, use a persistent wildcard inbox subscription, aggregate replies into a MessageBatch, handle JetStream status headers (100/404/408/409), and manage mutexing and memory cleanup.
Root re-exports
src/root.zig
Re-exports PullSubscription, FetchRequest, and MessageBatch from jetstream.zig, extending public API surface.
Tests aggregation
tests/all_tests.zig
Adds pub const jetstream_pull_tests = @import("jetstream_pull_test.zig"); to include new pull tests in the suite.
Pull consumer tests
tests/jetstream_pull_test.zig
New test exercising pull consumer basic fetch: create stream/consumer, publish three messages, fetch two then one, validate payloads and ack, and cleanup.
Inbox NUID buffer change
src/inbox.zig
Replaces use of nuid.next() returned slice with a stack-allocated nuid_buf passed into nuid.next(&nuid_buf) before formatting; no external behavioral change.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant C as Client
  participant PS as PullSubscription
  participant JS as JetStream
  participant NS as NATS

  C->>PS: fetch(batch, timeout_ms)
  activate PS
  PS->>PS: mutex.lock
  PS->>JS: build pull request (batch/max_bytes/expires/no_wait/heartbeat)
  JS->>NS: Publish to $JS.API.CONSUMER.MSG.NEXT (reply to inbox-prefix)
  JS->>PS: ensure wildcard inbox subscription exists
  loop collect replies until batch complete / timeout / error
    NS-->>PS: Message or Status (100/404/408/409)
    PS->>PS: handle status or convert to JetStreamMessage, aggregate
  end
  PS->>PS: mutex.unlock
  PS-->>C: MessageBatch (messages, err)
  deactivate PS
Loading
sequenceDiagram
  participant C as Client
  participant PS as PullSubscription

  note over C,PS: convenience helpers
  C->>PS: next(timeout_ns)
  PS->>PS: fetch({batch:1, expires:timeout_ns})
  PS-->>C: ?JetStreamMessage

  C->>PS: fetchNoWait(batch)
  PS->>PS: fetch({batch:batch, no_wait:true})
  PS-->>C: MessageBatch (possibly empty)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Poem

A rabbit waits by inbox light,
Pulls a batch with gentle bite.
Status hums, and messages hop,
I ack, I free — the queue stops not.
Hooray! My batch is tidy, bright 🐇📮

Tip

🔌 Remote MCP (Model Context Protocol) integration is now available!

Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats.

✨ Finishing Touches
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature/jetstream-pull-subscriptions

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Base automatically changed from feature/jetstream-push-subscriptions to main August 25, 2025 11:54
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (7)
src/jetstream.zig (5)

248-276: Clarify ownership and make MessageBatch.deinit more defensive.

deinit assumes ownership of every JetStreamMessage and frees the slice. Consider nulling out self.messages after free to avoid accidental double free and make deinit idempotent, which helps callers using defer in error paths.

     pub fn deinit(self: *MessageBatch) void {
         // Clean up each message individually
         for (self.messages) |js_msg| {
             js_msg.deinit();
         }
         // Free the messages array
-        self.allocator.free(self.messages);
+        self.allocator.free(self.messages);
+        self.messages = &[_]*JetStreamMessage{};
     }

328-338: Do not emit null optional fields in JSON request.

Some JetStream servers are strict; sending "max_bytes": null etc. is unnecessary. Ensure null optionals are omitted.

-        const request_json = try std.json.stringifyAlloc(self.js.allocator, request, .{});
+        const request_json = try std.json.stringifyAlloc(self.js.allocator, request, .{
+            .emit_null_optional_fields = false,
+        });

346-346: Use std.time for precise ns→ms conversion and avoid narrowing-cast pitfalls.

Current code divides first and then casts; prefer divTrunc for clarity and to prevent accidental overflow/rounding surprises.

-        const timeout_ms = @as(u64, @intCast(request.expires / 1_000_000)); // Convert nanoseconds to milliseconds
+        const timeout_ms: u64 = @divTrunc(request.expires, std.time.ns_per_ms);

354-383: Improve status handling: log unexpected status codes for diagnosability.

Unknown statuses are discarded silently. Add a debug log so field issues (e.g., non-standard statuses) are visible during troubleshooting.

                 } else {
                     // Unknown status code - clean up and continue
+                    log.debug("JetStream pull: unknown status code: {s}", .{status_code});
                     raw_msg.deinit();
                 }

406-435: next() memory/ownership path looks correct. Minor tightening optional.

The method returns the first message, deinit’s extras, and frees the slice; clean. Optionally, assert that batch.messages.len <= 1 after trimming for extra safety.

tests/jetstream_pull_test.zig (2)

120-124: No-wait empty fetch assertion is accurate; consider asserting no error too.

Strengthen the assertion to ensure a 404 “no messages” path wasn’t misclassified as another error.

-    try testing.expect(empty_batch.messages.len == 0);
+    try testing.expect(!empty_batch.hasError());
+    try testing.expect(empty_batch.messages.len == 0);

48-55: Ensure JetStream publishes are flushed to avoid CI timing flakiness

Replace the fixed sleep after publishing with a call to conn.flush(), which blocks until all pending writes are acknowledged by the server. This removes reliance on arbitrary delays and prevents intermittent test failures.

Locations to update:

  • tests/jetstream_pull_test.zig at:
    • Lines 48–55 in the “basic fetch” test
    • Lines 125–131 in the “fetchNoWait” test
    • Lines 186–191 in the “next()” test

Suggested diff (example for the basic fetch test):

     // Publish some test messages
     try conn.publish("test.pull.msg1", "Hello Pull 1");
     try conn.publish("test.pull.msg2", "Hello Pull 2");
     try conn.publish("test.pull.msg3", "Hello Pull 3");
-
-    // Wait a moment for messages to be stored
-    std.time.sleep(100 * std.time.ns_per_ms);
+    // Ensure all publish requests are sent and acknowledged
+    try conn.flush();

Apply the same pattern in the other two tests in this file. Since Connection.flush() is available (see src/connection.zig:601), no additional utilities are needed.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between f3fc2dc and ec3c4c1.

📒 Files selected for processing (4)
  • src/jetstream.zig (2 hunks)
  • src/root.zig (1 hunks)
  • tests/all_tests.zig (1 hunks)
  • tests/jetstream_pull_test.zig (1 hunks)
🧰 Additional context used
🪛 GitHub Actions: CI
tests/jetstream_pull_test.zig

[error] 137-137: JetStream pull consumer fetchNoWait test failed: assertion 'batch.messages.len == 2' did not pass (TestUnexpectedResult).


[error] 1-1: JetStream API error: 404 stream not found (err_code=10059) encountered during test execution.

🔇 Additional comments (5)
tests/all_tests.zig (1)

17-17: Include pull tests: LGTM.

Importing jetstream_pull_test.zig into the aggregate is correct.

tests/jetstream_pull_test.zig (3)

66-79: Ack order and cleanup look correct.

You ack after inspecting and then deinit messages/slices. Looks good.

Also applies to: 136-146, 196-201


203-206: Good negative path check on next().

Validates timeout behavior cleanly.


1-12: Import surface is minimal and clear.

The tests consume the new public API cleanly. No action needed.

src/root.zig (1)

49-53: Re-exports for PullSubscription API: LGTM.

Publicly exposing PullSubscription, FetchRequest, and MessageBatch via root matches the rest of the API surface.

Comment on lines 300 to 341
pub fn fetch(self: *PullSubscription, request: FetchRequest) !MessageBatch {
self.mutex.lock();
defer self.mutex.unlock();

// Build the pull request subject
const pull_subject = try std.fmt.allocPrint(
self.js.allocator,
"CONSUMER.MSG.NEXT.{s}.{s}",
.{ self.stream_name, self.consumer_name }
);
defer self.js.allocator.free(pull_subject);

// Create unique inbox for responses
const nuid = @import("nuid.zig");
const nuid_str = try nuid.nextString(self.js.allocator);
defer self.js.allocator.free(nuid_str);
const inbox = try std.fmt.allocPrint(
self.js.allocator,
"_INBOX.{s}",
.{nuid_str}
);
defer self.js.allocator.free(inbox);

// Create synchronous subscription for the inbox
const inbox_sub = try self.js.nc.subscribeSync(inbox);
defer inbox_sub.deinit();

// Serialize the fetch request to JSON
const request_json = try std.json.stringifyAlloc(self.js.allocator, request, .{});
defer self.js.allocator.free(request_json);

// Build the full API subject
const api_subject = try std.fmt.allocPrint(
self.js.allocator,
"{s}{s}",
.{ default_api_prefix, pull_subject }
);
defer self.js.allocator.free(api_subject);

// Send the pull request with reply subject
try self.js.nc.publishRequest(api_subject, inbox, request_json);

Copy link
Contributor

@coderabbitai coderabbitai bot Aug 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Avoid reply-inbox race: flush the SUB before publishing the pull request.

Publishing the fetch request immediately after subscribeSync risks the server replying before the SUB is processed, dropping replies. This can manifest as intermittent empty batches (observed in CI failure). Flush the connection after subscribing and before publishRequest.

If Connection exposes flush, insert it right after subscribeSync:

         // Create synchronous subscription for the inbox
         const inbox_sub = try self.js.nc.subscribeSync(inbox);
         defer inbox_sub.deinit();
+        // Ensure the server has processed the SUB before sending the request
+        try self.js.nc.flush();
 
         // Serialize the fetch request to JSON
         const request_json = try std.json.stringifyAlloc(self.js.allocator, request, .{});

If the flush API has a different signature, please adjust accordingly. To locate it:


🏁 Script executed:

#!/bin/bash
# Find flush-like APIs on Connection
rg -nP 'fn\s+flush\s*\(' src | sed -n '1,120p'
rg -nP 'flush' src/connection.zig -n -C3

Length of output: 6376


Ensure SUB is processed before publishing the fetch request
Insert a connection-level flush immediately after subscribeSync to avoid the server’s replies arriving before the SUB is registered (which can drop messages and lead to empty batches).

Locations to update:

  • File: src/jetstream.zig, inside pub fn fetch, right after the subscribeSync call.

Suggested diff:

         // Create synchronous subscription for the inbox
         const inbox_sub = try self.js.nc.subscribeSync(inbox);
         defer inbox_sub.deinit();
+        // Ensure the server has processed the SUB before sending the request
+        try self.js.nc.flush(); // uses Connection.flush(self: *Self) !void at src/connection.zig:601

         // Serialize the fetch request to JSON
         const request_json = try std.json.stringifyAlloc(self.js.allocator, request, .{});
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub fn fetch(self: *PullSubscription, request: FetchRequest) !MessageBatch {
self.mutex.lock();
defer self.mutex.unlock();
// Build the pull request subject
const pull_subject = try std.fmt.allocPrint(
self.js.allocator,
"CONSUMER.MSG.NEXT.{s}.{s}",
.{ self.stream_name, self.consumer_name }
);
defer self.js.allocator.free(pull_subject);
// Create unique inbox for responses
const nuid = @import("nuid.zig");
const nuid_str = try nuid.nextString(self.js.allocator);
defer self.js.allocator.free(nuid_str);
const inbox = try std.fmt.allocPrint(
self.js.allocator,
"_INBOX.{s}",
.{nuid_str}
);
defer self.js.allocator.free(inbox);
// Create synchronous subscription for the inbox
const inbox_sub = try self.js.nc.subscribeSync(inbox);
defer inbox_sub.deinit();
// Serialize the fetch request to JSON
const request_json = try std.json.stringifyAlloc(self.js.allocator, request, .{});
defer self.js.allocator.free(request_json);
// Build the full API subject
const api_subject = try std.fmt.allocPrint(
self.js.allocator,
"{s}{s}",
.{ default_api_prefix, pull_subject }
);
defer self.js.allocator.free(api_subject);
// Send the pull request with reply subject
try self.js.nc.publishRequest(api_subject, inbox, request_json);
pub fn fetch(self: *PullSubscription, request: FetchRequest) !MessageBatch {
self.mutex.lock();
defer self.mutex.unlock();
// Build the pull request subject
const pull_subject = try std.fmt.allocPrint(
self.js.allocator,
"CONSUMER.MSG.NEXT.{s}.{s}",
.{ self.stream_name, self.consumer_name }
);
defer self.js.allocator.free(pull_subject);
// Create unique inbox for responses
const nuid = @import("nuid.zig");
const nuid_str = try nuid.nextString(self.js.allocator);
defer self.js.allocator.free(nuid_str);
const inbox = try std.fmt.allocPrint(
self.js.allocator,
"_INBOX.{s}",
.{nuid_str}
);
defer self.js.allocator.free(inbox);
// Create synchronous subscription for the inbox
const inbox_sub = try self.js.nc.subscribeSync(inbox);
defer inbox_sub.deinit();
// Ensure the server has processed the SUB before sending the request
try self.js.nc.flush(); // uses Connection.flush(self: *Self) !void at src/connection.zig:601
// Serialize the fetch request to JSON
const request_json = try std.json.stringifyAlloc(self.js.allocator, request, .{});
defer self.js.allocator.free(request_json);
// Build the full API subject
const api_subject = try std.fmt.allocPrint(
self.js.allocator,
"{s}{s}",
.{ default_api_prefix, pull_subject }
);
defer self.js.allocator.free(api_subject);
// Send the pull request with reply subject
try self.js.nc.publishRequest(api_subject, inbox, request_json);
}
🤖 Prompt for AI Agents
In src/jetstream.zig around lines 300 to 341, after creating the synchronous
inbox subscription with subscribeSync, insert a connection-level flush call on
the underlying NATS connection and handle its error (i.e., call the
nc.flush-equivalent and return/propagate the error if it fails) so the SUB is
processed by the server before publishing the fetch request; this prevents
replies from arriving before the subscription is registered and avoids
dropped/empty message batches.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Messages are written in order, so even if both are buffered and sent later, the server will see SUB first and MSG (request) later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lalinsky You're absolutely right about NATS message ordering guarantees. Since messages are written and processed in order over the TCP connection, the SUB command will always be seen by the server before the subsequent request message, even when both are buffered. This makes the flush() call unnecessary overhead.

Thank you for the clarification on the NATS protocol behavior!


✏️ Learnings added
Learnt from: lalinsky
PR: lalinsky/nats.zig#31
File: src/jetstream.zig:0-0
Timestamp: 2025-08-25T14:49:05.827Z
Learning: In NATS protocol, messages are written in order over TCP and processed by the server in the order received. This means a SUB command will always be processed before a subsequent request message, even if both are buffered, making explicit flush() calls unnecessary for subscription-then-request patterns.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

Comment on lines +21 to +23
const stream_name = "TEST_PULL_STREAM";
const consumer_name = "test_pull_consumer";

Copy link
Contributor

@coderabbitai coderabbitai bot Aug 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use unique stream/consumer names per test to avoid cross-test interference.

CI shows “404 stream not found” — likely from the global cleanup test racing other tests. Make names unique per test run to avoid collisions and accidental deletions by other tests running concurrently.

Apply (example for all three tests):

-    const stream_name = "TEST_PULL_STREAM";
-    const consumer_name = "test_pull_consumer";
+    var sbuf: [64]u8 = undefined;
+    var cbuf: [64]u8 = undefined;
+    const ts = @intCast(u64, std.time.milliTimestamp());
+    const stream_name = try std.fmt.bufPrint(&sbuf, "TEST_PULL_STREAM_{d}", .{ts});
+    const consumer_name = try std.fmt.bufPrint(&cbuf, "test_pull_consumer_{d}", .{ts});

Repeat analogous changes for TEST_PULL_NOWAIT_STREAM/test_pull_nowait_consumer and TEST_PULL_NEXT_STREAM/test_pull_next_consumer. If bufPrint’s lifetime is an issue, keep the buffers in scope for the test (as shown).

Also applies to: 92-94, 159-161

🤖 Prompt for AI Agents
In tests/jetstream_pull_test.zig around lines 21-23 (and similarly at 92-94 and
159-161), the stream and consumer names are global/static causing cross-test
collisions; update each test to generate unique stream_name and consumer_name
per run (e.g., append a timestamp/UUID or test-specific suffix) so names are not
shared across concurrent CI runs, and ensure any bufPrint or temporary buffers
used to build those names remain in scope for the duration of the test (declare
buffers and formatted name variables inside the test function or block) so the
generated strings remain valid when used.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a system for clearing streams/consumers before each test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Look for test cleanup systems and utilities
echo "=== Examining test utilities ==="
cat tests/utils.zig

echo -e "\n=== Looking for cleanup patterns in test files ==="
rg -nC3 --type=zig "cleanup|clear|delete|remove" tests/

echo -e "\n=== Searching for stream/consumer management patterns ==="
rg -nC3 --type=zig "stream.*delete|consumer.*delete|cleanup.*stream|cleanup.*consumer" .

echo -e "\n=== Looking for test setup/teardown patterns ==="
rg -nC3 --type=zig "setUp|tearDown|before.*test|after.*test" tests/

Length of output: 15492


@lalinsky Thanks for clarifying! I can see your cleanup system now in tests/all_tests.zig - the "tests:beforeEach" test that runs before each test to delete all existing streams (and their associated consumers). That's actually a more comprehensive approach than unique naming since it ensures complete test isolation regardless of previous test states or failures.

The CI "404 stream not found" error is likely due to timing or connection issues during the cleanup process rather than name collisions. Your cleanup system should handle cross-test interference effectively.


✏️ Learnings added
Learnt from: lalinsky
PR: lalinsky/nats.zig#31
File: tests/jetstream_pull_test.zig:21-23
Timestamp: 2025-08-25T14:51:33.542Z
Learning: The nats.zig project uses a "tests:beforeEach" test in tests/all_tests.zig that runs before each test to clean up all JetStream streams and consumers, ensuring test isolation. This eliminates the need for unique test resource names as each test starts with a clean JetStream state.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

- Replace per-request sync subscriptions with persistent wildcard inbox subscription
- Use unique fetch IDs in reply subjects for message validation
- Follow NATS C library pattern: create _INBOX.{nuid}.* subscription once
- Generate unique reply subjects per fetch: _INBOX.{nuid}.{fetchID}
- Improve performance by reusing subscription across multiple fetch operations
- Fix inbox.zig to use correct nuid.next() API with buffer parameter
- Messages validated by timestamp uniqueness in JetStream ACK subjects

This change aligns with how the NATS C library handles pull subscriptions
and reduces the overhead of creating/destroying subscriptions per fetch.
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (5)
tests/jetstream_pull_test.zig (1)

21-23: Use unique stream/consumer names per test run to avoid collisions/races.

Static names can collide across concurrent tests or retries, causing flakiness (e.g., “404 stream not found” from another test’s cleanup). Generate names with a timestamp or nuid and keep buffers in scope.

-    const stream_name = "TEST_PULL_STREAM";
-    const consumer_name = "test_pull_consumer";
+    var sbuf: [64]u8 = undefined;
+    var cbuf: [64]u8 = undefined;
+    const ts = @as(u64, @intCast(std.time.milliTimestamp()));
+    const stream_name = try std.fmt.bufPrint(&sbuf, "TEST_PULL_STREAM_{d}", .{ts});
+    const consumer_name = try std.fmt.bufPrint(&cbuf, "test_pull_consumer_{d}", .{ts});
src/jetstream.zig (4)

849-853: Ephemeral consumers: consumer_name must come from ConsumerInfo.name, not config.

For ephemeral consumers, config.name and durable_name are null. The server-assigned name is in ConsumerInfo.name. Using config.* breaks the ephemeral path.

-        // Get the consumer name (use name first, then durable_name)
-        const consumer_name = consumer_info.value.config.name orelse
-            consumer_info.value.config.durable_name orelse
-            return error.MissingConsumerName;
+        // Use the actual server-assigned name (works for ephemeral); durable fallback remains available if needed
+        const consumer_name = consumer_info.value.name;

865-868: Flush after creating the wildcard inbox subscription to avoid reply-inbox race.

Without a flush, the server may send replies before it processes the SUB, dropping fetch responses intermittently.

         // Create the persistent wildcard inbox subscription
         const inbox_subscription = try self.nc.subscribeSync(wildcard_subject);
         errdefer inbox_subscription.deinit();
+        // Ensure SUB is processed before any fetch() publishes requests
+        try self.nc.flush();

235-247: Make FetchRequest fields unsigned and validate inputs (prevents negative values and usize mismatches).

batch and max_bytes cannot be negative. Using signed types introduces bugs and forces casts when comparing against usize lengths later.

 pub const FetchRequest = struct {
-    /// Maximum number of messages to fetch
-    batch: i32 = 1,
-    /// Maximum bytes to fetch (optional)
-    max_bytes: ?i64 = null,
+    /// Maximum number of messages to fetch (must be >= 1)
+    batch: usize = 1,
+    /// Maximum bytes to fetch (optional, non-negative)
+    max_bytes: ?u64 = null,
     /// Request timeout in nanoseconds (default: 30 seconds)
     expires: u64 = 30_000_000_000,
     /// Don't wait if no messages are available immediately
     no_wait: bool = false,
     /// Heartbeat interval in nanoseconds for long requests
     idle_heartbeat: ?u64 = null,
 };

Follow-up: Reject batch == 0 at the call site (see next comment).


329-333: Fix type mismatch in loop condition and avoid potential infinite wait patterns.

With batch as i32, comparing to messages.items.len (usize) is a mismatch; after switching to usize, the cast goes away. Also, note that each nextMsg call waits up to timeout_ms; with batch > 1 this can multiply the total wait. Consider reducing the per-iteration wait after receiving at least one message or honoring server heartbeats for earlier exit.

-        while (!batch_complete and messages.items.len < request.batch) {
+        while (!batch_complete and messages.items.len < request.batch) {
             if (self.inbox_subscription.nextMsg(timeout_ms)) |raw_msg| {
                 // ...

Optional improvement (not mandatory for this PR): switch to a remaining-time budget or shorter per-iteration waits after the first message arrives.

🧹 Nitpick comments (7)
src/inbox.zig (1)

23-25: Explicitly slice the NUID buffer when formatting.

Passing a fixed-size array to "{s}" can rely on coercions that aren’t always obvious. Slice explicitly for clarity and to avoid surprises with future changes.

-    return try std.fmt.allocPrint(allocator, "{s}{s}", .{ INBOX_PREFIX, nuid_buf });
+    return try std.fmt.allocPrint(allocator, "{s}{s}", .{ INBOX_PREFIX, nuid_buf[0..] });
tests/jetstream_pull_test.zig (2)

32-34: Add explicit server-side cleanup (delete consumer/stream) to reduce interference.

Deinit frees client-side allocations but the durable consumer and stream remain on the server. Deleting them prevents surprises in later tests or local runs.

     var stream_info = try js.addStream(stream_config);
     defer stream_info.deinit();
+    // Ensure server-side cleanup even on test failure
+    defer js.deleteStream(stream_name) catch {};

     // Create a pull consumer
@@
     var subscription = try js.pullSubscribe(stream_name, consumer_config);
     defer subscription.deinit();
+    // Delete the durable consumer at the end of the test
+    defer js.deleteConsumer(stream_name, consumer_name) catch {};

Also applies to: 42-44


73-76: Remove noisy debug logs in tests (or guard by a verbose flag).

These logs make CI output noisy and can slow runs with large volumes. Prefer assertions only.

-    std.log.debug("Received messages: {}", .{batch2.messages.len});
-    for (batch2.messages) |msg| {
-        std.log.debug("Message: {s}", .{msg.msg.data});
-    }
src/jetstream.zig (4)

170-171: Fix typo in comment (“mess” -> “message”).

Minor doc polish.

-    /// For push consumers this will regularly send an empty mess with Status header 100 and a reply subject
+    /// For push consumers this will regularly send an empty message with Status header 100 and a reply subject

337-363: Status handling is good, but treat unknown Status conservatively.

Unknown statuses are currently dropped silently after deinit. Consider logging at warn level with the code to aid troubleshooting.

-                    // Unknown status code - clean up and continue
-                    raw_msg.deinit();
+                    // Unknown status code - clean up and continue, but log for visibility
+                    log.warn("Unhandled JetStream status code: {s}", .{status_code});
+                    raw_msg.deinit();

331-333: Avoid logging message payloads at debug by default.

Logging full payloads can leak PII and bloat logs. Keep subject/reply; omit data unless explicitly opted-in.

-                log.debug("Message: subject={s}, reply={s}, data='{s}'", .{ raw_msg.subject, raw_msg.reply orelse "none", raw_msg.data });
+                log.debug("Message: subject={s}, reply={s}", .{ raw_msg.subject, raw_msg.reply orelse "none" });

287-292: Deinit order is fine; optional defensive nil-out after free.

Current cleanup is correct. If you want to prevent accidental reuse after deinit, you can nil-out pointers. Not required.

         self.consumer_info.deinit();
         self.inbox_subscription.deinit();
         self.js.allocator.free(self.inbox_prefix);
+        // Optional: defensive clear (not required)
+        // self.inbox_prefix = &[_]u8{};
         self.js.allocator.destroy(self);
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between ec3c4c1 and 182e26b.

📒 Files selected for processing (3)
  • src/inbox.zig (1 hunks)
  • src/jetstream.zig (11 hunks)
  • tests/jetstream_pull_test.zig (1 hunks)
🔇 Additional comments (1)
src/jetstream.zig (1)

743-778: Push-subscription status handling looks correct (heartbeats/flow control).

Good interception of 100-status heartbeats and flow control responses before invoking the user handler. This mirrors Go/C behavior and avoids surfacing protocol messages to the application.

- Change FetchRequest.batch from i32 to usize to match messages.items.len
- Change FetchRequest.max_bytes from ?i64 to ?u64 to prevent signed/unsigned arithmetic issues
- Update PullSubscription.fetch parameter from i32 to usize for type consistency
- Add validation for batch >= 1 with InvalidBatchSize error

Co-Authored-By: Claude <[email protected]>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
src/jetstream.zig (1)

851-855: Ephemeral consumer name resolution is incorrect; use ConsumerInfo.name.

For ephemeral consumers, config.name/durable_name are null. The server-assigned name is exposed as ConsumerInfo.name. Using config.* breaks the ephemeral path.

Apply:

-        // Get the consumer name (use name first, then durable_name)
-        const consumer_name = consumer_info.value.config.name orelse
-            consumer_info.value.config.durable_name orelse
-            return error.MissingConsumerName;
+        // Use the server-assigned consumer name (works for ephemeral and durable)
+        const consumer_name = consumer_info.value.name;
🧹 Nitpick comments (8)
src/jetstream.zig (8)

287-293: Guard deinit against concurrent fetch use.

If a caller drops PullSubscription while another thread is inside fetch(), deinit may race with the in-flight loop (destroying inbox_subscription or inbox_prefix). Consider either documenting “no concurrent deinit while fetch is active” or adding a simple in-flight counter/flag to block deinit until fetch returns.

Would you like a minimal patch that adds an atomic inflight counter and waits in deinit?


339-361: Parse Status as an integer and handle “No Messages” explicitly.

Comparing strings like "404"/"408" is brittle. Parse Status into a u16 and switch on it. Also consider setting a typed error for 404 when zero messages were collected if you want callers to distinguish “no messages” from “success with N>0”.

Example:

if (try raw_msg.headerGet("Status")) |status_str| {
    const code = std.fmt.parseUnsigned(u16, status_str, 10) catch {
        raw_msg.deinit();
        continue;
    };
    switch (code) {
        404 => { raw_msg.deinit(); batch_complete = true; break; },
        408 => { raw_msg.deinit(); fetch_error = error.RequestTimeout; batch_complete = true; break; },
        409 => { raw_msg.deinit(); fetch_error = error.ConsumerSequenceMismatch; batch_complete = true; break; },
        100 => { raw_msg.deinit(); continue; },
        else => { raw_msg.deinit(); }, // unknown status
    }
}

Optionally define:

const FetchError = error{ RequestTimeout, ConsumerSequenceMismatch, NoMessages };

and make MessageBatch.err: ?FetchError for stronger typing.


333-336: Avoid logging full message payloads at debug; log size instead.

Payloads can be large/binary and spam logs. Consider logging length or a truncated prefix.

Apply:

-                log.debug("Message: subject={s}, reply={s}, data='{s}'", .{ raw_msg.subject, raw_msg.reply orelse "none", raw_msg.data });
+                log.debug("Message: subject={s}, reply={s}, size={d}B", .{ raw_msg.subject, raw_msg.reply orelse "none", raw_msg.data.len });

332-375: Total wait time can exceed timeout_ms × batch; consider a deadline.

Using timeout_ms for each nextMsg() can extend overall wall time beyond the intended request.expires. Track a deadline and use the remaining time per iteration.

Example snippet:

const start_ns = std.time.nanoTimestamp();
const deadline_ns = start_ns + std.time.ns_per_ms * timeout_ms;
while (!batch_complete and messages.items.len < request.batch) {
    const now_ns = std.time.nanoTimestamp();
    if (now_ns >= deadline_ns) break;
    const remaining_ms = @intCast(u64, (deadline_ns - now_ns) / std.time.ns_per_ms);
    if (self.inbox_subscription.nextMsg(remaining_ms)) |raw_msg| { ... }
}

170-176: Typo in comment (“empty mess” → “empty message”).

Minor doc nit.

Apply:

-    /// For push consumers this will regularly send an empty mess with Status header 100 and a reply subject
+    /// For push consumers this will regularly send an empty message with Status header 100 and a reply subject

457-475: Use one logging pathway; drop std.debug.print duplication.

maybeParseErrorResponse both logs and prints. Prefer a single log.err/log.debug pair.

Apply:

-        // TODO: Handle specific error cases
-        std.debug.print("JetStream error: code={d} err_code={d} description={s}\n", .{ info.code, info.err_code, info.description });
+        // TODO: Handle specific error cases

308-315: Optional: avoid allocating JSON for every fetch.

To reduce alloc churn in hot paths, serialize into a small fixed buffer with std.json.stringify (writer API) or reuse a std.ArrayList(u8) arena.


295-312: Add missing fetchNoWait() and next() helpers to PullSubscription

A repository-wide search shows no fetchNoWait in src/jetstream.zig, and the only existing next() is in src/nuid.zig, unrelated to message fetching. To match the PR description and improve ergonomics, please add:

• In src/jetstream.zig, alongside pub fn fetch(...):

pub fn fetchNoWait(self: *PullSubscription, batch: usize) !MessageBatch {
    // Reuse fetch with zero timeout or set `no_wait` flag if FetchRequest supports it
    return self.fetch(batch, 0);
}

• Also in src/jetstream.zig:

pub fn next(self: *PullSubscription, timeout_ms: u64) !?Message {
    const mb = try self.fetch(1, timeout_ms);
    if (mb.items.len == 0) return null;
    return mb.items[0];
}

Implementing these will align the code with the documented API and offer a more convenient interface for single-message polling.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 182e26b and fdfa3d3.

📒 Files selected for processing (1)
  • src/jetstream.zig (11 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-08-25T14:49:05.827Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#31
File: src/jetstream.zig:0-0
Timestamp: 2025-08-25T14:49:05.827Z
Learning: In NATS protocol, messages are written in order over TCP and processed by the server in the order received. This means a SUB command will always be processed before a subsequent request message, even if both are buffered, making explicit flush() calls unnecessary for subscription-then-request patterns.

Applied to files:

  • src/jetstream.zig
📚 Learning: 2025-08-25T02:49:59.082Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#28
File: src/response_manager.zig:89-105
Timestamp: 2025-08-25T02:49:59.082Z
Learning: In this NATS Zig codebase, subscriptions are reference counted, so calling deinit() on a subscription from multiple locations (like both Connection and ResponseManager) is safe and doesn't cause double-free issues.

Applied to files:

  • src/jetstream.zig
🔇 Additional comments (5)
src/jetstream.zig (5)

235-247: Good type choices for FetchRequest (usize/u64) and sensible defaults.

Switching batch to usize and max_bytes to ?u64 prevents negative values and removes signed/unsigned friction elsewhere. The default expires and flags look reasonable.


249-266: MessageBatch cleanup is correct and safe.

Deinitializing each JetStreamMessage and freeing the slice via the stored allocator is the right ownership model. No leaks or double-frees evident.


745-805: Push status handling LGTM.

Centrally handling 100 heartbeats and flow-control replies keeps user handlers clean, and the reply publish on flow control is correct.


809-819: Push subscribeSync config sanitization is correct.

Zeroing pull-only fields (max_waiting/max_batch/max_expires) for push configs prevents server-side validation errors.


840-883: Persisted inbox wildcard subscription approach looks good; one naming fix needed.

Design (single wildcard sub + per-fetch reply subjects) is efficient. See next comment regarding consumer_name for ephemeral consumers.

Comment on lines +324 to +333
// Collect messages
var messages = std.ArrayList(*JetStreamMessage).init(self.js.allocator);
defer messages.deinit();

var batch_complete = false;
var fetch_error: ?anyerror = null;

// Collect messages until batch is complete or timeout
while (!batch_complete and messages.items.len < request.batch) {
if (self.inbox_subscription.nextMsg(timeout_ms)) |raw_msg| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Drain stale replies before starting a new fetch to avoid cross-fetch contamination.

A previous fetch can leave 100/408/404 status frames queued on the wildcard inbox. Draining those ensures the new fetch starts from a clean slate.

Apply:

         // Collect messages
         var messages = std.ArrayList(*JetStreamMessage).init(self.js.allocator);
         defer messages.deinit();
+
+        // Best-effort drain of any stale replies left from previous fetches
+        while (self.inbox_subscription.nextMsg(0)) |orphan| {
+            orphan.deinit();
+        }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Collect messages
var messages = std.ArrayList(*JetStreamMessage).init(self.js.allocator);
defer messages.deinit();
var batch_complete = false;
var fetch_error: ?anyerror = null;
// Collect messages until batch is complete or timeout
while (!batch_complete and messages.items.len < request.batch) {
if (self.inbox_subscription.nextMsg(timeout_ms)) |raw_msg| {
// Collect messages
var messages = std.ArrayList(*JetStreamMessage).init(self.js.allocator);
defer messages.deinit();
// Best‐effort drain of any stale replies left from previous fetches
while (self.inbox_subscription.nextMsg(0)) |orphan| {
orphan.deinit();
}
var batch_complete = false;
var fetch_error: ?anyerror = null;
// Collect messages until batch is complete or timeout
while (!batch_complete and messages.items.len < request.batch) {
if (self.inbox_subscription.nextMsg(timeout_ms)) |raw_msg| {
🤖 Prompt for AI Agents
In src/jetstream.zig around lines 324 to 333, the fetch loop can pick up
leftover status frames from a prior fetch on the wildcard inbox; before starting
to collect messages, drain any pending replies from self.inbox_subscription so
the new fetch starts clean. Implement a short non-blocking drain loop (e.g.,
call nextMsg with zero or minimal timeout) and discard any messages/status
frames (100/408/404) until nextMsg returns no message or timeout, then proceed
to initialize messages/batch variables and the normal collect loop.

Comment on lines +333 to +371
if (self.inbox_subscription.nextMsg(timeout_ms)) |raw_msg| {
log.debug("Message: subject={s}, reply={s}, data='{s}'", .{ raw_msg.subject, raw_msg.reply orelse "none", raw_msg.data });
// JetStream messages arrive with original subjects and ACK reply subjects
// The timestamp in the ACK subject ensures messages belong to this fetch request
// (timestamps are monotonically increasing and unique per message delivery)

if (try raw_msg.headerGet("Status")) |status_code| {
if (std.mem.eql(u8, status_code, "404")) {
// No messages available
raw_msg.deinit();
batch_complete = true;
break;
} else if (std.mem.eql(u8, status_code, "408")) {
// Request timeout
raw_msg.deinit();
fetch_error = error.RequestTimeout;
batch_complete = true;
break;
} else if (std.mem.eql(u8, status_code, "409")) {
// Consumer sequence mismatch
raw_msg.deinit();
fetch_error = error.ConsumerSequenceMismatch;
batch_complete = true;
break;
} else if (std.mem.eql(u8, status_code, "100")) {
// Heartbeat - continue waiting
raw_msg.deinit();
continue;
}
// Unknown status code - clean up and continue
raw_msg.deinit();
} else {
// This is a regular message - convert to JetStream message
const js_msg_ptr = try jetstream_message.createJetStreamMessage(self.js, raw_msg);
errdefer js_msg_ptr.deinit();

try messages.append(js_msg_ptr);
}
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Filter replies by the current reply_subject; otherwise stale frames can terminate a fresh fetch.

Because the inbox subscription uses a wildcard, replies from an older fetch (different subject) can appear first and cause false 404/408/409 handling. Filter by reply_subject to correlate correctly.

Apply:

             if (self.inbox_subscription.nextMsg(timeout_ms)) |raw_msg| {
-                log.debug("Message: subject={s}, reply={s}, data='{s}'", .{ raw_msg.subject, raw_msg.reply orelse "none", raw_msg.data });
+                // Ignore stray replies from previous fetches (different reply subject)
+                if (!std.mem.eql(u8, raw_msg.subject, reply_subject)) {
+                    log.debug("Skipping stray reply for previous fetch: {s}", .{raw_msg.subject});
+                    raw_msg.deinit();
+                    continue;
+                }
+                log.debug("Message: subject={s}, reply={s}, data='{s}'", .{ raw_msg.subject, raw_msg.reply orelse "none", raw_msg.data });
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (self.inbox_subscription.nextMsg(timeout_ms)) |raw_msg| {
log.debug("Message: subject={s}, reply={s}, data='{s}'", .{ raw_msg.subject, raw_msg.reply orelse "none", raw_msg.data });
// JetStream messages arrive with original subjects and ACK reply subjects
// The timestamp in the ACK subject ensures messages belong to this fetch request
// (timestamps are monotonically increasing and unique per message delivery)
if (try raw_msg.headerGet("Status")) |status_code| {
if (std.mem.eql(u8, status_code, "404")) {
// No messages available
raw_msg.deinit();
batch_complete = true;
break;
} else if (std.mem.eql(u8, status_code, "408")) {
// Request timeout
raw_msg.deinit();
fetch_error = error.RequestTimeout;
batch_complete = true;
break;
} else if (std.mem.eql(u8, status_code, "409")) {
// Consumer sequence mismatch
raw_msg.deinit();
fetch_error = error.ConsumerSequenceMismatch;
batch_complete = true;
break;
} else if (std.mem.eql(u8, status_code, "100")) {
// Heartbeat - continue waiting
raw_msg.deinit();
continue;
}
// Unknown status code - clean up and continue
raw_msg.deinit();
} else {
// This is a regular message - convert to JetStream message
const js_msg_ptr = try jetstream_message.createJetStreamMessage(self.js, raw_msg);
errdefer js_msg_ptr.deinit();
try messages.append(js_msg_ptr);
}
} else {
if (self.inbox_subscription.nextMsg(timeout_ms)) |raw_msg| {
// Ignore stray replies from previous fetches (different reply subject)
if (!std.mem.eql(u8, raw_msg.subject, reply_subject)) {
log.debug("Skipping stray reply for previous fetch: {s}", .{raw_msg.subject});
raw_msg.deinit();
continue;
}
log.debug("Message: subject={s}, reply={s}, data='{s}'",
.{ raw_msg.subject, raw_msg.reply orelse "none", raw_msg.data });
// JetStream messages arrive with original subjects and ACK reply subjects
// The timestamp in the ACK subject ensures messages belong to this fetch request
// (timestamps are monotonically increasing and unique per message delivery)
if (try raw_msg.headerGet("Status")) |status_code| {
if (std.mem.eql(u8, status_code, "404")) {
// No messages available
raw_msg.deinit();
batch_complete = true;
break;
} else if (std.mem.eql(u8, status_code, "408")) {
// Request timeout
raw_msg.deinit();
fetch_error = error.RequestTimeout;
batch_complete = true;
break;
} else if (std.mem.eql(u8, status_code, "409")) {
// Consumer sequence mismatch
raw_msg.deinit();
fetch_error = error.ConsumerSequenceMismatch;
batch_complete = true;
break;
} else if (std.mem.eql(u8, status_code, "100")) {
// Heartbeat - continue waiting
raw_msg.deinit();
continue;
}
// Unknown status code - clean up and continue
raw_msg.deinit();
} else {
// This is a regular message - convert to JetStream message
const js_msg_ptr = try jetstream_message.createJetStreamMessage(self.js, raw_msg);
errdefer js_msg_ptr.deinit();
try messages.append(js_msg_ptr);
}
} else {
// ... existing else block unchanged ...
}
🤖 Prompt for AI Agents
In src/jetstream.zig around lines 333 to 371, replies from a wildcard inbox can
belong to prior fetches and must be ignored: before treating status headers
(404/408/409/100) or converting the raw message, check that raw_msg.reply is
present and equals the current reply_subject; if raw_msg.reply is missing or
does not match, call raw_msg.deinit() and continue the loop, otherwise proceed
with the existing status/header handling and message conversion so stale frames
no longer terminate or affect the current fetch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant