-
Notifications
You must be signed in to change notification settings - Fork 1
Add publishMsg() and requestMsg() functions #37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- Add PublishError with specific error variants (MaxPayload, InsufficientBuffer, InvalidSubject) - Consolidate publish methods using internal publishMsgInternal() - Add publishMsg() and publishRequestMsg() for message-based publishing - Implement payload size validation against server limits - Add requestMsg() method for message-based requests - Improve Message struct with better defaults and simplified initialization - Add ASAP flush mode for time-sensitive operations - Implement "no responders" detection in request/reply - Add proper subject validation and protocol compliance
WalkthroughAdds PublishError and ConnectionClosedError, integrates PublishError into ConnectionError, exposes publish/request APIs and flusher_asap, makes buffering/flush asap-aware, revises publish path (headers, size checks, reply handling), updates reconnect/resend workflow, modifies Message defaults and header parsing, re-exports PublishError, and adds requestMsg tests. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant App
participant Connection
participant RespMgr as ResponseManager
participant Server
App->>Connection: requestMsg(msg, timeout_ms)
activate Connection
Connection->>RespMgr: registerInbox(timeout)
RespMgr-->>Connection: replyInbox
Connection->>Server: HPUB/PUB (subject, replyInbox, headers?, data) [asap flag]
Server-->>RespMgr: MSG (replyInbox, data/headers)
RespMgr-->>App: deliver response Message
deactivate Connection
rect rgba(230,250,230,0.25)
note over Connection,RespMgr: If no responders detected → NoResponders error
end
sequenceDiagram
autonumber
participant App
participant Connection
participant Flusher as FlusherLoop
participant Socket
App->>Connection: publishMsg(msg)
activate Connection
Connection->>Connection: publishMsgInternal(..., asap)
Connection->>Connection: bufferWrite(bytes, asap)
alt asap = true
Connection->>Flusher: wake (set flusher_asap)
else asap = false
note over Connection,Flusher: may defer wake
end
Flusher->>Socket: write buffered bytes
deactivate Connection
sequenceDiagram
autonumber
participant Connection
participant ServerPool
participant Socket
participant SubMgr as Subscriptions
Connection->>Connection: DoReconnect()
Connection->>ServerPool: choose next server
Connection->>Socket: open & handshake (INFO/CONNECT)
alt handshake success
Connection->>SubMgr: resend subscriptions
Connection->>Socket: flush pending buffer (asap)
else handshake failure
Connection->>ServerPool: try next
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. ✨ Finishing Touches🧪 Generate unit tests
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
Status, Documentation and Community
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (8)
src/message.zig (2)
106-111: Avoid repeated parsing attempts when raw_headers is null.If needs_header_parsing were ever true while raw_headers is null, ensureHeadersParsed() would keep returning early without flipping the flag. Make it idempotent by clearing the flag when there’s nothing to parse.
- const raw = self.raw_headers orelse return; + const raw = self.raw_headers orelse { + // Nothing to parse; avoid retrying on every header access. + self.needs_header_parsing = false; + return; + };
177-193: Header key case handling (consider canonicalization).NATS header field names are conventionally treated case-insensitively. Using StringHashMapUnmanaged with raw keys makes lookups case-sensitive. Consider canonicalizing keys (e.g., lower-case) on set/get/delete to avoid surprises when users vary casing.
- pub fn headerSet(self: *Self, key: []const u8, value: []const u8) !void { + pub fn headerSet(self: *Self, key: []const u8, value: []const u8) !void { try self.ensureHeadersParsed(); const arena_allocator = self.arena.allocator(); // Remove existing values (arena will clean up memory automatically) - _ = self.headers.fetchRemove(key); + const canon_key = try toLowerDup(arena_allocator, key); + _ = self.headers.fetchRemove(canon_key); // Add new value - const owned_key = try arena_allocator.dupe(u8, key); + const owned_key = canon_key; // already duplicated above const owned_value = try arena_allocator.dupe(u8, value); var values: ArrayListUnmanaged([]const u8) = .{}; try values.append(arena_allocator, owned_value); try self.headers.put(arena_allocator, owned_key, values); }Add helper (outside this range, for clarity):
fn toLowerDup(allocator: Allocator, s: []const u8) ![]u8 { var out = try allocator.dupe(u8, s); for (out) |*c| c.* = std.ascii.toLower(c.*); return out; }If you adopt canonicalization, mirror it in headerGet/headerGetAll/headerDelete to lower-case the query key before lookup.
tests/core_request_reply_test.zig (3)
158-178: Prefer conn.flush() over sleeping to reduce flakiness.Rely on protocol-level flush (PING/PONG) instead of timing assumptions. This improves determinism in CI.
- std.time.sleep(10_000_000); // 10ms + try conn.flush();Recommend making the same change in earlier tests in this file too.
180-205: Use flush instead of sleep before issuing request.Same rationale as above; avoids brittle timing.
- std.time.sleep(10_000_000); // 10ms + try conn.flush();
206-216: Validation error coverage looks good.The InvalidSubject case is exercised correctly for requestMsg(). Consider adding a complementary MaxPayload overflow test when feasible.
If you’d like, I can sketch a MaxPayload test harness that fakes a small server_info.max_payload.
src/connection.zig (3)
1061-1076: Replace direct std.debug.print with structured logging or gate behind trace.Direct prints in library code are noisy. Use log.debug and/or guard by options.trace.
- std.debug.print("processPing: status={}, stream={}\n", .{ self.status, self.stream != null }); + if (self.options.trace) log.debug("processPing: status={}, stream={}", .{ self.status, self.stream != null }); ... - std.debug.print("processPing: about to writeAll PONG\n", .{}); + if (self.options.trace) log.debug("processPing: about to writeAll PONG", .{}); ... - std.debug.print("processPing: writeAll failed: {}\n", .{err}); + if (self.options.trace) log.debug("processPing: writeAll failed: {}", .{err}); ... - std.debug.print("processPing: PONG write took {d}ms\n", .{@divTrunc(std.time.nanoTimestamp() - ping_start, std.time.ns_per_ms)}); + if (self.options.trace) log.debug("processPing: PONG write took {d}ms", .{@divTrunc(std.time.nanoTimestamp() - ping_start, std.time.ns_per_ms)}); ... - std.debug.print("processPing: skipped (not connected or no stream)\n", .{}); + if (self.options.trace) log.debug("processPing: skipped (not connected or no stream)", .{});
553-577: Comment nit: “Lock asaply like C library”.Minor typo; consider “Lock early like the C library” or drop the adverb.
- // Lock asaply like C library + // Lock early like the C libraryAlso applies to: 583-614, 619-640
477-505: Subject validation could be stricter (optional).Currently only empty subject is rejected. Consider adding a validateSubject() helper to reject whitespace and invalid wildcard placement to align with protocol expectations. Hook it right after the empty check.
I can draft a lightweight validator (no regex) if you want to tighten this in a follow-up.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (4)
src/connection.zig(13 hunks)src/message.zig(5 hunks)src/root.zig(1 hunks)tests/core_request_reply_test.zig(1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-24T20:23:43.415Z
Learnt from: lalinsky
PR: lalinsky/nats.zig#28
File: tests/headers_test.zig:158-159
Timestamp: 2025-08-24T20:23:43.415Z
Learning: In NATS header parsing, when processing status lines like "NATS/1.0 503 No Responders", the 3-digit status code (e.g., "503") gets stored in the Status header, while any additional descriptive text (e.g., "No Responders") gets stored in a separate Description header. The Status header should contain only the numeric code, not the full line.
Applied to files:
src/message.zig
🔇 Additional comments (8)
src/message.zig (4)
29-41: Good defaults and simplified Message struct.The addition of sensible defaults for reply/headers/raw_headers and the new sid field look solid. This reduces boilerplate and aligns well with arena-backed lifetime management.
72-99: initWithHeaders(): solid API and memory ownership.Switching raw_headers to a non-null []const u8 and duplicating into the arena with needs_header_parsing=true is correct and makes ownership explicit.
118-151: Status/Description extraction matches spec and past learnings.Parsing “NATS/1.0 503 No Responders” into Status: "503" and Description: "No Responders" is correct and consistent with prior guidance. The Status header contains only the numeric code; any trailing text goes to Description.
Also applies to: 143-149
233-251: Header encoding first line (interoperability check).encodeHeaders() emits "NATS/1.0\r\n" followed by header lines (including Status/Description). That is accepted by servers and mirrors common client behavior, but some implementations also allow inline status on the first line. If cross-impl interop is a concern, consider adding a unit test that round-trips both forms through ensureHeadersParsed()/encodeHeaders().
Do you want me to add such tests under tests/headers_test.zig to lock this down?
src/root.zig (1)
21-21: Re-exporting PublishError at the root is a good API move.This makes error handling ergonomics better for users without reaching into connection.zig.
src/connection.zig (3)
446-455: publish()/publishRequest(): ephemeral Message construction is fine.Stack-allocating a lightweight Message with arena=undefined is a nice touch, given no headers are present. Guarding against accidental header usage is handled by the early has_headers check.
Also applies to: 461-470, 472-476
644-679: ASAP flush on PING makes sense for flush().Using bufferWrite(..., true) minimizes latency while waiting for PONG. This matches the PR’s “ASAP flush mode” goal.
875-882: ASAP-aware flusher loop is sound.The 1ms batching delay is skipped in ASAP mode, and flags are reset under the connection mutex. Concurrency looks safe here.
Also applies to: 920-938
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
src/connection.zig (1)
477-547: Address the reconnection buffering issue.The current implementation still has the early-return issue identified in the previous review. Lines 535-539 will prevent publishes during reconnection from being buffered, contradicting the intended behavior described in Line 920-922.
Apply the suggested fix from the previous review:
- switch (self.status) { - .connected, .reconnecting => {}, - .closed, .connecting, .disconnected, .draining_subs, .draining_pubs => { - return ConnectionError.ConnectionClosed; - }, - } + // Allow publishes when connected or reconnecting (buffered) + // Reject when not usable for sending + switch (self.status) { + .connected, .reconnecting => {}, + else => return ConnectionError.ConnectionClosed, + }However, the logic in lines 920-922 correctly handles reconnection buffering, so the main issue is that this early check prevents reaching that logic.
🧹 Nitpick comments (1)
src/connection.zig (1)
550-610: Fix typo in comments.Lines 550, 579, 616: "asaply" should be "ASAP" or "early".
- // Lock asaply like C library + // Lock early like C library
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
src/connection.zig(13 hunks)
🔇 Additional comments (13)
src/connection.zig (13)
107-112: LGTM: Proper error type alignment.The change from
ConnectionError.OutOfMemorytoPublishError.InsufficientBuffercorrectly aligns with the new error handling structure wherePublishErroris the appropriate type for buffering issues.
126-145: LGTM: Well-structured error hierarchy.The new error types follow a logical hierarchy:
ConnectionClosedErroras a specific base typePublishErrorcomposing multiple error types appropriatelyConnectionErrorproperly includingPublishErrorThis provides good granularity for error handling while maintaining backward compatibility.
219-219: LGTM: Support for ASAP flushing.Adding the
flusher_asapfield enables time-sensitive operations like flush and requests to bypass the normal flusher delay, which is essential for proper request/reply functionality.
447-459: LGTM: Clean API consolidation.The refactoring to use
publishMsgInternalprovides good code reuse while maintaining the existingpublishand newpublishMsgAPIs. The temporaryMessagestruct construction is appropriate for the legacy API.
462-475: LGTM: Consistent API design.The
publishRequestandpublishRequestMsgfunctions follow the same pattern as the regular publish methods, providing both data-based and message-based variants with proper parameter handling.
478-480: Subject validation logic is correct.Empty subject validation using
error.InvalidSubjectis appropriate and follows NATS protocol requirements.
486-501: LGTM: Proper payload size validation.The implementation correctly:
- Calculates total payload size including headers
- Validates against server limits
- Returns appropriate
PublishError.MaxPayloaderror
508-527: LGTM: Proper HPUB/PUB protocol implementation.The code correctly implements both HPUB (with headers) and PUB (without headers) message formats according to the NATS protocol, with proper size calculations and formatting.
641-654: LGTM: ASAP flushing for flush operations.Using
asap=truein Line 654 is correct for flush operations since they need immediate processing to ensure proper PING/PONG synchronization.
677-715: LGTM: Well-implemented request/reply functionality.The
requestandrequestMsgimplementations properly:
- Handle request lifecycle with cleanup
- Use ASAP publishing for time-sensitive requests
- Implement "no responders" detection
- Provide both data-based and message-based APIs
871-877: LGTM: Proper ASAP flushing logic.The flusher correctly bypasses the 1ms delay when
flusher_asapis set, and properly resets the flag after processing. This ensures time-sensitive operations get immediate attention.
916-934: LGTM: Improved flusher signaling.The enhanced signaling logic properly handles ASAP mode by allowing multiple signals to wake the flusher from its delay, ensuring responsive handling of time-sensitive operations.
904-911: LGTM: Proper vectored I/O implementation.The use of
writevAllwith proper error handling and buffer consumption provides efficient network I/O for the flusher.
Summary
PublishErrorwith specific error variants (MaxPayload,InsufficientBuffer,InvalidSubject)publishMsgInternal()publishMsg()andpublishRequestMsg()for message-based publishingrequestMsg()method for message-based requestsTest plan
requestMsg()functionalitySummary by CodeRabbit