@@ -202,10 +202,10 @@ impl<E: Context, V: CodecShared> Inner<E, V> {
202202/// The data journal is always the source of truth. The offsets journal is an index
203203/// that may temporarily diverge during crashes. Divergences are automatically
204204/// aligned during init():
205- /// * If offsets.size() < data.size(): Rebuild missing offsets by replaying data.
206- /// (This can happen if we crash after writing data journal but before writing offsets journal)
207- /// * If offsets.size() > data.size(): Rewind offsets to match data size.
208- /// (This can happen if we crash after rewinding data journal but before rewinding offsets journal)
205+ /// * If offsets are behind data after the recovery watermark: rebuild missing offsets by replaying
206+ /// data from the recovery anchor.
207+ /// * If offsets are ahead of the retained data prefix: rewind offsets to match the data-backed
208+ /// size.
209209/// * If offsets.bounds().start < data.bounds().start: Prune offsets to match
210210/// (This can happen if we crash after pruning data journal but before pruning offsets journal)
211211///
@@ -217,8 +217,8 @@ impl<E: Context, V: CodecShared> Inner<E, V> {
217217///
218218/// The offsets journal's recovery watermark records a preferred point for replaying data to rebuild
219219/// offset entries after a crash. If that point falls outside the recovered offsets bounds, init
220- /// falls back to the offsets start. Items appended after the replay point may still survive, but
221- /// matching offsets may need to be rebuilt .
220+ /// falls back to the offsets start. Replay after the anchor stops at the first short data section
221+ /// and truncates newer sections so the recovered journal remains a contiguous prefix .
222222pub struct Journal < E : Context , V : Codec > {
223223 /// Inner state for data journal metadata.
224224 ///
@@ -921,8 +921,8 @@ impl<E: Context, V: CodecShared> Journal<E, V> {
921921 /// Align the offsets journal and data journal to be consistent in case a crash occurred
922922 /// on a previous run and left the journals in an inconsistent state.
923923 ///
924- /// The data journal is the source of truth. This function scans it to determine
925- /// what SHOULD be in the offsets journal , then fixes any mismatches.
924+ /// The data journal is the source of truth. This function replays the data journal as needed to
925+ /// verify or rebuild the offsets suffix , then fixes any mismatches.
926926 ///
927927 /// # Returns
928928 ///
@@ -1051,16 +1051,30 @@ impl<E: Context, V: CodecShared> Journal<E, V> {
10511051 let offsets_reader = offsets. reader ( ) . await ;
10521052 offsets_reader. bounds ( )
10531053 } ;
1054- let recovery_watermark = offsets. recovery_watermark ( ) . await ;
1054+ // The newest data section bounds how far recovery can possibly go. If it is also the
1055+ // oldest retained section, its logical start may be a mid-section pruning boundary.
1056+ let data_newest_section = data
1057+ . newest_section ( )
1058+ . expect ( "non-empty data journal should have newest section" ) ;
1059+ let data_newest_start = data_newest_section
1060+ . checked_mul ( items_per_section)
1061+ . ok_or ( Error :: OffsetOverflow ) ?;
1062+ let retained_data_end_bound = data_newest_start
1063+ . max ( offsets_bounds. start )
1064+ . checked_add ( items_in_last_section)
1065+ . ok_or ( Error :: OffsetOverflow ) ?;
10551066
1067+ let recovery_watermark = offsets. recovery_watermark ( ) . await ;
10561068 let recovery_start = if recovery_watermark < offsets_bounds. start
10571069 || recovery_watermark > offsets_bounds. end
1070+ || recovery_watermark > retained_data_end_bound
10581071 {
10591072 warn ! (
10601073 recovery_watermark,
10611074 start = offsets_bounds. start,
10621075 end = offsets_bounds. end,
1063- "crash repair: offsets recovery watermark outside recovered offsets bounds, rebuilding from offsets start"
1076+ retained_data_end_bound,
1077+ "crash repair: offsets recovery watermark is unusable, rebuilding from offsets start"
10641078 ) ;
10651079 offsets_bounds. start
10661080 } else {
@@ -1127,10 +1141,11 @@ impl<E: Context, V: CodecShared> Journal<E, V> {
11271141
11281142 /// Rebuild the offsets suffix by replaying the data journal from a recovery anchor.
11291143 ///
1130- /// Returns `Ok(None)` if the anchor is ahead of the data journal and callers should retry
1131- /// from an earlier point.
1144+ /// Returns `Ok(None)` if the anchor is ahead of the data journal and callers should retry from
1145+ /// an earlier point. If replay finds a short section after the anchor, recovery truncates newer
1146+ /// data sections and returns the contiguous data-backed size.
11321147 async fn rebuild_offsets_from_anchor (
1133- data : & variable:: Journal < E , V > ,
1148+ data : & mut variable:: Journal < E , V > ,
11341149 offsets : & mut fixed:: Journal < E , u64 > ,
11351150 items_per_section : u64 ,
11361151 pruning_boundary : u64 ,
@@ -1156,37 +1171,62 @@ impl<E: Context, V: CodecShared> Journal<E, V> {
11561171 let start_section = position_to_section ( anchor, items_per_section) ;
11571172 let first_position = pruning_boundary. max ( start_section * items_per_section) ;
11581173
1159- let skip = anchor - first_position;
1160- let stream = data. replay ( start_section, 0 , REPLAY_BUFFER_SIZE ) . await ?;
1161- futures:: pin_mut!( stream) ;
1174+ let ( size, repair) = {
1175+ let skip = anchor - first_position;
1176+ let stream = data. replay ( start_section, 0 , REPLAY_BUFFER_SIZE ) . await ?;
1177+ futures:: pin_mut!( stream) ;
11621178
1163- let mut skipped = 0 ;
1164- while skipped < skip {
1165- let Some ( result) = stream. next ( ) . await else {
1166- return Ok ( None ) ;
1167- } ;
1168- let ( section, _offset, _size, _item) = result?;
1169- let position = first_position + skipped;
1170- let expected_section = position_to_section ( position, items_per_section) ;
1171- if section != expected_section {
1172- return Err ( Error :: Corruption ( format ! (
1173- "data section {section} contains logical position {position}, expected section {expected_section}"
1174- ) ) ) ;
1179+ let mut skipped = 0 ;
1180+ while skipped < skip {
1181+ let Some ( result) = stream. next ( ) . await else {
1182+ return Ok ( None ) ;
1183+ } ;
1184+ let ( section, _offset, _size, _item) = result?;
1185+ let position = first_position + skipped;
1186+ let expected_section = position_to_section ( position, items_per_section) ;
1187+ if section != expected_section {
1188+ if section > expected_section {
1189+ return Ok ( None ) ;
1190+ }
1191+ return Err ( Error :: Corruption ( format ! (
1192+ "data section {section} contains logical position {position}, expected section {expected_section}"
1193+ ) ) ) ;
1194+ }
1195+ skipped += 1 ;
11751196 }
1176- skipped += 1 ;
1177- }
11781197
1179- let mut size = anchor;
1180- while let Some ( result) = stream. next ( ) . await {
1181- let ( section, offset, _size, _item) = result?;
1182- let expected_section = position_to_section ( size, items_per_section) ;
1183- if section != expected_section {
1184- return Err ( Error :: Corruption ( format ! (
1185- "data section {section} contains logical position {size}, expected section {expected_section}"
1186- ) ) ) ;
1198+ let mut size = anchor;
1199+ let mut repair = None ;
1200+ while let Some ( result) = stream. next ( ) . await {
1201+ let ( section, offset, _size, _item) = result?;
1202+ let expected_section = position_to_section ( size, items_per_section) ;
1203+ if section != expected_section {
1204+ if section > expected_section {
1205+ let byte_offset = data. size ( expected_section) . await ?;
1206+ repair = Some ( ( expected_section, section, size, byte_offset) ) ;
1207+ break ;
1208+ }
1209+ return Err ( Error :: Corruption ( format ! (
1210+ "data section {section} contains logical position {size}, expected section {expected_section}"
1211+ ) ) ) ;
1212+ }
1213+ offsets. append ( & offset) . await ?;
1214+ size += 1 ;
11871215 }
1188- offsets. append ( & offset) . await ?;
1189- size += 1 ;
1216+ ( size, repair)
1217+ } ;
1218+
1219+ if let Some ( ( section, next_section, size, byte_offset) ) = repair {
1220+ warn ! (
1221+ section,
1222+ next_section,
1223+ size,
1224+ byte_offset,
1225+ "crash repair: truncating data after short section"
1226+ ) ;
1227+ data. rewind ( section, byte_offset) . await ?;
1228+ data. sync ( section) . await ?;
1229+ return Ok ( Some ( size) ) ;
11901230 }
11911231
11921232 Ok ( Some ( size) )
@@ -2282,10 +2322,15 @@ mod tests {
22822322 let ( offset, _) = data. append ( 0 , & 100 ) . await . unwrap ( ) ;
22832323 offsets. append ( & offset) . await . unwrap ( ) ;
22842324
2285- let result =
2286- Journal :: < _ , u64 > :: rebuild_offsets_from_anchor ( & data, & mut offsets, 10 , 0 , 2 )
2287- . await
2288- . unwrap ( ) ;
2325+ let result = Journal :: < _ , u64 > :: rebuild_offsets_from_anchor (
2326+ & mut data,
2327+ & mut offsets,
2328+ 10 ,
2329+ 0 ,
2330+ 2 ,
2331+ )
2332+ . await
2333+ . unwrap ( ) ;
22892334 assert ! ( result. is_none( ) ) ;
22902335
22912336 data. destroy ( ) . await . unwrap ( ) ;
@@ -2385,6 +2430,107 @@ mod tests {
23852430 } ) ;
23862431 }
23872432
2433+ #[ test_traced]
2434+ fn test_variable_recovery_boundary_data_rewind_rebuilds_offsets ( ) {
2435+ let executor = deterministic:: Runner :: default ( ) ;
2436+ executor. start ( |context| async move {
2437+ let cfg = Config {
2438+ partition : "recovery-boundary-data-rewind" . into ( ) ,
2439+ items_per_section : NZU64 ! ( 10 ) ,
2440+ compression : None ,
2441+ codec_config : ( ) ,
2442+ page_cache : CacheRef :: from_pooler ( & context, LARGE_PAGE_SIZE , NZUsize ! ( 10 ) ) ,
2443+ write_buffer : NZUsize ! ( 1024 ) ,
2444+ } ;
2445+
2446+ let journal = Journal :: < _ , u64 > :: init ( context. child ( "first" ) , cfg. clone ( ) )
2447+ . await
2448+ . unwrap ( ) ;
2449+
2450+ for i in 0 ..20u64 {
2451+ journal. append ( & ( i * 100 ) ) . await . unwrap ( ) ;
2452+ }
2453+ journal. sync ( ) . await . unwrap ( ) ;
2454+
2455+ journal. test_rewind_data_to_position ( 10 ) . await . unwrap ( ) ;
2456+ journal. test_sync_data ( ) . await . unwrap ( ) ;
2457+ drop ( journal) ;
2458+
2459+ let journal = Journal :: < _ , u64 > :: init ( context. child ( "second" ) , cfg. clone ( ) )
2460+ . await
2461+ . unwrap ( ) ;
2462+ assert_eq ! ( journal. bounds( ) . await , 0 ..10 ) ;
2463+ assert_eq ! ( journal. test_offsets_size( ) . await , 10 ) ;
2464+ for i in 0 ..10u64 {
2465+ assert_eq ! ( journal. read( i) . await . unwrap( ) , i * 100 ) ;
2466+ }
2467+ assert ! ( matches!(
2468+ journal. read( 10 ) . await ,
2469+ Err ( Error :: ItemOutOfRange ( 10 ) )
2470+ ) ) ;
2471+
2472+ journal. destroy ( ) . await . unwrap ( ) ;
2473+ } ) ;
2474+ }
2475+
2476+ #[ test_traced]
2477+ fn test_variable_recovery_truncates_short_data_section_after_anchor ( ) {
2478+ let executor = deterministic:: Runner :: default ( ) ;
2479+ executor. start ( |context| async move {
2480+ let cfg = Config {
2481+ partition : "recovery-short-section-after-anchor" . into ( ) ,
2482+ items_per_section : NZU64 ! ( 10 ) ,
2483+ compression : None ,
2484+ codec_config : ( ) ,
2485+ page_cache : CacheRef :: from_pooler ( & context, LARGE_PAGE_SIZE , NZUsize ! ( 10 ) ) ,
2486+ write_buffer : NZUsize ! ( 1024 ) ,
2487+ } ;
2488+
2489+ let journal = Journal :: < _ , u64 > :: init ( context. child ( "first" ) , cfg. clone ( ) )
2490+ . await
2491+ . unwrap ( ) ;
2492+
2493+ for i in 0 ..25u64 {
2494+ journal. append ( & ( i * 100 ) ) . await . unwrap ( ) ;
2495+ }
2496+ journal. sync ( ) . await . unwrap ( ) ;
2497+
2498+ // Simulate a crash after the previous recovery checkpoint where section 1 was only
2499+ // partly durable but section 2 was present. Recovery should keep the contiguous prefix
2500+ // and discard section 2 rather than treating the section jump as hard corruption.
2501+ journal
2502+ . test_set_offsets_recovery_watermark ( 10 )
2503+ . await
2504+ . unwrap ( ) ;
2505+ let offset = {
2506+ let offsets = journal. offsets . reader ( ) . await ;
2507+ offsets. read ( 12 ) . await . unwrap ( )
2508+ } ;
2509+ {
2510+ let mut inner = journal. inner . write ( ) . await ;
2511+ inner. data . rewind_section ( 1 , offset) . await . unwrap ( ) ;
2512+ inner. data . sync ( 1 ) . await . unwrap ( ) ;
2513+ inner. data . sync ( 2 ) . await . unwrap ( ) ;
2514+ }
2515+ drop ( journal) ;
2516+
2517+ let journal = Journal :: < _ , u64 > :: init ( context. child ( "second" ) , cfg. clone ( ) )
2518+ . await
2519+ . unwrap ( ) ;
2520+ assert_eq ! ( journal. bounds( ) . await , 0 ..12 ) ;
2521+ assert_eq ! ( journal. test_offsets_size( ) . await , 12 ) ;
2522+ for i in 0 ..12u64 {
2523+ assert_eq ! ( journal. read( i) . await . unwrap( ) , i * 100 ) ;
2524+ }
2525+ assert ! ( matches!(
2526+ journal. read( 12 ) . await ,
2527+ Err ( Error :: ItemOutOfRange ( 12 ) )
2528+ ) ) ;
2529+
2530+ journal. destroy ( ) . await . unwrap ( ) ;
2531+ } ) ;
2532+ }
2533+
23882534 #[ test_traced]
23892535 fn test_variable_init_persists_offsets_trailing_item_repair ( ) {
23902536 let executor = deterministic:: Runner :: default ( ) ;
0 commit comments