-
Notifications
You must be signed in to change notification settings - Fork 1
Refactor readerLoop: extract readAndProcess() function #9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- Extract readAndProcess() function that returns specific error types - Simplify readerLoop to use Zig's native error handling - Centralize reconnection logic - reduce trigger points from 3 to 2 - Separate I/O operations from reconnection decisions - Eliminate complex mutex unlock/relock patterns - Improve testability by separating concerns This is Phase 1 of the readerLoop refactoring plan. All unit tests pass (25/25). Co-authored-by: Lukáš Lalinský <[email protected]>
|
Claude finished @lalinsky's task —— View job Code Review CompleteReviewing the readerLoop refactoring changes in this PR. Here's my todo list:
Review SummaryThis PR successfully extracts the I/O logic from ✅ Strengths
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@claude we are now accessing self.stream without holding a lock
|
Claude finished @lalinsky's task —— View job Analyzing the threading concern with Todo List
Analysis of the Threading IssueYou're absolutely correct about the threading concern. I found the specific problem: Problem Location: In the fn readAndProcess(self: *Self, buffer: []u8) !usize {
const stream = self.stream orelse return ConnectionError.ConnectionClosed; // ⚠️ RACE CONDITION
// Simple blocking read - shutdown() will wake us up
const bytes_read = try stream.read(buffer); // ⚠️ Using stream without lock
// ... rest of function
}The Race Condition:
Root Cause AnalysisLooking at the fn readerLoop(self: *Self) void {
// ...
self.mutex.lock();
defer self.mutex.unlock(); // Final cleanup
while (!self.should_stop.load(.acquire)) {
// Check status and stream under lock
if (self.status == .closed or self.status == .reconnecting) {
break;
}
// Unlock before I/O like C _readLoop
self.mutex.unlock();
defer self.mutex.lock(); // Re-lock at end of iteration
// Perform I/O without holding mutex ⚠️ PROBLEM HERE
if (self.readAndProcess(&buffer)) |bytes_read| {
// ...
}
}
}The intent is good (following the C library pattern of unlocking before I/O), but the implementation creates the race condition because Recommended SolutionsOption 1: Snapshot the stream under lock (Safest) fn readerLoop(self: *Self) void {
// ... existing code ...
while (!self.should_stop.load(.acquire)) {
// Get stream reference under lock
var stream_snapshot: ?net.Stream = null;
if (self.status == .closed or self.status == .reconnecting) {
break;
}
stream_snapshot = self.stream; // Snapshot under lock
// Unlock before I/O like C _readLoop
self.mutex.unlock();
defer self.mutex.lock();
// Use snapshots stream safely
if (self.readAndProcessWithStream(stream_snapshot, &buffer)) |bytes_read| {
// ...
}
}
}
fn readAndProcessWithStream(self: *Self, stream: ?net.Stream, buffer: []u8) !usize {
const s = stream orelse return ConnectionError.ConnectionClosed;
const bytes_read = try s.read(buffer);
// ... rest of existing logic
}Option 2: Check stream validity after I/O (Less ideal but simpler) fn readAndProcess(self: *Self, buffer: []u8) !usize {
const stream = self.stream orelse return ConnectionError.ConnectionClosed;
const bytes_read = stream.read(buffer) catch |err| {
// After I/O error, check if stream was closed during read
if (self.stream == null) {
return ConnectionError.ConnectionClosed;
}
return err;
};
// ... rest
}AssessmentThis is a critical threading bug that could cause:
The refactor successfully separated concerns but introduced this subtle concurrency issue. Option 1 (stream snapshotting) is the safest approach that maintains the C library's I/O-outside-mutex pattern while ensuring thread safety. |
|
@claude if we codify that the read thread is responsible for the lifetime of the stream, can we simplify the whole code somehow? Think harder |
|
Claude finished @lalinsky's task —— View job Analyzing the stream ownership model and potential simplifications Todo List
Stream Ownership AnalysisCurrent Model (Shared Ownership):
Reader Thread Ownership Model: If we codify that the read thread owns the stream lifetime, several simplifications become possible:
Key Insight: Separate Read and Write StreamsThe most elegant solution is to separate concerns completely: // Reader owns the read stream
reader_stream: ?net.Stream = null, // Only touched by reader thread
// Writers own their write streams/channels
write_channel: Channel(WriteRequest), // Thread-safe channelThis eliminates the race condition because:
Proposed ArchitectureOption A: Single Stream, Reader Ownership pub const Connection = struct {
// Remove: stream: ?net.Stream = null,
// Add: Let reader thread manage its own stream
// Reader thread creates/owns stream
fn readerLoop(self: *Self) void {
var my_stream: ?net.Stream = null;
defer if (my_stream) |s| s.close();
while (!self.should_stop.load(.acquire)) {
// Reader creates connection when needed
if (my_stream == null) {
my_stream = self.connectSocket() catch continue;
}
// Reader owns stream - no locking needed!
if (self.readAndProcess(my_stream.?, &buffer)) |_| {
// Success
} else |err| {
my_stream.?.close();
my_stream = null;
self.notifyReconnect(err);
}
}
}
// Other threads signal reconnect but don't touch stream
fn publish(self: *Self, data: []const u8) !void {
// Queue write request instead of direct stream access
try self.write_queue.put(WriteRequest{ .data = data });
}
};Option B: Dual Streams (Read + Write) pub const Connection = struct {
// Reader thread's private stream
reader_stream: std.atomic.Value(?*net.Stream) = std.atomic.Value(?*net.Stream).init(null),
// Writer thread's private stream
writer_stream: std.atomic.Value(?*net.Stream) = std.atomic.Value(?*net.Stream).init(null),
// Coordination
write_queue: Channel(WriteRequest),
reconnect_signal: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
};Simplifications Enabled
Recommended ApproachPhase 1: Reader ownership (simplest change)
Phase 2: Full separation (more ambitious)
Implementation StrategyFor immediate fix of the race condition: fn readerLoop(self: *Self) void {
var reader_stream: ?net.Stream = null;
defer if (reader_stream) |s| s.close();
while (!self.should_stop.load(.acquire)) {
// Create stream if needed (reader owns it)
if (reader_stream == null) {
reader_stream = self.createReaderSocket() catch continue;
}
// No mutex needed - reader owns this stream!
if (self.readAndProcessWithStream(reader_stream.?, &buffer)) |_| {
// Success - continue reading
} else |err| {
reader_stream.?.close();
reader_stream = null;
self.signalReconnect(err); // Signal, don't manage
continue;
}
}
}
fn readAndProcessWithStream(self: *Self, stream: net.Stream, buffer: []u8) !usize {
// No stream null checks needed - reader guarantees validity
const bytes_read = try stream.read(buffer);
// ... existing parsing logic
}This approach:
The key insight is that stream ownership should follow thread boundaries rather than object boundaries. This is a common pattern in systems programming where resources are owned by the threads that use them. |
|
@claude in the single ownership, who would handle write buffer flushes? |
|
Claude finished @lalinsky's task —— View job Analyzing write buffer flush handling in single stream ownership model Todo List
Current Write/Flush Architecture AnalysisAfter analyzing the codebase, here's how writes currently work: Current Multi-Threaded Write Model:
Write Flow (lines 807-822): fn bufferWrite(self: *Self, data: []const u8) !void {
// Buffer data (mutex held by caller)
try self.write_buffer.appendSlice(self.allocator, data);
// Signal flusher thread
if (!self.flusher_signaled) {
self.flusher_signaled = true;
self.flusher_condition.signal();
}
}Flusher Thread (lines 759-805):
Write Handling Options for Reader Stream OwnershipOption 1: Reader Handles Both Read and Write (Simplest) fn readerLoop(self: *Self) void {
var my_stream: ?net.Stream = null;
defer if (my_stream) |s| s.close();
while (!self.should_stop.load(.acquire)) {
// Reader creates and owns the stream
if (my_stream == null) {
my_stream = self.createReaderSocket() catch continue;
}
// Check for pending writes first
self.mutex.lock();
const has_writes = self.write_buffer.items.len > 0;
if (has_writes) {
// Copy buffer under lock
var write_data = try self.allocator.dupe(u8, self.write_buffer.items);
defer self.allocator.free(write_data);
self.write_buffer.clearRetainingCapacity();
self.mutex.unlock();
// Write without holding lock
my_stream.?.writeAll(write_data) catch |err| {
self.signalReconnect(err);
my_stream.?.close();
my_stream = null;
continue;
};
} else {
self.mutex.unlock();
}
// Then read
if (self.readAndProcessWithStream(my_stream.?, &buffer)) |_| {
// Success
} else |err| {
self.signalReconnect(err);
my_stream.?.close();
my_stream = null;
}
}
}Benefits:
Drawbacks:
Option 2: Dual Stream Model (More Complex) // Reader thread owns read stream
fn readerLoop(self: *Self) void {
var read_stream = self.createReadSocket() catch return;
defer read_stream.close();
// ... only reads
}
// Writer thread owns write stream
fn writerLoop(self: *Self) void {
var write_stream = self.createWriteSocket() catch return;
defer write_stream.close();
// ... only writes, similar to current flusher
}Benefits:
Drawbacks:
Option 3: Write Queue with Reader Ownership (Recommended) // Reader owns stream, but processes write queue
fn readerLoop(self: *Self) void {
var my_stream: ?net.Stream = null;
var write_queue = std.fifo.LinearFifo(WriteRequest, .Dynamic).init(self.allocator);
defer write_queue.deinit();
while (!self.should_stop.load(.acquire)) {
// Process writes first (drain queue)
while (write_queue.readItem()) |write_req| {
my_stream.?.writeAll(write_req.data) catch |err| {
// Handle write error
self.signalReconnect(err);
break;
};
}
// Then read with timeout
const read_result = my_stream.?.reader().readTimeout(&buffer, 1_000_000); // 1ms timeout
if (read_result) |bytes| {
if (bytes > 0) self.processBuffer(buffer[0..bytes]);
} else |err| switch (err) {
error.Timeout => continue, // Check write queue again
else => self.signalReconnect(err),
}
}
}Recommendation: Option 1 (Reader Handles Both)For immediate fix: Use Option 1 (Reader handles both read/write) because:
The write latency impact is minimal for most NATS use cases since:
Implementation plan:
This creates a clean, single-responsibility model where the reader thread manages the complete I/O lifecycle. |
|
@claude try to implement option 1 but keep in mind, that stream.read blocks indefinitely. You will need to change the strategy. |
|
Claude finished @lalinsky's task —— View job Claude finished implementing Option 1 with reader thread ownership! ✅ Todo List
✅ Solution ImplementedFixed the Race Condition: The critical threading bug where Key Changes:
Technical Details:
Benefits:
The implementation successfully addresses @lalinsky's concern about |
… I/O - Eliminate race condition where self.stream was accessed without locks - Reader thread now owns stream completely, eliminating TOCTTOU issues - Replace blocking stream.read() with std.posix.poll() and 10ms timeouts - Remove flusher thread - reader handles both reads and writes efficiently - Maintain all existing functionality while fixing threading safety - All 36 tests pass (25 unit + 11 integration) Benefits: - Thread-safe: No shared stream access between threads - Responsive: Writes processed immediately when no incoming data - Efficient: Poll-based I/O prevents indefinite blocking - Simplified: Single thread handles complete I/O lifecycle 🤖 Generated with [Claude Code](https://claude.ai/code) Co-authored-by: Lukáš Lalinský <[email protected]>
WalkthroughReplaced the separate flusher thread with a single reader thread that handles both reads and writes using posix poll. Updated buffering and publish paths to rely on reader-driven flushing. Adjusted connect/close/reconnect lifecycle accordingly. Added a readAndProcessWithStream helper and removed flusher-related public fields from Connection. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant App
participant Connection
participant ReaderThread
participant Stream
participant Parser
participant WriteBuffer
App->>Connection: connectToServer()
activate Connection
Connection->>ReaderThread: start()
deactivate Connection
loop Poll loop
ReaderThread->>Stream: poll(read|write)
alt Readable
ReaderThread->>Stream: read()
ReaderThread->>Parser: feed(data)
note right of Parser: On parse error<br/>reset parser state
end
alt Pending writes
ReaderThread->>WriteBuffer: copy pending data
ReaderThread->>Stream: write(copied data)
end
end
App->>Connection: publish()/bufferWrite()
Connection->>WriteBuffer: enqueue
note right of ReaderThread: Flush occurs on next poll iteration
alt Reconnect needed
ReaderThread-->>Connection: signal failure/exit
Connection->>ReaderThread: restart()
Connection->>Stream: re-establish + resubscribe
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Assessment against linked issues
Assessment against linked issues: Out-of-scope changes
Poem
✨ Finishing Touches🧪 Generate unit tests
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/connection.zig (1)
1072-1092: Avoid concurrent writes during reconnection: reorder operations so only the reader thread writes to the stream.After a successful handshake,
doReconnectcurrently:
- starts
reader_thread,- then resends subscriptions (direct
writeAll),- then flushes
pending_buffer(directwriteAll).This violates the “reader owns the stream” invariant and can interleave bytes with the reader’s own flushes, corrupting the protocol. Move re-subscription and pending flush before starting the reader, or enqueue them into
write_bufferand let the reader flush.Apply this diff to perform the writes before spawning the reader:
- // Restart reader thread for the new connection - self.reader_thread = std.Thread.spawn(.{}, readerLoop, .{self}) catch |err| { - log.err("Failed to restart reader thread: {}", .{err}); - self.triggerReconnect(err); - continue; // Try next server - }; - - // Reader thread handles both reads and writes - no flusher needed - - // Re-establish subscriptions (outside mutex like C library) - self.resendSubscriptions() catch |err| { - log.err("Failed to re-establish subscriptions: {}", .{err}); - // Continue anyway, connection is established - }; - - // Flush pending messages (outside mutex like C library) - self.pending_buffer.flush(self.stream.?) catch |err| { - log.warn("Failed to flush pending messages: {}", .{err}); - // Continue anyway, connection is established - }; + // Re-establish subscriptions before starting the reader to avoid concurrent writes + self.resendSubscriptions() catch |err| { + log.err("Failed to re-establish subscriptions: {}", .{err}); + // Continue anyway, connection is established + }; + + // Flush pending messages before starting the reader + self.pending_buffer.flush(self.stream.?) catch |err| { + log.warn("Failed to flush pending messages: {}", .{err}); + // Continue anyway, connection is established + }; + + // Now restart reader thread for the new connection + self.reader_thread = std.Thread.spawn(.{}, readerLoop, .{self}) catch |err| { + log.err("Failed to restart reader thread: {}", .{err}); + self.triggerReconnect(err); + continue; // Try next server + };Alternatively (arguably cleaner): build SUB commands and pending data under
self.mutexintoself.write_bufferand let the reader flush everything once started. I can draft that if you prefer.
🧹 Nitpick comments (8)
src/connection.zig (8)
717-726: Handle HUP/ERR/NVAL poll revents and treat EINTR specially.
- Currently only
POLL.INdrives reads. If the socket hangs up or errors,reventswill containHUP/ERR/NVAL; ignoring those can delay reconnects until another event occurs.- Also,
pollmay be interrupted by signals (EINTR). Treat that as a harmless retry, not as a reconnect trigger.Apply this diff (names assume Zig’s
std.posix.POLL.*constants; please adjust if your Zig version uses different symbols):- const poll_result = std.posix.poll(&pollfd, POLL_TIMEOUT_MS) catch |err| { - log.err("Poll error: {}", .{err}); - self.triggerReconnect(err); - break; - }; + const poll_result = std.posix.poll(&pollfd, POLL_TIMEOUT_MS) catch |err| { + // Retry on EINTR instead of reconnecting. + switch (err) { + error.Interrupted => continue, + else => { + log.err("Poll error: {}", .{err}); + self.triggerReconnect(err); + break; + }, + } + }; + + // Fast-path reconnect on HUP/ERR/NVAL + if (poll_result > 0 and + (pollfd[0].revents & (std.posix.POLL.ERR | std.posix.POLL.HUP | std.posix.POLL.NVAL)) != 0) + { + log.debug("Poll hangup/error revents=0x{x}", .{pollfd[0].revents}); + self.triggerReconnect(ConnectionError.ConnectionClosed); + break; + }If you’d like, I can cross-check the exact
std.posixerror and flag names for your Zig version.Also applies to: 744-745
762-785: Avoid per-iteration copy of the entire write buffer; use toOwnedSlice to transfer ownership.
dupecopies the entirewrite_bufferon each flush, doubling memory traffic.ArrayListUnmanaged.toOwnedSlice(allocator)avoids the copy and hands you the backing slice while clearing the list.Apply this diff:
- if (has_writes) { - // Copy buffer under lock - write_data = self.allocator.dupe(u8, self.write_buffer.items) catch { - self.mutex.unlock(); - continue; - }; - self.write_buffer.clearRetainingCapacity(); - } + if (has_writes) { + // Transfer ownership of the buffer under lock (no copy) + write_data = self.write_buffer.toOwnedSlice(self.allocator) catch { + self.mutex.unlock(); + continue; + }; + }
672-691: Reduce log noise and potential PII leakage; truncate raw read dumps.
log.debug("Read {} bytes: {s}", .{ bytes_read, buffer[0..bytes_read] });prints the entire payload verbatim. For large/binary frames this is noisy and may leak data.Apply this diff:
- log.debug("Read {} bytes: {s}", .{ bytes_read, buffer[0..bytes_read] }); + const preview: usize = @min(bytes_read, 256); + if (bytes_read <= preview) { + log.debug("Read {} bytes: {s}", .{ bytes_read, buffer[0..bytes_read] }); + } else { + log.debug("Read {} bytes (truncated to 256): {s}...", .{ bytes_read, buffer[0..preview] }); + }
717-726: Minor: event set could include OUT if you later switch to non-blocking writes.Right now you flush writes on the 10ms poll tick regardless of socket writability. If you move to non-blocking
write()(to avoid long blockingwriteAll()that can starve reads), consider addingPOLL.OUTand only writing when set.If you want, I can sketch a small state machine for read-then-write with
POLL.IN|OUT.
725-726: Nit: makePOLL_TIMEOUT_MSexplicitly i32 to matchposix.pollsignature.Clarity/readability only.
Apply this diff:
- const POLL_TIMEOUT_MS = 10; // Check for writes every 10ms + const POLL_TIMEOUT_MS: i32 = 10; // Check for writes every 10ms
530-532: Flush semantics rely on poll tick — confirm acceptable latency.Since the reader flushes every ~10ms,
flush()-> PING will be enqueued and sent on the next tick. This adds up to 10ms latency to every flush. Likely fine, but if you need tighter SLAs, we can either shorten the poll timeout whenpending_pongs > 0or temporarily spin until the write_buffer drains.Would you like me to draft a small “flush-fast-path” that shrinks
POLL_TIMEOUT_MSwhile a flush is pending?
801-803: Consider bounding write_buffer to prevent unbounded memory growth under backpressure.
appendSlicegrows without limit when the peer is slow or the network is congested. You have backpressure limits forpending_bufferduring reconnects, but not while connected. Consider amax_write_buffer_bytesoption (or reuseserver_info.max_payloadheuristically) and either:
- block producers (
publish,subscribe, etc.) until drained, or- return
ConnectionError.OutOfMemory/Timeout.I can add a simple size check here and propagate a
Timeout/OutOfMemorydepending on your preference.
3-3: Remove unusedposixalias (keep usingstd.posixor the alias consistently).Right now
const posix = std.posix;isn’t used; keeping it will trigger an unused-constant lint in some setups.Apply this diff:
-const posix = std.posix;
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
src/connection.zig(9 hunks)
🔇 Additional comments (5)
src/connection.zig (5)
695-705: Reader takes ownership of stream and clears shared handle — good pattern.Grabbing
self.streamunder lock, deferringclose(), and clearingself.streamunder the mutex avoids double-closing and races. Nice.
384-386: PUB path correctly funnels writes through the shared buffer.This keeps a single writer (reader thread) touching the socket. Good.
577-579: HPUB/PUB-request path also buffers — consistent with the new model.Consistent with
publish(); looks good.
791-792: Comment clarity: flusher removed — reader owns I/O.Accurately reflects the refactor intent.
308-311: Starting only the reader thread after handshake — good separation of concerns.Handshake completes first, then the reader starts and owns the stream lifecycle. Matches the objectives of issue #4.
| if (self.readAndProcessWithStream(stream, &buffer)) |bytes_read| { | ||
| log.debug("Processed {} bytes", .{bytes_read}); | ||
| } else |err| switch (err) { | ||
| ConnectionError.ConnectionClosed => { | ||
| log.debug("Connection closed by server", .{}); | ||
| self.triggerReconnect(err); | ||
| break; | ||
| }, | ||
| else => { | ||
| log.err("Read/parse error: {}", .{err}); | ||
| self.triggerReconnect(err); | ||
| break; | ||
| }, | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix slice vs pointer when passing buffer to readAndProcessWithStream (compile-time type mismatch).
readAndProcessWithStream expects []u8, but the call site passes &buffer (*[4096]u8). This won’t coerce implicitly and will fail to compile. Pass a slice instead.
Apply this diff:
- if (self.readAndProcessWithStream(stream, &buffer)) |bytes_read| {
+ if (self.readAndProcessWithStream(stream, buffer[0..])) |bytes_read| {📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if (self.readAndProcessWithStream(stream, &buffer)) |bytes_read| { | |
| log.debug("Processed {} bytes", .{bytes_read}); | |
| } else |err| switch (err) { | |
| ConnectionError.ConnectionClosed => { | |
| log.debug("Connection closed by server", .{}); | |
| self.triggerReconnect(err); | |
| break; | |
| }, | |
| else => { | |
| log.err("Read/parse error: {}", .{err}); | |
| self.triggerReconnect(err); | |
| break; | |
| }, | |
| } | |
| if (self.readAndProcessWithStream(stream, buffer[0..])) |bytes_read| { | |
| log.debug("Processed {} bytes", .{bytes_read}); | |
| } else |err| switch (err) { | |
| ConnectionError.ConnectionClosed => { | |
| log.debug("Connection closed by server", .{}); | |
| self.triggerReconnect(err); | |
| break; | |
| }, | |
| else => { | |
| log.err("Read/parse error: {}", .{err}); | |
| self.triggerReconnect(err); | |
| break; | |
| }, | |
| } |
🤖 Prompt for AI Agents
In src/connection.zig around lines 746 to 759, the call passes &buffer (type
*[4096]u8) to readAndProcessWithStream which expects a []u8 slice; change the
call to pass a slice of the buffer instead (e.g., buffer[0..] or
buffer[0..buffer.len]) so the types match and compilation succeeds.
Implements Phase 1 of the readerLoop refactoring plan from issue #4.
Changes
Benefits
All unit tests pass (25/25).
Closes #4
Generated with Claude Code
Summary by CodeRabbit
Refactor
Performance
Reliability