Skip to content

Conversation

@lalinsky
Copy link
Owner

@lalinsky lalinsky commented Aug 23, 2025

Summary

  • Implements generic Queue(T) with ArrayListUnmanaged backend and thread-safe operations
  • Replaces std.fifo.LinearFifo usage in subscriptions (removed in Zig 0.15)
  • Provides blocking/non-blocking pop operations with timeout support
  • Uses std.time.Timer for accurate elapsed time tracking

Key Features

  • Thread-safe: Protected by mutex and condition variable
  • Flexible access patterns: tryPop() (non-blocking), pop(timeout_ms) (blocking with timeout)
  • Memory efficient: Automatic compaction when consuming items
  • Closeable: Prevents new pushes and wakes up waiting consumers
  • Generic: Works with any type T

Changes

  • New file: src/queue.zig - Thread-safe queue implementation with comprehensive tests
  • Updated: src/subscription.zig - Use Queue(*Message) instead of LinearFifo
  • Updated: src/connection.zig - Use queue's push() method for message delivery

Test Plan

  • Unit tests for all queue operations (FIFO, timeout, compaction, thread safety)
  • Integration tests verify message passing works correctly
  • Existing NATS functionality preserved

The queue is ready for use in Connection.messages and provides a robust, idiomatic Zig solution for thread-safe message queuing.

Summary by CodeRabbit

  • New Features

    • Timeout-based waiting when retrieving subscription messages.
    • Subscriptions close gracefully and drain pending messages on shutdown.
  • Improvements

    • New thread-safe queue for message delivery with reduced per-subscription locking.
    • Simpler, more responsive message retrieval and improved throughput under load.
  • Bug Fixes

    • Fixed rare deadlocks and dropped/missed messages during high concurrency or shutdown.
  • Tests

    • Expanded tests for ordering, timeouts, close semantics, compaction, and multithreaded scenarios.

- Implement generic Queue(T) with ArrayListUnmanaged backend
- Thread-safe with mutex and condition variable
- Support for blocking/non-blocking pop operations with timeout
- Automatic memory compaction for efficiency
- Replace LinearFifo usage in subscriptions with new Queue
- Use std.time.Timer for accurate elapsed time tracking
- Close queue during cleanup to prevent new messages
- Comprehensive test coverage including multithreaded scenarios
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Aug 23, 2025

Walkthrough

Replaces per-subscription LinearFifo and explicit mutexes with a new generic, thread-safe Queue(T). Subscriptions now use Queue.init/close/pop/tryPop; connections enqueue via s.messages.push(message) with push error handling; draining closes the queue then drains via tryPop. (50 words)

Changes

Cohort / File(s) Summary of changes
Thread-safe Queue implementation
src/queue.zig
Added pub const Queue(comptime T: type) type with mutex, condvar, indexed ArrayListUnmanaged, head index, closed flag, compaction, push, tryPop, pop(timeout_ms), len, isEmpty, close, isClosed, init/deinit, and tests. Introduced QueueError (includes QueueClosed and allocator errors).
Subscription refactor to Queue
src/subscription.zig
Replaced std.fifo.LinearFifo(*Message, .Dynamic) + per-subscription mutex with Queue(*Message). Updated initSync/initAsync to use Queue.init, deinitInternal to close then drain via tryPop, and nextMsg to call messages.pop(timeout_ms), removing explicit locking and sleep-based polling.
Connection enqueue path update
src/connection.zig
For sync-subscription path, removed per-subscription lock and writeItem; replaced with s.messages.push(message) and added handling for QueueError.QueueClosed (drop message) and other push errors (log and drop). Connection control flow unchanged when enqueue fails.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant Conn as Connection.processMsg
  participant Sub as Subscription
  participant Q as Queue(*Message)
  participant Consumer as Subscription.nextMsg

  Conn->>Q: push(message)
  note right of Q #e8f4ff: (lock) append/rotate/grow, signal waiters
  alt QueueClosed
    Q-->>Conn: QueueClosed (drop message)
  else PushError
    Q-->>Conn: error (log, drop message)
  else Success
    Q-->>Conn: ok
  end

  Consumer->>Q: pop(timeout_ms)
  alt item available
    Q-->>Consumer: *Message
  else timeout or closed
    Q-->>Consumer: null
  end

  rect rgba(230,240,255,0.45)
  Sub->>Q: close()
  Q-->>Consumer: wake waiters (closed)
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

I hop through queues with tidy cheer,
Locks unspool and messages near.
Push then pop, timers hum along,
Close the gate — the drain goes on.
Rabbit rails keep order strong 🐇


📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 68c6d85 and f9a0a12.

📒 Files selected for processing (1)
  • src/queue.zig (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/queue.zig
✨ Finishing Touches
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature/thread-safe-queue

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/connection.zig (1)

847-853: Handle queue push errors locally to avoid parser teardown and to prevent message leaks

Pushing to the queue can fail (Allocator.Error or error.QueueClosed). Letting this bubble up via try turns into a parser error in readerLoop, which triggers a reconnect and leaks message (no deinit on the error path). Handle errors in-place: drop the message with a log and return success from processMsg.

Apply this diff:

             if (s.handler) |handler| {
                 // Execute callback without holding locks
                 handler.call(message);
             } else {
                 // Sync subscription - queue message
-                try s.messages.push(message);
+                s.messages.push(message) catch |err| {
+                    switch (err) {
+                        error.QueueClosed => {
+                            // Subscription is closing/closed; drop gracefully.
+                            log.debug("Queue closed for sid {d}; dropping message", .{ msg_arg.sid });
+                            message.deinit();
+                            return;
+                        },
+                        else => {
+                            // Allocation or unexpected push failure; drop but do not tear down the connection.
+                            log.err("Failed to enqueue message for sid {d}: {}", .{ msg_arg.sid, err });
+                            message.deinit();
+                            return;
+                        },
+                    }
+                };
             }
🧹 Nitpick comments (7)
src/connection.zig (1)

844-851: Confirm async handler owns message lifetime (no deinit here)

For the async path, handler.call(message) doesn't deinit message. Ensure handlers are responsible for destroying Message instances (and that this contract is documented), otherwise this will leak.

src/subscription.zig (1)

79-85: Close-then-drain is safe; verify intended semantics for pending items

Closing the queue wakes waiting consumers and prevents new pushes, then you drain and deinit pending messages. This means callers blocked in nextMsg() will get null (even if items remain) after close. If that's the intended unsubscribe semantics (drop remaining), this is correct. If you want pending items to be retrievable post-close, we’d need to adjust Queue.pop.

src/queue.zig (5)

78-117: Clarify pop semantics on close (pending items vs early-null)

pop() returns null when closed == true even if items are still queued. That’s a valid design, but callers must know they may lose pending items after close. If you instead want “no new pushes, but still drainable,” keep the loop running until the queue is empty, then return null on close.

If you want “drain after close,” you could adjust the loop condition:

-            while (!self.closed) {
+            while (true) {
                 if (self.head < self.items.items.len) {
                     const item = self.items.items[self.head];
                     self.head += 1;
                     // compaction...
                     return item;
                 }
-                const elapsed_ns = timer.read();
-                if (elapsed_ns >= timeout_ns) {
-                    return null;
-                }
-                const remaining_ns = timeout_ns - elapsed_ns;
-                self.condition.timedWait(&self.mutex, remaining_ns) catch {};
+                if (self.closed) return null;
+                const elapsed_ns = timer.read();
+                if (elapsed_ns >= timeout_ns) return null;
+                const remaining_ns = timeout_ns - elapsed_ns;
+                self.condition.timedWait(&self.mutex, remaining_ns) catch {};
             }

119-129: len/isEmpty: minor simplification and consistency

Given internal invariants, self.items.items.len >= self.head should always hold. You can simplify len() and make isEmpty() call len() to avoid divergence (micro-style).

-        pub fn len(self: *Self) usize {
+        pub fn len(self: *Self) usize {
             self.mutex.lock();
             defer self.mutex.unlock();
-            return if (self.items.items.len >= self.head) self.items.items.len - self.head else 0;
+            return self.items.items.len - self.head;
         }
 
         pub fn isEmpty(self: *Self) bool {
-            self.mutex.lock();
-            defer self.mutex.unlock();
-            return self.head >= self.items.items.len;
+            return self.len() == 0;
         }

141-152: Expose a concrete error set on API for discoverability

You defined pub const QueueError = error{ QueueClosed } || Allocator.Error, but push returns an inferred error set. Consider annotating signatures with QueueError so callsites can import and match it without guessing.

-        pub fn push(self: *Self, item: T) !void {
+        pub fn push(self: *Self, item: T) QueueError!void {
             ...
         }

201-221: Reduce flakiness in timeout test

Asserting elapsed_ms >= 45 on a 50ms wait can be flaky under load. Consider widening tolerance or asserting upper bounds instead (e.g., 30–200ms) to accommodate scheduler variability.


243-261: Add a regression test for “push when head > 0 at capacity”

The rotate bug wouldn’t be caught by current tests. Add a test that fills to capacity, pops a prefix, then pushes again to trigger compaction-before-growth and verify no duplicates.

Proposed test to append to this file:

test "Queue push compacts when head > 0 at capacity (no duplication)" {
    const testing = std.testing;
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    defer _ = gpa.deinit();
    const allocator = gpa.allocator();

    var q = Queue(i32).init(allocator);
    defer q.deinit();

    // Fill to initial capacity (8)
    for (0..8) |i| try q.push(@intCast(i));

    // Pop a few to advance head
    try testing.expectEqual(@as(?i32, 0), q.tryPop());
    try testing.expectEqual(@as(?i32, 1), q.tryPop());
    try testing.expectEqual(@as(?i32, 2), q.tryPop());
    try testing.expectEqual(@as(usize, 5), q.len());

    // Push enough to require space; should compact, not rotate duplicates
    for (8..12) |i| try q.push(@intCast(i));

    // We should see the remaining original items [3..7], then [8..11]
    for (3..12) |i| try testing.expectEqual(@as(?i32, @intCast(i)), q.tryPop());
    try testing.expect(q.isEmpty());
}
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 4171e87 and 5c89408.

📒 Files selected for processing (3)
  • src/connection.zig (1 hunks)
  • src/queue.zig (1 hunks)
  • src/subscription.zig (6 hunks)
🔇 Additional comments (3)
src/subscription.zig (2)

5-5: *Switch to Queue(Message) looks good

Replacing LinearFifo with Queue(*Message) simplifies synchronization and localizes concurrency concerns. Init sites are consistent for sync and async. LGTM.

Also applies to: 27-27, 41-41, 53-53


93-95: nextMsg: simple, race-free wrapper

Delegating to messages.pop(timeout_ms) is concise and removes manual polling. The return type ?*Message aligns with Queue.pop. LGTM.

src/queue.zig (1)

66-74: Compaction logic on pop/tryPop is correct and uses copy-forwards with shrink

The compaction threshold and copy/shrink pattern are sound and avoid quadratic behavior. Once the push-side rotate bug is fixed, the structure is consistent across push and pop paths.

Also applies to: 95-103

@lalinsky
Copy link
Owner Author

@claude see the review comments, are they relevant? what's the impact and what fixes would then need?

@claude
Copy link
Contributor

claude bot commented Aug 23, 2025

Claude finished @lalinsky's task —— View job


Analysis of CodeRabbit Review Comments

Todo List

  • Read repository guidelines and understand the codebase
  • Analyze each CodeRabbit comment for severity and relevance
  • Assess impact of identified issues
  • Provide recommendations for fixes

Summary

The CodeRabbit review identified 1 critical bug and several important issues. Here's my assessment:

🚨 CRITICAL: Data Corruption Bug in queue.zig:34-47

Issue: The std.mem.rotate logic in the push() method causes data duplication when the queue is at capacity with head > 0.

Impact:

  • Messages will be delivered multiple times to subscribers
  • Data integrity is completely compromised
  • Could cause infinite loops or processing errors in consumers

Root Cause: When rotating consumed elements back into the active slice, the array length isn't adjusted, making previously consumed items visible again.

Fix Priority: IMMEDIATE - This breaks core functionality.

⚠️ HIGH: Connection Error Handling (connection.zig:847-853)

Issue: try s.messages.push(message) can fail with QueueClosed or allocation errors, causing parser teardown and message leaks.

Impact:

  • Connection reconnection on queue errors (unnecessary)
  • Memory leaks when message.deinit() is skipped on error paths
  • Poor error recovery behavior

Fix Priority: HIGH - Affects reliability and resource management.

📋 Medium Priority Issues

  1. Error handling for async handlers (connection.zig:844-851) - Need to verify message lifetime ownership
  2. Close semantics (subscription.zig:79-85) - Confirm intended behavior for pending items after close
  3. Pop timeout semantics (queue.zig:78-117) - Whether to drain pending items after close vs. return null immediately

🔧 Low Priority Improvements

  1. Test coverage gap - Missing test for the rotate bug scenario
  2. Timeout test flakiness - 45ms assertion could be more robust
  3. API consistency - Consider explicit error types for better discoverability
  4. Code simplification - Minor opportunities in len()/isEmpty() methods

Recommended Action Plan

  1. Fix the critical rotate bug immediately - Replace with compaction logic as CodeRabbit suggested
  2. Fix connection error handling - Add proper error handling with message cleanup
  3. Add regression test for the rotate scenario
  4. Run the test suite (zig build test) to verify fixes
  5. Address medium priority items based on intended semantics

The core threading and synchronization logic is sound, but the data corruption bug makes this PR unsafe to merge in its current state.

@lalinsky
Copy link
Owner Author

@claude fix the issue

@claude
Copy link
Contributor

claude bot commented Aug 23, 2025

Claude finished @lalinsky's task —— View job


Fixed Critical Issues ✅

I've successfully fixed the critical issues identified in the CodeRabbit review:

Todo List

  • Read repository setup instructions and understand codebase
  • Fix critical data corruption bug in queue.zig (rotate logic)
  • Fix connection error handling to prevent message leaks
  • Add regression test for the rotate bug scenario
  • Run test suite to verify fixes
  • Commit and push changes

Changes Made

🚨 Critical Fix: Data Corruption Bug

  • Problem: std.mem.rotate in Queue.push() was reintroducing already-consumed items, causing message duplication
  • Solution: Replaced with proper compaction logic that copies remaining items to front and shrinks array length
  • File: src/queue.zig:34-47

⚠️ High Priority Fix: Connection Error Handling

  • Problem: queue.push() failures were causing parser teardown and message memory leaks
  • Solution: Added proper error handling that gracefully drops messages and logs errors without tearing down the connection
  • File: src/connection.zig:852-867

🧪 Regression Test Added

  • Added test "Queue push compacts when head > 0 at capacity (no duplication)"
  • Verifies the exact scenario that would trigger the data corruption bug
  • Ensures no message duplication occurs during compaction

Test Results

All unit tests pass (32/32), including the new regression test. The build completes successfully with no compilation errors.

Commit: 702fc82 - Fix critical queue data corruption and connection error handling

The queue implementation is now safe for production use. The core threading and synchronization logic was already sound - these fixes address the critical data integrity and error handling issues.


- Replace buggy std.mem.rotate logic with proper compaction in Queue.push()
  to prevent message duplication when head > 0 at capacity
- Add proper error handling for queue push failures in connection.zig
  to prevent connection teardown and memory leaks
- Add regression test for the compaction scenario

Co-authored-by: Lukáš Lalinský <[email protected]>
Keep graceful handling for QueueClosed but let allocation
and other unexpected push errors bubble up to trigger
connection recovery as intended.

Co-authored-by: Lukáš Lalinský <[email protected]>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
src/connection.zig (1)

852-867: Non-QueueClosed push errors now bubble up to tear down connection — LGTM

This matches the prior request to return on non-QueueClosed errors. Line 864 returning err ensures the error propagates through processMsgparser.parsereaderLoop catch, which triggers triggerReconnect(err). Memory ownership is handled correctly: message.deinit() on both error paths, and defer s.release() guarantees the subscription ref is released. Good fix.

To double-check that this indeed results in a reconnect, verify that parser propagates callback errors and that readerLoop catches them:

#!/bin/bash
# 1) Confirm parser.parse returns an error union (so it can propagate callback errors)
rg -n -C2 -g 'src/**/*.zig' -P 'fn\s+parse\([^)]*\)\s*!'

# 2) Confirm parse errors are caught in readerLoop and lead to triggerReconnect
rg -n -C4 'parser\.parse\(' src/connection.zig

# 3) Optional: confirm the Queue push error set matches the handled cases
rg -n -C2 -g 'src/**/*.zig' -P 'pub\s+fn\s+push\([^)]*\)\s*!' 
🧹 Nitpick comments (1)
src/connection.zig (1)

845-846: Avoid logging full payload in hot path to reduce log volume

Suggest logging size (and maybe a prefix) instead of the whole payload to keep debug logs useful under load.

-            log.debug("Delivering message to subscription {d}: {s}", .{ msg_arg.sid, message.data });
+            log.debug("Delivering message to subscription {d}: data_len={d}", .{ msg_arg.sid, message.data.len });
+            // If you still want a peek, cap it:
+            // const preview_len = @min(message.data.len, 128);
+            // log.debug("Payload preview (first {} bytes): {s}", .{ preview_len, message.data[0..preview_len] });
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 702fc82 and 68c6d85.

📒 Files selected for processing (1)
  • src/connection.zig (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: test

Extract duplicated compaction code from push(), tryPop(), and pop()
methods into a private compact() helper. This improves code
maintainability and reduces duplication.

Co-authored-by: Lukáš Lalinský <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant