Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
294 changes: 238 additions & 56 deletions dial9-tokio-telemetry/src/telemetry/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::rate_limit::rate_limited;
use crate::telemetry::collector::Batch;
use crate::telemetry::events::clock_pair;
use crate::telemetry::format::{ClockSyncEvent, SegmentMetadataEvent};
use std::collections::VecDeque;
use std::collections::{BTreeMap, VecDeque};
use std::fs::{self, File};
use std::io::BufWriter;
use std::path::{Path, PathBuf};
Expand Down Expand Up @@ -134,6 +134,17 @@ const DEFAULT_ROTATION_PERIOD: Duration = Duration::from_secs(60);
/// Default maximum interval between thread-local buffer drains.
const DEFAULT_DRAIN_INTERVAL: Duration = Duration::from_secs(30);

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum SegmentArtifact {
Retained { index: u32 },
Active,
}

struct ExistingSegments {
closed_files: VecDeque<(PathBuf, u64)>,
next_active_index: u32,
}

/// A writer that rotates trace files to bound disk usage and time.
///
/// Rotation triggers when *either* condition is met:
Expand All @@ -150,8 +161,9 @@ const DEFAULT_DRAIN_INTERVAL: Duration = Duration::from_secs(30);
/// first under normal conditions (e.g. 100 MB or more). Size-based rotation
/// then acts as a safety valve for unexpected data bursts.
///
/// `max_total_size` controls eviction: oldest files are deleted when total
/// size across all files exceeds this budget.
/// `max_total_size` controls eviction: oldest retained files for this trace
/// family are deleted when total size exceeds this budget, including artifacts
/// left by previous writer lifetimes.
///
/// Files are named `{base_path}.0.bin`, `{base_path}.1.bin`, etc.
/// Each file is a self-contained trace with its own header.
Expand Down Expand Up @@ -267,30 +279,37 @@ impl RotatingWriter {
if let Some(parent) = base_path.parent() {
fs::create_dir_all(parent)?;
}
let first_path = Self::active_path(&base_path, 0);
let existing_segments = Self::discover_existing_segments(&base_path)?;
let first_path = Self::active_path(&base_path, existing_segments.next_active_index);
let file = File::create(&first_path)?;
let writer = BufWriter::new(file);
let state = Self::prepare_segment(writer)?;
let now = time_source().system_time().as_std();
let drain_interval = rotation_period.min(DEFAULT_DRAIN_INTERVAL);
let next_index = existing_segments
.next_active_index
.checked_add(1)
.ok_or_else(|| std::io::Error::other("trace segment index overflow"))?;

Ok(Self {
let mut writer = Self {
base_path,
max_file_size,
max_total_size,
rotation_period,
next_rotation_time: Self::next_boundary(now, rotation_period),
closed_files: VecDeque::new(),
closed_files: existing_segments.closed_files,
active_path: first_path,
state,
next_index: 1,
next_index,
did_rotate: false,
segment_metadata,
dropped_events: 0,
has_real_events: false,
drain_interval,
next_drain_time: Self::next_boundary(now, drain_interval),
})
};
writer.evict_oldest()?;
Ok(writer)
}

/// Create a writer that writes to a single file with no rotation or eviction.
Expand Down Expand Up @@ -406,6 +425,93 @@ impl RotatingWriter {
parent.join(format!("{}.{}.bin.active", stem, index))
}

/// Discover retained segment artifacts from previous writer lifetimes.
///
/// `closed_files` keeps one canonical path per segment index, but its size
/// accounts for every retained on-disk artifact for that index (`.bin`,
/// `.bin.gz`, or future write-back variants). Stale `.active` files belong
/// to dead writers and are not consumable by the worker, so discard them
/// before creating the new active segment.
fn discover_existing_segments(base: &Path) -> std::io::Result<ExistingSegments> {
let stem = base.file_stem().unwrap_or_default().to_string_lossy();
let parent = base.parent().unwrap_or(Path::new("."));
let mut retained_sizes = BTreeMap::<u32, u64>::new();

if parent.exists() {
for entry in fs::read_dir(parent)? {
let entry = entry?;
let path = entry.path();
let metadata = match entry.metadata() {
Ok(metadata) => metadata,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => continue,
Err(e) => return Err(e),
};
if !metadata.is_file() {
continue;
}
let Some(file_name) = path.file_name().and_then(|name| name.to_str()) else {
continue;
};

match Self::parse_segment_artifact(file_name, &stem) {
Some(SegmentArtifact::Retained { index }) => {
*retained_sizes.entry(index).or_default() += metadata.len();
}
Some(SegmentArtifact::Active) => {
tracing::warn!(
path = %path.display(),
"discarding stale active trace segment from a previous writer"
);
match fs::remove_file(path) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => return Err(e),
}
}
None => {}
}
}
}

let next_active_index = retained_sizes
.last_key_value()
.map(|(&index, _)| {
index
.checked_add(1)
.ok_or_else(|| std::io::Error::other("trace segment index overflow"))
})
.transpose()?
.unwrap_or(0);
let closed_files = retained_sizes
.into_iter()
.map(|(index, size)| (Self::file_path(base, index), size))
.collect();

Ok(ExistingSegments {
closed_files,
next_active_index,
})
}

fn parse_segment_artifact(file_name: &str, stem: &str) -> Option<SegmentArtifact> {
let rest = file_name.strip_prefix(stem)?.strip_prefix('.')?;
if rest
.strip_suffix(".bin.active")
.and_then(|index| index.parse::<u32>().ok())
.is_some()
{
return Some(SegmentArtifact::Active);
}

let (index, suffix) = rest.split_once(".bin")?;
if !suffix.is_empty() && !suffix.starts_with('.') {
return None;
}
Some(SegmentArtifact::Retained {
index: index.parse().ok()?,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this won't happen currently but because of how the code works if we ever started producing something like 01 as an index, that could never be deleted.

})
}

/// Compute the next wall-clock-aligned rotation boundary after `now`.
///
/// For a 60 s period, if `now` is 14:03:22 the result is 14:04:00.
Expand Down Expand Up @@ -532,50 +638,12 @@ impl RotatingWriter {
fn evict_oldest(&mut self) -> std::io::Result<()> {
// Always keep at least the current file.
while self.total_size() > self.max_total_size && !self.closed_files.is_empty() {
if let Some((path, _size)) = self.closed_files.pop_front() {
// Try to remove the sealed `.bin` file directly. If a
// background worker has already renamed it (e.g. appended an
// extension like `.gz`), scan the parent directory for any
// file whose name starts with the original filename so we
// stay agnostic to future write-back extensions.
match fs::remove_file(&path) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
// NOTE: This directory scan is more expensive than a
// direct remove, but it keeps us agnostic to whatever
// extension the background worker appends. In practice
// eviction is infrequent and the directory is small, so
// the cost is hopefully negligible.
if let Some(file_name) = path.file_name().and_then(|n| n.to_str())
&& let Some(parent) = path.parent()
&& let Ok(entries) = fs::read_dir(parent)
{
for entry in entries.flatten() {
let name = entry.file_name();
if let Some(name_str) = name.to_str()
&& name_str.starts_with(file_name)
&& name_str != file_name
&& let Err(e2) = fs::remove_file(entry.path())
{
rate_limited!(Duration::from_secs(60), {
tracing::warn!(
"failed to evict old trace segment {}: {e2}",
entry.path().display()
);
});
}
}
}
}
Err(e) => {
rate_limited!(Duration::from_secs(60), {
tracing::warn!(
"failed to evict old trace segment {}: {e}",
path.display()
);
});
}
}
if let Some((path, _size)) = self.closed_files.pop_front()
&& let Err(e) = Self::remove_segment_artifacts(&path)
{
rate_limited!(Duration::from_secs(60), {
tracing::warn!("failed to evict old trace segment {}: {e}", path.display());
});
}
}
// If even the current file alone exceeds total budget, stop writing.
Expand All @@ -585,6 +653,41 @@ impl RotatingWriter {
Ok(())
}

fn remove_segment_artifacts(path: &Path) -> std::io::Result<()> {
let Some(file_name) = path.file_name().and_then(|name| name.to_str()) else {
return Ok(());
};
let Some(parent) = path.parent() else {
return Ok(());
};

match fs::read_dir(parent) {
Ok(entries) => {
for entry in entries {
let entry = entry?;
let artifact_name = entry.file_name();
let Some(artifact_name) = artifact_name.to_str() else {
continue;
};
if artifact_name == file_name
|| artifact_name
.strip_prefix(file_name)
.is_some_and(|suffix| suffix.starts_with('.'))
{
match fs::remove_file(entry.path()) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => return Err(e),
}
}
}
Ok(())
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(e) => Err(e),
}
}

/// Rotate if the current file exceeds max_file_size.
/// Called after writing a complete logical unit (def + event).
fn maybe_rotate(&mut self) -> std::io::Result<()> {
Expand Down Expand Up @@ -691,6 +794,9 @@ impl TraceWriter for RotatingWriter {
}
}
}
if self.has_real_events {
self.evict_oldest()?;
}
self.state = WriterState::Finished;
Ok(())
}
Expand Down Expand Up @@ -778,15 +884,18 @@ mod tests {
.collect()
}

/// Total size of all trace files (.bin and .active) in a directory.
/// Total size of all trace files in a directory, including write-back
/// artifacts such as `.bin.gz`.
fn total_disk_usage(dir: &std::path::Path) -> u64 {
std::fs::read_dir(dir)
.unwrap()
.filter_map(|e| e.ok())
.filter(|e| {
let p = e.path();
p.extension()
.is_some_and(|ext| ext == "bin" || ext == "active")
e.file_name().to_str().is_some_and(|name| {
name.ends_with(".bin")
|| name.ends_with(".bin.active")
|| name.contains(".bin.")
})
})
.map(|e| e.metadata().unwrap().len())
.sum()
Expand Down Expand Up @@ -926,6 +1035,79 @@ mod tests {
assert!(!std::path::Path::new(&rotating_file(&base, 0)).exists());
}

#[test]
fn test_rotating_writer_enforces_budget_across_restarts() {
let dir = TempDir::new().unwrap();
let base = dir.path().join("trace");
let one_event = single_event_file_size();
let max_file_size = one_event;
let max_total_size = max_file_size * 3;

let mut first = RotatingWriter::new(&base, max_file_size, max_total_size).unwrap();
for _ in 0..3 {
first.write_encoded_batch(&test_batch()).unwrap();
}
first.finalize().unwrap();

// Simulate the background worker replacing one sealed segment with a
// write-back artifact before the process restarts.
let first_segment = dir.path().join("trace.0.bin");
let first_segment_gz = dir.path().join("trace.0.bin.gz");
std::fs::rename(&first_segment, &first_segment_gz).unwrap();
let highest_retained_index = std::fs::read_dir(dir.path())
.unwrap()
.filter_map(|entry| entry.ok())
.filter_map(|entry| {
let file_name = entry.file_name();
let file_name = file_name.to_str()?;
match RotatingWriter::parse_segment_artifact(file_name, "trace") {
Some(SegmentArtifact::Retained { index }) => Some(index),
_ => None,
}
})
.max()
.unwrap();

let mut second = RotatingWriter::new(&base, max_file_size, max_total_size).unwrap();
assert_eq!(
second.current_active_path(),
dir.path()
.join(format!("trace.{}.bin.active", highest_retained_index + 1)),
"restart should continue after the highest retained segment"
);

second.write_encoded_batch(&test_batch()).unwrap();
second.finalize().unwrap();

assert!(
total_disk_usage(dir.path()) <= max_total_size,
"restart should keep the total retained trace set within budget"
);
assert!(
!first_segment_gz.exists(),
"oldest retained artifact should be evicted after restart"
);
}

#[test]
fn test_rotating_writer_discards_stale_active_segments_on_restart() {
let dir = TempDir::new().unwrap();
let base = dir.path().join("trace");
std::fs::write(dir.path().join("trace.4.bin"), b"sealed").unwrap();
std::fs::write(dir.path().join("trace.7.bin.active"), b"stale").unwrap();

let writer = RotatingWriter::new(&base, 1024, 4096).unwrap();

assert_eq!(
writer.current_active_path(),
dir.path().join("trace.5.bin.active")
);
assert!(
!dir.path().join("trace.7.bin.active").exists(),
"stale active segments are not worker-readable and should be discarded"
);
}

#[test]
fn test_rotating_writer_stops_when_over_budget() {
let dir = TempDir::new().unwrap();
Expand Down
Loading