@@ -1156,10 +1156,6 @@ impl<E: Context, V: CodecShared> Journal<E, V> {
11561156 let start_section = position_to_section ( anchor, items_per_section) ;
11571157 let first_position = pruning_boundary. max ( start_section * items_per_section) ;
11581158
1159- if anchor < first_position {
1160- return Ok ( None ) ;
1161- }
1162-
11631159 let skip = anchor - first_position;
11641160 let stream = data. replay ( start_section, 0 , REPLAY_BUFFER_SIZE ) . await ?;
11651161 futures:: pin_mut!( stream) ;
@@ -1301,11 +1297,30 @@ impl<E: Context, V: CodecShared> Journal<E, V> {
13011297 self . offsets . rewind ( position) . await
13021298 }
13031299
1300+ /// Test helper: Set and persist the offsets recovery watermark directly.
1301+ pub ( crate ) async fn test_set_offsets_recovery_watermark (
1302+ & self ,
1303+ watermark : u64 ,
1304+ ) -> Result < ( ) , Error > {
1305+ self . offsets . test_set_recovery_watermark ( watermark) . await
1306+ }
1307+
13041308 /// Test helper: Get the size of the internal offsets journal.
13051309 pub ( crate ) async fn test_offsets_size ( & self ) -> u64 {
13061310 self . offsets . size ( ) . await
13071311 }
13081312
1313+ /// Test helper: Rewind the internal data journal to the item at `position`.
1314+ pub ( crate ) async fn test_rewind_data_to_position ( & self , position : u64 ) -> Result < ( ) , Error > {
1315+ let offset = {
1316+ let offsets_reader = self . offsets . reader ( ) . await ;
1317+ offsets_reader. read ( position) . await ?
1318+ } ;
1319+ let section = position_to_section ( position, self . items_per_section ) ;
1320+ let mut inner = self . inner . write ( ) . await ;
1321+ inner. data . rewind_to_offset ( section, offset) . await
1322+ }
1323+
13091324 /// Test helper: Append directly to the internal data journal (simulates crash scenario).
13101325 pub ( crate ) async fn test_append_data (
13111326 & self ,
@@ -2196,6 +2211,137 @@ mod tests {
21962211 } ) ;
21972212 }
21982213
2214+ #[ test_traced]
2215+ fn test_variable_recovery_offsets_watermark_outside_bounds_rebuilds_from_start ( ) {
2216+ let executor = deterministic:: Runner :: default ( ) ;
2217+ executor. start ( |context| async move {
2218+ let cfg = Config {
2219+ partition : "recovery-watermark-outside-bounds" . into ( ) ,
2220+ items_per_section : NZU64 ! ( 10 ) ,
2221+ compression : None ,
2222+ codec_config : ( ) ,
2223+ page_cache : CacheRef :: from_pooler ( & context, LARGE_PAGE_SIZE , NZUsize ! ( 10 ) ) ,
2224+ write_buffer : NZUsize ! ( 1024 ) ,
2225+ } ;
2226+
2227+ let journal = Journal :: < _ , u64 > :: init ( context. child ( "first" ) , cfg. clone ( ) )
2228+ . await
2229+ . unwrap ( ) ;
2230+
2231+ for i in 0 ..15u64 {
2232+ journal. append ( & ( i * 100 ) ) . await . unwrap ( ) ;
2233+ }
2234+ journal. sync ( ) . await . unwrap ( ) ;
2235+
2236+ // Simulate stale metadata that points past the recovered offsets bounds.
2237+ journal
2238+ . test_set_offsets_recovery_watermark ( 30 )
2239+ . await
2240+ . unwrap ( ) ;
2241+ drop ( journal) ;
2242+
2243+ let journal = Journal :: < _ , u64 > :: init ( context. child ( "second" ) , cfg. clone ( ) )
2244+ . await
2245+ . unwrap ( ) ;
2246+ assert_eq ! ( journal. bounds( ) . await , 0 ..15 ) ;
2247+ assert_eq ! ( journal. test_offsets_size( ) . await , 15 ) ;
2248+ for i in 0 ..15u64 {
2249+ assert_eq ! ( journal. read( i) . await . unwrap( ) , i * 100 ) ;
2250+ }
2251+
2252+ journal. destroy ( ) . await . unwrap ( ) ;
2253+ } ) ;
2254+ }
2255+
2256+ #[ test_traced]
2257+ fn test_variable_rebuild_offsets_anchor_outside_bounds_returns_none ( ) {
2258+ let executor = deterministic:: Runner :: default ( ) ;
2259+ executor. start ( |context| async move {
2260+ let data_cfg = variable:: Config {
2261+ partition : "rebuild-anchor-outside-data" . into ( ) ,
2262+ compression : None ,
2263+ codec_config : ( ) ,
2264+ page_cache : CacheRef :: from_pooler ( & context, LARGE_PAGE_SIZE , NZUsize ! ( 10 ) ) ,
2265+ write_buffer : NZUsize ! ( 1024 ) ,
2266+ } ;
2267+ let offsets_cfg = fixed:: Config {
2268+ partition : "rebuild-anchor-outside-offsets" . into ( ) ,
2269+ items_per_blob : NZU64 ! ( 10 ) ,
2270+ page_cache : CacheRef :: from_pooler ( & context, LARGE_PAGE_SIZE , NZUsize ! ( 10 ) ) ,
2271+ write_buffer : NZUsize ! ( 1024 ) ,
2272+ } ;
2273+
2274+ let mut data = variable:: Journal :: < _ , u64 > :: init ( context. child ( "data" ) , data_cfg)
2275+ . await
2276+ . unwrap ( ) ;
2277+ let mut offsets = fixed:: Journal :: < _ , u64 > :: init ( context. child ( "offsets" ) , offsets_cfg)
2278+ . await
2279+ . unwrap ( ) ;
2280+
2281+ let ( offset, _) = data. append ( 0 , & 100 ) . await . unwrap ( ) ;
2282+ offsets. append ( & offset) . await . unwrap ( ) ;
2283+
2284+ let result =
2285+ Journal :: < _ , u64 > :: rebuild_offsets_from_anchor ( & data, & mut offsets, 10 , 0 , 2 )
2286+ . await
2287+ . unwrap ( ) ;
2288+ assert ! ( result. is_none( ) ) ;
2289+
2290+ data. destroy ( ) . await . unwrap ( ) ;
2291+ offsets. destroy ( ) . await . unwrap ( ) ;
2292+ } ) ;
2293+ }
2294+
2295+ #[ test_traced]
2296+ fn test_variable_recovery_retries_from_pruning_boundary_when_anchor_too_far ( ) {
2297+ let executor = deterministic:: Runner :: default ( ) ;
2298+ executor. start ( |context| async move {
2299+ let cfg = Config {
2300+ partition : "recovery-anchor-too-far" . into ( ) ,
2301+ items_per_section : NZU64 ! ( 10 ) ,
2302+ compression : None ,
2303+ codec_config : ( ) ,
2304+ page_cache : CacheRef :: from_pooler ( & context, LARGE_PAGE_SIZE , NZUsize ! ( 10 ) ) ,
2305+ write_buffer : NZUsize ! ( 1024 ) ,
2306+ } ;
2307+
2308+ let journal = Journal :: < _ , u64 > :: init ( context. child ( "first" ) , cfg. clone ( ) )
2309+ . await
2310+ . unwrap ( ) ;
2311+
2312+ for i in 0 ..20u64 {
2313+ journal. append ( & ( i * 100 ) ) . await . unwrap ( ) ;
2314+ }
2315+ journal. sync ( ) . await . unwrap ( ) ;
2316+
2317+ // The offsets watermark is in-bounds, but the data journal is shorter than that
2318+ // anchor. Recovery should retry from the pruning boundary and rebuild only the
2319+ // retained data prefix.
2320+ journal
2321+ . test_set_offsets_recovery_watermark ( 15 )
2322+ . await
2323+ . unwrap ( ) ;
2324+ journal. test_rewind_data_to_position ( 12 ) . await . unwrap ( ) ;
2325+ journal. test_sync_data ( ) . await . unwrap ( ) ;
2326+ drop ( journal) ;
2327+
2328+ let journal = Journal :: < _ , u64 > :: init ( context. child ( "second" ) , cfg. clone ( ) )
2329+ . await
2330+ . unwrap ( ) ;
2331+ assert_eq ! ( journal. bounds( ) . await , 0 ..12 ) ;
2332+ assert_eq ! ( journal. test_offsets_size( ) . await , 12 ) ;
2333+ for i in 0 ..12u64 {
2334+ assert_eq ! ( journal. read( i) . await . unwrap( ) , i * 100 ) ;
2335+ }
2336+ assert ! ( matches!(
2337+ journal. read( 12 ) . await ,
2338+ Err ( Error :: ItemOutOfRange ( 12 ) )
2339+ ) ) ;
2340+
2341+ journal. destroy ( ) . await . unwrap ( ) ;
2342+ } ) ;
2343+ }
2344+
21992345 #[ test_traced]
22002346 fn test_variable_rewind_commit_reopen ( ) {
22012347 let executor = deterministic:: Runner :: default ( ) ;
0 commit comments