diff --git a/dial9-tokio-telemetry/src/telemetry/writer.rs b/dial9-tokio-telemetry/src/telemetry/writer.rs index 51a690e3..590a21db 100644 --- a/dial9-tokio-telemetry/src/telemetry/writer.rs +++ b/dial9-tokio-telemetry/src/telemetry/writer.rs @@ -432,24 +432,84 @@ impl RotatingWriter { let WriterState::Active { writer: raw, .. } = &mut self.state else { return Ok(()); }; - raw.flush()?; - // Seal the current segment: snapshot size and rename .active → .bin + + // Advance timers up front. If anything below fails the flush loop must + // NOT see should_drain() return true on the next 5ms tick — otherwise + // it busy-spins re-attempting the same failing rotate. + let now = time_source().system_time().as_std(); + self.next_rotation_time = Self::next_boundary(now, self.rotation_period); + self.next_drain_time = Self::next_boundary(now, self.drain_interval); + + // Best-effort flush. If the underlying file is gone the buffered bytes + // are already lost; proceed to rotate rather than erroring. + let _ = raw.flush(); let closed_size = raw.bytes_written(); - let sealed = Self::file_path(&self.base_path, self.next_index - 1); - fs::rename(&self.active_path, &sealed)?; - self.closed_files.push_back((sealed, closed_size)); + let sealed_path = Self::file_path(&self.base_path, self.next_index - 1); + + // Seal the current segment. If `.active` was removed externally + // (operator, log rotation, container teardown) abandon the segment + // and start a fresh one. On any other error give up cleanly: mark + // the writer Finished so writes and rotations stop, instead of + // leaving inconsistent state. + match fs::rename(&self.active_path, &sealed_path) { + Ok(()) => self.closed_files.push_back((sealed_path, closed_size)), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + rate_limited!(Duration::from_secs(60), { + tracing::warn!( + "active trace file {} disappeared before sealing; \ + abandoning segment and starting a fresh one", + self.active_path.display() + ); + }); + } + Err(e) => { + self.state = WriterState::Finished; + return Err(e); + } + } let new_path = Self::active_path(&self.base_path, self.next_index); self.next_index += 1; - let file = File::create(&new_path)?; + + // Open the new active file. NotFound here typically means the parent + // directory itself was removed (the most likely real-world cause of + // the original `.active` file disappearing too). Try to recreate the + // directory once and retry. If that still fails, mark Finished so the + // writer stops cleanly rather than retrying every drain cycle. + let file = match File::create(&new_path) { + Ok(f) => f, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + if let Some(parent) = new_path.parent() + && let Err(mkdir_err) = fs::create_dir_all(parent) + { + self.state = WriterState::Finished; + return Err(mkdir_err); + } + match File::create(&new_path) { + Ok(f) => f, + Err(e) => { + self.state = WriterState::Finished; + return Err(e); + } + } + } + Err(e) => { + self.state = WriterState::Finished; + return Err(e); + } + }; let writer = BufWriter::new(file); - self.state = Self::prepare_segment(writer)?; + self.state = match Self::prepare_segment(writer) { + Ok(s) => s, + Err(e) => { + let _ = std::fs::remove_file(&new_path); + self.state = WriterState::Finished; + return Err(e); + } + }; self.active_path = new_path; self.did_rotate = true; self.has_real_events = false; - let now = time_source().system_time().as_std(); - self.next_rotation_time = Self::next_boundary(now, self.rotation_period); - self.next_drain_time = Self::next_boundary(now, self.drain_interval); tracing::debug!( segment_index = self.next_index - 1, @@ -591,8 +651,8 @@ impl TraceWriter for RotatingWriter { tracing::warn!("writer is already closed."); }); } - self.flush()?; - // Rename .active → .bin for the current segment (if it has .active suffix) + // Best-effort flush: if the file is gone the bytes are already lost. + let _ = self.flush(); if self .active_path .extension() @@ -600,8 +660,22 @@ impl TraceWriter for RotatingWriter { { if self.has_real_events { let sealed = Self::file_path(&self.base_path, self.next_index - 1); - fs::rename(&self.active_path, &sealed)?; - self.active_path = sealed; + match fs::rename(&self.active_path, &sealed) { + Ok(()) => self.active_path = sealed, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + rate_limited!(Duration::from_secs(60), { + tracing::warn!( + "active trace file {} disappeared before finalize; \ + dropping segment", + self.active_path.display() + ); + }); + } + Err(e) => { + self.state = WriterState::Finished; + return Err(e); + } + } } else { // No real events — just header + metadata. Remove instead of // sealing so the background worker doesn't upload an empty segment. @@ -612,6 +686,7 @@ impl TraceWriter for RotatingWriter { if let Err(e) = fs::remove_file(&self.active_path) && e.kind() != std::io::ErrorKind::NotFound { + self.state = WriterState::Finished; return Err(e); } } @@ -2024,4 +2099,118 @@ mod tests { "identical update_segment_metadata should not trigger another write" ); } + + /// Regression test for https://github.com/dial9-rs/dial9/issues/386 + /// + /// If the `.active` file is removed externally (e.g. by an operator, + /// log-rotation tool, or container teardown) the flush loop calls + /// `drained()` → `rotate()` → `fs::rename(.active, .bin)` which fails + /// with `NotFound`. Without recovery, `next_drain_time` is never + /// advanced, so `should_drain()` returns true on every subsequent + /// 5ms tick and the flush thread busy-loops. + /// + /// `drained()` must recover by abandoning the missing segment, opening a + /// fresh one, and advancing the drain/rotation timers. + #[tokio::test(start_paused = true)] + async fn test_drained_recovers_when_active_file_deleted() { + use metrique_timesource::{TimeSource, tokio::set_time_source_for_current_runtime}; + let _guard = set_time_source_for_current_runtime(TimeSource::tokio(std::time::UNIX_EPOCH)); + + let dir = TempDir::new().unwrap(); + let base = dir.path().join("trace"); + let mut writer = RotatingWriter::builder() + .base_path(&base) + .max_file_size(u64::MAX) + .max_total_size(100_000) + .rotation_period(Duration::from_secs(60)) + .build() + .unwrap(); + + writer.write_encoded_batch(&test_batch()).unwrap(); + writer.flush().unwrap(); + + // Simulate external deletion of the .active file. + let active_path = writer.current_active_path().to_owned(); + assert!(active_path.exists()); + std::fs::remove_file(&active_path).unwrap(); + + // Cross the rotation boundary so drained() will try to rotate. + tokio::time::advance(Duration::from_secs(61)).await; + + assert!(writer.should_drain(), "should_drain should fire"); + + // drained() must succeed despite the missing .active file. Returning + // an error here is what causes the flush thread to busy-loop because + // the timers are never advanced. + writer + .drained() + .expect("drained() must recover from missing .active file"); + + // After recovery, should_drain() must return false — otherwise the + // flush thread would spin calling drained() every 5ms. + assert!( + !writer.should_drain(), + "should_drain must return false after recovery (otherwise flush loop spins)" + ); + + // The writer must still be usable: a fresh active file exists and + // subsequent writes succeed. + writer.write_encoded_batch(&test_batch()).unwrap(); + writer.flush().unwrap(); + assert!( + writer.current_active_path().exists(), + "writer must have a fresh active file after recovery" + ); + + writer.finalize().unwrap(); + } + + /// Companion to `test_drained_recovers_when_active_file_deleted` covering + /// the more realistic case where the entire trace directory has been + /// removed (e.g. `rm -rf /var/log/dial9/`). Both the rename AND the + /// `File::create` for the new segment fail with `NotFound`. `drained()` + /// must still advance timers so `should_drain()` stops firing — the + /// writer can transition to `Finished`, but the flush loop must NOT + /// busy-spin. + #[tokio::test(start_paused = true)] + async fn test_drained_recovers_when_parent_dir_deleted() { + use metrique_timesource::{TimeSource, tokio::set_time_source_for_current_runtime}; + let _guard = set_time_source_for_current_runtime(TimeSource::tokio(std::time::UNIX_EPOCH)); + + let dir = TempDir::new().unwrap(); + let trace_dir = dir.path().join("traces"); + std::fs::create_dir_all(&trace_dir).unwrap(); + let base = trace_dir.join("trace"); + let mut writer = RotatingWriter::builder() + .base_path(&base) + .max_file_size(u64::MAX) + .max_total_size(100_000) + .rotation_period(Duration::from_secs(60)) + .build() + .unwrap(); + + writer.write_encoded_batch(&test_batch()).unwrap(); + writer.flush().unwrap(); + + std::fs::remove_dir_all(&trace_dir).unwrap(); + assert!(!writer.current_active_path().exists()); + + tokio::time::advance(Duration::from_secs(61)).await; + assert!(writer.should_drain()); + + // `drained()` may surface the underlying error, but the critical + // invariant is that `should_drain()` must NOT fire on the next tick — + // otherwise the flush thread busy-loops. + let _ = writer.drained(); + assert!( + !writer.should_drain(), + "should_drain must return false after a failed rotation \ + (otherwise the flush loop spins on every 5ms tick)" + ); + + // Subsequent drained() calls must not re-fire either. + tokio::time::advance(Duration::from_millis(5)).await; + let _ = writer.drained(); + assert!(!writer.should_drain()); + } }