-
Notifications
You must be signed in to change notification settings - Fork 25
Expand file tree
/
Copy pathrotation_time_alignment.rs
More file actions
258 lines (233 loc) · 9.68 KB
/
rotation_time_alignment.rs
File metadata and controls
258 lines (233 loc) · 9.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
//! Verify that rotated trace segments contain events from non-overlapping
//! (or minimally overlapping) time ranges.
//!
//! When rotation and flushing are properly coordinated, each segment should
//! contain events from a contiguous time window. Adjacent segments may overlap
//! by at most a small tolerance (e.g. 2 seconds) due to in-flight batches.
//!
//! This test uses a short rotation period (2s) and generates continuous events
//! across multiple workers to exercise the rotation/flush coordination path.
//!
//! The test is built on `TelemetryCore` so we can attach a metrics sink to the
//! flush thread and inspect its metrics when the test fails.
use dial9_tokio_telemetry::telemetry::{RotatingWriter, TelemetryCore, TelemetryEvent};
use metrique::local::{LocalFormat, OutputStyle};
use std::time::Duration;
/// Maximum allowed overlap between adjacent segments in seconds.
const MAX_OVERLAP_SECS: f64 = 2.0;
#[test]
fn rotated_segments_have_bounded_time_overlap() {
let dir = tempfile::tempdir().unwrap();
let trace_path = dir.path().join("trace.bin");
let rotation_period = Duration::from_secs(2);
let num_workers = 4;
let writer = RotatingWriter::builder()
.base_path(&trace_path)
.max_file_size(u64::MAX) // only time-based rotation
.max_total_size(u64::MAX)
.rotation_period(rotation_period)
.build()
.unwrap();
// Set up a metrics sink so we can capture flush-thread metrics for debugging.
let (render_queue, metrics_sink) =
metrique_writer::test_util::render_entry_sink(LocalFormat::new(OutputStyle::Pretty));
// Build the telemetry session via TelemetryCore so we get flush-thread
// metrics through the worker_metrics_sink.
let guard = TelemetryCore::builder()
.writer(writer)
.worker_metrics_sink(metrics_sink)
.build()
.unwrap();
guard.enable();
let mut builder = tokio::runtime::Builder::new_multi_thread();
builder.worker_threads(num_workers).enable_all();
let (runtime, _handle) = guard.trace_runtime("main").build(builder).unwrap();
// Generate continuous events across multiple rotation boundaries.
// With a 2s rotation period and 15s runtime, we should get 5+ segments
// even if the runtime takes a few seconds to start producing events.
runtime.block_on(async {
let start = tokio::time::Instant::now();
let target_duration = Duration::from_secs(15);
let mut handles = Vec::new();
for _ in 0..num_workers {
handles.push(tokio::spawn(async move {
let start = tokio::time::Instant::now();
while start.elapsed() < target_duration {
tokio::task::yield_now().await;
tokio::time::sleep(Duration::from_millis(20)).await;
}
}));
}
for h in handles {
h.await.unwrap();
}
// Ensure we've actually waited the full duration
let elapsed = start.elapsed();
if elapsed < target_duration {
tokio::time::sleep(target_duration - elapsed).await;
}
});
drop(runtime);
guard
.graceful_shutdown(Duration::from_secs(5))
.expect("graceful shutdown");
// Dump flush-thread metrics so they appear in test output on failure.
let flush_metrics = render_queue.entries();
eprintln!("flush-thread metrics ({} entries):", flush_metrics.len());
for entry in &flush_metrics {
eprintln!("{entry}");
}
// Collect all sealed segment files, sorted by index.
let mut segment_files: Vec<_> = std::fs::read_dir(dir.path())
.unwrap()
.filter_map(|e| e.ok())
.map(|e| e.path())
.filter(|p| {
p.extension().is_some_and(|ext| ext == "bin")
&& !p
.file_name()
.unwrap()
.to_str()
.unwrap()
.ends_with(".active")
})
.collect();
segment_files.sort();
assert!(
segment_files.len() >= 3,
"expected at least 3 rotated segments, got {}. Files: {:?}",
segment_files.len(),
segment_files
);
// For each segment, decode events and compute (min_timestamp, max_timestamp)
// from non-metadata events. Keep the events around for diagnostics on failure.
let segment_events: Vec<Vec<TelemetryEvent>> = segment_files
.iter()
.map(|path| {
let data = std::fs::read(path).unwrap();
dial9_tokio_telemetry::analysis_unstable::decode_events(&data).unwrap()
})
.collect();
let segment_ranges: Vec<(u64, u64)> = segment_events
.iter()
.enumerate()
.map(|(i, events)| {
let timestamps: Vec<u64> = events
.iter()
.filter(|e| !matches!(e, TelemetryEvent::SegmentMetadata { .. }))
.filter_map(|e| e.timestamp_nanos())
.collect();
assert!(
!timestamps.is_empty(),
"segment {} has no timestamped events",
segment_files[i].display()
);
let min = *timestamps.iter().min().unwrap();
let max = *timestamps.iter().max().unwrap();
(min, max)
})
.collect();
// Validate: adjacent segments should have bounded overlap.
// Skip the last boundary — the final segment is the shutdown dump where
// all TL buffers are drained at once, so it inherently contains events
// spanning the entire last drain interval.
let mut max_observed_overlap = Duration::ZERO;
let non_final_boundaries = if segment_ranges.len() >= 3 {
segment_ranges.len() - 2
} else {
// With only 2 segments we can't skip the final boundary,
// but we still have at least 1 boundary to check.
assert!(
segment_ranges.len() >= 2,
"need at least 2 segments to validate overlap, got {}",
segment_ranges.len()
);
1
};
for i in 0..non_final_boundaries {
let (_min_a, max_a) = segment_ranges[i];
let (min_b, _max_b) = segment_ranges[i + 1];
// Overlap = how much of segment A's tail extends past segment B's start.
// If max_a > min_b, there's overlap.
let overlap = if max_a > min_b {
Duration::from_nanos(max_a - min_b)
} else {
Duration::ZERO
};
if overlap > max_observed_overlap {
max_observed_overlap = overlap;
}
let overlap_secs = overlap.as_secs_f64();
eprintln!(
"segments {i} → {}: overlap = {:.3}s (segment {i}: [{:.3}s, {:.3}s], segment {}: [{:.3}s, {:.3}s])",
i + 1,
overlap_secs,
segment_ranges[i].0 as f64 / 1e9,
segment_ranges[i].1 as f64 / 1e9,
i + 1,
segment_ranges[i + 1].0 as f64 / 1e9,
segment_ranges[i + 1].1 as f64 / 1e9,
);
if overlap_secs > MAX_OVERLAP_SECS {
// Collect event types from segment A that bleed past segment B's start
let late_in_a: Vec<_> = segment_events[i]
.iter()
.filter(|e| !matches!(e, TelemetryEvent::SegmentMetadata { .. }))
.filter(|e| e.timestamp_nanos().is_some_and(|t| t > min_b))
.map(|e| event_type_name(e))
.collect();
// Collect event types from segment B that precede segment A's end
let early_in_b: Vec<_> = segment_events[i + 1]
.iter()
.filter(|e| !matches!(e, TelemetryEvent::SegmentMetadata { .. }))
.filter(|e| e.timestamp_nanos().is_some_and(|t| t < max_a))
.map(|e| event_type_name(e))
.collect();
panic!(
"segment {i} → {} overlap is {:.3}s, exceeds {MAX_OVERLAP_SECS}s tolerance. \
Segment {i} max={}, segment {} min={}\n\
Events in segment {i} past segment {} start ({} events): {:?}\n\
Events in segment {} before segment {i} end ({} events): {:?}\n\
Flush-thread metrics ({} entries):\n{}",
i + 1,
overlap_secs,
max_a,
i + 1,
min_b,
i + 1,
late_in_a.len(),
late_in_a,
i + 1,
early_in_b.len(),
early_in_b,
flush_metrics.len(),
flush_metrics.join("\n"),
);
}
}
eprintln!(
"max observed overlap: {:.3}s across {} non-final segment boundaries",
max_observed_overlap.as_secs_f64(),
non_final_boundaries
);
}
fn event_type_name(event: &TelemetryEvent) -> &'static str {
match event {
TelemetryEvent::PollStart { .. } => "PollStart",
TelemetryEvent::PollEnd { .. } => "PollEnd",
TelemetryEvent::WorkerPark { .. } => "WorkerPark",
TelemetryEvent::WorkerUnpark { .. } => "WorkerUnpark",
TelemetryEvent::QueueSample { .. } => "QueueSample",
TelemetryEvent::TaskSpawn { .. } => "TaskSpawn",
TelemetryEvent::TaskTerminate { .. } => "TaskTerminate",
TelemetryEvent::CpuSample { .. } => "CpuSample",
TelemetryEvent::TaskDump { .. } => "TaskDump",
TelemetryEvent::Alloc { .. } => "Alloc",
TelemetryEvent::Free { .. } => "Free",
TelemetryEvent::ThreadNameDef { .. } => "ThreadNameDef",
TelemetryEvent::WakeEvent { .. } => "WakeEvent",
TelemetryEvent::SegmentMetadata { .. } => "SegmentMetadata",
TelemetryEvent::ClockSync { .. } => "ClockSync",
TelemetryEvent::Custom { .. } => "Custom",
}
}