@@ -1301,11 +1301,30 @@ impl<E: Context, V: CodecShared> Journal<E, V> {
13011301 self . offsets . rewind ( position) . await
13021302 }
13031303
1304+ /// Test helper: Set and persist the offsets recovery watermark directly.
1305+ pub ( crate ) async fn test_set_offsets_recovery_watermark (
1306+ & self ,
1307+ watermark : u64 ,
1308+ ) -> Result < ( ) , Error > {
1309+ self . offsets . test_set_recovery_watermark ( watermark) . await
1310+ }
1311+
13041312 /// Test helper: Get the size of the internal offsets journal.
13051313 pub ( crate ) async fn test_offsets_size ( & self ) -> u64 {
13061314 self . offsets . size ( ) . await
13071315 }
13081316
1317+ /// Test helper: Rewind the internal data journal to the item at `position`.
1318+ pub ( crate ) async fn test_rewind_data_to_position ( & self , position : u64 ) -> Result < ( ) , Error > {
1319+ let offset = {
1320+ let offsets_reader = self . offsets . reader ( ) . await ;
1321+ offsets_reader. read ( position) . await ?
1322+ } ;
1323+ let section = position_to_section ( position, self . items_per_section ) ;
1324+ let mut inner = self . inner . write ( ) . await ;
1325+ inner. data . rewind_to_offset ( section, offset) . await
1326+ }
1327+
13091328 /// Test helper: Append directly to the internal data journal (simulates crash scenario).
13101329 pub ( crate ) async fn test_append_data (
13111330 & self ,
@@ -2196,6 +2215,98 @@ mod tests {
21962215 } ) ;
21972216 }
21982217
2218+ #[ test_traced]
2219+ fn test_variable_recovery_offsets_watermark_outside_bounds_rebuilds_from_start ( ) {
2220+ let executor = deterministic:: Runner :: default ( ) ;
2221+ executor. start ( |context| async move {
2222+ let cfg = Config {
2223+ partition : "recovery-watermark-outside-bounds" . into ( ) ,
2224+ items_per_section : NZU64 ! ( 10 ) ,
2225+ compression : None ,
2226+ codec_config : ( ) ,
2227+ page_cache : CacheRef :: from_pooler ( & context, LARGE_PAGE_SIZE , NZUsize ! ( 10 ) ) ,
2228+ write_buffer : NZUsize ! ( 1024 ) ,
2229+ } ;
2230+
2231+ let journal = Journal :: < _ , u64 > :: init ( context. child ( "first" ) , cfg. clone ( ) )
2232+ . await
2233+ . unwrap ( ) ;
2234+
2235+ for i in 0 ..15u64 {
2236+ journal. append ( & ( i * 100 ) ) . await . unwrap ( ) ;
2237+ }
2238+ journal. sync ( ) . await . unwrap ( ) ;
2239+
2240+ // Simulate stale metadata that points past the recovered offsets bounds.
2241+ journal
2242+ . test_set_offsets_recovery_watermark ( 30 )
2243+ . await
2244+ . unwrap ( ) ;
2245+ drop ( journal) ;
2246+
2247+ let journal = Journal :: < _ , u64 > :: init ( context. child ( "second" ) , cfg. clone ( ) )
2248+ . await
2249+ . unwrap ( ) ;
2250+ assert_eq ! ( journal. bounds( ) . await , 0 ..15 ) ;
2251+ assert_eq ! ( journal. test_offsets_size( ) . await , 15 ) ;
2252+ for i in 0 ..15u64 {
2253+ assert_eq ! ( journal. read( i) . await . unwrap( ) , i * 100 ) ;
2254+ }
2255+
2256+ journal. destroy ( ) . await . unwrap ( ) ;
2257+ } ) ;
2258+ }
2259+
2260+ #[ test_traced]
2261+ fn test_variable_recovery_retries_from_pruning_boundary_when_anchor_too_far ( ) {
2262+ let executor = deterministic:: Runner :: default ( ) ;
2263+ executor. start ( |context| async move {
2264+ let cfg = Config {
2265+ partition : "recovery-anchor-too-far" . into ( ) ,
2266+ items_per_section : NZU64 ! ( 10 ) ,
2267+ compression : None ,
2268+ codec_config : ( ) ,
2269+ page_cache : CacheRef :: from_pooler ( & context, LARGE_PAGE_SIZE , NZUsize ! ( 10 ) ) ,
2270+ write_buffer : NZUsize ! ( 1024 ) ,
2271+ } ;
2272+
2273+ let journal = Journal :: < _ , u64 > :: init ( context. child ( "first" ) , cfg. clone ( ) )
2274+ . await
2275+ . unwrap ( ) ;
2276+
2277+ for i in 0 ..20u64 {
2278+ journal. append ( & ( i * 100 ) ) . await . unwrap ( ) ;
2279+ }
2280+ journal. sync ( ) . await . unwrap ( ) ;
2281+
2282+ // The offsets watermark is in-bounds, but the data journal is shorter than that
2283+ // anchor. Recovery should retry from the pruning boundary and rebuild only the
2284+ // retained data prefix.
2285+ journal
2286+ . test_set_offsets_recovery_watermark ( 15 )
2287+ . await
2288+ . unwrap ( ) ;
2289+ journal. test_rewind_data_to_position ( 12 ) . await . unwrap ( ) ;
2290+ journal. test_sync_data ( ) . await . unwrap ( ) ;
2291+ drop ( journal) ;
2292+
2293+ let journal = Journal :: < _ , u64 > :: init ( context. child ( "second" ) , cfg. clone ( ) )
2294+ . await
2295+ . unwrap ( ) ;
2296+ assert_eq ! ( journal. bounds( ) . await , 0 ..12 ) ;
2297+ assert_eq ! ( journal. test_offsets_size( ) . await , 12 ) ;
2298+ for i in 0 ..12u64 {
2299+ assert_eq ! ( journal. read( i) . await . unwrap( ) , i * 100 ) ;
2300+ }
2301+ assert ! ( matches!(
2302+ journal. read( 12 ) . await ,
2303+ Err ( Error :: ItemOutOfRange ( 12 ) )
2304+ ) ) ;
2305+
2306+ journal. destroy ( ) . await . unwrap ( ) ;
2307+ } ) ;
2308+ }
2309+
21992310 #[ test_traced]
22002311 fn test_variable_rewind_commit_reopen ( ) {
22012312 let executor = deterministic:: Runner :: default ( ) ;
0 commit comments