Skip to content

Commit 14ecb7d

Browse files
committed
Switch to encode-inline + raw-copy flush path
Thread-local buffers now encode all events directly via Encoder<Vec<u8>> and flush raw encoded bytes to the central collector. The flush thread raw-copies these bytes into the output file (each batch's header acts as a reset frame for decoders). Key changes: - buffer.rs: encode all events directly via write_infallible/ intern_string_infallible, size-based flush (64KB default), remove Vec<RawEvent> path entirely. - collector.rs: Batch now only contains encoded_bytes. - recorder/mod.rs: flush_once only processes encoded batches, skips header-only batches. - writer.rs: RotatingWriter::write_encoded_batch does raw-copy + reset_state + trailing header. Default trait impl returns error. Add Box/NullWriter delegation. - event_writer.rs: write_raw_event gated behind #[cfg(test)], write_transcoded_batch increments event counter. - tests/common.rs: CapturingWriter decodes encoded batches back to RawEvent for test inspection. - threadlocal_encoding.rs: E2E tests for encoder reset, raw-copy concatenation, timestamp preservation, different schemas, empty batches, and fallible iteration. - benches/threadlocal_encode.rs: benchmark comparing direct encode vs thread-local encode + raw-copy. All 294 tests pass. Clippy clean.
1 parent f5b18c7 commit 14ecb7d

9 files changed

Lines changed: 698 additions & 64 deletions

File tree

dial9-tokio-telemetry/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,3 +109,7 @@ required-features = ["cpu-profiling"]
109109
[[bench]]
110110
name = "writer_encode"
111111
harness = false
112+
113+
[[bench]]
114+
name = "threadlocal_encode"
115+
harness = false
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
//! Benchmark comparing direct encode vs thread-local encode + raw-copy.
2+
//!
3+
//! Measures the end-to-end cost of:
4+
//! - Direct: encode events directly through a single Encoder<Vec<u8>>
5+
//! - Thread-local: encode through a thread-local Encoder, reset(), then
6+
//! raw-copy into a central Vec<u8> (reset-frame pattern)
7+
//!
8+
//! Usage:
9+
//! cargo bench --bench threadlocal_encode
10+
11+
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
12+
use dial9_tokio_telemetry::telemetry::format::{
13+
PollEndEvent, PollStartEvent, WorkerParkEvent, WorkerUnparkEvent,
14+
};
15+
use dial9_tokio_telemetry::telemetry::{TaskId, WorkerId};
16+
use dial9_trace_format::encoder::Encoder;
17+
18+
fn make_batch(worker: usize) -> Vec<(u64, WorkerId, TaskId)> {
19+
let wid = WorkerId::from(worker);
20+
let task = TaskId::from_u32(1);
21+
let mut events = Vec::with_capacity(350);
22+
23+
for cycle in 0..3u64 {
24+
let base = cycle * 10_000;
25+
events.push((base + 100, wid, task));
26+
for i in 0..170u64 {
27+
events.push((base + 200 + i * 10, wid, task));
28+
}
29+
events.push((base + 3000, wid, task));
30+
}
31+
events
32+
}
33+
34+
fn encode_batch(encoder: &mut Encoder<Vec<u8>>, batch: &[(u64, WorkerId, TaskId)]) {
35+
let spawn_loc = encoder.intern_string_infallible("test");
36+
for &(ts, wid, task) in batch {
37+
encoder.write_infallible(&PollStartEvent {
38+
timestamp_ns: ts,
39+
worker_id: wid,
40+
local_queue: 3,
41+
task_id: task,
42+
spawn_loc,
43+
});
44+
encoder.write_infallible(&PollEndEvent {
45+
timestamp_ns: ts + 5,
46+
worker_id: wid,
47+
});
48+
}
49+
encoder.write_infallible(&WorkerParkEvent {
50+
timestamp_ns: batch.last().unwrap().0 + 100,
51+
worker_id: batch[0].1,
52+
local_queue: 0,
53+
cpu_time_ns: 600_000,
54+
});
55+
encoder.write_infallible(&WorkerUnparkEvent {
56+
timestamp_ns: batch[0].0,
57+
worker_id: batch[0].1,
58+
local_queue: 5,
59+
cpu_time_ns: 500_000,
60+
sched_wait_ns: 1_000,
61+
});
62+
}
63+
64+
fn bench_direct_encode(c: &mut Criterion) {
65+
let mut group = c.benchmark_group("direct_encode");
66+
67+
for num_batches in [1, 10, 100] {
68+
let batches: Vec<_> = (0..num_batches).map(|i| make_batch(i % 8)).collect();
69+
let total_events: usize = batches.iter().map(|b| b.len() * 2).sum();
70+
group.throughput(criterion::Throughput::Elements(total_events as u64));
71+
72+
group.bench_with_input(
73+
BenchmarkId::new("batches", num_batches),
74+
&batches,
75+
|b, batches| {
76+
b.iter(|| {
77+
let mut encoder = Encoder::new();
78+
for batch in batches {
79+
encode_batch(&mut encoder, batch);
80+
}
81+
encoder.finish()
82+
});
83+
},
84+
);
85+
}
86+
group.finish();
87+
}
88+
89+
fn bench_threadlocal_rawcopy(c: &mut Criterion) {
90+
let mut group = c.benchmark_group("threadlocal_rawcopy");
91+
92+
for num_batches in [1, 10, 100] {
93+
let batches: Vec<_> = (0..num_batches).map(|i| make_batch(i % 8)).collect();
94+
let total_events: usize = batches.iter().map(|b| b.len() * 2).sum();
95+
group.throughput(criterion::Throughput::Elements(total_events as u64));
96+
97+
group.bench_with_input(
98+
BenchmarkId::new("batches", num_batches),
99+
&batches,
100+
|b, batches| {
101+
b.iter(|| {
102+
let mut output = Vec::new();
103+
dial9_trace_format::codec::encode_header(&mut output).unwrap();
104+
for batch in batches {
105+
let mut local = Encoder::new();
106+
encode_batch(&mut local, batch);
107+
let bytes = local.reset_to(Vec::new());
108+
output.extend_from_slice(&bytes);
109+
}
110+
output
111+
});
112+
},
113+
);
114+
}
115+
group.finish();
116+
}
117+
118+
criterion_group!(benches, bench_direct_encode, bench_threadlocal_rawcopy);
119+
criterion_main!(benches);

dial9-tokio-telemetry/src/telemetry/buffer.rs

Lines changed: 169 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,27 @@
11
//! `ThreadLocalBuffer` is the entrypoint for almost all dial9 events
22
//!
3-
//! The TL buffer is created lazily the first time an event is sent. Events are buffered into a fixed-size Vec (currently 1024 items)
4-
//! before being flushed to the central collector.
3+
//! The TL buffer is created lazily the first time an event is sent. Events are encoded directly
4+
//! into a thread-local `Encoder<Vec<u8>>` and flushed to the central collector when the encoded
5+
//! batch reaches the configured batch size (default 1 MB).
56
use crate::telemetry::collector::CentralCollector;
67
use crate::telemetry::events::RawEvent;
8+
use crate::telemetry::format::*;
9+
use dial9_trace_format::InternedString;
10+
use dial9_trace_format::encoder::Encoder;
711
use std::cell::RefCell;
12+
use std::collections::HashMap;
13+
use std::panic::Location;
814
use std::sync::Arc;
915

10-
const BUFFER_CAPACITY: usize = 1024;
16+
/// Default maximum encoded batch size before flushing (64KB).
17+
const DEFAULT_BATCH_SIZE: usize = 63 * 1024;
1118

1219
pub(crate) struct ThreadLocalBuffer {
13-
pub(crate) events: Vec<RawEvent>,
20+
encoder: Encoder<Vec<u8>>,
21+
event_count: usize,
22+
batch_size: usize,
1423
collector: Option<Arc<CentralCollector>>,
24+
location_cache: HashMap<&'static Location<'static>, String>,
1525
}
1626

1727
impl Default for ThreadLocalBuffer {
@@ -22,9 +32,19 @@ impl Default for ThreadLocalBuffer {
2232

2333
impl ThreadLocalBuffer {
2434
fn new() -> Self {
35+
Self::with_batch_size(DEFAULT_BATCH_SIZE)
36+
}
37+
38+
fn with_batch_size(batch_size: usize) -> Self {
2539
Self {
26-
events: Vec::with_capacity(BUFFER_CAPACITY),
40+
// make the Vec 1KB bigger to reduce the risk of reallocating
41+
// TODO: harden this code, we should ensure we never have to re-allocate this buffer.
42+
encoder: Encoder::new_to(Vec::with_capacity(batch_size + 1024))
43+
.expect("Vec::write_all cannot fail"),
44+
event_count: 0,
45+
batch_size,
2746
collector: None,
47+
location_cache: HashMap::new(),
2848
}
2949
}
3050

@@ -36,28 +56,143 @@ impl ThreadLocalBuffer {
3656
}
3757
}
3858

59+
// todo: this is now really "encode tokio event"
60+
fn encode_event(&mut self, event: &RawEvent) {
61+
match event {
62+
RawEvent::PollStart {
63+
timestamp_nanos,
64+
worker_id,
65+
worker_local_queue_depth,
66+
task_id,
67+
location,
68+
} => {
69+
let spawn_loc = self.intern_location(location);
70+
self.encoder.write_infallible(&PollStartEvent {
71+
timestamp_ns: *timestamp_nanos,
72+
worker_id: *worker_id,
73+
local_queue: *worker_local_queue_depth as u8,
74+
task_id: *task_id,
75+
spawn_loc,
76+
});
77+
}
78+
RawEvent::PollEnd {
79+
timestamp_nanos,
80+
worker_id,
81+
} => self.encoder.write_infallible(&PollEndEvent {
82+
timestamp_ns: *timestamp_nanos,
83+
worker_id: *worker_id,
84+
}),
85+
RawEvent::WorkerPark {
86+
timestamp_nanos,
87+
worker_id,
88+
worker_local_queue_depth,
89+
cpu_time_nanos,
90+
} => self.encoder.write_infallible(&WorkerParkEvent {
91+
timestamp_ns: *timestamp_nanos,
92+
worker_id: *worker_id,
93+
local_queue: *worker_local_queue_depth as u8,
94+
cpu_time_ns: *cpu_time_nanos,
95+
}),
96+
RawEvent::WorkerUnpark {
97+
timestamp_nanos,
98+
worker_id,
99+
worker_local_queue_depth,
100+
cpu_time_nanos,
101+
sched_wait_delta_nanos,
102+
} => self.encoder.write_infallible(&WorkerUnparkEvent {
103+
timestamp_ns: *timestamp_nanos,
104+
worker_id: *worker_id,
105+
local_queue: *worker_local_queue_depth as u8,
106+
cpu_time_ns: *cpu_time_nanos,
107+
sched_wait_ns: *sched_wait_delta_nanos,
108+
}),
109+
RawEvent::QueueSample {
110+
timestamp_nanos,
111+
global_queue_depth,
112+
} => self.encoder.write_infallible(&QueueSampleEvent {
113+
timestamp_ns: *timestamp_nanos,
114+
global_queue: *global_queue_depth as u8,
115+
}),
116+
RawEvent::TaskSpawn {
117+
timestamp_nanos,
118+
task_id,
119+
location,
120+
} => {
121+
let spawn_loc = self.intern_location(location);
122+
self.encoder.write_infallible(&TaskSpawnEvent {
123+
timestamp_ns: *timestamp_nanos,
124+
task_id: *task_id,
125+
spawn_loc,
126+
});
127+
}
128+
RawEvent::TaskTerminate {
129+
timestamp_nanos,
130+
task_id,
131+
} => self.encoder.write_infallible(&TaskTerminateEvent {
132+
timestamp_ns: *timestamp_nanos,
133+
task_id: *task_id,
134+
}),
135+
RawEvent::WakeEvent {
136+
timestamp_nanos,
137+
waker_task_id,
138+
woken_task_id,
139+
target_worker,
140+
} => self.encoder.write_infallible(&WakeEventEvent {
141+
timestamp_ns: *timestamp_nanos,
142+
waker_task_id: *waker_task_id,
143+
woken_task_id: *woken_task_id,
144+
target_worker: *target_worker,
145+
}),
146+
RawEvent::CpuSample(data) => {
147+
let thread_name = match &data.thread_name {
148+
Some(name) => self.encoder.intern_string_infallible(name.as_str()),
149+
None => self.encoder.intern_string_infallible("<no thread name>"),
150+
};
151+
self.encoder.write_infallible(&CpuSampleEvent {
152+
timestamp_ns: data.timestamp_nanos,
153+
worker_id: data.worker_id,
154+
tid: data.tid,
155+
source: data.source,
156+
thread_name,
157+
callchain: dial9_trace_format::StackFrames(data.callchain.clone()),
158+
});
159+
}
160+
}
161+
}
162+
163+
fn intern_location(&mut self, location: &'static Location<'static>) -> InternedString {
164+
let s = self
165+
.location_cache
166+
.entry(location)
167+
.or_insert_with(|| location.to_string());
168+
self.encoder.intern_string_infallible(s)
169+
}
170+
39171
fn record_event(&mut self, event: RawEvent) {
40-
self.events.push(event);
172+
self.encode_event(&event);
173+
self.event_count += 1;
41174
}
42175

43176
fn should_flush(&self) -> bool {
44-
self.events.len() >= BUFFER_CAPACITY
177+
self.encoder.bytes_written() as usize >= self.batch_size
45178
}
46179

47-
fn flush(&mut self) -> Vec<RawEvent> {
48-
std::mem::replace(&mut self.events, Vec::with_capacity(BUFFER_CAPACITY))
180+
fn flush(&mut self) -> crate::telemetry::collector::Batch {
181+
let encoded_bytes = self.encoder.reset_to(Vec::with_capacity(self.batch_size));
182+
self.event_count = 0;
183+
crate::telemetry::collector::Batch { encoded_bytes }
49184
}
50185
}
51186

52187
impl Drop for ThreadLocalBuffer {
53188
fn drop(&mut self) {
54-
if !self.events.is_empty() {
189+
if self.event_count > 0 {
55190
if let Some(collector) = self.collector.take() {
56-
collector.accept_flush(std::mem::take(&mut self.events));
191+
collector.accept_flush(self.flush());
57192
} else {
58193
tracing::warn!(
59194
"dial9-tokio-telemetry: dropping {} unflushed events (no collector registered on this thread)",
60-
self.events.len()
195+
self.event_count
61196
);
62197
}
63198
}
@@ -88,9 +223,8 @@ pub(crate) fn record_event(event: RawEvent, collector: &Arc<CentralCollector>) {
88223
pub(crate) fn drain_to_collector(collector: &CentralCollector) {
89224
BUFFER.with(|buf| {
90225
let mut buf = buf.borrow_mut();
91-
let events = buf.flush();
92-
if !events.is_empty() {
93-
collector.accept_flush(events);
226+
if buf.event_count > 0 {
227+
collector.accept_flush(buf.flush());
94228
}
95229
});
96230
}
@@ -108,34 +242,42 @@ mod tests {
108242
#[test]
109243
fn test_buffer_creation() {
110244
let buffer = ThreadLocalBuffer::new();
111-
assert_eq!(buffer.events.len(), 0);
112-
assert_eq!(buffer.events.capacity(), BUFFER_CAPACITY);
245+
assert_eq!(buffer.event_count, 0);
246+
assert_eq!(buffer.batch_size, DEFAULT_BATCH_SIZE);
113247
}
114248

115249
#[test]
116250
fn test_record_event() {
117251
let mut buffer = ThreadLocalBuffer::new();
118252
buffer.record_event(poll_end_event());
119-
assert_eq!(buffer.events.len(), 1);
253+
assert_eq!(buffer.event_count, 1);
254+
assert!(buffer.encoder.bytes_written() > 0);
120255
}
121256

122257
#[test]
123-
fn test_should_flush() {
124-
let mut buffer = ThreadLocalBuffer::new();
258+
fn test_should_flush_respects_batch_size() {
259+
// Use a tiny batch size so a single event triggers flush.
260+
let mut buffer = ThreadLocalBuffer::with_batch_size(1);
125261
assert!(!buffer.should_flush());
126-
for _ in 0..BUFFER_CAPACITY {
127-
buffer.record_event(poll_end_event());
128-
}
262+
buffer.record_event(poll_end_event());
129263
assert!(buffer.should_flush());
130264
}
131265

266+
#[test]
267+
fn test_should_flush_default_batch_size() {
268+
let mut buffer = ThreadLocalBuffer::new();
269+
assert!(!buffer.should_flush());
270+
buffer.record_event(poll_end_event());
271+
// A single small event should not exceed 1 MB.
272+
assert!(!buffer.should_flush());
273+
}
274+
132275
#[test]
133276
fn test_flush() {
134277
let mut buffer = ThreadLocalBuffer::new();
135278
buffer.record_event(poll_end_event());
136-
let flushed = buffer.flush();
137-
assert_eq!(flushed.len(), 1);
138-
assert_eq!(buffer.events.len(), 0);
139-
assert_eq!(buffer.events.capacity(), BUFFER_CAPACITY);
279+
let batch = buffer.flush();
280+
assert!(!batch.encoded_bytes.is_empty());
281+
assert_eq!(buffer.event_count, 0);
140282
}
141283
}

0 commit comments

Comments
 (0)