Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion src/connection.zig
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,22 @@ pub const Connection = struct {
handler.call(message);
} else {
// Sync subscription - queue message
try s.messages.push(message);
s.messages.push(message) catch |err| {
switch (err) {
error.QueueClosed => {
// Subscription is closing/closed; drop gracefully.
log.debug("Queue closed for sid {d}; dropping message", .{ msg_arg.sid });
message.deinit();
return;
},
else => {
// Allocation or unexpected push failure; drop but do not tear down the connection.
log.err("Failed to enqueue message for sid {d}: {}", .{ msg_arg.sid, err });
message.deinit();
return;
},
}
};
}
} else {
// No subscription found, clean up message
Expand Down
49 changes: 38 additions & 11 deletions src/queue.zig
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,20 @@ pub fn Queue(comptime T: type) type {
return error.QueueClosed;
}

// Ensure we have space
if (self.items.items.len >= self.items.capacity) {
const new_capacity = if (self.items.capacity == 0) 8 else self.items.capacity * 2;
try self.items.ensureTotalCapacity(self.allocator, new_capacity);

// If we've wrapped around, reorganize to be linear
if (self.head > 0 and self.head < self.items.items.len) {
const count = self.items.items.len;

// Rotate the array so head becomes 0
std.mem.rotate(T, self.items.items[0..count], self.head);
// Ensure we have space; prefer compaction before growth
if (self.items.items.len == self.items.capacity) {
if (self.head > 0) {
const remaining = self.items.items.len - self.head;
if (remaining > 0) {
std.mem.copyForwards(T, self.items.items[0..remaining], self.items.items[self.head..]);
}
self.items.shrinkRetainingCapacity(remaining);
self.head = 0;
}
if (self.items.items.len == self.items.capacity) {
const new_capacity = if (self.items.capacity == 0) 8 else self.items.capacity * 2;
try self.items.ensureTotalCapacity(self.allocator, new_capacity);
}
}

// Add item at the end
Expand Down Expand Up @@ -260,6 +261,32 @@ test "Queue compaction" {
}
}

test "Queue push compacts when head > 0 at capacity (no duplication)" {
const testing = std.testing;
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();

var q = Queue(i32).init(allocator);
defer q.deinit();

// Fill to initial capacity (8)
for (0..8) |i| try q.push(@intCast(i));

// Pop a few to advance head
try testing.expectEqual(@as(?i32, 0), q.tryPop());
try testing.expectEqual(@as(?i32, 1), q.tryPop());
try testing.expectEqual(@as(?i32, 2), q.tryPop());
try testing.expectEqual(@as(usize, 5), q.len());

// Push enough to require space; should compact, not rotate duplicates
for (8..12) |i| try q.push(@intCast(i));

// We should see the remaining original items [3..7], then [8..11]
for (3..12) |i| try testing.expectEqual(@as(?i32, @intCast(i)), q.tryPop());
try testing.expect(q.isEmpty());
}

test "Queue multithreaded producer/consumer" {
const testing = std.testing;
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
Expand Down