Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 42 additions & 33 deletions src/connection.zig
Original file line number Diff line number Diff line change
Expand Up @@ -690,60 +690,69 @@ pub const Connection = struct {
log.debug("Handshake completed successfully", .{});
}

fn readAndProcess(self: *Self, buffer: []u8) !usize {
const stream = self.stream orelse return ConnectionError.ConnectionClosed;

// Simple blocking read - shutdown() will wake us up
const bytes_read = try stream.read(buffer);
if (bytes_read == 0) {
return ConnectionError.ConnectionClosed;
}

log.debug("Read {} bytes: {s}", .{ bytes_read, buffer[0..bytes_read] });

// Parse the received data
self.parser.parse(self, buffer[0..bytes_read]) catch |err| {
log.err("Parser error: {}", .{err});
// Reset parser state on error to prevent corruption
self.parser.reset();
return err;
};

return bytes_read;
}

fn readerLoop(self: *Self) void {
var buffer: [4096]u8 = undefined;

log.debug("Reader loop started", .{});

// Lock at start like C _readLoop
self.mutex.lock();
defer self.mutex.unlock(); // Final cleanup

// Simple while condition - check status inside loop under lock

while (!self.should_stop.load(.acquire)) {

// Check status and stream under lock
if (self.status == .closed or self.status == .reconnecting) {
break;
}

const stream = self.stream orelse break;


// Unlock before I/O like C _readLoop
self.mutex.unlock();
defer self.mutex.lock(); // Re-lock at end of iteration

// Simple blocking read - shutdown() will wake us up
const bytes_read = stream.read(&buffer) catch |err| {
log.err("Read error: {}", .{err});
self.triggerReconnect(err); // Handles its own locking
break;
};

if (bytes_read == 0) {
log.debug("Connection closed by server", .{});
self.triggerReconnect(ConnectionError.ConnectionClosed); // Handles its own locking
break;

// Perform I/O without holding mutex
if (self.readAndProcess(&buffer)) |bytes_read| {
log.debug("Processed {} bytes", .{bytes_read});
} else |err| switch (err) {
ConnectionError.ConnectionClosed => {
log.debug("Connection closed by server", .{});
self.triggerReconnect(err); // Handles its own locking
break;
},
else => {
log.err("Read/parse error: {}", .{err});
self.triggerReconnect(err); // Handles its own locking
break;
},
}

log.debug("Read {} bytes: {s}", .{ bytes_read, buffer[0..bytes_read] });

// Parse the received data
self.parser.parse(self, buffer[0..bytes_read]) catch |err| {
log.err("Parser error: {}", .{err});
// Reset parser state on error to prevent corruption
self.parser.reset();
self.triggerReconnect(err); // Handles its own locking
break;
};
}

// Cleanup stream when reader exits (like C _readLoop)
if (self.stream) |stream| {
stream.close();
self.stream = null;
}

log.debug("Reader loop exited", .{});
}

Expand Down