-
Notifications
You must be signed in to change notification settings - Fork 1
Fix queue closure handling in blocking operations and add fast path #27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Caution Review failedThe pull request is closed. WalkthroughAdds QueueClosed to PopError/PushError; introduces timed and non-blocking getSlice/pop semantics (including tryGetSlice), queue lifecycle APIs (close/isClosed/hasData/getItemsAvailable), and updates buffer wrapper and tests to handle closure and timeouts. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant C as Caller
participant Q as ConcurrentQueue
participant B as CondVar/Wait
participant T as Timer
rect rgba(230,245,255,0.6)
Note over C,Q: Non-blocking pop / tryGetSlice (timeout_ms = 0)
C->>Q: pop(0) / tryGetSlice()
alt has item(s)
Q-->>C: return item / View
else closed AND empty
Q-->>C: error QueueClosed
else no items
Q-->>C: error QueueEmpty
end
end
rect rgba(235,255,235,0.6)
Note over C,Q: Timed wait (timeout_ms > 0)
C->>Q: pop(timeout_ms) / getSlice(timeout_ms)
loop wait until item, closed, or timeout
Q->>B: timedWait()
B->>T: start/track timer
T-->>B: wake on timeout or signal
B-->>Q: wake
end
alt item available
Q-->>C: return item / View
else closed AND empty
Q-->>C: error QueueClosed
else timeout
Q-->>C: error QueueEmpty
end
end
rect rgba(255,245,230,0.6)
Note over C,Q: Indefinite wait (timeout_ms = maxInt)
C->>Q: getSlice(maxInt)
loop until item or closed
Q->>B: wait()
B-->>Q: wake
end
alt item available
Q-->>C: return View
else closed AND empty
Q-->>C: error QueueClosed
end
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20–30 minutes Possibly related PRs
Poem
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 💡 Knowledge Base configuration:
You can enable these sources in your CodeRabbit configuration. 📒 Files selected for processing (1)
✨ Finishing Touches🧪 Generate unit tests
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (4)
src/queue.zig (4)
6-9: Adding QueueClosed to PopError is a sensible API improvement; confirm downstream impact and update docsThis changes the public error set of pop-related APIs. Callers that pattern-match on error sets (e.g., switch over PopError) may require updates. Please:
- Document the new error in pop() and waitAndGetSlice() doc comments.
- Note the change in the changelog and consider semver implications if this is a published library.
285-296: Bound total wait time and guard against overflow in timeout conversionCurrent loop reuses the same relative timeout on each iteration. With spurious wakeups, total wait can exceed timeout_ms. Also, timeout_ms * ns_per_ms can overflow u64. Recommend tracking a deadline and recomputing the remaining time each iteration, with overflow-safe multiplication.
Apply this diff:
- const timeout_ns = timeout_ms * std.time.ns_per_ms; - - while (self.items_available == 0 and !self.is_closed) { - self.data_cond.timedWait(&self.mutex, timeout_ns) catch { - return PopError.QueueEmpty; - }; - } + const timeout_ns = std.math.mul(u64, timeout_ms, std.time.ns_per_ms) catch std.math.maxInt(u64); + const deadline: i128 = std.time.nanoTimestamp() + @as(i128, timeout_ns); + + while (self.items_available == 0 and !self.is_closed) { + const now: i128 = std.time.nanoTimestamp(); + if (now >= deadline) return PopError.QueueEmpty; + const remaining_i128 = deadline - now; + const remaining_ns = std.math.cast(u64, remaining_i128) orelse 0; + self.data_cond.timedWait(&self.mutex, remaining_ns) catch { + return PopError.QueueEmpty; + }; + }
329-336: Constrain waitAndGetSlice error set to PopError for a tighter APIwaitAndGetSlice currently returns an inferred error set. Since it now returns PopError.QueueClosed, consider making the signature explicit to avoid leaking anyerror to callers:
pub fn waitAndGetSlice(self: *Self) PopError!View { ... }Also, update the doc comment to mention it may return QueueClosed when the queue is closed and empty.
720-743: Add a small test for the non-blocking fast path on closed+emptyTo fully cover the new fast path, add a test that calls pop(0) on a closed, empty queue and expects QueueClosed:
test "non-blocking pop returns QueueClosed on closed empty queue" { const allocator = std.testing.allocator; const Queue = ConcurrentQueue(i32, 4); var queue = Queue.init(allocator, .{}); defer queue.deinit(); queue.close(); const result = queue.pop(0); try std.testing.expectError(PopError.QueueClosed, result); }Also applies to: 744-767
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
src/queue.zig(4 hunks)
🔇 Additional comments (4)
src/queue.zig (4)
276-284: Non-blocking fast path is clean and correctGood early-exit logic: closed+empty returns QueueClosed; otherwise empty returns QueueEmpty; avoids timedWait overhead for timeout_ms == 0. Nice win for hot paths.
329-336: Correct closure handling in waitAndGetSliceThe loop condition and post-loop check are right: threads wake promptly on close and return QueueClosed when empty. Good fix.
720-743: Test: blocking pop handles queue closure — solid coverageThis exercises the wake-on-close path and asserts the new QueueClosed error. Looks good.
744-767: Test: waitAndGetSlice handles queue closure — solid coverageThis complements the pop() test and validates the slice path. Looks good.
- Add QueueClosed error to PopError enum - Fix pop() method to check closure state in wait loop and return QueueClosed when queue is closed with no items - Add fast path to pop() for non-blocking calls (timeout_ms == 0) to avoid timedWait overhead - Replace waitAndGetSlice() with unified getSlice(timeout_ms) API for consistency with pop() - Fix getSlice() method with same closure handling and fast path optimization - Update tryGetSlice() to use getSlice(0) implementation - Update ConcurrentWriteBuffer to use new getSlice API - Add comprehensive tests to verify blocking operations handle queue closure properly This prevents threads from hanging indefinitely when the queue is closed while waiting, optimizes non-blocking operations, and provides a cleaner, more consistent API.
467d32b to
0ae5d1f
Compare
Summary
Problem
Previously, when a queue was closed while threads were waiting in blocking operations (pop() with timeout or waitAndGetSlice()), the threads would either hang indefinitely or timeout instead of immediately returning with an appropriate error.
Additionally, non-blocking pop() calls (timeout_ms == 0) were unnecessarily calling timedWait which adds overhead.
Solution
items_available == 0and!is_closed, and returnPopError.QueueClosedwhen the queue is closed with no available itemsTest plan
Summary by CodeRabbit
New Features
Bug Fixes
Tests