Skip to content

Conversation

@lalinsky
Copy link
Owner

Summary

This PR refactors the JetStream API to follow a subscription-centric design similar to nats-py, nats.c, and legacy nats.go, making it more intuitive and consistent with other NATS client libraries.

Key Changes

New API Signatures: Subject-first, nullable parameter design

  • subscribe(subject, handler, handler_args, options)
  • pullSubscribe(subject, durable, options)

Stream Auto-Discovery: Automatically lookup streams by subject pattern

Unified Options: New SubscribeOptions and PullSubscribeOptions structs

Fixed Consumer Configuration: Resolved ConsumerPushMaxWaiting errors for push consumers

Improved Memory Management: Eliminated double-free issues

Enhanced Documentation: Added JetStream subscription examples to README

Test Results

✅ All 141 tests passing (87 unit + 54 integration)
✅ No compilation errors
✅ Memory management issues resolved
✅ Push and pull subscription patterns working correctly

Breaking Changes

This is a breaking change for existing JetStream subscription code:

// Old API
js.subscribe(stream_name, consumer_config, handler, args)

// New API  
js.subscribe(subject, handler, args, .{.stream = stream_name, .durable = consumer_name})

Migration Benefits

  • More intuitive subject-first API design
  • Automatic stream discovery reduces boilerplate
  • Consistent with other NATS client libraries
  • Better error handling and memory management
  • Cleaner test code with sensible defaults

This refactoring modernizes the JetStream API to follow patterns from
nats-py, nats.c, and legacy nats.go, making it more intuitive and
consistent with other NATS client libraries.

## API Changes

### New Function Signatures
- `subscribe(subject, handler, handler_args, options)` - subject first, nullable
- `subscribeSync(subject, options)` - simplified synchronous subscriptions
- `queueSubscribe(subject, queue, handler, handler_args, options)` - queue subscriptions
- `pullSubscribe(subject, durable, options)` - pull consumer subscriptions

### New Option Structs
- `SubscribeOptions` - unified options for push subscriptions
- `PullSubscribeOptions` - specialized options for pull subscriptions

### Stream Discovery
- Auto-lookup stream by subject pattern when stream not specified
- Supports both explicit stream specification and automatic discovery

### Consumer Management
- Automatic consumer creation with sensible defaults
- Support for both durable and ephemeral consumers
- Proper handling of existing consumer binding

## Implementation Improvements

### Memory Management
- Fixed double-free issues in subscription creation
- Simplified consumer info lifecycle management
- Proper cleanup of generated deliver subjects

### Push Consumer Configuration
- Fixed ConsumerPushMaxWaiting error by making max_waiting optional
- Push consumers omit max_waiting from JSON (set to null)
- Pull consumers default max_waiting to 512 when not specified

### Client-side Auto-acknowledgment
- Follows nats-py pattern: only ack on successful callback execution
- Handles AlreadyAcked errors gracefully
- Manual ack mode properly controls auto-acknowledgment behavior

## Test Updates
- Updated all JetStream tests to use new API signatures
- Cleaned up test configurations to use sensible defaults
- Removed redundant deliver_subject and ack_policy specifications
- All 141 tests now pass including integration tests

## Documentation
- Added JetStream subscription examples to README
- Separate sections for push and pull subscription patterns
- Demonstrates both durable consumer usage and proper message acknowledgment

## Breaking Changes
This is a breaking change for existing JetStream subscription code.
Migration guide:
- Old: `js.subscribe(stream, config, handler, args)`
- New: `js.subscribe(subject, handler, args, .{.stream = stream, .durable = name})`
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 14, 2025

Walkthrough

Reworks JetStream to a subject-first, options-driven subscription API (push, pull, sync, queue); makes ConsumerConfig.max_waiting optional; adds consumer/stream lookup and status-message handling; updates KV/ObjectStore call sites and tests; adds README push/pull examples.

Changes

Cohort / File(s) Summary of changes
JetStream core API & internals
src/jetstream.zig
Introduces SubscribeOptions and PullSubscribeOptions; changes ConsumerConfig.max_waiting from i64 to ?i64; replaces old subscribe/pull APIs with subject-first signatures: subscribe, subscribeSync, queueSubscribe, queueSubscribeSync, pullSubscribe; adds lookupStreamBySubject, getOrCreateConsumer, handleStatusMessage; wraps user handlers to support auto-ack/manual-ack and status-message handling; adds deliver_subject_owned to JetStreamSubscription and inbox_prefix exposure on PullSubscription; changes parseResponse to a generic form.
KV call site
src/jetstream_kv.zig
Updated subscribeSync usages to the new subject-first form passing null subject and inline options: { .stream = kv.stream_name, .config = consumer_config }.
ObjectStore call sites
src/jetstream_objstore.zig
Removed filter_subjects usage; switched subscriptions to subject-specific calls and pass inline options { .stream = self.stream_name, .config = consumer_config }.
Tests: JetStream behavior
tests/jetstream_duplicate_ack_test.zig, tests/jetstream_nak_test.zig, tests/jetstream_pull_test.zig, tests/jetstream_push_test.zig, tests/jetstream_sync_test.zig
Migrated tests from ConsumerConfig-based API to subject-first subscribe/pullSubscribe signatures; replaced ConsumerConfig param with inline options (.stream, .durable, .config); adjusted subjects to patterns and updated durable/stream wiring.
Documentation examples
README.md
Added JetStream Push Subscriptions and Pull Subscriptions examples showing handler-based push subscribe and pull/fetch usage with ack and deinit.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Title Check ✅ Passed The title concisely and accurately reflects the primary change in the diff — refactoring the JetStream API toward a subscription-centric (subject-first) design — and is a clear, single-sentence summary suitable for history scanning.
Description Check ✅ Passed The PR description is directly related to the changeset and succinctly documents the intent, key API changes (subject-first signatures, new options structs), breaking changes with migration examples, and reported test results, aligning with the provided raw_summary and objectives.
Docstring Coverage ✅ Passed No functions found in the changes. Docstring coverage check skipped.
✨ Finishing touches
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch jetstream-api-refactor

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/jetstream.zig (1)

482-486: STATUS_MAX_BYTES (409) shouldn't be unconditionally mapped to ConsumerSequenceMismatch

409 is used for many JetStream statuses (Batch Completed, Message Size Exceeds MaxBytes, Consumer Deleted, Server Shutdown, etc.). src/jetstream.zig (≈481–486) currently treats any 409 as ConsumerSequenceMismatch — this will misclassify other 409 responses. Distinguish by the status text/headers (or the server-side error code in the response) and map accordingly: e.g. "Message Size Exceeds MaxBytes" → MaxBytes/ExceededMaxRequest error, explicit "Consumer sequence mismatch" → ConsumerSequenceMismatch (parsing any sequence fields), otherwise treat as a generic pull-status error. Update the conditional to inspect the status string/headers (or server err_code) before assigning error.ConsumerSequenceMismatch.

🧹 Nitpick comments (9)
README.md (1)

176-191: Tiny doc tweak: show batch error check before iterating

Readers benefit from seeing the err branch.

Here’s a minimal addition:

var batch = try pull_sub.fetch(10, 5000);
defer batch.deinit();
if (batch.err) |e| {
    std.debug.print("fetch error: {}\n", .{e});
    return;
}
for (batch.messages) |js_msg| try js_msg.ack();
src/jetstream_kv.zig (1)

217-220: Approve — subject = null is correct; add a short inline comment

Passing null here is the intended JetStream usage: JetStream.subscribeSync accepts an optional subject and relies on ConsumerConfig.filter_subjects for multi-subject watches (same pattern in src/jetstream_objstore.zig:465 and :609).

  • Add one-line comment above the call: // subject = null — using ConsumerConfig.filter_subjects for multi-subject watch
tests/jetstream_nak_test.zig (3)

91-99: Push subscribe API usage looks correct (subject-first + options).

deliver_policy defaults to .all already; you can drop it unless you want to be explicit.


196-203: Max-deliver test setup is fine.

Same nit: deliver_policy = .all is redundant here.


301-307: Metadata verification path is good.

Explicit .deliver_policy = .all isn’t required but harmless.

tests/jetstream_push_test.zig (2)

41-48: Reduce flakiness: avoid fixed 100ms sleep; poll with timeout instead.

Fixed sleeps can be flaky under CI load. Poll until messages are seen or a short timeout elapses.

Apply this diff:

-    // Wait a bit for messages to be processed
-    std.time.sleep(100 * std.time.ns_per_ms);
-
-    // Verify messages were received
-    try testing.expect(message_count > 0);
+    // Wait up to ~2s for messages to be processed
+    var attempts: u32 = 0;
+    while (attempts < 20 and message_count == 0) {
+        std.time.sleep(100 * std.time.ns_per_ms);
+        attempts += 1;
+    }
+    try testing.expect(message_count > 0);

Also applies to: 56-61


96-106: Flow control test won’t exercise heartbeats at 30s.

If you intend to see control-path handling, set idle_heartbeat to ~1s for the test duration.

Apply this diff:

-            .idle_heartbeat = 30_000_000_000, // 30s - required when flow_control=true
+            .idle_heartbeat = 1_000_000_000, // 1s - keeps test snappy while exercising heartbeats
src/jetstream.zig (2)

201-214: max_waiting -> ?i64 is fine; ensure we don’t emit nulls in JSON.

Type change is good for push/pull separation. To avoid sending explicit nulls for many fields, prefer JSON options with emit_null_optional_fields = false in addConsumer().

Apply this diff near the JSON serialization in addConsumer():

-        const config_json = try std.json.stringifyAlloc(self.nc.allocator, request_payload, .{});
+        const config_json = try std.json.stringifyAlloc(self.nc.allocator, request_payload, .{
+            .emit_null_optional_fields = false,
+        });

1358-1463: subscribe()/queueSubscribe() duplicate substantial wrapping logic.

Extract a small internal helper to build the subscription (status handling + auto-ack wrapper). Keeps behavior in one place.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b1a27f6 and 616f782.

📒 Files selected for processing (9)
  • README.md (1 hunks)
  • src/jetstream.zig (7 hunks)
  • src/jetstream_kv.zig (1 hunks)
  • src/jetstream_objstore.zig (2 hunks)
  • tests/jetstream_duplicate_ack_test.zig (4 hunks)
  • tests/jetstream_nak_test.zig (5 hunks)
  • tests/jetstream_pull_test.zig (1 hunks)
  • tests/jetstream_push_test.zig (3 hunks)
  • tests/jetstream_sync_test.zig (3 hunks)
🧰 Additional context used
🧠 Learnings (4)
📚 Learning: 2025-08-25T14:51:33.570Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#31
File: tests/jetstream_pull_test.zig:21-23
Timestamp: 2025-08-25T14:51:33.570Z
Learning: The nats.zig project uses a "tests:beforeEach" test in tests/all_tests.zig that runs before each test to clean up all JetStream streams and consumers, ensuring test isolation. This eliminates the need for unique test resource names as each test starts with a clean JetStream state.

Applied to files:

  • README.md
  • tests/jetstream_pull_test.zig
  • tests/jetstream_sync_test.zig
  • tests/jetstream_duplicate_ack_test.zig
  • tests/jetstream_nak_test.zig
📚 Learning: 2025-09-06T20:28:53.180Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#98
File: src/jetstream_kv.zig:235-279
Timestamp: 2025-09-06T20:28:53.180Z
Learning: JetStreamMessage wrappers in nats.zig are allocated on the Message's owned arena, so they are automatically freed when Message.deinit() is called and don't require separate cleanup.

Applied to files:

  • README.md
📚 Learning: 2025-09-09T19:51:02.201Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#108
File: src/dispatcher.zig:60-65
Timestamp: 2025-09-09T19:51:02.201Z
Learning: In the NATS Zig codebase dispatcher.zig, when decrementing pending_bytes (atomic u64) with message_data_len (usize) using fetchSub, explicit casting is not required as Zig handles the implicit conversion automatically and the code builds successfully.

Applied to files:

  • README.md
📚 Learning: 2025-08-25T02:49:59.119Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#28
File: src/response_manager.zig:89-105
Timestamp: 2025-08-25T02:49:59.119Z
Learning: In this NATS Zig codebase, subscriptions are reference counted, so calling deinit() on a subscription from multiple locations (like both Connection and ResponseManager) is safe and doesn't cause double-free issues.

Applied to files:

  • README.md
🔇 Additional comments (11)
README.md (1)

157-175: Push subscription example matches the new API

Subject-first + options with stream/durable look correct. Handler acks and deinit is shown. LGTM.

tests/jetstream_pull_test.zig (1)

34-37: API migration looks good

pullSubscribe(subject, durable, .{ .stream = … }) matches the new shape; flush before fetch is already in place.

tests/jetstream_duplicate_ack_test.zig (4)

52-58: Subject-first subscribe + options: looks correct

Durable + deliver_policy=.all are set; handler defers deinit; ack/isAcked checks are fine.


137-143: Second test subscribe call: OK

API usage and options align with the refactor.


223-229: Third test subscribe call: OK

Matches new API; config minimal and sufficient for the case.


316-322: Fourth test subscribe call: OK

Consistent with the migration; no concerns.

tests/jetstream_nak_test.zig (2)

409-416: NAK-with-delay scenario configured correctly.

.max_deliver = 3 gives headroom; the assertions check exactly 2 deliveries—good.


502-509: Zero-delay NAK case reads well.

Semantics align with “regular NAK” behavior.

tests/jetstream_push_test.zig (1)

135-141: Error-path assertion matches the new API.

Expecting error.StreamNotFound here is appropriate given options.stream is set.

src/jetstream.zig (2)

634-665: Stream auto-discovery by subject looks good.

Nice ownership discipline on the returned name and explicit multi-match errors.


1258-1279: Potential message leak on handler error.

wrappedHandler transfers ownership of js_msg to the user handler. If the handler throws, we log and return without deinit. That’s consistent with “handler owns cleanup,” but it means user handlers must always deinit even on error.

Please confirm the public docs explicitly state “handler always owns and must deinit js_msg, even on error.” If not, I can add doc text or change behavior to deinit on error.

- Remove hardcoded deliver_subject from sync tests to prevent parallel test collisions
- Remove unused fields from SubscribeOptions and PullSubscribeOptions
- Add emit_null_optional_fields=false to consumer JSON serialization to fix server errors
- Refactor subscribeSync to avoid double consumer creation by generating deliver_subject before getOrCreateConsumer
- Update ObjectStore to use subject-first subscribeSync API and remove redundant filter_subjects
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (4)
tests/jetstream_sync_test.zig (3)

24-27: Tests updated to subject-first API: good.

Removal of hardcoded deliver_subject avoids collisions and matches the new surface.


63-66: Timeout test updated correctly.

Subject-first call with stream linkage looks right.


96-99: Multi-message test aligned with new API.

Durable + subject pattern are set as expected.

src/jetstream.zig (1)

648-698: Durable reconciliation gap: potential silent mismatch.

If an existing durable’s filter_subject or deliver_group differs from the requested subject/queue, getOrCreateConsumer returns it unchanged, and callers will subscribe to the “wrong” consumer. Safer to detect and error (or update) instead of silently binding.

Apply this check after finding an existing consumer:

@@
-            if (self.getConsumerInfo(stream_name, consumer_name)) |existing_info| {
+            if (self.getConsumerInfo(stream_name, consumer_name)) |existing_info| {
                 log.debug("Found existing consumer: {s}", .{consumer_name});
-                return existing_info;
+                // Validate subject/queue match if caller specified them
+                if (subject) |req_subj| {
+                    const existing_subj = existing_info.value.config.filter_subject orelse "";
+                    if (!std.mem.eql(u8, existing_subj, req_subj)) {
+                        existing_info.deinit();
+                        return error.ConsumerConfigMismatch;
+                    }
+                }
+                if (queue) |req_group| {
+                    const existing_group = existing_info.value.config.deliver_group orelse "";
+                    if (!std.mem.eql(u8, existing_group, req_group)) {
+                        existing_info.deinit();
+                        return error.ConsumerConfigMismatch;
+                    }
+                }
+                return existing_info;
             } else |err| switch (err) {

Note: If you prefer auto-fixing, gate an update path behind an option and re-add the consumer with the merged, correct config.

🧹 Nitpick comments (4)
src/jetstream_objstore.zig (2)

464-467: Correct shift to subject-first subscribeSync.

Good call passing chunk_subject directly and keeping stream linkage in options.

You can drop the preallocated inbox and .deliver_subject in consumer_config and let subscribeSync allocate the inbox. This removes a couple allocations without losing control (subscribeSync will still subscribe to the consumer’s actual deliver_subject).


607-610: Same here: meta listing uses subject-first correctly.

Looks clean and consistent with the new API.

As above, consider omitting .deliver_subject and the explicit inbox; subscribeSync will handle it and subscribe to the server-returned deliver subject.

src/jetstream.zig (2)

566-574: Consider convenience fields for pull tuning.

Optional: expose common pull knobs (max_waiting/max_batch/max_expires) on PullSubscribeOptions and merge into ConsumerConfig internally so callers don’t need ConsumerConfig details.


1291-1335: subscribeSync uses server-returned deliver_subject: correct.

This follows the “subscribe to what the server configured” rule and avoids mismatches.

To reduce drift, consider extracting a small helper that resolves stream, prepares config (optional deliver_subject), calls getOrCreateConsumer, and returns the effective deliver_subject for all push subscribe variants.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 616f782 and b60777c.

📒 Files selected for processing (3)
  • src/jetstream.zig (8 hunks)
  • src/jetstream_objstore.zig (2 hunks)
  • tests/jetstream_sync_test.zig (3 hunks)
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: lalinsky
PR: lalinsky/nats.zig#119
File: src/jetstream.zig:1217-1234
Timestamp: 2025-09-14T12:08:23.924Z
Learning: In JetStream subscribe() methods, always use the deliver_subject from the returned consumer info rather than pre-generating one, to avoid subscribing to subjects that don't match what the server actually configured for existing consumers.
📚 Learning: 2025-08-25T14:51:33.570Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#31
File: tests/jetstream_pull_test.zig:21-23
Timestamp: 2025-08-25T14:51:33.570Z
Learning: The nats.zig project uses a "tests:beforeEach" test in tests/all_tests.zig that runs before each test to clean up all JetStream streams and consumers, ensuring test isolation. This eliminates the need for unique test resource names as each test starts with a clean JetStream state.

Applied to files:

  • tests/jetstream_sync_test.zig
📚 Learning: 2025-09-14T12:08:23.924Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#119
File: src/jetstream.zig:1217-1234
Timestamp: 2025-09-14T12:08:23.924Z
Learning: In JetStream subscribe() methods, always use the deliver_subject from the returned consumer info rather than pre-generating one, to avoid subscribing to subjects that don't match what the server actually configured for existing consumers.

Applied to files:

  • tests/jetstream_sync_test.zig
  • src/jetstream_objstore.zig
  • src/jetstream.zig
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: test
🔇 Additional comments (5)
src/jetstream.zig (5)

201-201: Make max_waiting optional: correct for push/pull split.

Combined with omitting nulls in addConsumer, this avoids sending push-only noise.


551-565: SubscribeOptions shape looks good.

Fields map cleanly to stream discovery, durable binding, config merge, and ack behavior.


616-646: Stream lookup by subject: solid.

Validates subject, uses STREAM.NAMES with subject filter, and frees the duped name when autodiscovered.


904-907: Omit nulls when creating consumers: good.

Prevents sending push-only/pull-only fields unintentionally.


1450-1463: pullSubscribe: creation path looks fine.

Durable is required, stream is discovered when omitted, and inbox wildcarding is handled cleanly.

- Use deliver_subject from consumer_info for all nc.subscribe() calls for consistency
- Validate user-provided deliver_subject only (not server-returned or generated ones)
- Fix queueSubscribe to avoid double consumer creation pattern
- Ensure all subscription methods follow the same deliver_subject handling pattern
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
src/jetstream.zig (2)

1278-1290: Duplicate handler wrapper logic — extract once.

subscribe() and queueSubscribe() duplicate JSHandler.wrappedHandler. A small internal generic helper would reduce future drift.

I can send a follow-up patch to factor the wrapper into a private generic and reuse it in both sites.

Also applies to: 1397-1449


1-1592: Fix premature deinit of consumer_info in JetStream subscription creation

  • src/jetstream.zig — in subscribe(), subscribeSync(), queueSubscribe(): replace defer consumer_info.deinit(); with errdefer consumer_info.deinit();. Currently the unconditional defer frees the Result's arena on success after it was copied into the returned subscription, leaving a dangling/deallocated arena (pullSubscribe already uses errdefer correctly).
♻️ Duplicate comments (1)
src/jetstream.zig (1)

1189-1223: Good: now subscribing to server-returned deliver_subject (learning applied).

This addresses the dead-subscription bug with existing durables.

🧹 Nitpick comments (4)
src/jetstream.zig (4)

1162-1176: Handle flow control via Nats-Consumer-Stalled header too.

Some servers/versions signal FC by setting Status: 100 + Nats-Consumer-Stalled with a reply subject (not Description="FlowControl Request"). Support both.

-                } else if (std.mem.eql(u8, description, "FlowControl Request")) {
+                } else if (std.mem.eql(u8, description, "FlowControl Request")) {
                     // This is a flow control request - we need to respond
                     log.debug("Received flow control request from JetStream", .{});
@@
                     return;
                 }
+
+                // Fallback: FC via Nats-Consumer-Stalled header
+                if (msg.headers.get("Nats-Consumer-Stalled")) |stalled_vals| {
+                    if (stalled_vals.items.len > 0) {
+                        const fc_reply = stalled_vals.items[0];
+                        try nc.publish(fc_reply, "");
+                        log.debug("Sent flow control response (stalled) to: {s}", .{fc_reply});
+                        return;
+                    }
+                }

616-647: Stream lookup API: consider surfacing names on MultipleStreamMatches.

Not a blocker, but returning the candidate names helps callers resolve ambiguity.

Happy to sketch a Result variant carrying the list if you want it now.


1454-1501: Minor: derive and assert durable consistency.

You compute consumer_name from returned info or the durable param. Add a sanity check to warn if they diverge.

-        const consumer_name = consumer_info.value.config.name orelse
-            consumer_info.value.config.durable_name orelse
-            durable;
+        const consumer_name = consumer_info.value.config.name orelse
+            consumer_info.value.config.durable_name orelse
+            durable;
+        if (!std.mem.eql(u8, consumer_name, durable)) {
+            log.warn("pullSubscribe: server durable '{s}' differs from requested '{s}'", .{ consumer_name, durable });
+        }

461-486: STATUS_MAX_BYTES maps to ConsumerSequenceMismatch — name looks off.

Semantically it’s a size/limit condition. Consider a more accurate error (e.g., MaxBytesExceeded) to aid debugging.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b60777c and d941e49.

📒 Files selected for processing (1)
  • src/jetstream.zig (8 hunks)
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: lalinsky
PR: lalinsky/nats.zig#119
File: src/jetstream.zig:1217-1234
Timestamp: 2025-09-14T12:08:23.924Z
Learning: In JetStream subscribe() methods, always use the deliver_subject from the returned consumer info rather than pre-generating one, to avoid subscribing to subjects that don't match what the server actually configured for existing consumers.
📚 Learning: 2025-09-14T12:08:23.924Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#119
File: src/jetstream.zig:1217-1234
Timestamp: 2025-09-14T12:08:23.924Z
Learning: In JetStream subscribe() methods, always use the deliver_subject from the returned consumer info rather than pre-generating one, to avoid subscribing to subjects that don't match what the server actually configured for existing consumers.

Applied to files:

  • src/jetstream.zig
🔇 Additional comments (3)
src/jetstream.zig (3)

1455-1468: Pull path resource ownership looks correct.

errdefer on consumer_info is used properly; ownership is transferred to PullSubscription. Nice.


904-907: Good: omit null optionals in Consumer CREATE payload.

This prevents server-side default overrides by explicit nulls.


551-565: SubscribeOptions shape looks sane for subject-first API.

Manual ack default matches nats.c behavior. No issues.

- Implement queueSubscribeSync for synchronous queue subscriptions with load balancing
- Follow same pattern as other subscription methods for consistency
- Add test to verify functionality
- Completes the JetStream subscription API with: subscribe, subscribeSync, queueSubscribe, queueSubscribeSync
Replace defer with errdefer for consumer_info.deinit() in subscription
methods to prevent double-free when ownership transfers to subscription
wrappers. Affected methods: subscribe, subscribeSync, queueSubscribe,
queueSubscribeSync.
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/jetstream.zig (1)

461-487: Map STATUS_MAX_BYTES to a "max bytes" JetStream error (not ConsumerSequenceMismatch)

src/jetstream.zig (around lines 481–485) sets fetch_error = error.ConsumerSequenceMismatch for STATUS_MAX_BYTES — replace it with the appropriate "max bytes" error (e.g. error.StreamMaxBytesRequired or the Stream‑max‑bytes‑exceeded variant from src/jetstream_errors.zig) and verify the server status→error mapping.

♻️ Duplicate comments (2)
tests/jetstream_sync_test.zig (1)

24-27: Good: dropped hardcoded deliver_subject; library will allocate inbox

This aligns with the subject-first API and avoids collisions in parallel runs.

Also applies to: 63-66, 96-99

src/jetstream.zig (1)

681-697: Detect durable config mismatches (filter/queue/type) instead of silently binding

If a durable exists with a different filter_subject, deliver_group, or push/pull type, return a clear error rather than binding to the wrong consumer.

Apply:

         if (durable) |consumer_name| {
             // Try to get existing consumer info
             if (self.getConsumerInfo(stream_name, consumer_name)) |existing_info| {
-                log.debug("Found existing consumer: {s}", .{consumer_name});
-                return existing_info;
+                log.debug("Found existing consumer: {s}", .{consumer_name});
+                const existing_cfg = existing_info.value.config;
+                if (subject) |s| {
+                    if (existing_cfg.filter_subject) |fs| {
+                        if (!std.mem.eql(u8, fs, s)) return error.ConsumerConfigMismatch;
+                    } else {
+                        return error.ConsumerConfigMismatch;
+                    }
+                }
+                if (queue) |q| {
+                    if (existing_cfg.deliver_group) |g| {
+                        if (!std.mem.eql(u8, g, q)) return error.ConsumerConfigMismatch;
+                    } else {
+                        return error.ConsumerConfigMismatch;
+                    }
+                }
+                const existing_is_pull = (existing_cfg.deliver_subject == null);
+                if (existing_is_pull != is_pull) return error.ConsumerTypeMismatch;
+                return existing_info;
             } else |err| switch (err) {
🧹 Nitpick comments (1)
tests/jetstream_sync_test.zig (1)

152-154: Ack the received message in the queue test to prevent redelivery

Queue test doesn’t ack; keep behavior consistent with other tests and avoid flakey redeliveries.

Apply:

     // Verify message content
     try testing.expectEqualStrings(test_message, js_msg.msg.data);
+    try js_msg.ack();
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d941e49 and 209a718.

📒 Files selected for processing (2)
  • src/jetstream.zig (8 hunks)
  • tests/jetstream_sync_test.zig (4 hunks)
🧰 Additional context used
🧠 Learnings (4)
📓 Common learnings
Learnt from: lalinsky
PR: lalinsky/nats.zig#119
File: src/jetstream.zig:1217-1234
Timestamp: 2025-09-14T12:08:23.924Z
Learning: In JetStream subscribe() methods, always use the deliver_subject from the returned consumer info rather than pre-generating one, to avoid subscribing to subjects that don't match what the server actually configured for existing consumers.
📚 Learning: 2025-08-25T14:51:33.570Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#31
File: tests/jetstream_pull_test.zig:21-23
Timestamp: 2025-08-25T14:51:33.570Z
Learning: The nats.zig project uses a "tests:beforeEach" test in tests/all_tests.zig that runs before each test to clean up all JetStream streams and consumers, ensuring test isolation. This eliminates the need for unique test resource names as each test starts with a clean JetStream state.

Applied to files:

  • tests/jetstream_sync_test.zig
📚 Learning: 2025-09-14T12:08:23.924Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#119
File: src/jetstream.zig:1217-1234
Timestamp: 2025-09-14T12:08:23.924Z
Learning: In JetStream subscribe() methods, always use the deliver_subject from the returned consumer info rather than pre-generating one, to avoid subscribing to subjects that don't match what the server actually configured for existing consumers.

Applied to files:

  • tests/jetstream_sync_test.zig
  • src/jetstream.zig
📚 Learning: 2025-08-25T02:49:59.119Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#28
File: src/response_manager.zig:89-105
Timestamp: 2025-08-25T02:49:59.119Z
Learning: In this NATS Zig codebase, subscriptions are reference counted, so calling deinit() on a subscription from multiple locations (like both Connection and ResponseManager) is safe and doesn't cause double-free issues.

Applied to files:

  • src/jetstream.zig
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: test
🔇 Additional comments (3)
src/jetstream.zig (3)

1222-1223: Correct subject binding: using deliver_subject from ConsumerInfo is the right call

Prevents dead subscriptions when reusing an existing durable with a different deliver_subject.

Also applies to: 1329-1333, 1381-1393, 1488-1493


904-907: Good: omit null optionals in Consumer CREATE payload

Keeps payloads minimal and avoids server-side default stomping.


201-214: Sanity check: making max_waiting optional and defaulting to 512 for pull in getOrCreateConsumer

Matches common client behavior; ensure docs mention the implicit default.

Create JetStreamHandler() helper to eliminate duplication of message
handling logic between subscribe() and queueSubscribe() methods.
Reduces ~50 lines of duplicate code while maintaining functionality.
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/jetstream.zig (1)

1189-1243: Harden handler wrapper: enforce return type and normalize args.

Right now we assume non‑void handlers are error unions and that user_args is a tuple. Make both explicit at comptime and support single‑arg handlers.

Apply:

@@
-                comptime {
-                    const HandlerType = @TypeOf(handlerFn);
-                    const type_info = @typeInfo(HandlerType);
-                    if (type_info != .@"fn") {
-                        @compileError("Handler must be a function, got: " ++ @typeName(HandlerType));
-                    }
-                }
-                const ReturnType = @typeInfo(@TypeOf(handlerFn)).@"fn".return_type.?;
+                comptime {
+                    const HandlerType = @TypeOf(handlerFn);
+                    const fn_info = @typeInfo(HandlerType);
+                    if (fn_info != .@"fn") @compileError("Handler must be a function");
+                    const Ret = fn_info.@"fn".return_type.?;
+                    const rt_info = @typeInfo(Ret);
+                    const is_void = Ret == void;
+                    const is_err_void = rt_info == .ErrorUnion and rt_info.ErrorUnion.payload == void;
+                    if (!is_void and !is_err_void)
+                        @compileError("Handler must return void or anyerror!void");
+                }
+                const ReturnType = @typeInfo(@TypeOf(handlerFn)).@"fn".return_type.?;
+                const UA = @TypeOf(user_args);
+                const ua_info = @typeInfo(UA);
+                const ua_is_tuple = ua_info == .Struct and ua_info.Struct.is_tuple;
+                const args = if (ua_is_tuple) user_args else .{ user_args };
@@
-                if (ReturnType == void) {
-                    @call(.auto, handlerFn, .{js_msg} ++ user_args);
+                if (ReturnType == void) {
+                    @call(.auto, handlerFn, .{js_msg} ++ args);
                     callback_success = true;
                 } else {
-                    @call(.auto, handlerFn, .{js_msg} ++ user_args) catch |err| {
+                    @call(.auto, handlerFn, .{js_msg} ++ args) catch |err| {
                         log.err("User handler failed: {}", .{err});
                         return; // Don't auto-ack on callback error
                     };
                     callback_success = true;
                 }
♻️ Duplicate comments (1)
src/jetstream.zig (1)

648-698: Fail fast on durable config mismatches (subject/queue/type).

If a durable exists but its filter_subject, deliver_group, or push/pull type differ from the requested ones, we silently bind to the existing config. That’s surprising and can route messages incorrectly. Validate and error out.

Apply:

@@
-            if (self.getConsumerInfo(stream_name, consumer_name)) |existing_info| {
-                log.debug("Found existing consumer: {s}", .{consumer_name});
-                return existing_info;
-            } else |err| switch (err) {
+            if (self.getConsumerInfo(stream_name, consumer_name)) |existing_info| {
+                log.debug("Found existing consumer: {s}", .{consumer_name});
+                // Validate requested attributes against existing durable
+                const existing_cfg = existing_info.value.config;
+                if (subject) |s| {
+                    if (existing_cfg.filter_subject) |fs| {
+                        if (!std.mem.eql(u8, fs, s)) return error.ConsumerConfigMismatch;
+                    } else {
+                        return error.ConsumerConfigMismatch;
+                    }
+                }
+                if (queue) |q| {
+                    if (existing_cfg.deliver_group) |eg| {
+                        if (!std.mem.eql(u8, eg, q)) return error.ConsumerConfigMismatch;
+                    } else {
+                        return error.ConsumerConfigMismatch;
+                    }
+                }
+                const existing_is_pull = (existing_cfg.deliver_subject == null);
+                if (existing_is_pull != is_pull) return error.ConsumerTypeMismatch;
+                return existing_info;
+            } else |err| switch (err) {
🧹 Nitpick comments (2)
src/jetstream.zig (2)

616-647: Stream auto‑discovery: solid, but add a hint on multi‑match.

Returning MultipleStreamMatches is fine. Consider logging matched names to help users diagnose subject patterns.


1461-1508: pullSubscribe(): durable+subject-first flow looks good.

Consumer creation, wildcard inbox, and owned prefix are handled cleanly. One nit: consider a brief comment that inbox_subscription is long‑lived and shared across fetch calls.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 209a718 and d187a10.

📒 Files selected for processing (1)
  • src/jetstream.zig (8 hunks)
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: lalinsky
PR: lalinsky/nats.zig#119
File: src/jetstream.zig:1217-1234
Timestamp: 2025-09-14T12:08:23.924Z
Learning: In JetStream subscribe() methods, always use the deliver_subject from the returned consumer info rather than pre-generating one, to avoid subscribing to subjects that don't match what the server actually configured for existing consumers.
📚 Learning: 2025-09-14T12:08:23.924Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#119
File: src/jetstream.zig:1217-1234
Timestamp: 2025-09-14T12:08:23.924Z
Learning: In JetStream subscribe() methods, always use the deliver_subject from the returned consumer info rather than pre-generating one, to avoid subscribing to subjects that don't match what the server actually configured for existing consumers.

Applied to files:

  • src/jetstream.zig
📚 Learning: 2025-08-25T02:49:59.119Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#28
File: src/response_manager.zig:89-105
Timestamp: 2025-08-25T02:49:59.119Z
Learning: In this NATS Zig codebase, subscriptions are reference counted, so calling deinit() on a subscription from multiple locations (like both Connection and ResponseManager) is safe and doesn't cause double-free issues.

Applied to files:

  • src/jetstream.zig
🔇 Additional comments (8)
src/jetstream.zig (8)

201-214: Making max_waiting optional is correct for push/pull split.

Defaulting to 512 for pull and nulling it for push in getOrCreateConsumer() matches server expectations. LGTM.


551-565: SubscribeOptions shape looks right.

Subject-first, with optional stream/durable/config and manual_ack defaulting to auto‑ack. Good.


566-574: PullSubscribeOptions is lean and correct.

Let callers pass pull‑specific knobs via ConsumerConfig. No unused fields. Good.


904-907: JSON encoding: omit null optionals is the right call.

Prevents sending irrelevant fields to the server. Good.


1245-1297: subscribe(): correct deliver_subject ownership and binding.

You now subscribe using consumer_info.config.deliver_subject (per the learning), and keep ownership only when generated. Nice.


1299-1349: subscribeSync(): same good pattern as subscribe().

Uses server’s deliver_subject and fixes previous ownership bug via errdefer. LGTM.


1351-1405: queueSubscribe(): push/queue path looks correct.

deliver_group is set in config, deliver_subject generation is handled once, and you subscribe to the server’s subject. Good.


1407-1458: queueSubscribeSync(): mirrors queueSubscribe() correctly.

Ownership and subject selection are right. LGTM.

@lalinsky lalinsky merged commit 6937d6e into main Sep 14, 2025
4 checks passed
@lalinsky lalinsky deleted the jetstream-api-refactor branch September 14, 2025 12:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant