@@ -432,24 +432,83 @@ impl RotatingWriter {
432432 let WriterState :: Active { writer : raw, .. } = & mut self . state else {
433433 return Ok ( ( ) ) ;
434434 } ;
435- raw. flush ( ) ?;
436- // Seal the current segment: snapshot size and rename .active → .bin
435+
436+ // Advance timers up front. If anything below fails the flush loop must
437+ // NOT see should_drain() return true on the next 5ms tick — otherwise
438+ // it busy-spins re-attempting the same failing rotate.
439+ let now = time_source ( ) . system_time ( ) . as_std ( ) ;
440+ self . next_rotation_time = Self :: next_boundary ( now, self . rotation_period ) ;
441+ self . next_drain_time = Self :: next_boundary ( now, self . drain_interval ) ;
442+
443+ // Best-effort flush. If the underlying file is gone the buffered bytes
444+ // are already lost; proceed to rotate rather than erroring.
445+ let _ = raw. flush ( ) ;
437446 let closed_size = raw. bytes_written ( ) ;
438- let sealed = Self :: file_path ( & self . base_path , self . next_index - 1 ) ;
439- fs:: rename ( & self . active_path , & sealed) ?;
440- self . closed_files . push_back ( ( sealed, closed_size) ) ;
447+ let sealed_path = Self :: file_path ( & self . base_path , self . next_index - 1 ) ;
448+
449+ // Seal the current segment. If `.active` was removed externally
450+ // (operator, log rotation, container teardown) abandon the segment
451+ // and start a fresh one. On any other error give up cleanly: mark
452+ // the writer Finished so writes and rotations stop, instead of
453+ // leaving inconsistent state.
454+ match fs:: rename ( & self . active_path , & sealed_path) {
455+ Ok ( ( ) ) => self . closed_files . push_back ( ( sealed_path, closed_size) ) ,
456+ Err ( e) if e. kind ( ) == std:: io:: ErrorKind :: NotFound => {
457+ rate_limited ! ( Duration :: from_secs( 60 ) , {
458+ tracing:: warn!(
459+ "active trace file {} disappeared before sealing; \
460+ abandoning segment and starting a fresh one",
461+ self . active_path. display( )
462+ ) ;
463+ } ) ;
464+ }
465+ Err ( e) => {
466+ self . state = WriterState :: Finished ;
467+ return Err ( e) ;
468+ }
469+ }
441470
442471 let new_path = Self :: active_path ( & self . base_path , self . next_index ) ;
443472 self . next_index += 1 ;
444- let file = File :: create ( & new_path) ?;
473+
474+ // Open the new active file. NotFound here typically means the parent
475+ // directory itself was removed (the most likely real-world cause of
476+ // the original `.active` file disappearing too). Try to recreate the
477+ // directory once and retry. If that still fails, mark Finished so the
478+ // writer stops cleanly rather than retrying every drain cycle.
479+ let file = match File :: create ( & new_path) {
480+ Ok ( f) => f,
481+ Err ( e) if e. kind ( ) == std:: io:: ErrorKind :: NotFound => {
482+ if let Some ( parent) = new_path. parent ( )
483+ && let Err ( mkdir_err) = fs:: create_dir_all ( parent)
484+ {
485+ self . state = WriterState :: Finished ;
486+ return Err ( mkdir_err) ;
487+ }
488+ match File :: create ( & new_path) {
489+ Ok ( f) => f,
490+ Err ( e) => {
491+ self . state = WriterState :: Finished ;
492+ return Err ( e) ;
493+ }
494+ }
495+ }
496+ Err ( e) => {
497+ self . state = WriterState :: Finished ;
498+ return Err ( e) ;
499+ }
500+ } ;
445501 let writer = BufWriter :: new ( file) ;
446- self . state = Self :: prepare_segment ( writer) ?;
502+ self . state = match Self :: prepare_segment ( writer) {
503+ Ok ( s) => s,
504+ Err ( e) => {
505+ self . state = WriterState :: Finished ;
506+ return Err ( e) ;
507+ }
508+ } ;
447509 self . active_path = new_path;
448510 self . did_rotate = true ;
449511 self . has_real_events = false ;
450- let now = time_source ( ) . system_time ( ) . as_std ( ) ;
451- self . next_rotation_time = Self :: next_boundary ( now, self . rotation_period ) ;
452- self . next_drain_time = Self :: next_boundary ( now, self . drain_interval ) ;
453512
454513 tracing:: debug!(
455514 segment_index = self . next_index - 1 ,
@@ -591,17 +650,31 @@ impl TraceWriter for RotatingWriter {
591650 tracing:: warn!( "writer is already closed." ) ;
592651 } ) ;
593652 }
594- self . flush ( ) ? ;
595- // Rename .active → .bin for the current segment (if it has .active suffix)
653+ // Best-effort flush: if the file is gone the bytes are already lost.
654+ let _ = self . flush ( ) ;
596655 if self
597656 . active_path
598657 . extension ( )
599658 . is_some_and ( |ext| ext == "active" )
600659 {
601660 if self . has_real_events {
602661 let sealed = Self :: file_path ( & self . base_path , self . next_index - 1 ) ;
603- fs:: rename ( & self . active_path , & sealed) ?;
604- self . active_path = sealed;
662+ match fs:: rename ( & self . active_path , & sealed) {
663+ Ok ( ( ) ) => self . active_path = sealed,
664+ Err ( e) if e. kind ( ) == std:: io:: ErrorKind :: NotFound => {
665+ rate_limited ! ( Duration :: from_secs( 60 ) , {
666+ tracing:: warn!(
667+ "active trace file {} disappeared before finalize; \
668+ dropping segment",
669+ self . active_path. display( )
670+ ) ;
671+ } ) ;
672+ }
673+ Err ( e) => {
674+ self . state = WriterState :: Finished ;
675+ return Err ( e) ;
676+ }
677+ }
605678 } else {
606679 // No real events — just header + metadata. Remove instead of
607680 // sealing so the background worker doesn't upload an empty segment.
@@ -612,6 +685,7 @@ impl TraceWriter for RotatingWriter {
612685 if let Err ( e) = fs:: remove_file ( & self . active_path )
613686 && e. kind ( ) != std:: io:: ErrorKind :: NotFound
614687 {
688+ self . state = WriterState :: Finished ;
615689 return Err ( e) ;
616690 }
617691 }
@@ -2024,4 +2098,118 @@ mod tests {
20242098 "identical update_segment_metadata should not trigger another write"
20252099 ) ;
20262100 }
2101+
2102+ /// Regression test for https://github.com/dial9-rs/dial9/issues/386
2103+ ///
2104+ /// If the `.active` file is removed externally (e.g. by an operator,
2105+ /// log-rotation tool, or container teardown) the flush loop calls
2106+ /// `drained()` → `rotate()` → `fs::rename(.active, .bin)` which fails
2107+ /// with `NotFound`. Without recovery, `next_drain_time` is never
2108+ /// advanced, so `should_drain()` returns true on every subsequent
2109+ /// 5ms tick and the flush thread busy-loops.
2110+ ///
2111+ /// `drained()` must recover by abandoning the missing segment, opening a
2112+ /// fresh one, and advancing the drain/rotation timers.
2113+ #[ tokio:: test( start_paused = true ) ]
2114+ async fn test_drained_recovers_when_active_file_deleted ( ) {
2115+ use metrique_timesource:: { TimeSource , tokio:: set_time_source_for_current_runtime} ;
2116+ let _guard = set_time_source_for_current_runtime ( TimeSource :: tokio ( std:: time:: UNIX_EPOCH ) ) ;
2117+
2118+ let dir = TempDir :: new ( ) . unwrap ( ) ;
2119+ let base = dir. path ( ) . join ( "trace" ) ;
2120+ let mut writer = RotatingWriter :: builder ( )
2121+ . base_path ( & base)
2122+ . max_file_size ( u64:: MAX )
2123+ . max_total_size ( 100_000 )
2124+ . rotation_period ( Duration :: from_secs ( 60 ) )
2125+ . build ( )
2126+ . unwrap ( ) ;
2127+
2128+ writer. write_encoded_batch ( & test_batch ( ) ) . unwrap ( ) ;
2129+ writer. flush ( ) . unwrap ( ) ;
2130+
2131+ // Simulate external deletion of the .active file.
2132+ let active_path = writer. current_active_path ( ) . to_owned ( ) ;
2133+ assert ! ( active_path. exists( ) ) ;
2134+ std:: fs:: remove_file ( & active_path) . unwrap ( ) ;
2135+
2136+ // Cross the rotation boundary so drained() will try to rotate.
2137+ tokio:: time:: advance ( Duration :: from_secs ( 61 ) ) . await ;
2138+
2139+ assert ! ( writer. should_drain( ) , "should_drain should fire" ) ;
2140+
2141+ // drained() must succeed despite the missing .active file. Returning
2142+ // an error here is what causes the flush thread to busy-loop because
2143+ // the timers are never advanced.
2144+ writer
2145+ . drained ( )
2146+ . expect ( "drained() must recover from missing .active file" ) ;
2147+
2148+ // After recovery, should_drain() must return false — otherwise the
2149+ // flush thread would spin calling drained() every 5ms.
2150+ assert ! (
2151+ !writer. should_drain( ) ,
2152+ "should_drain must return false after recovery (otherwise flush loop spins)"
2153+ ) ;
2154+
2155+ // The writer must still be usable: a fresh active file exists and
2156+ // subsequent writes succeed.
2157+ writer. write_encoded_batch ( & test_batch ( ) ) . unwrap ( ) ;
2158+ writer. flush ( ) . unwrap ( ) ;
2159+ assert ! (
2160+ writer. current_active_path( ) . exists( ) ,
2161+ "writer must have a fresh active file after recovery"
2162+ ) ;
2163+
2164+ writer. finalize ( ) . unwrap ( ) ;
2165+ }
2166+
2167+ /// Companion to `test_drained_recovers_when_active_file_deleted` covering
2168+ /// the more realistic case where the entire trace directory has been
2169+ /// removed (e.g. `rm -rf /var/log/dial9/`). Both the rename AND the
2170+ /// `File::create` for the new segment fail with `NotFound`. `drained()`
2171+ /// must still advance timers so `should_drain()` stops firing — the
2172+ /// writer can transition to `Finished`, but the flush loop must NOT
2173+ /// busy-spin.
2174+ #[ tokio:: test( start_paused = true ) ]
2175+ async fn test_drained_recovers_when_parent_dir_deleted ( ) {
2176+ use metrique_timesource:: { TimeSource , tokio:: set_time_source_for_current_runtime} ;
2177+ let _guard = set_time_source_for_current_runtime ( TimeSource :: tokio ( std:: time:: UNIX_EPOCH ) ) ;
2178+
2179+ let dir = TempDir :: new ( ) . unwrap ( ) ;
2180+ let trace_dir = dir. path ( ) . join ( "traces" ) ;
2181+ std:: fs:: create_dir_all ( & trace_dir) . unwrap ( ) ;
2182+ let base = trace_dir. join ( "trace" ) ;
2183+ let mut writer = RotatingWriter :: builder ( )
2184+ . base_path ( & base)
2185+ . max_file_size ( u64:: MAX )
2186+ . max_total_size ( 100_000 )
2187+ . rotation_period ( Duration :: from_secs ( 60 ) )
2188+ . build ( )
2189+ . unwrap ( ) ;
2190+
2191+ writer. write_encoded_batch ( & test_batch ( ) ) . unwrap ( ) ;
2192+ writer. flush ( ) . unwrap ( ) ;
2193+
2194+ std:: fs:: remove_dir_all ( & trace_dir) . unwrap ( ) ;
2195+ assert ! ( !writer. current_active_path( ) . exists( ) ) ;
2196+
2197+ tokio:: time:: advance ( Duration :: from_secs ( 61 ) ) . await ;
2198+ assert ! ( writer. should_drain( ) ) ;
2199+
2200+ // `drained()` may surface the underlying error, but the critical
2201+ // invariant is that `should_drain()` must NOT fire on the next tick —
2202+ // otherwise the flush thread busy-loops.
2203+ let _ = writer. drained ( ) ;
2204+ assert ! (
2205+ !writer. should_drain( ) ,
2206+ "should_drain must return false after a failed rotation \
2207+ (otherwise the flush loop spins on every 5ms tick)"
2208+ ) ;
2209+
2210+ // Subsequent drained() calls must not re-fire either.
2211+ tokio:: time:: advance ( Duration :: from_millis ( 5 ) ) . await ;
2212+ let _ = writer. drained ( ) ;
2213+ assert ! ( !writer. should_drain( ) ) ;
2214+ }
20272215}
0 commit comments