diff --git a/Cargo.lock b/Cargo.lock index b317929e..9149e958 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1430,6 +1430,7 @@ dependencies = [ "hostname", "libc", "metrique", + "metrique-timesource", "metrique-writer", "nix", "pin-project-lite", @@ -2693,6 +2694,9 @@ name = "metrique-timesource" version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d607939211e4eaaa8cd35394fa5e57faffb7390d0ac513b39992edcaf3cc526c" +dependencies = [ + "tokio", +] [[package]] name = "metrique-writer" diff --git a/dial9-tokio-telemetry/Cargo.toml b/dial9-tokio-telemetry/Cargo.toml index 8210e42d..7b88b9d7 100644 --- a/dial9-tokio-telemetry/Cargo.toml +++ b/dial9-tokio-telemetry/Cargo.toml @@ -38,6 +38,7 @@ flate2 = { version = "1" } time = { version = "0.3", features = ["formatting", "macros"], optional = true } metrique-writer = "0.1" metrique = { version = "0.1.23", features = ["local-format"] } +metrique-timesource = "0.1" [features] analysis = [] @@ -50,7 +51,9 @@ assert2 = { workspace = true } criterion = "0.5" clap = { version = "4", features = ["derive"] } hdrhistogram = "7" +metrique-timesource = { version = "0.1", features = ["custom-timesource", "tokio"] } metrique-writer = { version = "0.1", features = ["test-util"] } +tokio = { version = "1.51.0", features = ["test-util"] } proptest = "1" tempfile = "3" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/dial9-tokio-telemetry/README.md b/dial9-tokio-telemetry/README.md index 484e93af..30a11511 100644 --- a/dial9-tokio-telemetry/README.md +++ b/dial9-tokio-telemetry/README.md @@ -32,6 +32,7 @@ fn main() -> std::io::Result<()> { .base_path("/tmp/my_traces/trace.bin") .max_file_size(1024 * 1024) // rotate after 1 MiB per file .max_total_size(5 * 1024 * 1024) // keep at most 5 MiB on disk + // .rotation_period(std::time::Duration::from_secs(300)) // optional: rotate every 5 min (default: 60 s) .build()?; let mut builder = tokio::runtime::Builder::new_multi_thread(); @@ -49,6 +50,8 @@ fn main() -> std::io::Result<()> { Events are 6–16 bytes on the wire, and a typical request generates ~20–35 bytes of trace data (a few poll events plus park/unpark). At 10k requests/sec that's well under 1 MB/s — `RotatingWriter` caps total disk usage so you can leave it running indefinitely. Typical CPU overhead is under 5%. +Segments rotate on size *or* time, whichever comes first. Time boundaries are wall-clock-aligned (e.g. a 60 s period rotates at the top of each minute), which produces clean S3 key paths when using the `worker-s3` feature. + ## Can I use this in prod? dial9-tokio-telemetry is designed for always-on production use, but it's still early software. Measure overhead and validate behavior in your environment before deploying to production. @@ -202,7 +205,7 @@ handle.disable(); ### Writers -`RotatingWriter` rotates files and evicts old ones to stay within a total size budget. For quick experiments, `RotatingWriter::single_file(path)` writes a single file with no rotation. +`RotatingWriter` rotates files based on size and time, and evicts old ones to stay within a total size budget. By default, segments rotate every 60 seconds (wall-clock-aligned) or when they exceed `max_file_size`, whichever comes first. For quick experiments, `RotatingWriter::single_file(path)` writes a single file with no rotation. ### Analyzing traces diff --git a/dial9-tokio-telemetry/src/telemetry/writer.rs b/dial9-tokio-telemetry/src/telemetry/writer.rs index e5714854..8da9547e 100644 --- a/dial9-tokio-telemetry/src/telemetry/writer.rs +++ b/dial9-tokio-telemetry/src/telemetry/writer.rs @@ -6,6 +6,9 @@ use std::collections::VecDeque; use std::fs::{self, File}; use std::io::BufWriter; use std::path::{Path, PathBuf}; +use std::time::{Duration, SystemTime}; + +use metrique_timesource::time_source; /// Trait for writing encoded telemetry batches to a destination. pub trait TraceWriter: Send { @@ -76,10 +79,18 @@ impl TraceWriter for NullWriter { } } -/// A writer that rotates trace files to bound disk usage. +/// Default rotation period: 1 minute. +const DEFAULT_ROTATION_PERIOD: Duration = Duration::from_secs(60); + +/// A writer that rotates trace files to bound disk usage and time. +/// +/// Rotation triggers when *either* condition is met: +/// - `max_file_size`: the current file exceeds this many bytes +/// - `rotation_period`: a wall-clock-aligned time boundary is crossed +/// (default: 1 minute, aligned to round minute boundaries) /// -/// - `max_file_size`: rotate to a new file when the current file exceeds this size -/// - `max_total_size`: delete oldest files when total size across all files exceeds this +/// `max_total_size` controls eviction: oldest files are deleted when total +/// size across all files exceeds this budget. /// /// Files are named `{base_path}.0.bin`, `{base_path}.1.bin`, etc. /// Each file is a self-contained trace with its own header. @@ -87,6 +98,11 @@ pub struct RotatingWriter { base_path: PathBuf, max_file_size: u64, max_total_size: u64, + /// How often to rotate based on wall-clock time. `Duration::MAX` disables + /// time-based rotation (used by `single_file()`). + rotation_period: Duration, + /// The next wall-clock instant at which time-based rotation should fire. + next_rotation_time: SystemTime, /// Tracks (path, size) of closed files oldest-first. The active file is /// not in this list — its size comes from `encoder.bytes_written()`. closed_files: VecDeque<(PathBuf, u64)>, @@ -134,7 +150,13 @@ impl RotatingWriter { max_file_size: u64, max_total_size: u64, ) -> std::io::Result { - Self::create(base_path, max_file_size, max_total_size, Vec::new()) + Self::create( + base_path, + max_file_size, + max_total_size, + DEFAULT_ROTATION_PERIOD, + Vec::new(), + ) } /// Create a `RotatingWriterBuilder` for advanced configuration. @@ -143,12 +165,17 @@ impl RotatingWriter { base_path: impl Into, max_file_size: u64, max_total_size: u64, + /// How often to rotate based on wall-clock time, aligned to round + /// boundaries (e.g. a 60 s period rotates at the top of each minute). + /// Defaults to 60 seconds. + rotation_period: Option, segment_metadata: Option>, ) -> std::io::Result { Self::create( base_path, max_file_size, max_total_size, + rotation_period.unwrap_or(DEFAULT_ROTATION_PERIOD), segment_metadata.unwrap_or_default(), ) } @@ -157,8 +184,12 @@ impl RotatingWriter { base_path: impl Into, max_file_size: u64, max_total_size: u64, + rotation_period: Duration, segment_metadata: Vec<(String, String)>, ) -> std::io::Result { + if rotation_period == Duration::from_secs(0) { + return Err(std::io::Error::other("Rotation period must not be zero")); + } let base_path = base_path.into(); if let Some(parent) = base_path.parent() { fs::create_dir_all(parent)?; @@ -172,6 +203,11 @@ impl RotatingWriter { base_path, max_file_size, max_total_size, + rotation_period, + next_rotation_time: Self::next_boundary( + time_source().system_time().as_std(), + rotation_period, + ), closed_files: VecDeque::new(), active_path: first_path, state: WriterState::Active(raw), @@ -189,6 +225,7 @@ impl RotatingWriter { /// and gzip it to `{stem}.0.bin.gz`. /// /// Note: This API does not allow the ability to provide custom segment metadata. + /// Time-based rotation is disabled. pub fn single_file(path: impl Into) -> std::io::Result { let path = path.into(); if let Some(parent) = path.parent() { @@ -203,6 +240,11 @@ impl RotatingWriter { base_path: path, max_file_size: u64::MAX, max_total_size: u64::MAX, + rotation_period: Duration::MAX, + next_rotation_time: Self::next_boundary( + time_source().system_time().as_std(), + Duration::MAX, + ), closed_files: VecDeque::new(), active_path, state: WriterState::Active(raw), @@ -232,7 +274,8 @@ impl RotatingWriter { ) -> std::io::Result>> { let mut encoder = Encoder::new_to(writer)?; let entries = segment_metadata.to_vec(); - let timestamp_ns = std::time::SystemTime::now() + let timestamp_ns = time_source() + .system_time() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_nanos() as u64; @@ -251,7 +294,8 @@ impl RotatingWriter { return Ok(()); }; let entries = self.segment_metadata.clone(); - let timestamp_ns = std::time::SystemTime::now() + let timestamp_ns = time_source() + .system_time() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_nanos() as u64; @@ -277,6 +321,28 @@ impl RotatingWriter { parent.join(format!("{}.{}.bin.active", stem, index)) } + /// Compute the next wall-clock-aligned rotation boundary after `now`. + /// + /// For a 60 s period, if `now` is 14:03:22 the result is 14:04:00. + /// Returns a far-future time when `period` is `Duration::MAX` (time + /// rotation disabled). + fn next_boundary(now: SystemTime, period: Duration) -> SystemTime { + if period == Duration::MAX { + // ~year 2554 — far enough to never trigger, small enough to not overflow. + return SystemTime::UNIX_EPOCH + Duration::from_secs(u32::MAX as u64 * 4); + } + let epoch_dur = now + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default(); + let period_nanos = period.as_nanos(); + if period_nanos == 0 { + return now; + } + let epoch_nanos = epoch_dur.as_nanos(); + let next_nanos = ((epoch_nanos / period_nanos) + 1) * period_nanos; + SystemTime::UNIX_EPOCH + Duration::from_nanos(next_nanos as u64) + } + fn rotate(&mut self) -> std::io::Result<()> { let WriterState::Active(raw) = &mut self.state else { return Ok(()); @@ -299,6 +365,8 @@ impl RotatingWriter { self.active_path = new_path; self.did_rotate = true; self.has_real_events = false; + self.next_rotation_time = + Self::next_boundary(time_source().system_time().as_std(), self.rotation_period); tracing::info!( segment_index = self.next_index - 1, @@ -367,12 +435,17 @@ impl RotatingWriter { Ok(()) } - /// Rotate if the current file exceeds max_file_size. + /// Rotate if the current file exceeds max_file_size or the wall-clock + /// rotation boundary has been crossed. /// Called after writing a complete logical unit (def + event). fn maybe_rotate(&mut self) -> std::io::Result<()> { - if let WriterState::Active(raw) = &self.state - && raw.bytes_written() > self.max_file_size - { + let WriterState::Active(raw) = &self.state else { + return Ok(()); + }; + let size_trigger = raw.bytes_written() > self.max_file_size; + let time_trigger = + self.has_real_events && time_source().system_time() >= self.next_rotation_time; + if size_trigger || time_trigger { self.rotate()?; } Ok(()) @@ -442,6 +515,13 @@ impl TraceWriter for RotatingWriter { return Ok(()); }; if batch.event_count > 0 { + let now = time_source().system_time(); + // If the time boundary expired while the segment was empty, + // advance it so the incoming event starts a fresh window rather + // than being immediately rotated out as a single-event segment. + if !self.has_real_events && now >= self.next_rotation_time { + self.next_rotation_time = Self::next_boundary(now.as_std(), self.rotation_period); + } // Raw-copy the thread-local batch. Each batch is self-contained // (starts with its own header), so the next batch's header acts as // the reset frame for decoders. @@ -1197,4 +1277,278 @@ mod tests { // The .bin.gz file should have been cleaned up by eviction. assert!(!seg0_gz.exists(), "trace.0.bin.gz should have been evicted"); } + + // ---- Time-based rotation tests ---- + + #[test] + fn test_next_boundary_aligns_to_minute() { + use std::time::{Duration, SystemTime}; + // 2026-01-01 14:03:22 UTC → epoch 1767272602 + let now = SystemTime::UNIX_EPOCH + Duration::from_secs(1_767_272_602); + let period = Duration::from_secs(60); + let boundary = RotatingWriter::next_boundary(now, period); + // Should align to 14:04:00 → epoch 1767272640 + let expected = SystemTime::UNIX_EPOCH + Duration::from_secs(1_767_272_640); + assert_eq!(boundary, expected); + } + + #[test] + fn test_next_boundary_at_exact_boundary() { + use std::time::{Duration, SystemTime}; + // Exactly on a minute boundary: 14:03:00 → epoch 1767272580 + let now = SystemTime::UNIX_EPOCH + Duration::from_secs(1_767_272_580); + let period = Duration::from_secs(60); + let boundary = RotatingWriter::next_boundary(now, period); + // Should advance to 14:04:00 + let expected = SystemTime::UNIX_EPOCH + Duration::from_secs(1_767_272_640); + assert_eq!(boundary, expected); + } + + #[test] + fn test_next_boundary_5_minute_alignment() { + use std::time::{Duration, SystemTime}; + // 14:03:22 with 5-minute period + let now = SystemTime::UNIX_EPOCH + Duration::from_secs(1_767_272_602); + let period = Duration::from_secs(300); + let boundary = RotatingWriter::next_boundary(now, period); + // Should align to 14:05:00 → epoch 1767272700 + let expected = SystemTime::UNIX_EPOCH + Duration::from_secs(1_767_272_700); + assert_eq!(boundary, expected); + } + + #[test] + fn test_next_boundary_duration_max_returns_far_future() { + use std::time::{Duration, SystemTime}; + let now = SystemTime::now(); + let boundary = RotatingWriter::next_boundary(now, Duration::MAX); + // Should be far in the future — never triggers + assert!(boundary > now + Duration::from_secs(86400 * 365 * 100)); + } + + #[tokio::test(start_paused = true)] + async fn test_time_rotation_triggers_on_expired_boundary() { + use metrique_timesource::{TimeSource, tokio::set_time_source_for_current_runtime}; + let _guard = set_time_source_for_current_runtime(TimeSource::tokio(std::time::UNIX_EPOCH)); + + let dir = TempDir::new().unwrap(); + let base = dir.path().join("trace"); + let mut writer = RotatingWriter::builder() + .base_path(&base) + .max_file_size(u64::MAX) + .max_total_size(100_000) + .rotation_period(Duration::from_secs(60)) + .build() + .unwrap(); + + writer.write_encoded_batch(&test_batch()).unwrap(); + writer.flush().unwrap(); + let initial_index = writer.next_index; + + // Advance past the 60s boundary + tokio::time::advance(Duration::from_secs(61)).await; + + writer.write_encoded_batch(&test_batch()).unwrap(); + writer.flush().unwrap(); + + assert!( + writer.next_index > initial_index, + "expected time-based rotation to trigger" + ); + writer.finalize().unwrap(); + + let total: usize = (0..10) + .map(|i| { + let f = rotating_file(&base, i); + if std::path::Path::new(&f).exists() { + read_trace_events(&f).len() + } else { + 0 + } + }) + .sum(); + assert_eq!(total, 2); + } + + #[tokio::test(start_paused = true)] + async fn test_time_rotation_skips_when_no_real_events() { + use metrique_timesource::{TimeSource, tokio::set_time_source_for_current_runtime}; + let _guard = set_time_source_for_current_runtime(TimeSource::tokio(std::time::UNIX_EPOCH)); + + let dir = TempDir::new().unwrap(); + let base = dir.path().join("trace"); + let mut writer = RotatingWriter::builder() + .base_path(&base) + .max_file_size(u64::MAX) + .max_total_size(100_000) + .rotation_period(Duration::from_secs(60)) + .build() + .unwrap(); + + // Advance past the boundary without writing any events + tokio::time::advance(Duration::from_secs(120)).await; + + let empty_batch = Batch { + encoded_bytes: vec![], + event_count: 0, + }; + writer.write_encoded_batch(&empty_batch).unwrap(); + + assert_eq!( + writer.next_index, 1, + "should not rotate when no real events exist" + ); + writer.finalize().unwrap(); + } + + #[test] + fn test_size_rotation_still_works_with_time_disabled() { + let dir = TempDir::new().unwrap(); + let base = dir.path().join("trace"); + let one_event = single_event_file_size(); + let mut writer = RotatingWriter::builder() + .base_path(&base) + .max_file_size(one_event) + .max_total_size(100_000) + .rotation_period(std::time::Duration::MAX) + .build() + .unwrap(); + + for _ in 0..3 { + writer.write_encoded_batch(&test_batch()).unwrap(); + } + writer.finalize().unwrap(); + + let total: usize = (0..10) + .map(|i| { + let f = rotating_file(&base, i); + if std::path::Path::new(&f).exists() { + read_trace_events(&f).len() + } else { + 0 + } + }) + .sum(); + assert_eq!(total, 3); + } + + #[tokio::test(start_paused = true)] + async fn test_time_rotation_respects_eviction_budget() { + use metrique_timesource::{TimeSource, tokio::set_time_source_for_current_runtime}; + let _guard = set_time_source_for_current_runtime(TimeSource::tokio(std::time::UNIX_EPOCH)); + + let dir = TempDir::new().unwrap(); + let base = dir.path().join("trace"); + let one_event = single_event_file_size(); + let mut writer = RotatingWriter::builder() + .base_path(&base) + .max_file_size(u64::MAX) + .max_total_size(one_event * 3) + .rotation_period(Duration::from_secs(60)) + .build() + .unwrap(); + + writer.write_encoded_batch(&test_batch()).unwrap(); + for _ in 0..5 { + tokio::time::advance(Duration::from_secs(61)).await; + writer.write_encoded_batch(&test_batch()).unwrap(); + } + writer.finalize().unwrap(); + + assert!( + total_disk_usage(dir.path()) <= one_event * 3, + "disk usage should stay within budget" + ); + } + + #[test] + fn test_builder_rotation_period_default() { + let dir = TempDir::new().unwrap(); + let base = dir.path().join("trace"); + let writer = RotatingWriter::builder() + .base_path(&base) + .max_file_size(1024) + .max_total_size(100_000) + .build() + .unwrap(); + assert_eq!(writer.rotation_period, DEFAULT_ROTATION_PERIOD); + } + + #[test] + fn test_new_uses_default_rotation_period() { + let dir = TempDir::new().unwrap(); + let base = dir.path().join("trace"); + let writer = RotatingWriter::new(&base, 1024, 100_000).unwrap(); + assert_eq!(writer.rotation_period, DEFAULT_ROTATION_PERIOD); + } + + #[tokio::test(start_paused = true)] + async fn test_finalize_after_time_rotation() { + use metrique_timesource::{TimeSource, tokio::set_time_source_for_current_runtime}; + let _guard = set_time_source_for_current_runtime(TimeSource::tokio(std::time::UNIX_EPOCH)); + + let dir = TempDir::new().unwrap(); + let base = dir.path().join("trace"); + let mut writer = RotatingWriter::builder() + .base_path(&base) + .max_file_size(u64::MAX) + .max_total_size(100_000) + .rotation_period(Duration::from_secs(60)) + .build() + .unwrap(); + + writer.write_encoded_batch(&test_batch()).unwrap(); + tokio::time::advance(Duration::from_secs(61)).await; + writer.write_encoded_batch(&test_batch()).unwrap(); + writer.finalize().unwrap(); + + let total: usize = (0..10) + .map(|i| { + let f = rotating_file(&base, i); + if std::path::Path::new(&f).exists() { + read_trace_events(&f).len() + } else { + 0 + } + }) + .sum(); + assert_eq!(total, 2); + } + + #[tokio::test(start_paused = true)] + async fn test_stale_boundary_does_not_rotate_first_event() { + use metrique_timesource::{TimeSource, tokio::set_time_source_for_current_runtime}; + let _guard = set_time_source_for_current_runtime(TimeSource::tokio(std::time::UNIX_EPOCH)); + + let dir = TempDir::new().unwrap(); + let base = dir.path().join("trace"); + let mut writer = RotatingWriter::builder() + .base_path(&base) + .max_file_size(u64::MAX) + .max_total_size(100_000) + .rotation_period(Duration::from_secs(60)) + .build() + .unwrap(); + + // Advance well past the boundary with no events + tokio::time::advance(Duration::from_secs(300)).await; + + // First event after the gap — should NOT trigger rotation + writer.write_encoded_batch(&test_batch()).unwrap(); + assert_eq!( + writer.next_index, 1, + "first event after idle gap should not trigger immediate rotation" + ); + + // Second event shortly after — still within the new boundary + writer.write_encoded_batch(&test_batch()).unwrap(); + assert_eq!( + writer.next_index, 1, + "second event should still be in the same segment" + ); + + writer.finalize().unwrap(); + + let events = read_trace_events(&rotating_file(&base, 0)); + assert_eq!(events.len(), 2, "both events should be in segment 0"); + } }