Skip to content

Commit a68c354

Browse files
grapebabaCopilot
authored andcommitted
feat(log): add AsyncAppend, JsonLayout, and LogfmtLayout
Phase 2 components: - AsyncAppend: Vyukov bounded MPMC ring buffer with background consumer thread, non-blocking enqueue, drop-on-full policy, graceful shutdown+drain - JsonLayout: JSON lines format with proper string escaping - LogfmtLayout: key=value format with conditional quoting Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 867286a commit a68c354

File tree

3 files changed

+543
-0
lines changed

3 files changed

+543
-0
lines changed

src/log/AsyncAppend.zig

Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
const std = @import("std");
2+
3+
/// Maximum bytes per log message slot.
4+
const max_message_size = 4096;
5+
6+
/// A single slot in the ring buffer.
7+
const Slot = struct {
8+
sequence: std.atomic.Value(usize),
9+
len: usize,
10+
data: [max_message_size]u8,
11+
};
12+
13+
/// Vyukov bounded MPMC ring buffer for async log message passing.
14+
/// `buffer_size_log2` is the log2 of the number of slots (e.g., 10 = 1024 slots).
15+
pub fn AsyncAppend(comptime buffer_size_log2: comptime_int) type {
16+
const buffer_size: usize = 1 << buffer_size_log2;
17+
const mask: usize = buffer_size - 1;
18+
19+
return struct {
20+
const Self = @This();
21+
22+
slots: [buffer_size]Slot,
23+
head: std.atomic.Value(usize),
24+
tail: std.atomic.Value(usize),
25+
shutdown: std.atomic.Value(bool),
26+
thread: ?std.Thread,
27+
inner_write_fn: *const fn (*anyopaque, []const u8) void,
28+
inner_ptr: *anyopaque,
29+
30+
/// Initialize AsyncAppend wrapping an inner Append.
31+
/// `inner` must be a pointer to a struct with `pub fn write(self: *T, bytes: []const u8) void`.
32+
pub fn init(inner: anytype) Self {
33+
const T = @TypeOf(inner.*);
34+
const wrapper = struct {
35+
fn writeFn(ctx: *anyopaque, bytes: []const u8) void {
36+
const typed: *T = @ptrCast(@alignCast(ctx));
37+
typed.write(bytes);
38+
}
39+
};
40+
41+
var self = Self{
42+
.slots = undefined,
43+
.head = std.atomic.Value(usize).init(0),
44+
.tail = std.atomic.Value(usize).init(0),
45+
.shutdown = std.atomic.Value(bool).init(false),
46+
.thread = null,
47+
.inner_write_fn = wrapper.writeFn,
48+
.inner_ptr = @ptrCast(inner),
49+
};
50+
51+
for (0..buffer_size) |i| {
52+
self.slots[i].sequence = std.atomic.Value(usize).init(i);
53+
self.slots[i].len = 0;
54+
}
55+
56+
return self;
57+
}
58+
59+
/// Start the background consumer thread.
60+
pub fn start(self: *Self) void {
61+
self.thread = std.Thread.spawn(.{}, consumerLoop, .{self}) catch return;
62+
}
63+
64+
/// Append interface: non-blocking enqueue. Drops message if ring is full.
65+
pub fn write(self: *Self, bytes: []const u8) void {
66+
var head = self.head.load(.acquire);
67+
while (true) {
68+
const slot = &self.slots[head & mask];
69+
const seq = slot.sequence.load(.acquire);
70+
71+
const diff = @as(isize, @intCast(seq)) - @as(isize, @intCast(head));
72+
if (diff == 0) {
73+
// Slot is available — try to claim it.
74+
if (self.head.cmpxchgWeak(head, head + 1, .acq_rel, .acquire)) |updated| {
75+
head = updated;
76+
continue;
77+
}
78+
// Claimed — write payload.
79+
const copy_len = @min(bytes.len, max_message_size);
80+
@memcpy(slot.data[0..copy_len], bytes[0..copy_len]);
81+
slot.len = copy_len;
82+
// Publish.
83+
slot.sequence.store(head + 1, .release);
84+
return;
85+
} else if (diff < 0) {
86+
// Ring is full — drop message.
87+
return;
88+
} else {
89+
// Another producer is ahead — reload head.
90+
head = self.head.load(.acquire);
91+
}
92+
}
93+
}
94+
95+
/// Shutdown: signal, join thread, drain remaining.
96+
pub fn deinit(self: *Self) void {
97+
self.shutdown.store(true, .release);
98+
if (self.thread) |t| {
99+
t.join();
100+
self.thread = null;
101+
}
102+
self.drain();
103+
}
104+
105+
fn consumerLoop(self: *Self) void {
106+
var idle_spins: u32 = 0;
107+
while (!self.shutdown.load(.acquire)) {
108+
if (self.tryConsume()) {
109+
idle_spins = 0;
110+
} else {
111+
idle_spins += 1;
112+
if (idle_spins > 64) {
113+
std.time.sleep(1_000_000); // 1ms
114+
} else {
115+
std.atomic.spinLoopHint();
116+
}
117+
}
118+
}
119+
}
120+
121+
fn tryConsume(self: *Self) bool {
122+
const tail = self.tail.load(.acquire);
123+
const slot = &self.slots[tail & mask];
124+
const seq = slot.sequence.load(.acquire);
125+
126+
const diff = @as(isize, @intCast(seq)) - @as(isize, @intCast(tail + 1));
127+
if (diff == 0) {
128+
// Slot has data — claim it.
129+
if (self.tail.cmpxchgWeak(tail, tail + 1, .acq_rel, .acquire)) |_| {
130+
return false;
131+
}
132+
self.inner_write_fn(self.inner_ptr, slot.data[0..slot.len]);
133+
// Recycle slot.
134+
slot.sequence.store(tail + buffer_size, .release);
135+
return true;
136+
}
137+
return false;
138+
}
139+
140+
fn drain(self: *Self) void {
141+
while (self.tryConsume()) {}
142+
}
143+
};
144+
}
145+
146+
// ==== Tests ====
147+
148+
const TestCollector = struct {
149+
count: std.atomic.Value(usize),
150+
last_msg: [max_message_size]u8,
151+
last_len: usize,
152+
153+
fn init() TestCollector {
154+
return .{
155+
.count = std.atomic.Value(usize).init(0),
156+
.last_msg = undefined,
157+
.last_len = 0,
158+
};
159+
}
160+
161+
pub fn write(self: *TestCollector, bytes: []const u8) void {
162+
const copy_len = @min(bytes.len, max_message_size);
163+
@memcpy(self.last_msg[0..copy_len], bytes[0..copy_len]);
164+
self.last_len = copy_len;
165+
_ = self.count.fetchAdd(1, .monotonic);
166+
}
167+
};
168+
169+
test "AsyncAppend basic enqueue and drain" {
170+
var collector = TestCollector.init();
171+
var appender = AsyncAppend(4).init(&collector); // 16 slots
172+
173+
appender.start();
174+
175+
appender.write("hello async");
176+
appender.write("second message");
177+
178+
// Give consumer time to process.
179+
std.time.sleep(50_000_000); // 50ms
180+
181+
appender.deinit();
182+
183+
try std.testing.expect(collector.count.load(.acquire) >= 2);
184+
}
185+
186+
test "AsyncAppend shutdown drains remaining" {
187+
var collector = TestCollector.init();
188+
var appender = AsyncAppend(4).init(&collector);
189+
190+
// Don't start consumer thread — messages queue up.
191+
appender.write("msg1");
192+
appender.write("msg2");
193+
appender.write("msg3");
194+
195+
// deinit should drain.
196+
appender.deinit();
197+
198+
try std.testing.expectEqual(@as(usize, 3), collector.count.load(.acquire));
199+
}
200+
201+
test "AsyncAppend drops when full" {
202+
var collector = TestCollector.init();
203+
var appender = AsyncAppend(2).init(&collector); // 4 slots only
204+
205+
// Fill beyond capacity without consumer.
206+
for (0..10) |i| {
207+
var buf: [32]u8 = undefined;
208+
const msg = std.fmt.bufPrint(&buf, "msg{d}", .{i}) catch "?";
209+
appender.write(msg);
210+
}
211+
212+
// Drain what's in the buffer.
213+
appender.deinit();
214+
215+
// Should have received at most 4 (buffer size).
216+
const count = collector.count.load(.acquire);
217+
try std.testing.expect(count <= 4);
218+
try std.testing.expect(count > 0);
219+
}
220+
221+
test "AsyncAppend truncates long messages" {
222+
var collector = TestCollector.init();
223+
var appender = AsyncAppend(4).init(&collector);
224+
225+
// Create a message longer than max_message_size.
226+
var long_msg: [max_message_size + 100]u8 = undefined;
227+
@memset(&long_msg, 'A');
228+
229+
appender.write(&long_msg);
230+
appender.deinit();
231+
232+
try std.testing.expectEqual(@as(usize, 1), collector.count.load(.acquire));
233+
try std.testing.expectEqual(@as(usize, max_message_size), collector.last_len);
234+
}

src/log/JsonLayout.zig

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
const std = @import("std");
2+
const Record = @import("Record.zig").Record;
3+
const Level = @import("Level.zig");
4+
const Attr = @import("Attr.zig").Attr;
5+
6+
/// JSON lines layout. Each record is a single JSON object followed by \n.
7+
/// Format: {"ts":1234,"level":"info","scope":"fork_choice","msg":"block applied","slot":42}
8+
pub const JsonLayout = struct {
9+
pub fn format(_: *const JsonLayout, record: *const Record, writer: anytype) void {
10+
writer.writeAll("{") catch return;
11+
12+
// Timestamp
13+
writer.print("\"ts\":{d}", .{record.timestamp_ms}) catch return;
14+
15+
// Level
16+
writer.writeAll(",\"level\":\"") catch return;
17+
writer.writeAll(std.mem.trimRight(u8, Level.asText(record.level), " ")) catch return;
18+
writer.writeAll("\"") catch return;
19+
20+
// Scope
21+
if (!std.mem.eql(u8, record.scope_name, "default")) {
22+
writer.writeAll(",\"scope\":\"") catch return;
23+
writer.writeAll(record.scope_name) catch return;
24+
writer.writeAll("\"") catch return;
25+
}
26+
27+
// Message
28+
writer.writeAll(",\"msg\":\"") catch return;
29+
writeJsonEscaped(writer, record.message);
30+
writer.writeAll("\"") catch return;
31+
32+
// Attributes
33+
var iter = record.attrIterator();
34+
while (iter.next()) |attr| {
35+
writer.writeAll(",\"") catch return;
36+
writer.writeAll(attr.key) catch return;
37+
writer.writeAll("\":") catch return;
38+
writeJsonValue(writer, attr.value);
39+
}
40+
41+
writer.writeAll("}\n") catch return;
42+
}
43+
44+
fn writeJsonValue(writer: anytype, value: Attr.Value) void {
45+
switch (value) {
46+
.int => |v| writer.print("{d}", .{v}) catch {},
47+
.uint => |v| writer.print("{d}", .{v}) catch {},
48+
.float => |v| writer.print("{d:.6}", .{v}) catch {},
49+
.bool_val => |v| writer.print("{}", .{v}) catch {},
50+
.string => |v| {
51+
writer.writeByte('"') catch {};
52+
writeJsonEscaped(writer, v);
53+
writer.writeByte('"') catch {};
54+
},
55+
.hex_bytes => |v| {
56+
writer.writeAll("\"0x") catch {};
57+
for (v) |byte| {
58+
writer.print("{x:0>2}", .{byte}) catch {};
59+
}
60+
writer.writeByte('"') catch {};
61+
},
62+
}
63+
}
64+
65+
fn writeJsonEscaped(writer: anytype, s: []const u8) void {
66+
for (s) |c| {
67+
switch (c) {
68+
'"' => writer.writeAll("\\\"") catch {},
69+
'\\' => writer.writeAll("\\\\") catch {},
70+
'\n' => writer.writeAll("\\n") catch {},
71+
'\r' => writer.writeAll("\\r") catch {},
72+
'\t' => writer.writeAll("\\t") catch {},
73+
else => writer.writeByte(c) catch {},
74+
}
75+
}
76+
}
77+
};
78+
79+
test "JsonLayout basic" {
80+
var buf: [512]u8 = undefined;
81+
var fbs = std.io.fixedBufferStream(&buf);
82+
83+
var record = Record{
84+
.timestamp_ms = 1234567890,
85+
.level = .info,
86+
.scope_name = "fork_choice",
87+
.message = "block applied",
88+
};
89+
record.pushEventAttr(Attr.uint("slot", 42));
90+
91+
const layout = JsonLayout{};
92+
layout.format(&record, fbs.writer());
93+
94+
const output = fbs.getWritten();
95+
// Should be valid-ish JSON with expected fields
96+
try std.testing.expect(std.mem.indexOf(u8, output, "\"ts\":1234567890") != null);
97+
try std.testing.expect(std.mem.indexOf(u8, output, "\"level\":\"info\"") != null);
98+
try std.testing.expect(std.mem.indexOf(u8, output, "\"scope\":\"fork_choice\"") != null);
99+
try std.testing.expect(std.mem.indexOf(u8, output, "\"msg\":\"block applied\"") != null);
100+
try std.testing.expect(std.mem.indexOf(u8, output, "\"slot\":42") != null);
101+
try std.testing.expect(output[output.len - 1] == '\n');
102+
}
103+
104+
test "JsonLayout default scope omitted" {
105+
var buf: [512]u8 = undefined;
106+
var fbs = std.io.fixedBufferStream(&buf);
107+
108+
const record = Record{
109+
.timestamp_ms = 0,
110+
.level = .err,
111+
.scope_name = "default",
112+
.message = "oops",
113+
};
114+
115+
const layout = JsonLayout{};
116+
layout.format(&record, fbs.writer());
117+
118+
const output = fbs.getWritten();
119+
try std.testing.expect(std.mem.indexOf(u8, output, "\"scope\"") == null);
120+
}
121+
122+
test "JsonLayout escapes special characters" {
123+
var buf: [512]u8 = undefined;
124+
var fbs = std.io.fixedBufferStream(&buf);
125+
126+
var record = Record{
127+
.timestamp_ms = 0,
128+
.level = .warn,
129+
.scope_name = "default",
130+
.message = "line1\nline2",
131+
};
132+
record.pushEventAttr(Attr.str("data", "has \"quotes\""));
133+
134+
const layout = JsonLayout{};
135+
layout.format(&record, fbs.writer());
136+
137+
const output = fbs.getWritten();
138+
try std.testing.expect(std.mem.indexOf(u8, output, "line1\\nline2") != null);
139+
try std.testing.expect(std.mem.indexOf(u8, output, "has \\\"quotes\\\"") != null);
140+
}
141+
142+
test "JsonLayout bool and hex values" {
143+
var buf: [512]u8 = undefined;
144+
var fbs = std.io.fixedBufferStream(&buf);
145+
146+
var record = Record{
147+
.timestamp_ms = 0,
148+
.level = .info,
149+
.scope_name = "default",
150+
.message = "test",
151+
};
152+
record.pushEventAttr(Attr.boolean("ok", true));
153+
record.pushEventAttr(Attr.hex("hash", &[_]u8{ 0xab, 0xcd }));
154+
155+
const layout = JsonLayout{};
156+
layout.format(&record, fbs.writer());
157+
158+
const output = fbs.getWritten();
159+
try std.testing.expect(std.mem.indexOf(u8, output, "\"ok\":true") != null);
160+
try std.testing.expect(std.mem.indexOf(u8, output, "\"hash\":\"0xabcd\"") != null);
161+
}

0 commit comments

Comments
 (0)