Skip to content

Commit 6bb0149

Browse files
grapebabaCopilot
andcommitted
log: extract RingBuffer from AsyncAppend
Separate the MPMC ring buffer into its own module (RingBuffer.zig): - Bounded sequence-per-slot queue with push/tryPop/Message.release API - Cache-line aligned cursors, epoch-based condvar parking - Independent tests (push/pop, drops-when-full, truncation, FIFO, close, peek) AsyncAppend is now a thin wrapper (~80 lines): - Owns a RingBuffer + consumer thread + inner appender forwarding - No ring buffer logic of its own Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent bcb271a commit 6bb0149

File tree

3 files changed

+322
-144
lines changed

3 files changed

+322
-144
lines changed

src/log/AsyncAppend.zig

Lines changed: 27 additions & 144 deletions
Original file line numberDiff line numberDiff line change
@@ -1,68 +1,22 @@
11
const std = @import("std");
2+
const ring_buffer = @import("RingBuffer.zig");
3+
const RingBuffer = ring_buffer.RingBuffer;
24

3-
/// L1 cache line size. 128 covers x86-64 (64B) and Apple Silicon (128B).
4-
const cache_line = 128;
5+
pub const max_message_size = ring_buffer.max_message_size;
56

6-
/// Maximum bytes per log message slot.
7-
pub const max_message_size = 4096;
8-
9-
/// A single slot in the ring buffer.
10-
/// Sequence field is cache-line aligned to prevent false sharing between
11-
/// adjacent slots (the previous slot's data tail and the next slot's sequence
12-
/// would otherwise share a cache line).
13-
const Slot = struct {
14-
sequence: std.atomic.Value(usize) align(cache_line),
15-
len: usize,
16-
data: [max_message_size]u8,
17-
};
18-
19-
/// Condition-variable waiter with fast-path check.
20-
/// Only signals when threads are actually parked, avoiding unnecessary syscalls.
21-
const Waiter = struct {
22-
mutex: std.Thread.Mutex = .{},
23-
cond: std.Thread.Condition = .{},
24-
parked: std.atomic.Value(u32) align(cache_line) = std.atomic.Value(u32).init(0),
25-
26-
fn notifyOne(self: *Waiter) void {
27-
if (self.parked.load(.monotonic) != 0) {
28-
self.cond.signal();
29-
}
30-
}
31-
32-
fn broadcast(self: *Waiter) void {
33-
if (self.parked.load(.monotonic) != 0) {
34-
self.cond.broadcast();
35-
}
36-
}
37-
};
38-
39-
/// Bounded MPMC ring buffer for async log message passing.
40-
///
41-
/// Sequence-per-slot design (LMAX Disruptor / Vyukov style):
42-
/// - Pre-allocated slots via allocator (zero allocation on hot path)
43-
/// - Cache-line aligned cursors prevent false sharing
44-
/// - Epoch-based condvar parking for low-latency wakeup
45-
/// - Non-blocking producer with drop-on-full policy
46-
/// - Wrapping arithmetic for correctness over long uptimes
7+
/// Async appender: non-blocking write via a bounded MPMC ring buffer,
8+
/// drained by a background consumer thread that forwards to an inner appender.
479
pub const AsyncAppend = struct {
48-
slots: []Slot,
49-
mask: usize,
50-
head: std.atomic.Value(usize) align(cache_line) = std.atomic.Value(usize).init(0),
51-
tail: std.atomic.Value(usize) align(cache_line) = std.atomic.Value(usize).init(0),
52-
epoch: std.atomic.Value(usize) align(cache_line) = std.atomic.Value(usize).init(0),
53-
shutdown: std.atomic.Value(bool) align(cache_line) = std.atomic.Value(bool).init(false),
54-
waiter: Waiter align(cache_line) = .{},
10+
ring: RingBuffer,
5511
thread: ?std.Thread = null,
5612
inner_write_fn: *const fn (*anyopaque, []const u8) void,
5713
inner_ptr: *anyopaque,
58-
allocator: std.mem.Allocator,
5914

60-
/// Pre-allocate ring buffer with `size` slots. Size must be a power of two.
15+
/// Create an async appender wrapping `inner` with `size` ring buffer slots.
16+
/// Size must be a power of two.
6117
/// `inner` must be a pointer to a struct with
6218
/// `pub fn write(self: *T, bytes: []const u8) void`.
6319
pub fn init(allocator: std.mem.Allocator, size: u32, inner: anytype) !AsyncAppend {
64-
std.debug.assert(size > 0 and (size & (size - 1)) == 0);
65-
6620
const T = @TypeOf(inner.*);
6721
const wrapper = struct {
6822
fn writeFn(ctx: *anyopaque, bytes: []const u8) void {
@@ -71,18 +25,10 @@ pub const AsyncAppend = struct {
7125
}
7226
};
7327

74-
const slots = try allocator.alloc(Slot, size);
75-
for (slots, 0..) |*slot, i| {
76-
slot.sequence = std.atomic.Value(usize).init(i);
77-
slot.len = 0;
78-
}
79-
8028
return .{
81-
.slots = slots,
82-
.mask = size - 1,
29+
.ring = try RingBuffer.init(allocator, size),
8330
.inner_write_fn = wrapper.writeFn,
8431
.inner_ptr = @ptrCast(inner),
85-
.allocator = allocator,
8632
};
8733
}
8834

@@ -93,130 +39,68 @@ pub const AsyncAppend = struct {
9339

9440
/// Non-blocking enqueue. Drops message if ring is full.
9541
pub fn write(self: *AsyncAppend, bytes: []const u8) void {
96-
var head = self.head.load(.monotonic);
97-
while (true) {
98-
const slot = &self.slots[head & self.mask];
99-
const seq = slot.sequence.load(.acquire);
100-
101-
if (seq == head) {
102-
if (self.head.cmpxchgWeak(head, head +% 1, .monotonic, .monotonic)) |updated| {
103-
head = updated;
104-
continue;
105-
}
106-
// Claimed — write payload then publish.
107-
const copy_len = @min(bytes.len, max_message_size);
108-
@memcpy(slot.data[0..copy_len], bytes[0..copy_len]);
109-
slot.len = copy_len;
110-
slot.sequence.store(head +% 1, .release);
111-
_ = self.epoch.fetchAdd(1, .release);
112-
self.waiter.notifyOne();
113-
return;
114-
}
115-
116-
// Wrapping diff avoids @intCast trap on long-running systems.
117-
const diff = @as(isize, @bitCast(seq -% head));
118-
if (diff < 0) {
119-
return; // Full — drop message.
120-
}
121-
head = self.head.load(.monotonic);
122-
}
42+
_ = self.ring.push(bytes);
12343
}
12444

125-
/// Signal shutdown, join consumer thread, drain remaining, free slots.
45+
/// Signal shutdown, join consumer thread, drain remaining, free ring.
12646
pub fn deinit(self: *AsyncAppend) void {
127-
self.shutdown.store(true, .release);
128-
_ = self.epoch.fetchAdd(1, .release);
129-
self.waiter.broadcast();
47+
self.ring.close();
13048
if (self.thread) |t| {
13149
t.join();
13250
self.thread = null;
13351
}
13452
self.drain();
135-
self.allocator.free(self.slots);
53+
self.ring.deinit();
13654
}
13755

13856
fn consumerLoop(self: *AsyncAppend) void {
139-
while (!self.shutdown.load(.acquire)) {
140-
if (self.tryConsume()) continue;
57+
while (!self.ring.isClosed()) {
58+
if (self.consumeOne()) continue;
14159

14260
// Spin briefly before parking on condvar.
14361
var spin: u8 = 0;
14462
while (spin < 16) : (spin += 1) {
145-
if (self.tryConsume()) break;
146-
if (self.shutdown.load(.acquire)) return;
63+
if (self.consumeOne()) break;
64+
if (self.ring.isClosed()) return;
14765
std.atomic.spinLoopHint();
14866
} else {
149-
self.park();
67+
self.ring.park();
15068
}
15169
}
15270
}
15371

154-
/// Epoch-based parking: prevents lost wakeups by checking that the
155-
/// epoch hasn't changed between deciding to park and actually sleeping.
156-
fn park(self: *AsyncAppend) void {
157-
const e0 = self.epoch.load(.acquire);
158-
159-
self.waiter.mutex.lock();
160-
defer self.waiter.mutex.unlock();
161-
162-
_ = self.waiter.parked.fetchAdd(1, .monotonic);
163-
defer _ = self.waiter.parked.fetchSub(1, .monotonic);
164-
165-
while (true) {
166-
if (self.shutdown.load(.acquire)) return;
167-
if (self.peekReadable()) return;
168-
if (self.epoch.load(.acquire) != e0) return;
169-
self.waiter.cond.wait(&self.waiter.mutex);
170-
}
171-
}
172-
173-
fn peekReadable(self: *AsyncAppend) bool {
174-
const tail = self.tail.load(.monotonic);
175-
const slot = &self.slots[tail & self.mask];
176-
const seq = slot.sequence.load(.acquire);
177-
return seq == tail +% 1;
178-
}
179-
180-
fn tryConsume(self: *AsyncAppend) bool {
181-
const tail = self.tail.load(.monotonic);
182-
const slot = &self.slots[tail & self.mask];
183-
const seq = slot.sequence.load(.acquire);
184-
185-
if (seq == tail +% 1) {
186-
if (self.tail.cmpxchgWeak(tail, tail +% 1, .monotonic, .monotonic)) |_| {
187-
return false;
188-
}
189-
self.inner_write_fn(self.inner_ptr, slot.data[0..slot.len]);
190-
slot.sequence.store(tail +% self.mask +% 1, .release);
72+
fn consumeOne(self: *AsyncAppend) bool {
73+
if (self.ring.tryPop()) |msg| {
74+
self.inner_write_fn(self.inner_ptr, msg.bytes());
75+
msg.release();
19176
return true;
19277
}
19378
return false;
19479
}
19580

19681
fn drain(self: *AsyncAppend) void {
197-
while (self.tryConsume()) {}
82+
while (self.ring.tryPop()) |msg| {
83+
self.inner_write_fn(self.inner_ptr, msg.bytes());
84+
msg.release();
85+
}
19886
}
19987
};
20088

20189
// ==== Tests ====
20290

20391
const TestCollector = struct {
20492
count: std.atomic.Value(usize),
205-
last_msg: [max_message_size]u8,
20693
last_len: usize,
20794

20895
fn init() TestCollector {
20996
return .{
21097
.count = std.atomic.Value(usize).init(0),
211-
.last_msg = undefined,
21298
.last_len = 0,
21399
};
214100
}
215101

216102
pub fn write(self: *TestCollector, bytes: []const u8) void {
217-
const copy_len = @min(bytes.len, max_message_size);
218-
@memcpy(self.last_msg[0..copy_len], bytes[0..copy_len]);
219-
self.last_len = copy_len;
103+
self.last_len = bytes.len;
220104
_ = self.count.fetchAdd(1, .monotonic);
221105
}
222106
};
@@ -277,7 +161,6 @@ test "AsyncAppend truncates long messages" {
277161
var collector = TestCollector.init();
278162
var appender = try AsyncAppend.init(std.testing.allocator, 16, &collector);
279163

280-
// Create a message longer than max_message_size.
281164
var long_msg: [max_message_size + 100]u8 = undefined;
282165
@memset(&long_msg, 'A');
283166

0 commit comments

Comments
 (0)