Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 203 additions & 14 deletions dial9-tokio-telemetry/src/telemetry/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,24 +432,84 @@ impl RotatingWriter {
let WriterState::Active { writer: raw, .. } = &mut self.state else {
return Ok(());
};
raw.flush()?;
// Seal the current segment: snapshot size and rename .active → .bin

// Advance timers up front. If anything below fails the flush loop must
// NOT see should_drain() return true on the next 5ms tick — otherwise
// it busy-spins re-attempting the same failing rotate.
let now = time_source().system_time().as_std();
self.next_rotation_time = Self::next_boundary(now, self.rotation_period);
self.next_drain_time = Self::next_boundary(now, self.drain_interval);

// Best-effort flush. If the underlying file is gone the buffered bytes
// are already lost; proceed to rotate rather than erroring.
let _ = raw.flush();
let closed_size = raw.bytes_written();
let sealed = Self::file_path(&self.base_path, self.next_index - 1);
fs::rename(&self.active_path, &sealed)?;
self.closed_files.push_back((sealed, closed_size));
let sealed_path = Self::file_path(&self.base_path, self.next_index - 1);

// Seal the current segment. If `.active` was removed externally
// (operator, log rotation, container teardown) abandon the segment
// and start a fresh one. On any other error give up cleanly: mark
// the writer Finished so writes and rotations stop, instead of
// leaving inconsistent state.
match fs::rename(&self.active_path, &sealed_path) {
Ok(()) => self.closed_files.push_back((sealed_path, closed_size)),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
rate_limited!(Duration::from_secs(60), {
tracing::warn!(
"active trace file {} disappeared before sealing; \
abandoning segment and starting a fresh one",
self.active_path.display()
);
});
}
Err(e) => {
self.state = WriterState::Finished;
return Err(e);
}
}

let new_path = Self::active_path(&self.base_path, self.next_index);
self.next_index += 1;
let file = File::create(&new_path)?;

// Open the new active file. NotFound here typically means the parent
// directory itself was removed (the most likely real-world cause of
// the original `.active` file disappearing too). Try to recreate the
// directory once and retry. If that still fails, mark Finished so the
// writer stops cleanly rather than retrying every drain cycle.
let file = match File::create(&new_path) {
Ok(f) => f,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
if let Some(parent) = new_path.parent()
&& let Err(mkdir_err) = fs::create_dir_all(parent)
{
self.state = WriterState::Finished;
return Err(mkdir_err);
}
match File::create(&new_path) {
Ok(f) => f,
Err(e) => {
self.state = WriterState::Finished;
return Err(e);
}
}
}
Err(e) => {
self.state = WriterState::Finished;
return Err(e);
}
};
let writer = BufWriter::new(file);
self.state = Self::prepare_segment(writer)?;
self.state = match Self::prepare_segment(writer) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with a fail on this we would be leaving an orphan file created @479 and an inconsistent state. No big deal, but probably worth to take into account

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, it would be good to remove the file inside our error handling block.

Ok(s) => s,
Err(e) => {
let _ = std::fs::remove_file(&new_path);
self.state = WriterState::Finished;
return Err(e);
}
};
self.active_path = new_path;
self.did_rotate = true;
self.has_real_events = false;
let now = time_source().system_time().as_std();
self.next_rotation_time = Self::next_boundary(now, self.rotation_period);
self.next_drain_time = Self::next_boundary(now, self.drain_interval);

tracing::debug!(
segment_index = self.next_index - 1,
Expand Down Expand Up @@ -591,17 +651,31 @@ impl TraceWriter for RotatingWriter {
tracing::warn!("writer is already closed.");
});
}
self.flush()?;
// Rename .active → .bin for the current segment (if it has .active suffix)
// Best-effort flush: if the file is gone the bytes are already lost.
let _ = self.flush();
if self
.active_path
.extension()
.is_some_and(|ext| ext == "active")
{
if self.has_real_events {
let sealed = Self::file_path(&self.base_path, self.next_index - 1);
fs::rename(&self.active_path, &sealed)?;
self.active_path = sealed;
match fs::rename(&self.active_path, &sealed) {
Ok(()) => self.active_path = sealed,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
rate_limited!(Duration::from_secs(60), {
tracing::warn!(
"active trace file {} disappeared before finalize; \
dropping segment",
self.active_path.display()
);
});
}
Err(e) => {
self.state = WriterState::Finished;
return Err(e);
}
}
} else {
// No real events — just header + metadata. Remove instead of
// sealing so the background worker doesn't upload an empty segment.
Expand All @@ -612,6 +686,7 @@ impl TraceWriter for RotatingWriter {
if let Err(e) = fs::remove_file(&self.active_path)
&& e.kind() != std::io::ErrorKind::NotFound
{
self.state = WriterState::Finished;
return Err(e);
}
}
Expand Down Expand Up @@ -2024,4 +2099,118 @@ mod tests {
"identical update_segment_metadata should not trigger another write"
);
}

/// Regression test for https://github.com/dial9-rs/dial9/issues/386
///
/// If the `.active` file is removed externally (e.g. by an operator,
/// log-rotation tool, or container teardown) the flush loop calls
/// `drained()` → `rotate()` → `fs::rename(.active, .bin)` which fails
/// with `NotFound`. Without recovery, `next_drain_time` is never
/// advanced, so `should_drain()` returns true on every subsequent
/// 5ms tick and the flush thread busy-loops.
///
/// `drained()` must recover by abandoning the missing segment, opening a
/// fresh one, and advancing the drain/rotation timers.
#[tokio::test(start_paused = true)]
async fn test_drained_recovers_when_active_file_deleted() {
use metrique_timesource::{TimeSource, tokio::set_time_source_for_current_runtime};
let _guard = set_time_source_for_current_runtime(TimeSource::tokio(std::time::UNIX_EPOCH));

let dir = TempDir::new().unwrap();
let base = dir.path().join("trace");
let mut writer = RotatingWriter::builder()
.base_path(&base)
.max_file_size(u64::MAX)
.max_total_size(100_000)
.rotation_period(Duration::from_secs(60))
.build()
.unwrap();

writer.write_encoded_batch(&test_batch()).unwrap();
writer.flush().unwrap();

// Simulate external deletion of the .active file.
let active_path = writer.current_active_path().to_owned();
assert!(active_path.exists());
std::fs::remove_file(&active_path).unwrap();

// Cross the rotation boundary so drained() will try to rotate.
tokio::time::advance(Duration::from_secs(61)).await;

assert!(writer.should_drain(), "should_drain should fire");

// drained() must succeed despite the missing .active file. Returning
// an error here is what causes the flush thread to busy-loop because
// the timers are never advanced.
writer
.drained()
.expect("drained() must recover from missing .active file");

// After recovery, should_drain() must return false — otherwise the
// flush thread would spin calling drained() every 5ms.
assert!(
!writer.should_drain(),
"should_drain must return false after recovery (otherwise flush loop spins)"
);

// The writer must still be usable: a fresh active file exists and
// subsequent writes succeed.
writer.write_encoded_batch(&test_batch()).unwrap();
writer.flush().unwrap();
assert!(
writer.current_active_path().exists(),
"writer must have a fresh active file after recovery"
);

writer.finalize().unwrap();
}

/// Companion to `test_drained_recovers_when_active_file_deleted` covering
/// the more realistic case where the entire trace directory has been
/// removed (e.g. `rm -rf /var/log/dial9/`). Both the rename AND the
/// `File::create` for the new segment fail with `NotFound`. `drained()`
/// must still advance timers so `should_drain()` stops firing — the
/// writer can transition to `Finished`, but the flush loop must NOT
/// busy-spin.
#[tokio::test(start_paused = true)]
async fn test_drained_recovers_when_parent_dir_deleted() {
use metrique_timesource::{TimeSource, tokio::set_time_source_for_current_runtime};
let _guard = set_time_source_for_current_runtime(TimeSource::tokio(std::time::UNIX_EPOCH));

let dir = TempDir::new().unwrap();
let trace_dir = dir.path().join("traces");
std::fs::create_dir_all(&trace_dir).unwrap();
let base = trace_dir.join("trace");
let mut writer = RotatingWriter::builder()
.base_path(&base)
.max_file_size(u64::MAX)
.max_total_size(100_000)
.rotation_period(Duration::from_secs(60))
.build()
.unwrap();

writer.write_encoded_batch(&test_batch()).unwrap();
writer.flush().unwrap();

std::fs::remove_dir_all(&trace_dir).unwrap();
assert!(!writer.current_active_path().exists());

tokio::time::advance(Duration::from_secs(61)).await;
assert!(writer.should_drain());

// `drained()` may surface the underlying error, but the critical
// invariant is that `should_drain()` must NOT fire on the next tick —
// otherwise the flush thread busy-loops.
let _ = writer.drained();
assert!(
!writer.should_drain(),
"should_drain must return false after a failed rotation \
(otherwise the flush loop spins on every 5ms tick)"
);

// Subsequent drained() calls must not re-fire either.
tokio::time::advance(Duration::from_millis(5)).await;
let _ = writer.drained();
assert!(!writer.should_drain());
}
}
Loading