Skip to content

Commit f52a5cc

Browse files
authored
fix: enforce RotatingWriter retention across restarts (#414)
1 parent 6342350 commit f52a5cc

1 file changed

Lines changed: 238 additions & 56 deletions

File tree

dial9-tokio-telemetry/src/telemetry/writer.rs

Lines changed: 238 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::rate_limit::rate_limited;
44
use crate::telemetry::collector::Batch;
55
use crate::telemetry::events::clock_pair;
66
use crate::telemetry::format::{ClockSyncEvent, SegmentMetadataEvent};
7-
use std::collections::VecDeque;
7+
use std::collections::{BTreeMap, VecDeque};
88
use std::fs::{self, File};
99
use std::io::BufWriter;
1010
use std::path::{Path, PathBuf};
@@ -134,6 +134,17 @@ const DEFAULT_ROTATION_PERIOD: Duration = Duration::from_secs(60);
134134
/// Default maximum interval between thread-local buffer drains.
135135
const DEFAULT_DRAIN_INTERVAL: Duration = Duration::from_secs(30);
136136

137+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
138+
enum SegmentArtifact {
139+
Retained { index: u32 },
140+
Active,
141+
}
142+
143+
struct ExistingSegments {
144+
closed_files: VecDeque<(PathBuf, u64)>,
145+
next_active_index: u32,
146+
}
147+
137148
/// A writer that rotates trace files to bound disk usage and time.
138149
///
139150
/// Rotation triggers when *either* condition is met:
@@ -150,8 +161,9 @@ const DEFAULT_DRAIN_INTERVAL: Duration = Duration::from_secs(30);
150161
/// first under normal conditions (e.g. 100 MB or more). Size-based rotation
151162
/// then acts as a safety valve for unexpected data bursts.
152163
///
153-
/// `max_total_size` controls eviction: oldest files are deleted when total
154-
/// size across all files exceeds this budget.
164+
/// `max_total_size` controls eviction: oldest retained files for this trace
165+
/// family are deleted when total size exceeds this budget, including artifacts
166+
/// left by previous writer lifetimes.
155167
///
156168
/// Files are named `{base_path}.0.bin`, `{base_path}.1.bin`, etc.
157169
/// Each file is a self-contained trace with its own header.
@@ -267,30 +279,37 @@ impl RotatingWriter {
267279
if let Some(parent) = base_path.parent() {
268280
fs::create_dir_all(parent)?;
269281
}
270-
let first_path = Self::active_path(&base_path, 0);
282+
let existing_segments = Self::discover_existing_segments(&base_path)?;
283+
let first_path = Self::active_path(&base_path, existing_segments.next_active_index);
271284
let file = File::create(&first_path)?;
272285
let writer = BufWriter::new(file);
273286
let state = Self::prepare_segment(writer)?;
274287
let now = time_source().system_time().as_std();
275288
let drain_interval = rotation_period.min(DEFAULT_DRAIN_INTERVAL);
289+
let next_index = existing_segments
290+
.next_active_index
291+
.checked_add(1)
292+
.ok_or_else(|| std::io::Error::other("trace segment index overflow"))?;
276293

277-
Ok(Self {
294+
let mut writer = Self {
278295
base_path,
279296
max_file_size,
280297
max_total_size,
281298
rotation_period,
282299
next_rotation_time: Self::next_boundary(now, rotation_period),
283-
closed_files: VecDeque::new(),
300+
closed_files: existing_segments.closed_files,
284301
active_path: first_path,
285302
state,
286-
next_index: 1,
303+
next_index,
287304
did_rotate: false,
288305
segment_metadata,
289306
dropped_events: 0,
290307
has_real_events: false,
291308
drain_interval,
292309
next_drain_time: Self::next_boundary(now, drain_interval),
293-
})
310+
};
311+
writer.evict_oldest()?;
312+
Ok(writer)
294313
}
295314

296315
/// Create a writer that writes to a single file with no rotation or eviction.
@@ -406,6 +425,93 @@ impl RotatingWriter {
406425
parent.join(format!("{}.{}.bin.active", stem, index))
407426
}
408427

428+
/// Discover retained segment artifacts from previous writer lifetimes.
429+
///
430+
/// `closed_files` keeps one canonical path per segment index, but its size
431+
/// accounts for every retained on-disk artifact for that index (`.bin`,
432+
/// `.bin.gz`, or future write-back variants). Stale `.active` files belong
433+
/// to dead writers and are not consumable by the worker, so discard them
434+
/// before creating the new active segment.
435+
fn discover_existing_segments(base: &Path) -> std::io::Result<ExistingSegments> {
436+
let stem = base.file_stem().unwrap_or_default().to_string_lossy();
437+
let parent = base.parent().unwrap_or(Path::new("."));
438+
let mut retained_sizes = BTreeMap::<u32, u64>::new();
439+
440+
if parent.exists() {
441+
for entry in fs::read_dir(parent)? {
442+
let entry = entry?;
443+
let path = entry.path();
444+
let metadata = match entry.metadata() {
445+
Ok(metadata) => metadata,
446+
Err(e) if e.kind() == std::io::ErrorKind::NotFound => continue,
447+
Err(e) => return Err(e),
448+
};
449+
if !metadata.is_file() {
450+
continue;
451+
}
452+
let Some(file_name) = path.file_name().and_then(|name| name.to_str()) else {
453+
continue;
454+
};
455+
456+
match Self::parse_segment_artifact(file_name, &stem) {
457+
Some(SegmentArtifact::Retained { index }) => {
458+
*retained_sizes.entry(index).or_default() += metadata.len();
459+
}
460+
Some(SegmentArtifact::Active) => {
461+
tracing::warn!(
462+
path = %path.display(),
463+
"discarding stale active trace segment from a previous writer"
464+
);
465+
match fs::remove_file(path) {
466+
Ok(()) => {}
467+
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
468+
Err(e) => return Err(e),
469+
}
470+
}
471+
None => {}
472+
}
473+
}
474+
}
475+
476+
let next_active_index = retained_sizes
477+
.last_key_value()
478+
.map(|(&index, _)| {
479+
index
480+
.checked_add(1)
481+
.ok_or_else(|| std::io::Error::other("trace segment index overflow"))
482+
})
483+
.transpose()?
484+
.unwrap_or(0);
485+
let closed_files = retained_sizes
486+
.into_iter()
487+
.map(|(index, size)| (Self::file_path(base, index), size))
488+
.collect();
489+
490+
Ok(ExistingSegments {
491+
closed_files,
492+
next_active_index,
493+
})
494+
}
495+
496+
fn parse_segment_artifact(file_name: &str, stem: &str) -> Option<SegmentArtifact> {
497+
let rest = file_name.strip_prefix(stem)?.strip_prefix('.')?;
498+
if rest
499+
.strip_suffix(".bin.active")
500+
.and_then(|index| index.parse::<u32>().ok())
501+
.is_some()
502+
{
503+
return Some(SegmentArtifact::Active);
504+
}
505+
506+
let (index, suffix) = rest.split_once(".bin")?;
507+
if !suffix.is_empty() && !suffix.starts_with('.') {
508+
return None;
509+
}
510+
Some(SegmentArtifact::Retained {
511+
index: index.parse().ok()?,
512+
})
513+
}
514+
409515
/// Compute the next wall-clock-aligned rotation boundary after `now`.
410516
///
411517
/// For a 60 s period, if `now` is 14:03:22 the result is 14:04:00.
@@ -532,50 +638,12 @@ impl RotatingWriter {
532638
fn evict_oldest(&mut self) -> std::io::Result<()> {
533639
// Always keep at least the current file.
534640
while self.total_size() > self.max_total_size && !self.closed_files.is_empty() {
535-
if let Some((path, _size)) = self.closed_files.pop_front() {
536-
// Try to remove the sealed `.bin` file directly. If a
537-
// background worker has already renamed it (e.g. appended an
538-
// extension like `.gz`), scan the parent directory for any
539-
// file whose name starts with the original filename so we
540-
// stay agnostic to future write-back extensions.
541-
match fs::remove_file(&path) {
542-
Ok(()) => {}
543-
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
544-
// NOTE: This directory scan is more expensive than a
545-
// direct remove, but it keeps us agnostic to whatever
546-
// extension the background worker appends. In practice
547-
// eviction is infrequent and the directory is small, so
548-
// the cost is hopefully negligible.
549-
if let Some(file_name) = path.file_name().and_then(|n| n.to_str())
550-
&& let Some(parent) = path.parent()
551-
&& let Ok(entries) = fs::read_dir(parent)
552-
{
553-
for entry in entries.flatten() {
554-
let name = entry.file_name();
555-
if let Some(name_str) = name.to_str()
556-
&& name_str.starts_with(file_name)
557-
&& name_str != file_name
558-
&& let Err(e2) = fs::remove_file(entry.path())
559-
{
560-
rate_limited!(Duration::from_secs(60), {
561-
tracing::warn!(
562-
"failed to evict old trace segment {}: {e2}",
563-
entry.path().display()
564-
);
565-
});
566-
}
567-
}
568-
}
569-
}
570-
Err(e) => {
571-
rate_limited!(Duration::from_secs(60), {
572-
tracing::warn!(
573-
"failed to evict old trace segment {}: {e}",
574-
path.display()
575-
);
576-
});
577-
}
578-
}
641+
if let Some((path, _size)) = self.closed_files.pop_front()
642+
&& let Err(e) = Self::remove_segment_artifacts(&path)
643+
{
644+
rate_limited!(Duration::from_secs(60), {
645+
tracing::warn!("failed to evict old trace segment {}: {e}", path.display());
646+
});
579647
}
580648
}
581649
// If even the current file alone exceeds total budget, stop writing.
@@ -585,6 +653,41 @@ impl RotatingWriter {
585653
Ok(())
586654
}
587655

656+
fn remove_segment_artifacts(path: &Path) -> std::io::Result<()> {
657+
let Some(file_name) = path.file_name().and_then(|name| name.to_str()) else {
658+
return Ok(());
659+
};
660+
let Some(parent) = path.parent() else {
661+
return Ok(());
662+
};
663+
664+
match fs::read_dir(parent) {
665+
Ok(entries) => {
666+
for entry in entries {
667+
let entry = entry?;
668+
let artifact_name = entry.file_name();
669+
let Some(artifact_name) = artifact_name.to_str() else {
670+
continue;
671+
};
672+
if artifact_name == file_name
673+
|| artifact_name
674+
.strip_prefix(file_name)
675+
.is_some_and(|suffix| suffix.starts_with('.'))
676+
{
677+
match fs::remove_file(entry.path()) {
678+
Ok(()) => {}
679+
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
680+
Err(e) => return Err(e),
681+
}
682+
}
683+
}
684+
Ok(())
685+
}
686+
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
687+
Err(e) => Err(e),
688+
}
689+
}
690+
588691
/// Rotate if the current file exceeds max_file_size.
589692
/// Called after writing a complete logical unit (def + event).
590693
fn maybe_rotate(&mut self) -> std::io::Result<()> {
@@ -691,6 +794,9 @@ impl TraceWriter for RotatingWriter {
691794
}
692795
}
693796
}
797+
if self.has_real_events {
798+
self.evict_oldest()?;
799+
}
694800
self.state = WriterState::Finished;
695801
Ok(())
696802
}
@@ -778,15 +884,18 @@ mod tests {
778884
.collect()
779885
}
780886

781-
/// Total size of all trace files (.bin and .active) in a directory.
887+
/// Total size of all trace files in a directory, including write-back
888+
/// artifacts such as `.bin.gz`.
782889
fn total_disk_usage(dir: &std::path::Path) -> u64 {
783890
std::fs::read_dir(dir)
784891
.unwrap()
785892
.filter_map(|e| e.ok())
786893
.filter(|e| {
787-
let p = e.path();
788-
p.extension()
789-
.is_some_and(|ext| ext == "bin" || ext == "active")
894+
e.file_name().to_str().is_some_and(|name| {
895+
name.ends_with(".bin")
896+
|| name.ends_with(".bin.active")
897+
|| name.contains(".bin.")
898+
})
790899
})
791900
.map(|e| e.metadata().unwrap().len())
792901
.sum()
@@ -926,6 +1035,79 @@ mod tests {
9261035
assert!(!std::path::Path::new(&rotating_file(&base, 0)).exists());
9271036
}
9281037

1038+
#[test]
1039+
fn test_rotating_writer_enforces_budget_across_restarts() {
1040+
let dir = TempDir::new().unwrap();
1041+
let base = dir.path().join("trace");
1042+
let one_event = single_event_file_size();
1043+
let max_file_size = one_event;
1044+
let max_total_size = max_file_size * 3;
1045+
1046+
let mut first = RotatingWriter::new(&base, max_file_size, max_total_size).unwrap();
1047+
for _ in 0..3 {
1048+
first.write_encoded_batch(&test_batch()).unwrap();
1049+
}
1050+
first.finalize().unwrap();
1051+
1052+
// Simulate the background worker replacing one sealed segment with a
1053+
// write-back artifact before the process restarts.
1054+
let first_segment = dir.path().join("trace.0.bin");
1055+
let first_segment_gz = dir.path().join("trace.0.bin.gz");
1056+
std::fs::rename(&first_segment, &first_segment_gz).unwrap();
1057+
let highest_retained_index = std::fs::read_dir(dir.path())
1058+
.unwrap()
1059+
.filter_map(|entry| entry.ok())
1060+
.filter_map(|entry| {
1061+
let file_name = entry.file_name();
1062+
let file_name = file_name.to_str()?;
1063+
match RotatingWriter::parse_segment_artifact(file_name, "trace") {
1064+
Some(SegmentArtifact::Retained { index }) => Some(index),
1065+
_ => None,
1066+
}
1067+
})
1068+
.max()
1069+
.unwrap();
1070+
1071+
let mut second = RotatingWriter::new(&base, max_file_size, max_total_size).unwrap();
1072+
assert_eq!(
1073+
second.current_active_path(),
1074+
dir.path()
1075+
.join(format!("trace.{}.bin.active", highest_retained_index + 1)),
1076+
"restart should continue after the highest retained segment"
1077+
);
1078+
1079+
second.write_encoded_batch(&test_batch()).unwrap();
1080+
second.finalize().unwrap();
1081+
1082+
assert!(
1083+
total_disk_usage(dir.path()) <= max_total_size,
1084+
"restart should keep the total retained trace set within budget"
1085+
);
1086+
assert!(
1087+
!first_segment_gz.exists(),
1088+
"oldest retained artifact should be evicted after restart"
1089+
);
1090+
}
1091+
1092+
#[test]
1093+
fn test_rotating_writer_discards_stale_active_segments_on_restart() {
1094+
let dir = TempDir::new().unwrap();
1095+
let base = dir.path().join("trace");
1096+
std::fs::write(dir.path().join("trace.4.bin"), b"sealed").unwrap();
1097+
std::fs::write(dir.path().join("trace.7.bin.active"), b"stale").unwrap();
1098+
1099+
let writer = RotatingWriter::new(&base, 1024, 4096).unwrap();
1100+
1101+
assert_eq!(
1102+
writer.current_active_path(),
1103+
dir.path().join("trace.5.bin.active")
1104+
);
1105+
assert!(
1106+
!dir.path().join("trace.7.bin.active").exists(),
1107+
"stale active segments are not worker-readable and should be discarded"
1108+
);
1109+
}
1110+
9291111
#[test]
9301112
fn test_rotating_writer_stops_when_over_budget() {
9311113
let dir = TempDir::new().unwrap();

0 commit comments

Comments
 (0)