diff --git a/dial9-tokio-telemetry/src/telemetry/writer.rs b/dial9-tokio-telemetry/src/telemetry/writer.rs index 59422108..814146a6 100644 --- a/dial9-tokio-telemetry/src/telemetry/writer.rs +++ b/dial9-tokio-telemetry/src/telemetry/writer.rs @@ -4,7 +4,7 @@ use crate::rate_limit::rate_limited; use crate::telemetry::collector::Batch; use crate::telemetry::events::clock_pair; use crate::telemetry::format::{ClockSyncEvent, SegmentMetadataEvent}; -use std::collections::VecDeque; +use std::collections::{BTreeMap, VecDeque}; use std::fs::{self, File}; use std::io::BufWriter; use std::path::{Path, PathBuf}; @@ -134,6 +134,17 @@ const DEFAULT_ROTATION_PERIOD: Duration = Duration::from_secs(60); /// Default maximum interval between thread-local buffer drains. const DEFAULT_DRAIN_INTERVAL: Duration = Duration::from_secs(30); +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum SegmentArtifact { + Retained { index: u32 }, + Active, +} + +struct ExistingSegments { + closed_files: VecDeque<(PathBuf, u64)>, + next_active_index: u32, +} + /// A writer that rotates trace files to bound disk usage and time. /// /// Rotation triggers when *either* condition is met: @@ -150,8 +161,9 @@ const DEFAULT_DRAIN_INTERVAL: Duration = Duration::from_secs(30); /// first under normal conditions (e.g. 100 MB or more). Size-based rotation /// then acts as a safety valve for unexpected data bursts. /// -/// `max_total_size` controls eviction: oldest files are deleted when total -/// size across all files exceeds this budget. +/// `max_total_size` controls eviction: oldest retained files for this trace +/// family are deleted when total size exceeds this budget, including artifacts +/// left by previous writer lifetimes. /// /// Files are named `{base_path}.0.bin`, `{base_path}.1.bin`, etc. /// Each file is a self-contained trace with its own header. @@ -267,30 +279,37 @@ impl RotatingWriter { if let Some(parent) = base_path.parent() { fs::create_dir_all(parent)?; } - let first_path = Self::active_path(&base_path, 0); + let existing_segments = Self::discover_existing_segments(&base_path)?; + let first_path = Self::active_path(&base_path, existing_segments.next_active_index); let file = File::create(&first_path)?; let writer = BufWriter::new(file); let state = Self::prepare_segment(writer)?; let now = time_source().system_time().as_std(); let drain_interval = rotation_period.min(DEFAULT_DRAIN_INTERVAL); + let next_index = existing_segments + .next_active_index + .checked_add(1) + .ok_or_else(|| std::io::Error::other("trace segment index overflow"))?; - Ok(Self { + let mut writer = Self { base_path, max_file_size, max_total_size, rotation_period, next_rotation_time: Self::next_boundary(now, rotation_period), - closed_files: VecDeque::new(), + closed_files: existing_segments.closed_files, active_path: first_path, state, - next_index: 1, + next_index, did_rotate: false, segment_metadata, dropped_events: 0, has_real_events: false, drain_interval, next_drain_time: Self::next_boundary(now, drain_interval), - }) + }; + writer.evict_oldest()?; + Ok(writer) } /// Create a writer that writes to a single file with no rotation or eviction. @@ -406,6 +425,93 @@ impl RotatingWriter { parent.join(format!("{}.{}.bin.active", stem, index)) } + /// Discover retained segment artifacts from previous writer lifetimes. + /// + /// `closed_files` keeps one canonical path per segment index, but its size + /// accounts for every retained on-disk artifact for that index (`.bin`, + /// `.bin.gz`, or future write-back variants). Stale `.active` files belong + /// to dead writers and are not consumable by the worker, so discard them + /// before creating the new active segment. + fn discover_existing_segments(base: &Path) -> std::io::Result { + let stem = base.file_stem().unwrap_or_default().to_string_lossy(); + let parent = base.parent().unwrap_or(Path::new(".")); + let mut retained_sizes = BTreeMap::::new(); + + if parent.exists() { + for entry in fs::read_dir(parent)? { + let entry = entry?; + let path = entry.path(); + let metadata = match entry.metadata() { + Ok(metadata) => metadata, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => continue, + Err(e) => return Err(e), + }; + if !metadata.is_file() { + continue; + } + let Some(file_name) = path.file_name().and_then(|name| name.to_str()) else { + continue; + }; + + match Self::parse_segment_artifact(file_name, &stem) { + Some(SegmentArtifact::Retained { index }) => { + *retained_sizes.entry(index).or_default() += metadata.len(); + } + Some(SegmentArtifact::Active) => { + tracing::warn!( + path = %path.display(), + "discarding stale active trace segment from a previous writer" + ); + match fs::remove_file(path) { + Ok(()) => {} + Err(e) if e.kind() == std::io::ErrorKind::NotFound => {} + Err(e) => return Err(e), + } + } + None => {} + } + } + } + + let next_active_index = retained_sizes + .last_key_value() + .map(|(&index, _)| { + index + .checked_add(1) + .ok_or_else(|| std::io::Error::other("trace segment index overflow")) + }) + .transpose()? + .unwrap_or(0); + let closed_files = retained_sizes + .into_iter() + .map(|(index, size)| (Self::file_path(base, index), size)) + .collect(); + + Ok(ExistingSegments { + closed_files, + next_active_index, + }) + } + + fn parse_segment_artifact(file_name: &str, stem: &str) -> Option { + let rest = file_name.strip_prefix(stem)?.strip_prefix('.')?; + if rest + .strip_suffix(".bin.active") + .and_then(|index| index.parse::().ok()) + .is_some() + { + return Some(SegmentArtifact::Active); + } + + let (index, suffix) = rest.split_once(".bin")?; + if !suffix.is_empty() && !suffix.starts_with('.') { + return None; + } + Some(SegmentArtifact::Retained { + index: index.parse().ok()?, + }) + } + /// Compute the next wall-clock-aligned rotation boundary after `now`. /// /// For a 60 s period, if `now` is 14:03:22 the result is 14:04:00. @@ -532,50 +638,12 @@ impl RotatingWriter { fn evict_oldest(&mut self) -> std::io::Result<()> { // Always keep at least the current file. while self.total_size() > self.max_total_size && !self.closed_files.is_empty() { - if let Some((path, _size)) = self.closed_files.pop_front() { - // Try to remove the sealed `.bin` file directly. If a - // background worker has already renamed it (e.g. appended an - // extension like `.gz`), scan the parent directory for any - // file whose name starts with the original filename so we - // stay agnostic to future write-back extensions. - match fs::remove_file(&path) { - Ok(()) => {} - Err(e) if e.kind() == std::io::ErrorKind::NotFound => { - // NOTE: This directory scan is more expensive than a - // direct remove, but it keeps us agnostic to whatever - // extension the background worker appends. In practice - // eviction is infrequent and the directory is small, so - // the cost is hopefully negligible. - if let Some(file_name) = path.file_name().and_then(|n| n.to_str()) - && let Some(parent) = path.parent() - && let Ok(entries) = fs::read_dir(parent) - { - for entry in entries.flatten() { - let name = entry.file_name(); - if let Some(name_str) = name.to_str() - && name_str.starts_with(file_name) - && name_str != file_name - && let Err(e2) = fs::remove_file(entry.path()) - { - rate_limited!(Duration::from_secs(60), { - tracing::warn!( - "failed to evict old trace segment {}: {e2}", - entry.path().display() - ); - }); - } - } - } - } - Err(e) => { - rate_limited!(Duration::from_secs(60), { - tracing::warn!( - "failed to evict old trace segment {}: {e}", - path.display() - ); - }); - } - } + if let Some((path, _size)) = self.closed_files.pop_front() + && let Err(e) = Self::remove_segment_artifacts(&path) + { + rate_limited!(Duration::from_secs(60), { + tracing::warn!("failed to evict old trace segment {}: {e}", path.display()); + }); } } // If even the current file alone exceeds total budget, stop writing. @@ -585,6 +653,41 @@ impl RotatingWriter { Ok(()) } + fn remove_segment_artifacts(path: &Path) -> std::io::Result<()> { + let Some(file_name) = path.file_name().and_then(|name| name.to_str()) else { + return Ok(()); + }; + let Some(parent) = path.parent() else { + return Ok(()); + }; + + match fs::read_dir(parent) { + Ok(entries) => { + for entry in entries { + let entry = entry?; + let artifact_name = entry.file_name(); + let Some(artifact_name) = artifact_name.to_str() else { + continue; + }; + if artifact_name == file_name + || artifact_name + .strip_prefix(file_name) + .is_some_and(|suffix| suffix.starts_with('.')) + { + match fs::remove_file(entry.path()) { + Ok(()) => {} + Err(e) if e.kind() == std::io::ErrorKind::NotFound => {} + Err(e) => return Err(e), + } + } + } + Ok(()) + } + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()), + Err(e) => Err(e), + } + } + /// Rotate if the current file exceeds max_file_size. /// Called after writing a complete logical unit (def + event). fn maybe_rotate(&mut self) -> std::io::Result<()> { @@ -691,6 +794,9 @@ impl TraceWriter for RotatingWriter { } } } + if self.has_real_events { + self.evict_oldest()?; + } self.state = WriterState::Finished; Ok(()) } @@ -778,15 +884,18 @@ mod tests { .collect() } - /// Total size of all trace files (.bin and .active) in a directory. + /// Total size of all trace files in a directory, including write-back + /// artifacts such as `.bin.gz`. fn total_disk_usage(dir: &std::path::Path) -> u64 { std::fs::read_dir(dir) .unwrap() .filter_map(|e| e.ok()) .filter(|e| { - let p = e.path(); - p.extension() - .is_some_and(|ext| ext == "bin" || ext == "active") + e.file_name().to_str().is_some_and(|name| { + name.ends_with(".bin") + || name.ends_with(".bin.active") + || name.contains(".bin.") + }) }) .map(|e| e.metadata().unwrap().len()) .sum() @@ -926,6 +1035,79 @@ mod tests { assert!(!std::path::Path::new(&rotating_file(&base, 0)).exists()); } + #[test] + fn test_rotating_writer_enforces_budget_across_restarts() { + let dir = TempDir::new().unwrap(); + let base = dir.path().join("trace"); + let one_event = single_event_file_size(); + let max_file_size = one_event; + let max_total_size = max_file_size * 3; + + let mut first = RotatingWriter::new(&base, max_file_size, max_total_size).unwrap(); + for _ in 0..3 { + first.write_encoded_batch(&test_batch()).unwrap(); + } + first.finalize().unwrap(); + + // Simulate the background worker replacing one sealed segment with a + // write-back artifact before the process restarts. + let first_segment = dir.path().join("trace.0.bin"); + let first_segment_gz = dir.path().join("trace.0.bin.gz"); + std::fs::rename(&first_segment, &first_segment_gz).unwrap(); + let highest_retained_index = std::fs::read_dir(dir.path()) + .unwrap() + .filter_map(|entry| entry.ok()) + .filter_map(|entry| { + let file_name = entry.file_name(); + let file_name = file_name.to_str()?; + match RotatingWriter::parse_segment_artifact(file_name, "trace") { + Some(SegmentArtifact::Retained { index }) => Some(index), + _ => None, + } + }) + .max() + .unwrap(); + + let mut second = RotatingWriter::new(&base, max_file_size, max_total_size).unwrap(); + assert_eq!( + second.current_active_path(), + dir.path() + .join(format!("trace.{}.bin.active", highest_retained_index + 1)), + "restart should continue after the highest retained segment" + ); + + second.write_encoded_batch(&test_batch()).unwrap(); + second.finalize().unwrap(); + + assert!( + total_disk_usage(dir.path()) <= max_total_size, + "restart should keep the total retained trace set within budget" + ); + assert!( + !first_segment_gz.exists(), + "oldest retained artifact should be evicted after restart" + ); + } + + #[test] + fn test_rotating_writer_discards_stale_active_segments_on_restart() { + let dir = TempDir::new().unwrap(); + let base = dir.path().join("trace"); + std::fs::write(dir.path().join("trace.4.bin"), b"sealed").unwrap(); + std::fs::write(dir.path().join("trace.7.bin.active"), b"stale").unwrap(); + + let writer = RotatingWriter::new(&base, 1024, 4096).unwrap(); + + assert_eq!( + writer.current_active_path(), + dir.path().join("trace.5.bin.active") + ); + assert!( + !dir.path().join("trace.7.bin.active").exists(), + "stale active segments are not worker-readable and should be discarded" + ); + } + #[test] fn test_rotating_writer_stops_when_over_budget() { let dir = TempDir::new().unwrap();