|
58 | 58 | //! |
59 | 59 | //! Recovery derives fixed-journal size from retained blob lengths: |
60 | 60 | //! - Once RECOVERY_WATERMARK_KEY exists, recovery walks retained blob lengths from oldest to |
61 | | -//! newest, stops at the first short section, and truncates newer sections to preserve a |
62 | | -//! contiguous prefix. After size recovery, the watermark is preserved if it is still within the |
63 | | -//! recovered size and lowered otherwise. |
| 61 | +//! newest. A short newest section is the natural tail; a short earlier section is treated as the |
| 62 | +//! end of the contiguous prefix, and newer sections are truncated. After size recovery, the |
| 63 | +//! watermark is preserved if it is still within the recovered size and lowered otherwise. |
64 | 64 | //! - Legacy journals without RECOVERY_WATERMARK_KEY rely on the old rule that section rollover |
65 | 65 | //! synced the previous section. Valid legacy journals recover from the newest retained blob once, |
66 | 66 | //! then persist the watermark before returning from `init`. |
@@ -843,7 +843,7 @@ impl<E: Context, A: CodecFixedShared> Journal<E, A> { |
843 | 843 | Ok((size, None)) |
844 | 844 | } |
845 | 845 |
|
846 | | - /// Recover by walking section lengths until the first short section. |
| 846 | + /// Recover by walking section lengths until the first short non-tail section. |
847 | 847 | /// |
848 | 848 | /// This is the normal current-format crash-repair path. Legacy recovery uses it only when the |
849 | 849 | /// old rollover invariant is already violated or pruning metadata was stale. |
@@ -885,6 +885,9 @@ impl<E: Context, A: CodecFixedShared> Journal<E, A> { |
885 | 885 |
|
886 | 886 | size += len; |
887 | 887 | if len < capacity { |
| 888 | + if section == newest { |
| 889 | + return Ok((size, None)); |
| 890 | + } |
888 | 891 | return Ok(( |
889 | 892 | size, |
890 | 893 | Some(RecoveryRepair { |
@@ -2135,6 +2138,70 @@ mod tests { |
2135 | 2138 | }); |
2136 | 2139 | } |
2137 | 2140 |
|
| 2141 | + #[test_traced] |
| 2142 | + fn test_fixed_journal_recover_accepts_clean_short_tail() { |
| 2143 | + let executor = deterministic::Runner::default(); |
| 2144 | + executor.start(|context| async move { |
| 2145 | + let cfg = test_cfg(&context, NZU64!(5)); |
| 2146 | + let segmented_cfg = SegmentedConfig { |
| 2147 | + partition: blob_partition(&cfg), |
| 2148 | + page_cache: cfg.page_cache.clone(), |
| 2149 | + write_buffer: cfg.write_buffer, |
| 2150 | + }; |
| 2151 | + let mut inner = |
| 2152 | + SegmentedJournal::<_, Digest>::init(context.child("blobs"), segmented_cfg) |
| 2153 | + .await |
| 2154 | + .unwrap(); |
| 2155 | + |
| 2156 | + for i in 0..5 { |
| 2157 | + inner.append(0, &test_digest(i)).await.unwrap(); |
| 2158 | + } |
| 2159 | + for i in 5..7 { |
| 2160 | + inner.append(1, &test_digest(i)).await.unwrap(); |
| 2161 | + } |
| 2162 | + inner.sync(0).await.unwrap(); |
| 2163 | + inner.sync(1).await.unwrap(); |
| 2164 | + |
| 2165 | + let (size, repair) = Journal::<_, Digest>::recover_by_walking_lengths(&mut inner, 5, 0) |
| 2166 | + .await |
| 2167 | + .unwrap(); |
| 2168 | + assert_eq!(size, 7); |
| 2169 | + assert!(repair.is_none()); |
| 2170 | + inner.destroy().await.unwrap(); |
| 2171 | + }); |
| 2172 | + } |
| 2173 | + |
| 2174 | + #[test_traced] |
| 2175 | + fn test_fixed_journal_recover_accepts_clean_empty_tail() { |
| 2176 | + let executor = deterministic::Runner::default(); |
| 2177 | + executor.start(|context| async move { |
| 2178 | + let cfg = test_cfg(&context, NZU64!(5)); |
| 2179 | + let segmented_cfg = SegmentedConfig { |
| 2180 | + partition: blob_partition(&cfg), |
| 2181 | + page_cache: cfg.page_cache.clone(), |
| 2182 | + write_buffer: cfg.write_buffer, |
| 2183 | + }; |
| 2184 | + let mut inner = |
| 2185 | + SegmentedJournal::<_, Digest>::init(context.child("blobs"), segmented_cfg) |
| 2186 | + .await |
| 2187 | + .unwrap(); |
| 2188 | + |
| 2189 | + for i in 0..5 { |
| 2190 | + inner.append(0, &test_digest(i)).await.unwrap(); |
| 2191 | + } |
| 2192 | + inner.ensure_section_exists(1).await.unwrap(); |
| 2193 | + inner.sync(0).await.unwrap(); |
| 2194 | + inner.sync(1).await.unwrap(); |
| 2195 | + |
| 2196 | + let (size, repair) = Journal::<_, Digest>::recover_by_walking_lengths(&mut inner, 5, 0) |
| 2197 | + .await |
| 2198 | + .unwrap(); |
| 2199 | + assert_eq!(size, 5); |
| 2200 | + assert!(repair.is_none()); |
| 2201 | + inner.destroy().await.unwrap(); |
| 2202 | + }); |
| 2203 | + } |
| 2204 | + |
2138 | 2205 | #[test_traced] |
2139 | 2206 | fn test_fixed_journal_recover_truncates_short_oldest_section() { |
2140 | 2207 | let executor = deterministic::Runner::default(); |
|
0 commit comments