Skip to content

Conversation

@lalinsky
Copy link
Owner

@lalinsky lalinsky commented Sep 6, 2025

No description provided.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 6, 2025

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Walkthrough

Refactors JetStream and KV APIs: adjusts nextMsg error signature, introduces a generic arena-backed Result(T), switches JSON parsing to return Result(T), adds timestamp parsing for messages, redesigns KVWatcher and watch/history/keys APIs, removes KV.destroy, updates message metadata types, and aligns tests with new behaviors.

Changes

Cohort / File(s) Summary
JetStream core
src/jetstream.zig
Imports parseTimestamp and Result(T); changes JetStreamSubscription.nextMsg to return !*JetStreamMessage; replaces pub const Result alias; parseResponse now returns Result(T) from std.json.parse; getMsgLegacy parses time via parseTimestamp.
KV module and watcher
src/jetstream_kv.zig
Switches to result.zig Result(T); uses newInbox; KVOperation.fromString now returns error union; adds KVEntry.isDeleted; redesigns KVWatcher (fields, init, and next with timeout returning !?KVEntry); watch/watchAll return KVWatcher (by value) and adds watchMulti; history/keys return arena-backed Result wrappers; removes KV.destroy; updates parsing helpers.
Message models
src/jetstream_message.zig, src/message.zig
Makes SequencePair.consumer/stream non-nullable u64 with default 0; adds time: u64 = 0 to Message.
Generic Result wrapper
src/result.zig
Adds public generic Result(T) with ArenaAllocator, init(allocator, value), and deinit().
Tests updated for new APIs
tests/jetstream_kv_history_test.zig, tests/jetstream_kv_test.zig, tests/jetstream_nak_test.zig
History now via Result wrapper with deinit; watcher.next uses timeout and returns optional; cleanup via kv_manager.deleteBucket; sequence assertions changed to > 0.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

✨ Finishing Touches
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch jetstream-kv-ng

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@lalinsky
Copy link
Owner Author

lalinsky commented Sep 6, 2025

@coderabbitai review

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 6, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
tests/jetstream_nak_test.zig (1)

281-285: Fix log format: using optional placeholder {?} with non-optional values

sequence.consumer, sequence.stream, and num_pending are u64 now; {?} will fail format-time. Use {}.

Apply:

-            log.info("- Consumer sequence: {?}", .{js_msg.metadata.sequence.consumer});
-            log.info("- Stream sequence: {?}", .{js_msg.metadata.sequence.stream});
+            log.info("- Consumer sequence: {}", .{js_msg.metadata.sequence.consumer});
+            log.info("- Stream sequence: {}", .{js_msg.metadata.sequence.stream});
-            log.info("- Pending count: {?}", .{js_msg.metadata.num_pending});
+            log.info("- Pending count: {}", .{js_msg.metadata.num_pending});
🧹 Nitpick comments (6)
tests/jetstream_kv_test.zig (1)

32-34: Prefer deferred cleanup for buckets

Defer deletion so the bucket is cleaned up even if the test bails early. This keeps resource lifecycle tight and avoids ordering footguns.

-    // Clean up
-    try kv_manager.deleteBucket(bucket_name);
+    // Clean up
+    defer kv_manager.deleteBucket(bucket_name) catch {};
src/result.zig (1)

27-37: Nit: init doesn’t actually fail anymore

After the change above, init has no fallible path. Consider returning Self instead of Allocator.Error!Self and drop try at call sites.

-        pub fn init(allocator: std.mem.Allocator, value: T) std.mem.Allocator.Error!Self {
+        pub fn init(allocator: std.mem.Allocator, value: T) Self {
             return Self{
                 .arena = std.heap.ArenaAllocator.init(allocator),
                 .value = value,
             };
         }
tests/jetstream_kv_history_test.zig (1)

180-199: Watcher.next() assertions: prefer checking the error branch explicitly.

Comparing an error-union variable to error tags can be brittle. Pattern-match the error branch to assert Timeout/QueueEmpty.

Apply:

-    const result = watcher.next(1000);
-    try testing.expect(result == error.Timeout or result == error.QueueEmpty);
+    const res = watcher.next(1000);
+    if (res) |_| {
+        try testing.expect(false); // expected a timeout/empty after completion marker
+    } else |err| {
+        try testing.expect(err == error.Timeout or err == error.QueueEmpty);
+    }
src/jetstream_kv.zig (3)

200-228: KVWatcher.init(): config is reasonable, but one nuance.

headers_only mirrors meta_only — good. Using filter_subjects is correct for multi-pattern. Note: init_pending from consumer_info is best-effort; you also derive it from first message which is fine as fallback.

If you want to reduce surprises, consider a specific initial_count strategy: prefer first-message delta when include_history=false, else consumer_info.num_pending.


632-639: Minor: drop the unconditional debug log of subjects in production builds.

Log spam in hot paths makes tests noisy. Consider gating behind a verbose flag or removing.

-        log.debug("subject: {s}", .{msg.subject});
+        if (@import("builtin").mode == .Debug) {
+            log.debug("subject: {s}", .{msg.subject});
+        }

686-729: history(): sensible arena ownership; one tiny improvement.

Using nc.options.timeout_ms as a total budget is fine. Optionally accept an override timeout for history to avoid coupling to connection defaults.

Happy to draft an overload if you want one.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 54016ec and d9da51a.

📒 Files selected for processing (8)
  • src/jetstream.zig (4 hunks)
  • src/jetstream_kv.zig (8 hunks)
  • src/jetstream_message.zig (1 hunks)
  • src/message.zig (1 hunks)
  • src/result.zig (1 hunks)
  • tests/jetstream_kv_history_test.zig (2 hunks)
  • tests/jetstream_kv_test.zig (1 hunks)
  • tests/jetstream_nak_test.zig (1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-25T14:51:33.570Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#31
File: tests/jetstream_pull_test.zig:21-23
Timestamp: 2025-08-25T14:51:33.570Z
Learning: The nats.zig project uses a "tests:beforeEach" test in tests/all_tests.zig that runs before each test to clean up all JetStream streams and consumers, ensuring test isolation. This eliminates the need for unique test resource names as each test starts with a clean JetStream state.

Applied to files:

  • tests/jetstream_nak_test.zig
  • tests/jetstream_kv_test.zig
🔇 Additional comments (14)
tests/jetstream_nak_test.zig (1)

293-295: Good switch to numeric checks

> 0 is the right replacement after making sequences non-optional.

tests/jetstream_kv_history_test.zig (1)

46-50: Good move to arena-backed Result for history.

The wrapper + deinit() contract is clear, and per-entry cleanup via entry.deinit() is correct.

src/jetstream.zig (4)

27-29: Imports look right.

Pulling in parseTimestamp and the project-wide Result wrapper aligns this module with the new API.


654-667: parseResponse(): consistent Result(T) wrapping with arena hand-off.

Looks good. Callers must remember to result.deinit(); your tests reflect that.


939-946: Legacy GetMsg: timestamp parsing added.

Setting msg.time via parseTimestamp(stored_msg.time) is the right fix; downstream code can now rely on msg.time.


593-600: Restrict catch to only error.Timeout in examples/benchmarks
In examples/replier.zig (line 35), benchmarks/subscriber.zig (line 42) and benchmarks/echo_server.zig (line 41), you’re using a generic catch continue which silences all errors. Change to explicit matching—continue only on error.Timeout and propagate any other error.

⛔ Skipped due to learnings
Learnt from: lalinsky
PR: lalinsky/nats.zig#60
File: src/queue.zig:390-393
Timestamp: 2025-08-30T07:40:28.186Z
Learning: In the nats.zig project, the maintainer prefers to rely on Zig's built-in integer overflow detection rather than implementing manual saturated conversions for timeout calculations (timeout_ms * std.time.ns_per_ms). Zig automatically fails on integer overflow in Debug and ReleaseSafe modes.
src/jetstream_kv.zig (8)

21-21: Module imports updated to local Result and newInbox.

All good; this decouples KV from jetstream.zig’s old alias and avoids implicit Inbox creation paths.

Also applies to: 29-29


54-59: KVOperation.fromString(): fail-fast on unknown values.

Returning error.InvalidOperation is preferable to silently defaulting. Callers already “try” it.


118-124: isDeleted() helper improves readability.

This simplifies KeyNotFound checks elsewhere.


395-401: getRawEntry(): good mapping for MessageNotFound -> KeyNotFound.

errdefer msg.deinit() and parseMessage() ownership are consistent.


408-409: fromString() errors are propagated here correctly.

Keeps bad headers from masquerading as PUT.


586-593: watch/watchMulti/watchAll: APIs look good; lifetime of temporary subjects is safe.

Subjects are serialized into JSON during addConsumer; freeing the local copies after init is fine.

Also applies to: 596-617, 620-625


733-763: keys(): uses completion marker correctly and dedup via StringHashMap.

Allocation split (arena for keys, self.allocator for map) is reasonable and avoids double free.

Also applies to: 769-781


802-813: keysWithFilters(): mirrors keys() well.

Same notes as keys(); looks good.

Also applies to: 815-831, 839-850

- Use num_pending only for empty stream detection at subscription time
- Always start init_pending at 0, set from first message delta like C code
- Change completion check from >= to > to match C logic
- Handle empty streams immediately with init_done and return_marker flags
The time field was not being cleared when messages were returned to the pool,
causing pooled messages to retain stale timestamps between uses.
@lalinsky lalinsky merged commit b5a4877 into jetstream-kv Sep 6, 2025
3 checks passed
@lalinsky lalinsky deleted the jetstream-kv-ng branch September 6, 2025 21:19
@coderabbitai coderabbitai bot mentioned this pull request Sep 6, 2025
lalinsky added a commit that referenced this pull request Sep 7, 2025
* Fix pull requests

* Implement stall timeout in requestMany()

* Reformat

* WIP

* Change KV.init to return KV struct by value instead of pointer

This eliminates heap allocation for KV instances and simplifies memory
management by removing the need for destroy() calls.

Changes:
- KV.init now returns !KV instead of !*KV
- Removed heap allocation and destroy logic from init/deinit
- Updated createBucket and openBucket to return KV structs
- Fixed test to use var instead of const for mutable KV instance

* Change KV status() to return Result(KVStatus) following JetStream API pattern

This makes the KV API consistent with other JetStream methods that return
Result(T) for automatic memory management via arena allocation.

Changes:
- Remove allocator field and manual deinit from KVStatus struct
- Change status() return type from !*KVStatus to !Result(KVStatus)
- Reuse arena from getStreamInfo() response for allocations
- Use static string for backing_store field instead of duplicating
- Update test to access status.value fields

* Change KV get() and JetStream message APIs to return Result pattern

This refactoring makes the KV API consistent with other JetStream methods
that return Result(T) for automatic memory management via arena allocation.

Changes:
- Update getMsg/getLastMsg to return Result(StoredMessage) instead of *Message
- Move base64 decoding to getMsgInternal for better performance
- Make StoredMessage.data non-null (empty string instead of null)
- Change KV.get() to return Result(KVEntry) instead of *KVEntry
- Remove manual memory management from KVEntry struct
- Update parseEntry to work with StoredMessage and reuse arena
- Update all tests to use .value.field access pattern
- Export StoredMessage as public API

The KV API now follows the same Result pattern as other JetStream APIs,
providing consistent memory management and better ergonomics.

* Fix KV API after merge - update to new Message-based getMsg API

- Update KV.get() to use new getMsg API with GetMsgOptions and direct=true
- Modify KVEntry to hold *Message reference and handle its cleanup
- Update parseEntry() to return KVEntry directly (not Result(KVEntry))
- Use parsed headers via msg.headerGet() instead of raw header parsing
- Fix test assertions to work with new KVEntry structure
- Unit tests now pass (78/78), integration tests need NATS server

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>

* Fix formatting

* Add copy parameter to Message setter functions

Modify setSubject, setReply, setPayload, and setRawHeaders to accept
a copy: bool parameter. When copy=false, the functions store direct
references to the data instead of copying to the message's arena.

This optimization allows avoiding unnecessary allocations when:
- Data is already allocated on the message's arena
- Using string constants like ""
- Working with temporary messages

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>

* Fixes

* Enable allow_direct for KV streams to fix NoResponders error

KV get operations were failing with 503 NoResponders because the streams
weren't configured with allow_direct=true, which is required for the
direct message API ($JS.API.DIRECT.GET) to work.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>

* Update src/message.zig

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>

* Update tests/all_tests.zig

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>

* Update src/jetstream_kv.zig

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>

* Fix KV tests: compilation errors and runtime issues

- Fix const qualifier issues in jetstream_kv_test.zig by changing const to var for KV and KVEntry instances
- Fix KV create() error handling to properly map StreamWrongLastSequence to KeyExists
- Implement proper KV create() logic per ADR-8: allow create after delete/purge operations
- Add missing StreamConfig fields (allow_rollup_hdrs, deny_delete) required for KV buckets
- Enable rollup headers and deny delete settings for KV streams per ADR-8 specification

All KV tests now pass (9/9) and implementation follows NATS JetStream KV spec.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>

* Fix KV get() method to properly handle deleted/purged keys per ADR-8

- Add getRawEntry() internal method to retrieve raw KV entries including delete/purge markers
- Refactor get() method to use getRawEntry() and filter out deleted entries
- Fix create() method to use getRawEntry() for internal delete detection logic
- Update tests to expect KeyNotFound for deleted/purged keys per ADR-8 specification
- Implementation now matches Go NATS library behavior exactly

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>

* Add proper timestamp parsing for KV entries

- Create new timestamp.zig module with parseTimestamp() function
- Parse ISO 8601 timestamps with nanosecond precision from Nats-Time-Stamp header
- Add comprehensive unit tests for timestamp parsing including edge cases
- Update KV parseEntry() to use proper timestamps instead of sequence numbers
- Support full u64 range for nanosecond timestamps (valid until ~2554)
- Handle leap years, partial nanoseconds, and error cases correctly

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>

* Implement KV watch, history and keys functionality (WIP) (#97)

* Implement KV watch, history and keys functionality (WIP)

This implements the missing ADR-8 Key-Value features:

- WatchOptions struct with filtering options (include_history, ignore_deletes, meta_only, updates_only)
- KVWatcher struct for managing live key updates with thread-safe entry management
- watch() method for monitoring specific keys or patterns
- watchAll() method for monitoring entire buckets
- history() method for retrieving historical values of a key
- keys() and keysWithFilters() methods for listing bucket keys
- Internal watchFiltered() method using push subscriptions
- Enhanced status() method with proper limit_marker_ttl extraction
- Nats-Marker-Reason header handling for TTL-based operations

Key implementation details:
- Uses JetStream push subscriptions with ephemeral consumers
- Implements end-of-initial-data detection via null entry markers
- Thread-safe entry management with mutexes and atomic flags
- Proper resource cleanup for deliver subjects and subscriptions
- Message parsing from JetStream messages to KV entries

Current status: Core functionality implemented but push subscription
approach needs refinement - encountering issues with ephemeral consumers
and deliver subject configuration.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>

* Refactor KVWatcher to use async push subscription with ConcurrentQueue

- Replace ArrayList with ConcurrentQueue(KVEntry, 16) for thread-safe message passing
- Switch from subscribeSync() to async subscribe() with kvWatchHandler callback
- Handler extracts Message pointer without calling JetStreamMessage.deinit()
- Update next() API to return !KVEntry with timeout instead of ?KVEntry
- Add tryNext() for non-blocking entry retrieval
- Create parseJetStreamEntryAsync() for handling Message directly
- Update history() and keys() methods to work with new async API
- Fix test expectations for new timeout-based approach

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-authored-by: Lukáš Lalinský <[email protected]>

* Fix KV watcher memory management and consumer handling

- Add parseJetStreamEntryFromHandler() to safely extract Message pointers without use-after-free
- Fix TTL marker handling in parseEntry() for proper operation classification
- Wire subject_delete_marker_ttl in StreamConfig creation
- Use ephemeral consumers (name = null) for proper cleanup
- Address CodeRabbit memory management concerns

Note: ConsumerAlreadyExists errors in tests appear to be timing/environment related,
not fundamental code issues. Memory safety improvements are solid.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Lukáš Lalinský <[email protected]>

* Consolidate parseJetStreamEntry functions to avoid memory management issues

Remove duplicate parseJetStreamEntryFromHandler and consolidate into single
parseJetStreamEntry function that doesn't call js_msg.deinit() since we
reference memory inside the message.

Co-authored-by: Lukáš Lalinský <[email protected]>

---------

Co-authored-by: Claude <[email protected]>
Co-authored-by: claude[bot] <209825114+claude[bot]@users.noreply.github.com>
Co-authored-by: Lukáš Lalinský <[email protected]>

* Fix KV watcher memory management and consumer configuration

- Fix double-free error by ensuring clear ownership of deliver_subject
- Fix consumer configuration: set max_ack_pending=0 to allow ack_policy=none
- Fix memory leaks by making KVWatcher heap-allocated with self-destroy
- Fix wildcard filtering: change watchAll from '*' to '>' for multi-token keys
- All KV tests now pass (13/13)

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>

* Format code

* Add timestamp validation to parseTimestamp

Add daysInMonth helper function and validate numeric ranges in parseTimestamp:
- Year must be >= 1970
- Month must be 1-12
- Day must be 1-daysInMonth(year, month)
- Hour must be 0-23
- Minute and second must be 0-59

Also add defensive assertions in daysSinceEpoch and refactor to use daysInMonth helper.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>

* Experiment (#98)

* Experiment

* Fixed kv.history()

* Fixes

* Add result.zig

* Fix KV watcher initialization to match C implementation

- Use num_pending only for empty stream detection at subscription time
- Always start init_pending at 0, set from first message delta like C code
- Change completion check from >= to > to match C logic
- Handle empty streams immediately with init_done and return_marker flags

* Fix message pooling: reset time field in Message.reset()

The time field was not being cleared when messages were returned to the pool,
causing pooled messages to retain stale timestamps between uses.

---------

Co-authored-by: Claude <[email protected]>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Co-authored-by: claude[bot] <209825114+claude[bot]@users.noreply.github.com>
Co-authored-by: Lukáš Lalinský <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant