-
Notifications
You must be signed in to change notification settings - Fork 1
Implement shared response subscription for request/reply operations #29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- Add ResponseManager class implementing future-based request/reply pattern - Uses single wildcard subscription instead of per-request subscriptions - Implements incremental numeric token generation (like C library) - Thread-safe response routing with condition variables - Matches Go/C reference library efficiency for high-throughput scenarios - ResponseManager unit tests passing Replaces old per-request subscription approach with shared subscription pattern used by Go and C NATS libraries for better performance.
- Fix mutex deadlock where request() held mutex while calling publishRequest() - Update request_reply_test.zig for new Subscription.init signature - Unit tests now pass, integration tests still have issue with JetStream The shared subscription response manager is working for basic functionality, but there may be an integration issue with JetStream request/reply operations.
This commit completes the shared subscription request/reply implementation by adding proper "No Responders" error detection that matches the behavior of the official Go and C NATS reference libraries. ## Key Changes ### Header Parsing Compatibility - Add header constants like C library: STATUS_HDR, DESCRIPTION_HDR, HDR_STATUS_NO_RESP_503 - Update header parsing to match Go library behavior: - Parse "NATS/1.0 503" → Status header = "503" - Parse "NATS/1.0 503 No Responders" → Status = "503", Description = "No Responders" - Eliminate unnecessary key copying by using string constants ### NoResponders Error Handling - Fix isNoResponders() to use parsed Status header like Go library - Add NoResponders error type to ConnectionError enum - Update ResponseManager to detect 503 status and convert to proper Zig error - Fix ResponseInfo.wait() to properly propagate errors instead of returning null ### Request/Reply Test Organization - Create dedicated tests/core_request_reply_test.zig with comprehensive coverage: - Basic request/reply functionality - Concurrent requests (validates token routing) - NoResponders error handling - Request timeout scenarios - Multiple subjects testing - Remove scattered request/reply tests from other files ### Bug Fixes - Fix dispatcher assignment for internal subscriptions - Fix memory leak in echo handler by adding proper msg.deinit() - Update test to use non-error returning isNoResponders() ## Validation - All request/reply tests pass with no memory leaks - Implementation now exactly matches Go/C reference library behavior - Shared subscription with token-based routing working correctly - Proper error propagation for NoResponders scenarios
Update test to use NATS protocol format 'NATS/1.0 503' instead of 'NATS/1.0\r\nStatus: 503 No Responders Available\r\n\r\n' to match how NATS actually sends NoResponders messages.
- Add NATS_STATUS_PREFIX constant and use throughout - Use ArrayListUnmanaged.initCapacity and appendAssumeCapacity for headers - Simplify ResponseInfo to store result as ?anyerror!?*Message - Use std.time.nanoTimestamp for timeout handling - Use StringHashMapUnmanaged for response map - Use statically sized array for inbox prefix instead of allocations - Remove subscribeInternal API, use regular subscribe instead - Fix ResponseInfo lifecycle management - Add proper mutex synchronization for thread safety 🤖 Generated with [Claude Code](https://claude.ai/code) Co-authored-by: Lukáš Lalinský <[email protected]>
- Simplify ResponseInfo to use ?anyerror!*Message instead of separate fields - Remove redundant completed field, use result == null check instead - Fix segmentation fault in response_manager deinit - Fix memory leaks in token cleanup - All requested optimizations confirmed implemented 🤖 Generated with [Claude Code](https://claude.ai/code) Co-authored-by: Lukáš Lalinský <[email protected]>
- Fix NUID next() function calls to provide buffer arguments - Fix ResponseInfo test to use timedWait() instead of missing wait() method - Fix optional type errors in request/response handling - request() returns !*Message not ?*Message - Fix use-after-free bug in token cleanup causing segfaults - Fix test expectations for simple request reply and timeout scenarios - Update response manager to use proper error handling and memory management
Use a boolean flag to track message ownership and only free the message if it wasn't transferred to the response handler. This prevents memory leaks when NoResponders errors occur.
The connection mutex was being used around createRequest() call, but this is not needed since: - createRequest() doesn't modify any connection-level state - It has its own internal resp_mutex for thread safety - Token generation uses atomic operations - Memory allocation is thread-safe This improves performance by reducing lock contention.
Remove unused getNextSubId helper function that was not being called anywhere in the codebase.
Add error handling to complete() and completeWithError() methods to prevent race conditions from double completion attempts. Use proper error propagation with catch return in responseHandler.
- Set no_responders default from false to true in ConnectionOptions - Change logic to only enable no_responders when both option is true AND server supports headers - Ensures no_responders feature is only used when server can handle it properly
…-request sync primitives - Replace per-request ResponseInfo with shared synchronization primitives - Use u64 request IDs (rid) instead of string tokens for better performance - Add RequestHandle type for clean API design with separate getReplySubject method - Implement proper shutdown handling with is_closed flag and broadcast notification - Reduce memory usage from ~80+ bytes per request to ~8 bytes per request - Eliminate atomic operations by generating RIDs under mutex protection - Add comprehensive cleanup of pending responses during shutdown - Update tests to use new API with proper defer cleanup - All existing functionality preserved - no breaking changes to public API Performance improvements: - 10x less memory per concurrent request - No string allocations for token management - Single condition variable instead of N scattered ones - Simple u64 comparisons instead of string operations
WalkthroughCentralizes request/reply correlation via a new ResponseManager and wires it into Connection; changes request API and no_responders handshake default; parses inline NATS status into headers; updates NUID API to write into caller buffers; refines JetStream error propagation; adds tests and docs for request/reply. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant App as Application
participant Conn as Connection
participant RM as ResponseManager
participant Serv as NATS Server
App->>Conn: request(subject, data, timeout_ms)
Conn->>RM: ensureInitialized(connection)
RM-->>Conn: subscription ready (_INBOX.<prefix>.*)
Conn->>RM: createRequest() -> RequestHandle(rid)
Conn->>RM: getReplySubject(handle) -> _INBOX.<prefix>.<rid>
Conn->>Serv: PUB subject (reply=_INBOX...rid) + payload
Note over RM: resp mux subscribed to _INBOX.<prefix>.*
Serv-->>RM: MSG _INBOX.<prefix>.<rid> + payload or inline 503 status
RM->>RM: correlate rid -> store Message or NoResponders error
RM-->>Conn: signal waiters
Conn->>RM: waitForResponse(handle, timeout)
alt response received
RM-->>Conn: *Message
Conn-->>App: return *Message
else timeout / closed / no responders / unknown
RM-->>Conn: error (Timeout / ConnectionClosed / NoResponders / Unknown)
Conn-->>App: propagate error
end
Conn->>RM: cleanupRequest(handle)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
✨ Finishing Touches🧪 Generate unit tests
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
|
@claude review this, focus on concurrency correctness, memory management |
|
Claude finished @lalinsky's task —— View job Code Review: Concurrency Correctness & Memory ManagementTodo List:
SummaryThis PR introduces a centralized Critical Issues1. Race Condition in Response Completion (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/jetstream.zig (1)
233-238: Action Required: Update tests to expect underlying NATS errors instead oferror.NoResponseThe recent change to
sendRequestnow propagates more specific errors (e.g.ConnectionClosed,Timeout,NoResponders) rather than the genericerror.NoResponse. The only remaining references toerror.NoResponseare in your request–reply tests:
- tests/core_request_reply_test.zig (around line 106):
- return error.NoResponse;
- // Replace with the underlying NATS error you expect for “no responders”
- return error.NoResponders;
Please update this test (and any related expectations) to match the new error variant. </blockquote></details> <details> <summary>src/nuid.zig (1)</summary><blockquote> `47-61`: **Overflow risk after resetSequence(): sequence + increment may still exceed MAX_SEQUENCE on the same call.** After you randomize/reset, you unconditionally add increment without rechecking. If resetSequence picks sequence close to MAX_SEQUENCE and increment is large, sequence can wrap past 36^10 and encodeBase36 will silently truncate. Fix by bounding sequence given the chosen increment, or loop until safe. Apply one of these fixes (deterministic bounding preferred): ```diff @@ fn resetSequence(self: *Nuid) void { @@ - self.sequence = seq_random % MAX_SEQUENCE; - self.increment = MIN_INCREMENT + (inc_random % (MAX_INCREMENT - MIN_INCREMENT + 1)); + self.increment = MIN_INCREMENT + (inc_random % (MAX_INCREMENT - MIN_INCREMENT + 1)); + // Ensure next() can safely add increment without exceeding MAX_SEQUENCE + const limit = MAX_SEQUENCE - self.increment; + // limit is > 0 because MAX_INCREMENT << MAX_SEQUENCE + self.sequence = seq_random % limit; } @@ - fn next(self: *Nuid, buffer: *[NUID_TOTAL_LEN]u8) void { - // Check if sequence would overflow - if (self.sequence + self.increment >= MAX_SEQUENCE) { - self.randomizePrefix(); - self.resetSequence(); - } + fn next(self: *Nuid, buffer: *[NUID_TOTAL_LEN]u8) void { + // Check if sequence would overflow; after resetSequence() we guarantee room for one increment + if (self.sequence + self.increment >= MAX_SEQUENCE) { + self.randomizePrefix(); + self.resetSequence(); + }Alternatively, keep current resetSequence and guard with a loop:
- if (self.sequence + self.increment >= MAX_SEQUENCE) { - self.randomizePrefix(); - self.resetSequence(); - } + while (self.sequence + self.increment >= MAX_SEQUENCE) { + self.randomizePrefix(); + self.resetSequence(); + }I recommend the first approach to avoid rare spin retries.
🧹 Nitpick comments (5)
src/nuid.zig (1)
101-110: next(buffer) and nextString() adaptation looks good.The API shift to caller-provided buffers avoids copies; nextString() keeps a convenient path with a single dupe. Consider adding a slice-based helper that validates buffer length for ergonomics.
Example:
pub fn nextIntoSlice(buf: []u8) error{InvalidBuffer}!void { if (buf.len != NUID_TOTAL_LEN) return error.InvalidBuffer; var tmp: *[NUID_TOTAL_LEN]u8 = @ptrCast(buf.ptr); next(tmp); }src/message.zig (1)
110-150: Inline status parsing is solid; add a tiny guard to ensure status is exactly three digits.Today any 3 chars are accepted as status. Tighten it to digits to avoid false positives on malformed inputs.
Apply this minimal check:
@@ - if (status_part.len == status_len) { - status = status_part; + if (status_part.len == status_len) { + status = status_part; } else if (status_part.len > status_len) { status = status_part[0..status_len]; const desc_part = std.mem.trim(u8, status_part[status_len..], " \t"); if (desc_part.len > 0) { description = desc_part; } } else { status = status_part; // Less than 3 chars, use as-is } + + // Ensure status is numeric (e.g., "503") + if (status.len == 3) { + const d0 = status[0]; + const d1 = status[1]; + const d2 = status[2]; + if (!(d0 >= '0' and d0 <= '9' and d1 >= '0' and d1 <= '9' and d2 >= '0' and d2 <= '9')) { + // Not a valid numeric status; skip setting headers + status = ""; + description = null; + } + } @@ - var status_list = try ArrayListUnmanaged([]const u8).initCapacity(arena_allocator, 1); - status_list.appendAssumeCapacity(status); - try self.headers.put(arena_allocator, STATUS_HDR, status_list); + if (status.len != 0) { + var status_list = try ArrayListUnmanaged([]const u8).initCapacity(arena_allocator, 1); + status_list.appendAssumeCapacity(status); + try self.headers.put(arena_allocator, STATUS_HDR, status_list); + }tests/core_request_reply_test.zig (1)
49-49: Consider making sleep duration configurable.The fixed 10ms sleep might be too short in heavily loaded systems or too long in fast test environments.
Consider extracting this as a constant at the top of the file:
const SUBSCRIPTION_REGISTRATION_WAIT_NS = 10_000_000; // 10mssrc/response_manager.zig (2)
24-29: Uninitialized buffer usage before initialization.The
resp_sub_prefixfield is initialized to referenceresp_sub_prefix_buf, butresp_sub_prefix_bufis marked asundefined. While this works becauseresp_sub_prefixis properly set inensureInitializedbefore use, it's cleaner to initializeresp_sub_prefixto an empty slice.- resp_sub_prefix_buf: [INBOX_PREFIX_LEN]u8 = undefined, - resp_sub_prefix: []u8 = &.{}, + resp_sub_prefix_buf: [INBOX_PREFIX_LEN]u8 = undefined, + resp_sub_prefix: []const u8 = "",
102-102: Unused parameter in createRequest.The second parameter (data) is not used in the function. Consider removing it or documenting why it's needed for API consistency.
- pub fn createRequest(self: *ResponseManager, subject: []const u8, _: []const u8) !RequestHandle { + pub fn createRequest(self: *ResponseManager, subject: []const u8, data: []const u8) !RequestHandle { + _ = data; // Reserved for future use or API consistency
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (9)
src/connection.zig(10 hunks)src/jetstream.zig(1 hunks)src/message.zig(4 hunks)src/nuid.zig(5 hunks)src/request_reply_test.zig(2 hunks)src/response_manager.zig(1 hunks)tests/all_tests.zig(1 hunks)tests/core_request_reply_test.zig(1 hunks)tests/headers_test.zig(1 hunks)
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: lalinsky
PR: lalinsky/nats.zig#28
File: src/response_manager.zig:89-105
Timestamp: 2025-08-25T02:49:59.082Z
Learning: In this NATS Zig codebase, subscriptions are reference counted, so calling deinit() on a subscription from multiple locations (like both Connection and ResponseManager) is safe and doesn't cause double-free issues.
📚 Learning: 2025-08-24T20:23:43.415Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#28
File: tests/headers_test.zig:158-159
Timestamp: 2025-08-24T20:23:43.415Z
Learning: In NATS header parsing, when processing status lines like "NATS/1.0 503 No Responders", the 3-digit status code (e.g., "503") gets stored in the Status header, while any additional descriptive text (e.g., "No Responders") gets stored in a separate Description header. The Status header should contain only the numeric code, not the full line.
Applied to files:
tests/headers_test.zigsrc/message.zig
📚 Learning: 2025-08-25T02:49:59.082Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#28
File: src/response_manager.zig:89-105
Timestamp: 2025-08-25T02:49:59.082Z
Learning: In this NATS Zig codebase, subscriptions are reference counted, so calling deinit() on a subscription from multiple locations (like both Connection and ResponseManager) is safe and doesn't cause double-free issues.
Applied to files:
src/request_reply_test.zigsrc/response_manager.zig
🔇 Additional comments (20)
src/request_reply_test.zig (2)
52-54: API update to Subscription.init/deinit looks correct.Passing the extra null argument and switching to parameterless deinit() match the updated Subscription API. No functional issues spotted here.
86-88: Consistent use of new Subscription API.Same here—the init(…, null) and deinit() changes align with the public API shift. Given subscriptions are ref-counted in this codebase, this remains safe from double-frees even when deinit is called from multiple owners.
src/nuid.zig (4)
9-9: Making NUID_TOTAL_LEN public is reasonable.This improves reuse at call sites and in tests without leaking internals. No issues.
121-125: Unit test updates align with the new API.Stack-allocating [NUID_TOTAL_LEN]u8 and calling next(&arr) is correct.
168-170: Uniqueness test uses value keys—good coverage.Using HashMap([NUID_TOTAL_LEN]u8, …) avoids heap churn and validates equality/hashing of the entire identifier.
222-224: Thread-safety test writes disjoint indices—good; minor note on context passing.Each thread writes to a unique range in results; no races. Passing ThreadContext by value into spawn is safe; results backing storage outlives threads. All good.
src/message.zig (3)
5-10: Header constants and status prefix centralization: LGTM.Using STATUS_HDR, DESCRIPTION_HDR, and NATS_STATUS_PREFIX improves consistency and avoids magic strings. Matches the “Status numeric only” convention.
225-231: isNoResponders() simplification looks good.Bool return and 503 check against the parsed Status header matches the intended behavior and simplifies call sites.
239-239: Using NATS_STATUS_PREFIX in encodeHeaders is correct.Always emitting "NATS/1.0" as the first line when headers are present is per protocol; no issues.
tests/all_tests.zig (1)
9-9: Confirmed: core_request_reply_test.zig is present and importedThe script found
tests/core_request_reply_test.zig, andtests/all_tests.zigcorrectly imports it via:pub const core_request_reply_tests = @import("core_request_reply_test.zig");CI will exercise the new request/reply behavior as intended.
tests/headers_test.zig (2)
154-154: LGTM! The inline status format is correct.The test now correctly simulates a NATS-style inline status response format (
NATS/1.0 503) which gets parsed into separate Status and Description headers internally.
158-158: The non-try call aligns with the boolean return type.The
isNoResponders()method now returns a boolean directly instead of an error union, which simplifies the API.tests/core_request_reply_test.zig (2)
88-94: Verify proper cleanup in concurrent test.The test allocates memory in a loop but only defers the free once per iteration, which is correct. However, consider extracting the request creation into a helper function for better maintainability.
11-15: Potential memory leak in error scenarios.The handler allocates memory but may return early on errors without freeing it. While the test allocator might handle this, it's better to ensure proper cleanup.
fn echoHandler(msg: *nats.Message, connection: *nats.Connection) void { defer msg.deinit(); if (msg.reply) |reply_subject| { - const response = std.fmt.allocPrint(std.testing.allocator, "echo: {s}", .{msg.data}) catch return; + const response = std.fmt.allocPrint(std.testing.allocator, "echo: {s}", .{msg.data}) catch { + log.err("Failed to allocate response", .{}); + return; + }; defer std.testing.allocator.free(response); connection.publish(reply_subject, response) catch return;Likely an incorrect or invalid review comment.
src/connection.zig (2)
699-700: Good defensive programming with AND logic forno_responders.The code correctly enables no_responders only when both the client option is set AND the server supports headers. This prevents protocol errors with older servers.
655-669: No leak in pending map on publish failureThe
defer self.response_manager.cleanupRequest(handle);is registered immediately aftercreateRequest, so regardless of whetherpublishRequestsucceeds or errors,cleanupRequestwill execute and remove the request from the pending map. No additional error-handling is needed here.Likely an incorrect or invalid review comment.
src/response_manager.zig (4)
50-52: Reference counting safety confirmed.Based on the retrieved learning, subscriptions in this codebase are reference counted, so setting
resp_muxto null here is safe as the Connection properly manages the subscription lifecycle.
140-161: Robust timeout handling with proper synchronization.The implementation correctly handles the timeout case by checking elapsed time before and after waiting. The use of
timedWaitwith the remaining time ensures accurate timeout behavior.
185-191: Good message ownership management.The code correctly transfers message ownership when storing a valid response and retains ownership for cleanup when it's a NoResponders error. This prevents memory leaks.
244-245: Test validates timeout accuracy.Good test that verifies both the error condition and that the actual elapsed time meets the minimum expected duration.
Connection.request now returns !*Message instead of !?*Message. Updated examples/requestor.zig and tests to use direct assignment and removed optional checking patterns. Added request/reply example to README.md.
- Fix HashMap entry pointer validity issue in waitForResponse by looking up entry fresh each iteration after timedWait, since HashMap modifications can invalidate existing entry pointers - Use removeByPtr for efficient removal when transferring ownership - Change InvalidToken error to more descriptive UnknownRequest - Add is_closed check in responseHandler to prevent memory leaks during shutdown by avoiding processing of late-arriving responses - Remove entry from map when waitForResponse succeeds, making cleanupRequest a noop and eliminating double-free potential
Remove unused parameters from createRequest(): 1. Remove unused data parameter - was ignored with underscore 2. Remove subject parameter - only used for debug logging Move request logging to Connection.request() where subject context is meaningful. createRequest() now has a clean API focused solely on creating request tracking entries.
Allow callers to specify which allocator to use for reply subject allocation, providing more flexibility for memory management. This enables using arena allocators or other specialized allocators for request-scoped memory.
After resetSequence(), sequence + increment could exceed MAX_SEQUENCE if the randomized sequence was close to the maximum and increment was large. This would cause silent truncation in encodeBase36. - Calculate increment first, then limit sequence to MAX_SEQUENCE - increment - Add comprehensive test verifying sequence + increment < MAX_SEQUENCE - Ensures next() never overflows after resetSequence() Co-Authored-By: Claude <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (5)
examples/requestor.zig (1)
24-27: Optional: distinguish NoResponders from generic failures in the exampleSince request now surfaces NoResponders as an error, showing a tailored message improves DX and documents the new behavior.
Example tweak (outside the changed lines, for illustration):
const reply = conn.request("help", "really need some", std.time.ns_per_s * 5) catch |err| { if (err == error.NoResponders) { std.debug.print("No responders for subject 'help'\n", .{}); } else if (err == error.Timeout) { std.debug.print("Request timed out\n", .{}); } else { std.debug.print("Request failed: {}\n", .{err}); } std.process.exit(2); };src/response_manager.zig (4)
120-122: Guard against use before initializationIf getReplySubject is called before ensureInitialized, the prefix is empty and subjects will be malformed. Add an assertion to catch misuse in debug, or return an explicit error.
pub fn getReplySubject(self: *ResponseManager, allocator: std.mem.Allocator, handle: RequestHandle) ![]u8 { + std.debug.assert(self.resp_sub_prefix.len != 0); // ensureInitialized() should have been called return try std.fmt.allocPrint(allocator, "{s}{d}", .{ self.resp_sub_prefix, handle.rid }); }
142-169: Minor: handle timedWait errors explicitly for claritySemantics are correct, but being explicit about error.Timeout makes the intent clearer and avoids relying on try propagation for control flow.
- // After this call, any entry pointers become invalid due to potential HashMap modifications - try self.pending_condition.timedWait(&self.pending_mutex, timeout_ns - elapsed); + // After this call, any entry pointers become invalid due to potential HashMap modifications + if (self.pending_condition.timedWait(&self.pending_mutex, timeout_ns - elapsed)) |_| { + // woke up via broadcast/spurious; loop and re-check + } else |e| { + if (e == error.Timeout) return error.Timeout; + return e; + }
191-204: Demote “unknown rid” to debug to avoid noisy logs during cancellationscleanupRequest can legitimately remove entries while a response is in-flight. Logging this as warn will be noisy in cancellations/timeouts. Demote to debug and clarify context.
- const entry = self.pending_responses.getEntry(rid) orelse { - log.warn("Received response for unknown rid: {d}", .{rid}); + const entry = self.pending_responses.getEntry(rid) orelse { + log.debug("Received response for unknown or already-cleaned rid: {d}", .{rid}); return; };
221-238: Tests exercise RID sequencing; consider adding init/teardown & cancellation coverageThe current tests validate RID assignment and timeout. Adding a test for ensureInitialized/getReplySubject (prefix non-empty) and a cancellation test (cleanupRequest waking waiters) would lock in the intended behavior and prevent regressions.
Would you like me to draft these tests following the project’s testing patterns?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (5)
README.md(1 hunks)examples/requestor.zig(1 hunks)src/connection.zig(10 hunks)src/response_manager.zig(1 hunks)tests/core_request_reply_test.zig(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- README.md
🚧 Files skipped from review as they are similar to previous changes (2)
- src/connection.zig
- tests/core_request_reply_test.zig
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: lalinsky
PR: lalinsky/nats.zig#28
File: src/response_manager.zig:89-105
Timestamp: 2025-08-25T02:49:59.082Z
Learning: In this NATS Zig codebase, subscriptions are reference counted, so calling deinit() on a subscription from multiple locations (like both Connection and ResponseManager) is safe and doesn't cause double-free issues.
📚 Learning: 2025-08-25T02:49:59.082Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#28
File: src/response_manager.zig:89-105
Timestamp: 2025-08-25T02:49:59.082Z
Learning: In this NATS Zig codebase, subscriptions are reference counted, so calling deinit() on a subscription from multiple locations (like both Connection and ResponseManager) is safe and doesn't cause double-free issues.
Applied to files:
src/response_manager.zig
🔇 Additional comments (1)
examples/requestor.zig (1)
28-31: Good: reply ownership is explicit and safely releasedAdding defer reply.deinit() and using reply.data directly reflects the new API where request returns a Message*. This avoids leaks and simplifies the happy path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
src/nuid.zig (1)
108-112: Consider buffer reuse optimization fornextString.While the current implementation is correct, you could optimize memory usage by offering an alternative that accepts a pre-allocated buffer for applications that generate many NUIDs as strings.
Consider adding an additional helper function:
+/// Generate the next NUID using a provided buffer and return it as a string slice +pub fn nextStringWithBuffer(buffer: *[NUID_TOTAL_LEN]u8) []u8 { + next(buffer); + return buffer[0..]; +}This would allow callers to reuse buffers when generating multiple NUID strings, reducing allocations.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
src/nuid.zig(7 hunks)
🔇 Additional comments (6)
src/nuid.zig (6)
9-9: LGTM! Good API design decision.Exposing
NUID_TOTAL_LENas a public constant allows callers to properly allocate buffers at compile time, which is essential for the new buffer-based API.
40-46: Excellent overflow prevention logic!The new sequence generation ensures
sequence + increment < MAX_SEQUENCE, preventing overflow issues that could compromise NUID uniqueness. The calculation correctly bounds the initial sequence value to guarantee safe increments.
49-63: Well-designed buffer-based API.The change from returning a fixed array to accepting a buffer parameter eliminates allocation overhead and gives callers control over memory management. This is particularly beneficial for hot paths like request/reply operations.
103-105: Clean public API transformation.The simplified public
next()function properly delegates to the thread-safe global instance while maintaining the buffer-based approach.
194-259: Comprehensive thread safety test coverage.The multi-threaded test properly validates thread safety by spawning multiple threads that generate NUIDs concurrently and verifies uniqueness across all generated values. The per-thread buffer allocation correctly follows the new API pattern.
261-278: Excellent overflow safety validation.The new test thoroughly validates the overflow prevention logic by testing many random combinations and ensuring the invariant
sequence + increment < MAX_SEQUENCEalways holds. This provides strong confidence in the correctness of the overflow prevention mechanism.
Previously, cleanupRequest would remove pending response entries without notifying waiting threads, causing them to hang until timeout. Now broadcasts to the condition variable after successful entry removal to immediately wake waiters so they can return error.UnknownRequest.
Previously, deinit would only null the resp_mux pointer without calling the subscription's deinit, potentially causing callback races. Now explicitly calls subscription.deinit() while holding the pending_mutex before nulling the pointer, ensuring proper cleanup and reducing race conditions. Double deinit is safe due to refcounting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (3)
src/response_manager.zig (3)
127-146: Broadcast on cleanup looks good.Thank you for waking waiters after
fetchRemove; this prevents hanging until timeout when the caller proactively cancels. Matches prior guidance.
59-63: Acknowledging proactive subscription release.Good call explicitly releasing
resp_muxin shutdown (subscriptions are ref-counted in this codebase). Combined with the lock-order fix suggested above, this minimizes callback races and avoids deadlocks.
87-107: Avoid deadlock: don’t hold pending_mutex across connection.subscribe; use double-checked init.
ensureInitializedlockspending_mutexand then callsconnection.subscribe(...). Ifsubscribetakesconnection.mutex(it typically does) while the dispatch path takesconnection.mutexthen callsresponseHandler(which lockspending_mutex), you can deadlock. Initialize the prefix under the lock, unlock, performsubscribe, then publish the result under the lock (dropping duplicates).pub fn ensureInitialized(self: *ResponseManager, connection: *Connection) !void { - self.pending_mutex.lock(); - defer self.pending_mutex.unlock(); - - // Already initialized by another thread - if (self.resp_mux != null) return; - - // Create an unique identifier for the response manager - var nuid_buf: [nuid.NUID_TOTAL_LEN]u8 = undefined; - nuid.next(&nuid_buf); - self.resp_sub_prefix = std.fmt.bufPrint(&self.resp_sub_prefix_buf, "_INBOX.{s}.", .{&nuid_buf}) catch unreachable; - - // Create wildcard subscription using a temporary buffer - var subject_buf: [INBOX_PREFIX_LEN + 1]u8 = undefined; - const subject = std.fmt.bufPrint(&subject_buf, "{s}*", .{self.resp_sub_prefix}) catch unreachable; - - // Subscribe to the wildcard subject - self.resp_mux = try connection.subscribe(subject, responseHandlerWrapper, .{self}); - - log.debug("Initialized response manager with prefix: {s}, wildcard: {s}", .{ self.resp_sub_prefix, subject }); + // Fast path + self.pending_mutex.lock(); + if (self.resp_mux != null) { + self.pending_mutex.unlock(); + return; + } + // Initialize prefix once + if (self.resp_sub_prefix.len == 0) { + var nuid_buf: [nuid.NUID_TOTAL_LEN]u8 = undefined; + nuid.next(&nuid_buf); + self.resp_sub_prefix = std.fmt.bufPrint(&self.resp_sub_prefix_buf, "_INBOX.{s}.", .{&nuid_buf}) catch unreachable; + } + const prefix = self.resp_sub_prefix; // snapshot for use outside the lock + self.pending_mutex.unlock(); + + // Subscribe outside the manager lock + var subject_buf: [INBOX_PREFIX_LEN + 1]u8 = undefined; + const subject = std.fmt.bufPrint(&subject_buf, "{s}*", .{prefix}) catch unreachable; + const sub = try connection.subscribe(subject, responseHandlerWrapper, .{self}); + + // Publish the subscription if still needed + self.pending_mutex.lock(); + defer self.pending_mutex.unlock(); + if (self.resp_mux == null) { + self.resp_mux = sub; + log.debug("Initialized response manager with prefix: {s}, wildcard: {s}", .{ prefix, subject }); + } else { + // Another thread won; drop duplicate (ref-counted). + sub.deinit(); + } }
🧹 Nitpick comments (6)
src/response_manager.zig (6)
6-6: Remove unused import (inbox.zig).The
inboximport is not referenced in this file.-const inbox = @import("inbox.zig");
33-40: Shared condition is fine now; consider per-request signaling if herd shows up.The single condition/broadcast approach is simple and correct. If you observe contention under high concurrency, consider storing a tiny per-request wait primitive (e.g., a boolean + Condition or a waiter token) to avoid waking unrelated waiters. Keep as-is unless profiling flags it.
123-126: Guard against uninitialized prefix and avoid heap allocation hot-path.
- If
getReplySubjectis called beforeensureInitialized,resp_sub_prefixis empty, yielding an invalid subject. Add a defensive debug assert.- This allocates every call; provide a zero-allocation variant that prints into a caller buffer to reduce pressure in hot paths.
pub fn getReplySubject(self: *ResponseManager, allocator: std.mem.Allocator, handle: RequestHandle) ![]u8 { - return try std.fmt.allocPrint(allocator, "{s}{d}", .{ self.resp_sub_prefix, handle.rid }); + std.debug.assert(self.resp_sub_prefix.len != 0); + return try std.fmt.allocPrint(allocator, "{s}{d}", .{ self.resp_sub_prefix, handle.rid }); } + +pub fn getReplySubjectInto(self: *ResponseManager, buf: []u8, handle: RequestHandle) ![]u8 { + std.debug.assert(self.resp_sub_prefix.len != 0); + return try std.fmt.bufPrint(buf, "{s}{d}", .{ self.resp_sub_prefix, handle.rid }); +}
197-200: Demote unknown RID log level; late replies after timeout are normal.Receiving a response for an unknown RID usually means the requester timed out and cleaned up before the reply arrived. This is expected under load and shouldn’t be a warn-level event.
- log.warn("Received response for unknown rid: {d}", .{rid}); + log.debug("Received response for unknown rid: {d} (likely late reply after timeout)", .{rid});
227-245: Basic handle test is fine; minor nit on RID expectations.Asserting specific RID values is fine for a fresh manager instance. If instantiation ever changes (e.g., RID seeding), consider loosening to monotonicity only.
256-263: Use std.time.Timer for monotonic timing in tests.
nanoTimestamp()can be platform-dependent. Since you already useTimerelsewhere, prefer it here for monotonic elapsed time measurement.- const start = std.time.nanoTimestamp(); - const result = manager.waitForResponse(handle, 1_000_000); // 1ms timeout - const duration = std.time.nanoTimestamp() - start; + var t = std.time.Timer.start() catch unreachable; + const result = manager.waitForResponse(handle, 1_000_000); // 1ms timeout + const duration = t.read();
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
src/response_manager.zig(1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-25T02:49:59.082Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#28
File: src/response_manager.zig:89-105
Timestamp: 2025-08-25T02:49:59.082Z
Learning: In this NATS Zig codebase, subscriptions are reference counted, so calling deinit() on a subscription from multiple locations (like both Connection and ResponseManager) is safe and doesn't cause double-free issues.
Applied to files:
src/response_manager.zig
🔇 Additional comments (1)
src/response_manager.zig (1)
202-209: NoResponders path and message ownership transfer look correct.
- Mapping inline 503 to
error.NoRespondersis the right call.- Ownership semantics are correct (deinit on error path, transfer on success).
| pub fn deinit(self: *ResponseManager) void { | ||
| // Signal shutdown and wake up all waiters | ||
| self.pending_mutex.lock(); | ||
| defer self.pending_mutex.unlock(); | ||
|
|
||
| // Explicitly release the resp_mux subscription to reduce callback races | ||
| if (self.resp_mux) |subscription| { | ||
| subscription.deinit(); | ||
| } | ||
| self.resp_mux = null; | ||
|
|
||
| self.is_closed = true; | ||
| self.pending_condition.broadcast(); // Wake up all waiters | ||
|
|
||
| // Clean up any remaining pending responses | ||
| if (self.pending_responses.count() > 0) { | ||
| log.warn("Cleaning up {} remaining pending responses during shutdown", .{self.pending_responses.count()}); | ||
| var iterator = self.pending_responses.iterator(); | ||
| while (iterator.next()) |entry| { | ||
| // If there's a pending message, clean it up | ||
| if (entry.value_ptr.*) |result| { | ||
| if (result) |msg| { | ||
| msg.deinit(); | ||
| } else |_| { | ||
| // Error result, nothing to clean up | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| self.pending_responses.deinit(self.allocator); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix lock inversion in deinit: don’t call Subscription.deinit() while holding pending_mutex.
Current flow holds pending_mutex and then calls subscription.deinit(). If deinit() acquires connection.mutex (very likely) while the dispatch path holds connection.mutex and calls your response handler (which then tries to lock pending_mutex), you have a classic lock-order inversion and potential deadlock.
Detach the subscription and flip is_closed under pending_mutex, then release the lock before calling sub.deinit(). Finish map cleanup afterwards under the lock.
pub fn deinit(self: *ResponseManager) void {
- // Signal shutdown and wake up all waiters
- self.pending_mutex.lock();
- defer self.pending_mutex.unlock();
-
- // Explicitly release the resp_mux subscription to reduce callback races
- if (self.resp_mux) |subscription| {
- subscription.deinit();
- }
- self.resp_mux = null;
-
- self.is_closed = true;
- self.pending_condition.broadcast(); // Wake up all waiters
-
- // Clean up any remaining pending responses
- if (self.pending_responses.count() > 0) {
- log.warn("Cleaning up {} remaining pending responses during shutdown", .{self.pending_responses.count()});
- var iterator = self.pending_responses.iterator();
- while (iterator.next()) |entry| {
- // If there's a pending message, clean it up
- if (entry.value_ptr.*) |result| {
- if (result) |msg| {
- msg.deinit();
- } else |_| {
- // Error result, nothing to clean up
- }
- }
- }
- }
-
- self.pending_responses.deinit(self.allocator);
+ // Phase 1: flip closed and detach subscription under the manager lock
+ var sub: ?*Subscription = null;
+ self.pending_mutex.lock();
+ self.is_closed = true;
+ sub = self.resp_mux;
+ self.resp_mux = null;
+ // Wake waiters promptly
+ self.pending_condition.broadcast();
+ self.pending_mutex.unlock();
+
+ // Phase 2: release the subscription outside the manager lock
+ if (sub) |s| s.deinit();
+
+ // Phase 3: clean up remaining pending responses under the lock
+ self.pending_mutex.lock();
+ defer self.pending_mutex.unlock();
+ if (self.pending_responses.count() > 0) {
+ log.warn("Cleaning up {} remaining pending responses during shutdown", .{self.pending_responses.count()});
+ var it = self.pending_responses.iterator();
+ while (it.next()) |entry| {
+ if (entry.value_ptr.*) |result| {
+ if (result) |msg| msg.deinit() else |_| {}
+ }
+ }
+ }
+ self.pending_responses.deinit(self.allocator);
}Note: Subscriptions are ref-counted in this codebase, so releasing here and elsewhere is safe per previous learnings.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| pub fn deinit(self: *ResponseManager) void { | |
| // Signal shutdown and wake up all waiters | |
| self.pending_mutex.lock(); | |
| defer self.pending_mutex.unlock(); | |
| // Explicitly release the resp_mux subscription to reduce callback races | |
| if (self.resp_mux) |subscription| { | |
| subscription.deinit(); | |
| } | |
| self.resp_mux = null; | |
| self.is_closed = true; | |
| self.pending_condition.broadcast(); // Wake up all waiters | |
| // Clean up any remaining pending responses | |
| if (self.pending_responses.count() > 0) { | |
| log.warn("Cleaning up {} remaining pending responses during shutdown", .{self.pending_responses.count()}); | |
| var iterator = self.pending_responses.iterator(); | |
| while (iterator.next()) |entry| { | |
| // If there's a pending message, clean it up | |
| if (entry.value_ptr.*) |result| { | |
| if (result) |msg| { | |
| msg.deinit(); | |
| } else |_| { | |
| // Error result, nothing to clean up | |
| } | |
| } | |
| } | |
| } | |
| self.pending_responses.deinit(self.allocator); | |
| } | |
| pub fn deinit(self: *ResponseManager) void { | |
| // Phase 1: flip closed and detach subscription under the manager lock | |
| var sub: ?*Subscription = null; | |
| self.pending_mutex.lock(); | |
| self.is_closed = true; | |
| sub = self.resp_mux; | |
| self.resp_mux = null; | |
| // Wake waiters promptly | |
| self.pending_condition.broadcast(); | |
| self.pending_mutex.unlock(); | |
| // Phase 2: release the subscription outside the manager lock | |
| if (sub) |s| s.deinit(); | |
| // Phase 3: clean up remaining pending responses under the lock | |
| self.pending_mutex.lock(); | |
| defer self.pending_mutex.unlock(); | |
| if (self.pending_responses.count() > 0) { | |
| log.warn("Cleaning up {} remaining pending responses during shutdown", .{self.pending_responses.count()}); | |
| var it = self.pending_responses.iterator(); | |
| while (it.next()) |entry| { | |
| if (entry.value_ptr.*) |result| { | |
| if (result) |msg| msg.deinit() else |_| {} | |
| } | |
| } | |
| } | |
| self.pending_responses.deinit(self.allocator); | |
| } |
🤖 Prompt for AI Agents
In src/response_manager.zig around lines 54 to 85, avoid calling
subscription.deinit() while holding pending_mutex to prevent lock-order
inversion; under pending_mutex: capture and null out self.resp_mux (store it
into a local variable), set self.is_closed = true, and broadcast
pending_condition, then release pending_mutex; after unlocking, call
subscription.deinit() on the captured local if non-null; finally re-acquire
pending_mutex to iterate/clean pending_responses and deinit the map/allocator as
before.
Summary
This PR implements a centralized response management system that uses a single shared subscription for all request/reply operations instead of creating individual subscriptions per request.
Key Changes
ResponseManager
_INBOX.{nuid}.*pattern instead of N subscriptions for N requestsConnection.request() API
*Messagedirectly instead of?*MessageNoResponderserror instead of null for better error semanticsEnhanced Header Parsing
NUID Improvements
Benefits
Testing
Breaking Changes
Connection.request()now returns*Messageinstead of?*MessageMessage.isNoResponders()returnsboolinstead of!boolSummary by CodeRabbit
New Features
Behavior Changes
Tests
Documentation