Skip to content

Commit c6183d4

Browse files
authored
Fix: add explicit monotonic <-> realtime clock sync to trace format (#210)
* fix: emit monotonic/realtime anchors for absolute-time rendering * Use nearest clock-sync anchor for wall time * Clean up
1 parent dc4db19 commit c6183d4

9 files changed

Lines changed: 554 additions & 40 deletions

File tree

dial9-tokio-telemetry/examples/trace_to_fat_jsonl.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ fn to_fat_event(event: &TelemetryEvent, reader: &TraceReader) -> Option<FatEvent
165165
TelemetryEvent::TaskSpawn { .. }
166166
| TelemetryEvent::TaskTerminate { .. }
167167
| TelemetryEvent::ThreadNameDef { .. }
168-
| TelemetryEvent::SegmentMetadata { .. } => None,
168+
| TelemetryEvent::SegmentMetadata { .. }
169+
| TelemetryEvent::ClockSync { .. } => None,
169170
}
170171
}

dial9-tokio-telemetry/src/background_task/sealed.rs

Lines changed: 97 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ pub(crate) struct SealedSegment {
1212
pub(crate) index: u32,
1313
}
1414

15-
/// Segment creation time as epoch seconds, parsed from SegmentMetadata header.
16-
/// Returns `(epoch_secs, true)` if the header was valid, or falls back to
17-
/// file mtime / current time with `(epoch_secs, false)`.
15+
/// Segment creation time as epoch seconds, parsed from the first clock
16+
/// anchor in the trace. Returns `(secs, true)` on a successful parse, or
17+
/// falls back to file mtime / current time with `(secs, false)`.
1818
pub(crate) fn creation_epoch_secs(data: &[u8], path: &Path) -> (u64, bool) {
1919
match parse_segment_timestamp(data) {
2020
Ok(ts) => return (ts / 1_000_000_000, true),
@@ -42,17 +42,32 @@ pub(crate) fn creation_epoch_secs(data: &[u8], path: &Path) -> (u64, bool) {
4242
(secs, false)
4343
}
4444

45-
/// Parse the timestamp (nanos) from the first SegmentMetadata event in a trace segment.
45+
/// Legacy epoch-vs-monotonic discriminator for ambiguous
46+
/// `SegmentMetadataEvent.timestamp_ns` values.
47+
///
48+
/// Pre-clock-sync traces may store epoch nanoseconds there, while new traces
49+
/// use monotonic nanoseconds. Epoch timestamps are ~1e18, while monotonic uptime is much smaller.
50+
/// So we use `2020-01-01` ns as a floor, monotonic would need decades
51+
/// of continuous runtime to reach that range.
52+
///
53+
/// Keep in sync with `LEGACY_EPOCH_NS_FLOOR` in `dial9-viewer/ui/trace_parser.js`.
54+
pub(crate) const LEGACY_EPOCH_NS_FLOOR: u64 = 1_577_836_800_000_000_000;
55+
56+
/// Parse wall-clock creation time from the first `ClockSyncEvent`,
57+
/// or from a legacy `SegmentMetadataEvent.timestamp_ns` that predates clock-sync support.
4658
fn parse_segment_timestamp(data: &[u8]) -> Result<u64, ParseTimestampError> {
4759
use dial9_trace_format::decoder::{DecodedFrameRef, Decoder};
60+
use dial9_trace_format::types::FieldValueRef;
4861

4962
let mut dec = Decoder::new(data).ok_or(ParseTimestampError::InvalidHeader)?;
5063
let mut events_seen = 0;
64+
let mut legacy_fallback: Option<u64> = None;
5165
loop {
5266
match dec.next_frame_ref() {
5367
Ok(Some(DecodedFrameRef::Event {
5468
type_id,
5569
timestamp_ns,
70+
values,
5671
..
5772
})) => {
5873
events_seen += 1;
@@ -61,16 +76,25 @@ fn parse_segment_timestamp(data: &[u8]) -> Result<u64, ParseTimestampError> {
6176
.get(type_id)
6277
.map(|s| s.name.as_str())
6378
.ok_or(ParseTimestampError::UnknownTypeId(type_id.0))?;
64-
if name == "SegmentMetadataEvent" {
65-
return timestamp_ns.ok_or(ParseTimestampError::MissingTimestamp);
79+
if name == "ClockSyncEvent" {
80+
return match values.first() {
81+
Some(FieldValueRef::Varint(v)) => Ok(*v),
82+
_ => Err(ParseTimestampError::MissingRealtimeField),
83+
};
84+
}
85+
if name == "SegmentMetadataEvent"
86+
&& let Some(ts) = timestamp_ns
87+
&& ts >= LEGACY_EPOCH_NS_FLOOR
88+
{
89+
legacy_fallback = Some(ts);
6690
}
6791
if events_seen >= 10 {
68-
return Err(ParseTimestampError::NotFoundInFirst10Events);
92+
return legacy_fallback.ok_or(ParseTimestampError::NoAnchorInFirst10Events);
6993
}
7094
}
7195
Ok(Some(_)) => {} // schema/pool frame, keep going
7296
Ok(None) => {
73-
return Err(ParseTimestampError::EndOfStream { events_seen });
97+
return legacy_fallback.ok_or(ParseTimestampError::EndOfStream { events_seen });
7498
}
7599
Err(e) => {
76100
return Err(ParseTimestampError::DecodeError(e.to_string()));
@@ -83,8 +107,8 @@ fn parse_segment_timestamp(data: &[u8]) -> Result<u64, ParseTimestampError> {
83107
enum ParseTimestampError {
84108
InvalidHeader,
85109
UnknownTypeId(u16),
86-
MissingTimestamp,
87-
NotFoundInFirst10Events,
110+
MissingRealtimeField,
111+
NoAnchorInFirst10Events,
88112
EndOfStream { events_seen: u32 },
89113
DecodeError(String),
90114
}
@@ -94,13 +118,14 @@ impl std::fmt::Display for ParseTimestampError {
94118
match self {
95119
Self::InvalidHeader => write!(f, "invalid trace header"),
96120
Self::UnknownTypeId(id) => write!(f, "unknown type_id {id} not in registry"),
97-
Self::MissingTimestamp => write!(f, "SegmentMetadataEvent had no timestamp"),
98-
Self::NotFoundInFirst10Events => {
99-
write!(f, "SegmentMetadataEvent not found in first 10 events")
100-
}
121+
Self::MissingRealtimeField => write!(f, "ClockSyncEvent had no realtime_ns field"),
122+
Self::NoAnchorInFirst10Events => write!(
123+
f,
124+
"no ClockSyncEvent or legacy wall-clock SegmentMetadataEvent in first 10 events"
125+
),
101126
Self::EndOfStream { events_seen } => write!(
102127
f,
103-
"end of stream after {events_seen} events without SegmentMetadataEvent"
128+
"end of stream after {events_seen} events without a clock anchor"
104129
),
105130
Self::DecodeError(e) => write!(f, "decode error: {e}"),
106131
}
@@ -330,6 +355,63 @@ mod tests {
330355
check!(now_nanos.abs_diff(ts) < 60_000_000_000);
331356
}
332357

358+
fn legacy_segment_metadata_trace(timestamp_ns: u64) -> Vec<u8> {
359+
use crate::telemetry::format::{SegmentMetadataEvent, WorkerParkEvent};
360+
use dial9_trace_format::encoder::Encoder;
361+
362+
let mut enc = Encoder::new_to(Vec::new()).unwrap();
363+
enc.write(&SegmentMetadataEvent {
364+
timestamp_ns,
365+
entries: vec![("k".into(), "v".into())],
366+
})
367+
.unwrap();
368+
enc.write_infallible(&WorkerParkEvent {
369+
timestamp_ns: 1_000_000_000,
370+
worker_id: crate::telemetry::format::WorkerId::from(0usize),
371+
local_queue: 0,
372+
cpu_time_ns: 0,
373+
});
374+
enc.into_inner()
375+
}
376+
377+
/// Legacy pre-clock-sync files stored wall clock in `SegmentMetadataEvent.timestamp_ns`,
378+
/// parser fallback should recover it.
379+
#[test]
380+
fn test_parse_segment_timestamp_legacy_segment_metadata_fallback() {
381+
// 2024-06-01T00:00:00Z: comfortably epoch-scale.
382+
const LEGACY_WALL_NS: u64 = 1_717_200_000_000_000_000;
383+
let data = legacy_segment_metadata_trace(LEGACY_WALL_NS);
384+
385+
let parsed = parse_segment_timestamp(&data).unwrap();
386+
check!(parsed == LEGACY_WALL_NS);
387+
}
388+
389+
#[test]
390+
fn test_creation_epoch_secs_uses_legacy_segment_metadata_fallback() {
391+
const LEGACY_WALL_NS: u64 = 1_717_200_000_000_000_000;
392+
let data = legacy_segment_metadata_trace(LEGACY_WALL_NS);
393+
let dir = TempDir::new().unwrap();
394+
let path = dir.path().join("legacy.0.bin");
395+
std::fs::write(&path, &data).unwrap();
396+
let (epoch_secs, header_valid) = creation_epoch_secs(&data, &path);
397+
check!(header_valid);
398+
check!(epoch_secs == LEGACY_WALL_NS / 1_000_000_000);
399+
}
400+
401+
/// A small (monotonic-looking) SegmentMeta.timestamp_ns must NOT be
402+
/// mistaken for a legacy wall-clock value.
403+
#[test]
404+
fn test_parse_segment_timestamp_monotonic_segment_metadata_is_not_legacy() {
405+
let data = legacy_segment_metadata_trace(12_345);
406+
407+
let result = parse_segment_timestamp(&data);
408+
check!(matches!(
409+
result,
410+
Err(ParseTimestampError::EndOfStream { .. })
411+
| Err(ParseTimestampError::NoAnchorInFirst10Events)
412+
));
413+
}
414+
333415
#[test]
334416
fn parse_segment_index_valid() {
335417
check!(parse_segment_index("trace.0.bin", "trace") == Some(0));

dial9-tokio-telemetry/src/telemetry/analysis.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ impl TraceReader {
8888
TelemetryEvent::TaskSpawn { .. }
8989
| TelemetryEvent::ThreadNameDef { .. }
9090
| TelemetryEvent::SegmentMetadata { .. }
91+
| TelemetryEvent::ClockSync { .. }
9192
)
9293
})
9394
.cloned()
@@ -297,7 +298,8 @@ pub fn analyze_trace(events: &[TelemetryEvent]) -> TraceAnalysis {
297298
| TelemetryEvent::CpuSample { .. }
298299
| TelemetryEvent::ThreadNameDef { .. }
299300
| TelemetryEvent::WakeEvent { .. }
300-
| TelemetryEvent::SegmentMetadata { .. } => {}
301+
| TelemetryEvent::SegmentMetadata { .. }
302+
| TelemetryEvent::ClockSync { .. } => {}
301303
}
302304
}
303305

dial9-tokio-telemetry/src/telemetry/events.rs

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,16 @@ pub enum TelemetryEvent {
188188
/// Key-value metadata pairs.
189189
entries: Vec<(String, String)>,
190190
},
191+
/// Clock-correlation anchor pairing a monotonic timestamp with the
192+
/// wall-clock value captured at the same instant.
193+
ClockSync {
194+
/// Monotonic nanoseconds.
195+
#[serde(rename = "timestamp_ns")]
196+
timestamp_nanos: u64,
197+
/// Nanoseconds since the Unix epoch.
198+
#[serde(rename = "realtime_ns")]
199+
realtime_nanos: u64,
200+
},
191201
}
192202

193203
impl TelemetryEvent {
@@ -224,6 +234,9 @@ impl TelemetryEvent {
224234
TelemetryEvent::ThreadNameDef { .. } => None,
225235
TelemetryEvent::SegmentMetadata {
226236
timestamp_nanos, ..
237+
}
238+
| TelemetryEvent::ClockSync {
239+
timestamp_nanos, ..
227240
} => Some(*timestamp_nanos),
228241
}
229242
}
@@ -241,7 +254,8 @@ impl TelemetryEvent {
241254
| TelemetryEvent::TaskTerminate { .. }
242255
| TelemetryEvent::ThreadNameDef { .. }
243256
| TelemetryEvent::WakeEvent { .. }
244-
| TelemetryEvent::SegmentMetadata { .. } => None,
257+
| TelemetryEvent::SegmentMetadata { .. }
258+
| TelemetryEvent::ClockSync { .. } => None,
245259
}
246260
}
247261

@@ -367,15 +381,49 @@ pub fn clock_monotonic_ns() -> u64 {
367381
ts.tv_sec as u64 * 1_000_000_000 + ts.tv_nsec as u64
368382
}
369383

384+
/// `CLOCK_MONOTONIC` in nanoseconds. Non-Linux fallback: elapsed time
385+
/// since the first call on this process via `Instant`.
370386
#[cfg(not(target_os = "linux"))]
371387
pub fn clock_monotonic_ns() -> u64 {
372-
// Fallback: use Instant. This is fine for non-Linux where perf isn't available.
373388
use std::sync::OnceLock;
374389
use std::time::Instant;
375390
static EPOCH: OnceLock<Instant> = OnceLock::new();
376391
EPOCH.get_or_init(Instant::now).elapsed().as_nanos() as u64
377392
}
378393

394+
/// `CLOCK_REALTIME` in nanoseconds since the Unix epoch.
395+
#[cfg(target_os = "linux")]
396+
pub(crate) fn clock_realtime_ns() -> u64 {
397+
let mut ts = libc::timespec {
398+
tv_sec: 0,
399+
tv_nsec: 0,
400+
};
401+
unsafe {
402+
libc::clock_gettime(libc::CLOCK_REALTIME, &mut ts);
403+
}
404+
ts.tv_sec as u64 * 1_000_000_000 + ts.tv_nsec as u64
405+
}
406+
407+
#[cfg(not(target_os = "linux"))]
408+
pub(crate) fn clock_realtime_ns() -> u64 {
409+
use std::time::{SystemTime, UNIX_EPOCH};
410+
SystemTime::now()
411+
.duration_since(UNIX_EPOCH)
412+
.map(|d| d.as_nanos() as u64)
413+
.unwrap_or(0)
414+
}
415+
416+
/// Snapshot `(monotonic_ns, realtime_ns)` as close together as possible.
417+
/// Reads M₁ -> R -> M₂ and pairs `R` with the midpoint of M₁ and M₂ so
418+
/// the correlation error is half the `clock_gettime` interval.
419+
pub(crate) fn clock_pair() -> (u64, u64) {
420+
let m1 = clock_monotonic_ns();
421+
let r = clock_realtime_ns();
422+
let m2 = clock_monotonic_ns();
423+
let mono = m1 + m2.saturating_sub(m1) / 2;
424+
(mono, r)
425+
}
426+
379427
/// Per-thread scheduler stats from `/proc/<pid>/task/<tid>/schedstat`.
380428
/// Fields: run_time_ns wait_time_ns timeslices
381429
#[derive(Debug, Clone, Copy, Default)]

dial9-tokio-telemetry/src/telemetry/format.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,19 @@ pub(crate) struct SegmentMetadataEvent {
215215
pub entries: Vec<(String, String)>,
216216
}
217217

218+
/// Clock-correlation anchor. `timestamp_ns` (monotonic) and `realtime_ns`
219+
/// (nanoseconds since Unix epoch) are captured at the same instant via
220+
/// [`clock_pair`], so offline consumers can recover wall clock from the
221+
/// monotonic event stream.
222+
///
223+
/// [`clock_pair`]: crate::telemetry::events::clock_pair
224+
#[derive(TraceEvent)]
225+
pub(crate) struct ClockSyncEvent {
226+
#[traceevent(timestamp)]
227+
pub timestamp_ns: u64,
228+
pub realtime_ns: u64,
229+
}
230+
218231
// ── dial9-trace-format: decode ──────────────────────────────────────────────
219232

220233
/// Decode all events from a `dial9-trace-format` byte slice into `TelemetryEvent`s.
@@ -254,6 +267,7 @@ pub(crate) enum TelemetryEventRef<'a> {
254267
CpuSample(CpuSampleEventRef<'a>),
255268
WakeEvent(WakeEventEventRef<'a>),
256269
SegmentMetadata(SegmentMetadataEventRef<'a>),
270+
ClockSync(ClockSyncEventRef<'a>),
257271
}
258272

259273
#[cfg(any(feature = "analysis", test))]
@@ -272,6 +286,7 @@ impl<'a> TelemetryEventRef<'a> {
272286
Self::CpuSample(e) => Some(e.timestamp_ns),
273287
Self::WakeEvent(e) => Some(e.timestamp_ns),
274288
Self::SegmentMetadata(e) => Some(e.timestamp_ns),
289+
Self::ClockSync(e) => Some(e.timestamp_ns),
275290
}
276291
}
277292
}
@@ -314,6 +329,9 @@ pub(crate) fn decode_ref<'a>(
314329
"SegmentMetadataEvent" => {
315330
TelemetryEventRef::SegmentMetadata(SegmentMetadataEvent::decode(timestamp_ns, fields)?)
316331
}
332+
"ClockSyncEvent" => {
333+
TelemetryEventRef::ClockSync(ClockSyncEvent::decode(timestamp_ns, fields)?)
334+
}
317335
_ => return None,
318336
})
319337
}
@@ -392,5 +410,9 @@ pub(crate) fn to_owned_event(r: TelemetryEventRef<'_>, pool: &StringPool) -> Tel
392410
.map(|(k, v)| (k.to_owned(), v.to_owned()))
393411
.collect(),
394412
},
413+
TelemetryEventRef::ClockSync(e) => TelemetryEvent::ClockSync {
414+
timestamp_nanos: e.timestamp_ns,
415+
realtime_nanos: e.realtime_ns,
416+
},
395417
}
396418
}

0 commit comments

Comments
 (0)