Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 113 additions & 133 deletions src/connection.zig
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const std = @import("std");
const net = std.net;
const posix = std.posix;
const Allocator = std.mem.Allocator;
const ArrayList = std.ArrayList;
const Parser = @import("parser.zig").Parser;
Expand Down Expand Up @@ -173,17 +174,11 @@ pub const Connection = struct {

// Threading
reader_thread: ?std.Thread = null,
flusher_thread: ?std.Thread = null,
should_stop: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),

// Main connection mutex (protects most fields)
mutex: std.Thread.Mutex = .{},

// Flusher synchronization (protected by main mutex)
flusher_stop: bool = false,
flusher_signaled: bool = false,
flusher_condition: std.Thread.Condition = .{},

// PONG waiting (protected by main mutex)
pending_pongs: u32 = 0,
pong_condition: std.Thread.Condition = .{},
Expand Down Expand Up @@ -310,12 +305,9 @@ pub const Connection = struct {

// Socket will be blocking - shutdown() will wake up reader

// Start reader thread
// Start reader thread (handles both reads and writes)
self.reader_thread = try std.Thread.spawn(.{}, readerLoop, .{self});

// Start flusher thread
self.flusher_thread = try std.Thread.spawn(.{}, flusherLoop, .{self});

log.info("Connected successfully to {s}", .{selected_server.parsed_url.full_url});
}

Expand Down Expand Up @@ -348,16 +340,7 @@ pub const Connection = struct {
thread.join();
}

// Stop flusher thread
if (self.flusher_thread) |thread| {
self.mutex.lock();
self.flusher_stop = true;
self.flusher_condition.signal();
self.mutex.unlock();

thread.join();
self.flusher_thread = null;
}
// Flusher thread no longer needed - reader handles writes

// Wait for reader thread to complete (it will handle stream cleanup)
if (self.reader_thread) |thread| {
Expand Down Expand Up @@ -398,7 +381,7 @@ pub const Connection = struct {
try buffer.appendSlice(data);
try buffer.appendSlice("\r\n");

// Send via buffer (either immediate or flusher thread)
// Send via buffer (reader thread will handle flushing)
try self.bufferWrite(buffer.items);

log.debug("Published to {s}: {s}", .{ subject, data });
Expand Down Expand Up @@ -544,12 +527,8 @@ pub const Connection = struct {
return ConnectionError.ConnectionClosed;
}

// Force flush any pending writes (mutex already held)
const has_pending = self.write_buffer.items.len > 0;
if (has_pending) {
self.flusher_signaled = true;
self.flusher_condition.signal();
}
// Reader thread handles flushing - no need to signal flusher
// Just ensure writes are buffered and reader will flush them

// Increment pending PONG counter before sending PING
self.pending_pongs += 1;
Expand Down Expand Up @@ -595,7 +574,7 @@ pub const Connection = struct {
try buffer.appendSlice(data);
try buffer.appendSlice("\r\n");

// Send via buffer (mutex held)
// Send via buffer (reader thread will handle flushing)
try self.bufferWrite(buffer.items);

log.debug("Published request to {s} with reply {s}: {s}", .{ subject, reply, data });
Expand Down Expand Up @@ -690,111 +669,127 @@ pub const Connection = struct {
log.debug("Handshake completed successfully", .{});
}

fn readAndProcessWithStream(self: *Self, stream: net.Stream, buffer: []u8) !usize {
// Reader owns stream - no race condition
const bytes_read = try stream.read(buffer);
if (bytes_read == 0) {
return ConnectionError.ConnectionClosed;
}

log.debug("Read {} bytes: {s}", .{ bytes_read, buffer[0..bytes_read] });

// Parse the received data
self.parser.parse(self, buffer[0..bytes_read]) catch |err| {
log.err("Parser error: {}", .{err});
// Reset parser state on error to prevent corruption
self.parser.reset();
return err;
};

return bytes_read;
}

fn readerLoop(self: *Self) void {
var buffer: [4096]u8 = undefined;

log.debug("Reader loop started", .{});

// Lock at start like C _readLoop

// Reader owns stream - get reference once
var reader_stream: ?net.Stream = null;
defer if (reader_stream) |stream| {
stream.close();
// Clear the shared stream reference
self.mutex.lock();
self.stream = null;
self.mutex.unlock();
};

// Get initial stream under lock
self.mutex.lock();
defer self.mutex.unlock(); // Final cleanup

// Simple while condition - check status inside loop under lock
reader_stream = self.stream;
self.mutex.unlock();

const stream = reader_stream orelse {
log.debug("No stream available, reader exiting", .{});
return;
};

// Set up poll for non-blocking reads with timeout
var pollfd = [1]std.posix.pollfd{
.{
.fd = stream.handle,
.events = std.posix.POLL.IN,
.revents = 0,
},
};

const POLL_TIMEOUT_MS = 10; // Check for writes every 10ms

while (!self.should_stop.load(.acquire)) {

// Check status and stream under lock
if (self.status == .closed or self.status == .reconnecting) {
break;
}

const stream = self.stream orelse break;

// Unlock before I/O like C _readLoop
// Check connection status
self.mutex.lock();
const should_exit = (self.status == .closed or self.status == .reconnecting);
self.mutex.unlock();
defer self.mutex.lock(); // Re-lock at end of iteration

// Simple blocking read - shutdown() will wake us up
const bytes_read = stream.read(&buffer) catch |err| {
log.err("Read error: {}", .{err});
self.triggerReconnect(err); // Handles its own locking
break;
};

if (bytes_read == 0) {
log.debug("Connection closed by server", .{});
self.triggerReconnect(ConnectionError.ConnectionClosed); // Handles its own locking

if (should_exit) {
break;
}

log.debug("Read {} bytes: {s}", .{ bytes_read, buffer[0..bytes_read] });

// Parse the received data
self.parser.parse(self, buffer[0..bytes_read]) catch |err| {
log.err("Parser error: {}", .{err});
// Reset parser state on error to prevent corruption
self.parser.reset();
self.triggerReconnect(err); // Handles its own locking

// Poll for incoming data with timeout
const poll_result = std.posix.poll(&pollfd, POLL_TIMEOUT_MS) catch |err| {
log.err("Poll error: {}", .{err});
self.triggerReconnect(err);
break;
};
}

// Cleanup stream when reader exits (like C _readLoop)
if (self.stream) |stream| {
stream.close();
self.stream = null;
}

log.debug("Reader loop exited", .{});
}

fn flusherLoop(self: *Self) void {
log.debug("Flusher loop started", .{});

self.mutex.lock();
defer self.mutex.unlock();

while (true) {
// Wait for signal or stop condition
while (!self.flusher_signaled and !self.flusher_stop) {
self.flusher_condition.wait(&self.mutex);
}

if (self.flusher_stop) {
log.debug("Flusher stopping...", .{});
break;

if (poll_result > 0 and (pollfd[0].revents & std.posix.POLL.IN) != 0) {
// Data available - read and process
if (self.readAndProcessWithStream(stream, &buffer)) |bytes_read| {
log.debug("Processed {} bytes", .{bytes_read});
} else |err| switch (err) {
ConnectionError.ConnectionClosed => {
log.debug("Connection closed by server", .{});
self.triggerReconnect(err);
break;
},
else => {
log.err("Read/parse error: {}", .{err});
self.triggerReconnect(err);
break;
},
}
Comment on lines +746 to +759
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix slice vs pointer when passing buffer to readAndProcessWithStream (compile-time type mismatch).

readAndProcessWithStream expects []u8, but the call site passes &buffer (*[4096]u8). This won’t coerce implicitly and will fail to compile. Pass a slice instead.

Apply this diff:

-                if (self.readAndProcessWithStream(stream, &buffer)) |bytes_read| {
+                if (self.readAndProcessWithStream(stream, buffer[0..])) |bytes_read| {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (self.readAndProcessWithStream(stream, &buffer)) |bytes_read| {
log.debug("Processed {} bytes", .{bytes_read});
} else |err| switch (err) {
ConnectionError.ConnectionClosed => {
log.debug("Connection closed by server", .{});
self.triggerReconnect(err);
break;
},
else => {
log.err("Read/parse error: {}", .{err});
self.triggerReconnect(err);
break;
},
}
if (self.readAndProcessWithStream(stream, buffer[0..])) |bytes_read| {
log.debug("Processed {} bytes", .{bytes_read});
} else |err| switch (err) {
ConnectionError.ConnectionClosed => {
log.debug("Connection closed by server", .{});
self.triggerReconnect(err);
break;
},
else => {
log.err("Read/parse error: {}", .{err});
self.triggerReconnect(err);
break;
},
}
🤖 Prompt for AI Agents
In src/connection.zig around lines 746 to 759, the call passes &buffer (type
*[4096]u8) to readAndProcessWithStream which expects a []u8 slice; change the
call to pass a slice of the buffer instead (e.g., buffer[0..] or
buffer[0..buffer.len]) so the types match and compilation succeeds.

}

// Give a chance to accumulate more requests (like C implementation)
self.flusher_condition.timedWait(&self.mutex, 1_000_000) catch {}; // 1ms in nanoseconds

self.flusher_signaled = false;

// Check if we should flush
if (self.status != .connected or self.write_buffer.items.len == 0) {
// No need to flush if not connected or no data to flush
continue;

// Check for pending writes to send
self.mutex.lock();
const has_writes = self.write_buffer.items.len > 0;
var write_data: []u8 = undefined;
if (has_writes) {
// Copy buffer under lock
write_data = self.allocator.dupe(u8, self.write_buffer.items) catch {
self.mutex.unlock();
continue;
};
self.write_buffer.clearRetainingCapacity();
}

const stream = self.stream orelse break;

// Unlock before I/O like C _readLoop
self.mutex.unlock();
defer self.mutex.lock(); // Re-lock at end of iteration

// Write all buffered data
if (stream.writeAll(self.write_buffer.items)) {
log.debug("Flushed {} bytes", .{self.write_buffer.items.len});
self.write_buffer.clearRetainingCapacity();
} else |err| {
log.err("Flush error: {}", .{err});
self.triggerReconnect(err);
break;

if (has_writes) {
defer self.allocator.free(write_data);
// Write without holding lock
stream.writeAll(write_data) catch |err| {
log.err("Write error: {}", .{err});
self.triggerReconnect(err);
break;
};
log.debug("Flushed {} bytes", .{write_data.len});
}
}

log.debug("Flusher loop exited", .{});
log.debug("Reader loop exited", .{});
}

// Flusher thread removed - reader handles both reads and writes

fn bufferWrite(self: *Self, data: []const u8) !void {
// Assume mutex is already held by caller

Expand All @@ -803,13 +798,8 @@ pub const Connection = struct {
return self.pending_buffer.addMessage(data);
}

// Buffer and signal flusher (mutex already held)
// Buffer for reader thread to flush (no signaling needed)
try self.write_buffer.appendSlice(self.allocator, data);

if (!self.flusher_signaled) {
self.flusher_signaled = true;
self.flusher_condition.signal();
}
}

// Parser callback methods
Expand Down Expand Up @@ -1086,17 +1076,7 @@ pub const Connection = struct {
continue; // Try next server
};

// Restart flusher thread for the new connection
self.mutex.lock();
self.flusher_stop = false;
self.flusher_signaled = false;
self.mutex.unlock();

self.flusher_thread = std.Thread.spawn(.{}, flusherLoop, .{self}) catch |err| {
log.err("Failed to restart flusher thread: {}", .{err});
self.triggerReconnect(err);
continue; // Try next server
};
// Reader thread handles both reads and writes - no flusher needed

// Re-establish subscriptions (outside mutex like C library)
self.resendSubscriptions() catch |err| {
Expand Down
Loading