Skip to content

Commit 789729a

Browse files
protect against torn slot writes, and handle full to partial page shrinking
1 parent fd03283 commit 789729a

2 files changed

Lines changed: 278 additions & 30 deletions

File tree

runtime/src/utils/buffer/paged/append.rs

Lines changed: 265 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
//! pages have a [Checksum] at the end. If no CRC record existed before for the page being written,
99
//! then one of the checksums will be all zero. If a checksum already existed for the page being
1010
//! written, then the write will overwrite only the checksum with the lesser length value. Should
11-
//! this write fail, the previously committed page state can still be recovered. Same-page shrink
12-
//! makes the shorter checksum durable before invalidating the old longer checksum.
11+
//! this write fail, the previously committed page state can still be recovered. Partial-page
12+
//! shrink makes the shorter checksum durable before invalidating the old longer checksum.
1313
//!
1414
//! During initialization, the wrapper will back up over any page that is not accompanied by a
1515
//! valid CRC, treating it as the result of an incomplete write that may be invalid.
@@ -759,6 +759,13 @@ impl<B: Blob> Append<B> {
759759
(write_buffer, Some(crc_record))
760760
}
761761

762+
fn checksum_slot_bytes(len: u16, crc: u32) -> [u8; CHECKSUM_SLOT_SIZE] {
763+
let mut bytes = [0u8; CHECKSUM_SLOT_SIZE];
764+
bytes[..2].copy_from_slice(&len.to_be_bytes());
765+
bytes[2..].copy_from_slice(&crc.to_be_bytes());
766+
bytes
767+
}
768+
762769
/// Build a CRC record that preserves the old CRC in its original slot and places
763770
/// the new CRC in the other slot.
764771
const fn build_crc_record_preserving_old(
@@ -787,13 +794,13 @@ impl<B: Blob> Append<B> {
787794
}
788795
}
789796

790-
/// Durably rewrite a committed partial page to a shorter length in the same physical page.
797+
/// Durably rewrite a committed page to a shorter partial length.
791798
///
792799
/// Since larger valid lengths are authoritative, a shorter CRC cannot simply be written next to
793-
/// the old CRC. We first make the shorter CRC durable in the alternate slot, then invalidate
794-
/// the old longer slot. A crash during either phase recovers either the old longer page or the
795-
/// new shorter page, but never loses the whole page.
796-
async fn sync_same_page_shrink(
800+
/// the old CRC. We first stage the shorter slot with length 0, then make its length durable,
801+
/// then invalidate the old longer slot. A crash during any phase recovers either the old longer
802+
/// page or the new shorter page, but never loses the whole page or fabricates a larger length.
803+
async fn sync_partial_page_shrink(
797804
blob: &B,
798805
page: u64,
799806
logical_page_size: u64,
@@ -809,16 +816,22 @@ impl<B: Blob> Append<B> {
809816
.and_then(|start| start.checked_add(logical_page_size))
810817
.ok_or(Error::OffsetOverflow)?;
811818

812-
let staged_record = Self::build_crc_record_preserving_old(new_len, new_crc, old_crc);
813-
let staged_bytes = staged_record.to_bytes();
814819
let new_slot_start = if old_crc.len1 >= old_crc.len2 {
815820
CHECKSUM_SLOT_SIZE
816821
} else {
817822
0
818823
};
824+
let staged_slot = Self::checksum_slot_bytes(0, new_crc);
819825
blob.write_at(
820826
crc_start + new_slot_start as u64,
821-
staged_bytes[new_slot_start..new_slot_start + CHECKSUM_SLOT_SIZE].to_vec(),
827+
staged_slot.to_vec(),
828+
)
829+
.await?;
830+
blob.sync().await?;
831+
832+
blob.write_at(
833+
crc_start + new_slot_start as u64,
834+
new_len.to_be_bytes().to_vec(),
822835
)
823836
.await?;
824837
blob.sync().await?;
@@ -828,13 +841,19 @@ impl<B: Blob> Append<B> {
828841
} else {
829842
0
830843
};
844+
let len_size = std::mem::size_of::<u16>();
845+
let crc_size = CHECKSUM_SLOT_SIZE - len_size;
831846
blob.write_at(
832-
crc_start + old_slot_start as u64,
833-
vec![0u8; CHECKSUM_SLOT_SIZE],
847+
crc_start + old_slot_start as u64 + len_size as u64,
848+
vec![0u8; crc_size],
834849
)
835850
.await?;
836851
blob.sync().await?;
837852

853+
blob.write_at(crc_start + old_slot_start as u64, vec![0u8; len_size])
854+
.await?;
855+
blob.sync().await?;
856+
838857
let final_record = if new_slot_start == 0 {
839858
Checksum {
840859
len1: new_len,
@@ -966,15 +985,14 @@ impl<B: Blob> Append<B> {
966985
full_pages * physical_page_size
967986
};
968987

969-
let old_partial_page_state = blob_guard.partial_page_state.clone();
970-
let same_page_partial_shrink = size < current_size
988+
let protected_partial_shrink = size < current_size
971989
&& partial_bytes > 0
972-
&& full_pages == blob_guard.current_page
973-
&& old_partial_page_state.is_some();
990+
&& (full_pages < blob_guard.current_page
991+
|| (full_pages == blob_guard.current_page
992+
&& blob_guard.partial_page_state.is_some()));
974993

975-
if same_page_partial_shrink {
976-
let old_crc =
977-
old_partial_page_state.expect("same-page shrink requires old partial CRC");
994+
if protected_partial_shrink {
995+
blob_guard.blob.resize(new_physical_size).await?;
978996

979997
// Evict cached pages at or beyond the new full-page boundary. The page at
980998
// `full_pages` is now owned by the tip buffer, and anything above is beyond the new
@@ -986,8 +1004,12 @@ impl<B: Blob> Append<B> {
9861004
blob_guard.current_page = full_pages;
9871005
buf_guard.offset = full_pages * logical_page_size;
9881006

989-
let page_data =
990-
super::get_page_from_blob(&blob_guard.blob, full_pages, logical_page_size).await?;
1007+
let (page_data, old_crc) = super::get_page_with_checksum_from_blob(
1008+
&blob_guard.blob,
1009+
full_pages,
1010+
logical_page_size,
1011+
)
1012+
.await?;
9911013

9921014
// Ensure the validated data covers what we need.
9931015
if (page_data.len() as u64) < partial_bytes {
@@ -999,7 +1021,7 @@ impl<B: Blob> Append<B> {
9991021
let over_capacity = buf_guard.append(new_data);
10001022
assert!(!over_capacity);
10011023

1002-
let final_record = Self::sync_same_page_shrink(
1024+
let final_record = Self::sync_partial_page_shrink(
10031025
&blob_guard.blob,
10041026
full_pages,
10051027
logical_page_size,
@@ -2814,6 +2836,219 @@ mod tests {
28142836
});
28152837
}
28162838

2839+
#[test]
2840+
fn test_resize_same_page_shrink_survives_interrupted_len_stage() {
2841+
let executor = deterministic::Runner::default();
2842+
2843+
executor.start(|context| async move {
2844+
const LARGE_PAGE_SIZE: NonZeroU16 = NZU16!(600);
2845+
const LARGE_BUFFER_SIZE: usize = 1_200;
2846+
2847+
let cache_ref =
2848+
CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(LARGE_BUFFER_SIZE));
2849+
let data: Vec<u8> = (0..300).map(|i| (i % 251) as u8).collect();
2850+
2851+
let (blob, size) = context
2852+
.open("test_partition", b"same_page_shrink_len_stage")
2853+
.await
2854+
.unwrap();
2855+
let append = Append::new(blob, size, LARGE_BUFFER_SIZE, cache_ref.clone())
2856+
.await
2857+
.unwrap();
2858+
append.append(&data[..255]).await.unwrap();
2859+
append.sync().await.unwrap();
2860+
append.append(&data[255..]).await.unwrap();
2861+
append.sync().await.unwrap();
2862+
drop(append);
2863+
2864+
let (blob, size) = context
2865+
.open("test_partition", b"same_page_shrink_len_stage")
2866+
.await
2867+
.unwrap();
2868+
let faulty_blob = PartialWriteBlob::new(blob, 2, 1);
2869+
let write_count = faulty_blob.write_count();
2870+
let failed_write_len = faulty_blob.failed_write_len();
2871+
let append = Append::new(faulty_blob, size, LARGE_BUFFER_SIZE, cache_ref.clone())
2872+
.await
2873+
.unwrap();
2874+
2875+
assert!(
2876+
append.resize(257).await.is_err(),
2877+
"length-stage partial write should fail"
2878+
);
2879+
assert_eq!(write_count.load(Ordering::SeqCst), 2);
2880+
assert_eq!(failed_write_len.load(Ordering::SeqCst), 2);
2881+
drop(append);
2882+
2883+
let (blob, size) = context
2884+
.open("test_partition", b"same_page_shrink_len_stage")
2885+
.await
2886+
.unwrap();
2887+
let append = Append::new(blob, size, LARGE_BUFFER_SIZE, cache_ref)
2888+
.await
2889+
.unwrap();
2890+
assert_eq!(append.size().await, 300);
2891+
let read = append.read_at(0, 300).await.unwrap().coalesce();
2892+
assert_eq!(read.as_ref(), &data);
2893+
});
2894+
}
2895+
2896+
#[test]
2897+
fn test_resize_same_page_shrink_preserves_validated_fallback_slot() {
2898+
let executor = deterministic::Runner::default();
2899+
2900+
executor.start(|context| async move {
2901+
let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
2902+
let data: Vec<u8> = (0..52).collect();
2903+
2904+
let (blob, size) = context
2905+
.open("test_partition", b"same_page_shrink_fallback_slot")
2906+
.await
2907+
.unwrap();
2908+
let faulty_blob = PartialWriteBlob::new(blob.clone(), 5, 3);
2909+
let write_count = faulty_blob.write_count();
2910+
let failed_write_len = faulty_blob.failed_write_len();
2911+
let append = Append::new(faulty_blob, size, BUFFER_SIZE, cache_ref.clone())
2912+
.await
2913+
.unwrap();
2914+
append.append(&data[..48]).await.unwrap();
2915+
append.sync().await.unwrap();
2916+
assert_eq!(write_count.load(Ordering::SeqCst), 1);
2917+
2918+
append.append(&data[48..50]).await.unwrap();
2919+
append.sync().await.unwrap();
2920+
assert_eq!(write_count.load(Ordering::SeqCst), 3);
2921+
2922+
append.append(&data[50..]).await.unwrap();
2923+
append.sync().await.unwrap();
2924+
assert_eq!(write_count.load(Ordering::SeqCst), 4);
2925+
2926+
// Corrupt the newer authoritative slot. The older slot still covers the shrink target.
2927+
// `resize()` first syncs the live buffer, which writes a valid fallback slot but leaves
2928+
// the cached footer stale. A torn phase-1 shrink write must preserve that validated
2929+
// fallback slot.
2930+
let slot0_offset = PAGE_SIZE.get() as u64;
2931+
blob.write_at(slot0_offset, DUMMY_MARKER.to_vec())
2932+
.await
2933+
.unwrap();
2934+
blob.sync().await.unwrap();
2935+
2936+
assert!(
2937+
append.resize(45).await.is_err(),
2938+
"phase-1 partial write should fail"
2939+
);
2940+
assert_eq!(write_count.load(Ordering::SeqCst), 5);
2941+
assert_eq!(failed_write_len.load(Ordering::SeqCst), CHECKSUM_SLOT_SIZE);
2942+
drop(append);
2943+
2944+
let (blob, size) = context
2945+
.open("test_partition", b"same_page_shrink_fallback_slot")
2946+
.await
2947+
.unwrap();
2948+
let append = Append::new(blob, size, BUFFER_SIZE, cache_ref).await.unwrap();
2949+
assert_eq!(append.size().await, 50);
2950+
let read = append.read_at(0, 50).await.unwrap().coalesce();
2951+
assert_eq!(read.as_ref(), &data[..50]);
2952+
});
2953+
}
2954+
2955+
#[test]
2956+
fn test_resize_full_page_to_partial_reopens_at_shorter_size() {
2957+
let executor = deterministic::Runner::default();
2958+
2959+
executor.start(|context| async move {
2960+
let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
2961+
let page_size = PAGE_SIZE.get() as u64;
2962+
let target = page_size + 45;
2963+
let data: Vec<u8> = (0..page_size * 2).map(|i| (i % 251) as u8).collect();
2964+
2965+
let (blob, size) = context
2966+
.open("test_partition", b"full_page_to_partial")
2967+
.await
2968+
.unwrap();
2969+
let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
2970+
.await
2971+
.unwrap();
2972+
append.append(&data).await.unwrap();
2973+
append.sync().await.unwrap();
2974+
2975+
append.resize(target).await.unwrap();
2976+
drop(append);
2977+
2978+
let (blob, size) = context
2979+
.open("test_partition", b"full_page_to_partial")
2980+
.await
2981+
.unwrap();
2982+
let append = Append::new(blob, size, BUFFER_SIZE, cache_ref)
2983+
.await
2984+
.unwrap();
2985+
assert_eq!(append.size().await, target);
2986+
let read = append
2987+
.read_at(0, target as usize)
2988+
.await
2989+
.unwrap()
2990+
.coalesce();
2991+
assert_eq!(read.as_ref(), &data[..target as usize]);
2992+
});
2993+
}
2994+
2995+
#[test]
2996+
fn test_resize_full_page_to_partial_survives_interrupted_crc_stage() {
2997+
let executor = deterministic::Runner::default();
2998+
2999+
executor.start(|context| async move {
3000+
let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
3001+
let page_size = PAGE_SIZE.get() as u64;
3002+
let target = page_size + 45;
3003+
let data: Vec<u8> = (0..page_size * 3).map(|i| (i % 251) as u8).collect();
3004+
3005+
let (blob, size) = context
3006+
.open("test_partition", b"full_page_to_partial_interrupted")
3007+
.await
3008+
.unwrap();
3009+
let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
3010+
.await
3011+
.unwrap();
3012+
append.append(&data).await.unwrap();
3013+
append.sync().await.unwrap();
3014+
drop(append);
3015+
3016+
let (blob, size) = context
3017+
.open("test_partition", b"full_page_to_partial_interrupted")
3018+
.await
3019+
.unwrap();
3020+
let faulty_blob = PartialWriteBlob::new(blob, 1, 3);
3021+
let write_count = faulty_blob.write_count();
3022+
let failed_write_len = faulty_blob.failed_write_len();
3023+
let append = Append::new(faulty_blob, size, BUFFER_SIZE, cache_ref.clone())
3024+
.await
3025+
.unwrap();
3026+
3027+
assert!(
3028+
append.resize(target).await.is_err(),
3029+
"phase-1 partial write should fail"
3030+
);
3031+
assert_eq!(write_count.load(Ordering::SeqCst), 1);
3032+
assert_eq!(failed_write_len.load(Ordering::SeqCst), CHECKSUM_SLOT_SIZE);
3033+
drop(append);
3034+
3035+
let (blob, size) = context
3036+
.open("test_partition", b"full_page_to_partial_interrupted")
3037+
.await
3038+
.unwrap();
3039+
let append = Append::new(blob, size, BUFFER_SIZE, cache_ref)
3040+
.await
3041+
.unwrap();
3042+
assert_eq!(append.size().await, page_size * 2);
3043+
let read = append
3044+
.read_at(0, (page_size * 2) as usize)
3045+
.await
3046+
.unwrap()
3047+
.coalesce();
3048+
assert_eq!(read.as_ref(), &data[..(page_size * 2) as usize]);
3049+
});
3050+
}
3051+
28173052
#[test]
28183053
fn test_resize_same_page_shrink_survives_interrupted_crc_invalidation() {
28193054
let executor = deterministic::Runner::default();
@@ -2833,7 +3068,7 @@ mod tests {
28333068
.await
28343069
.unwrap();
28353070
// Put the old authoritative CRC in slot 1, so the shorter CRC will be staged in slot
2836-
// 0. Phase 1 is the new-slot write; phase 2 only invalidates the old slot.
3071+
// 0. The old slot is invalidated only after the shorter slot is durable.
28373072
append.append(&data[..40]).await.unwrap();
28383073
append.sync().await.unwrap();
28393074
append.append(&data[40..]).await.unwrap();
@@ -2847,7 +3082,7 @@ mod tests {
28473082
)
28483083
.await
28493084
.unwrap();
2850-
let faulty_blob = PartialWriteBlob::new(blob, 2, 3);
3085+
let faulty_blob = PartialWriteBlob::new(blob, 3, 3);
28513086
let write_count = faulty_blob.write_count();
28523087
let failed_write_len = faulty_blob.failed_write_len();
28533088
let append = Append::new(faulty_blob, size, BUFFER_SIZE, cache_ref.clone())
@@ -2856,10 +3091,13 @@ mod tests {
28563091

28573092
assert!(
28583093
append.resize(45).await.is_err(),
2859-
"phase-2 partial write should fail"
3094+
"old-slot invalidation should fail"
3095+
);
3096+
assert_eq!(write_count.load(Ordering::SeqCst), 3);
3097+
assert_eq!(
3098+
failed_write_len.load(Ordering::SeqCst),
3099+
CHECKSUM_SLOT_SIZE - std::mem::size_of::<u16>()
28603100
);
2861-
assert_eq!(write_count.load(Ordering::SeqCst), 2);
2862-
assert_eq!(failed_write_len.load(Ordering::SeqCst), CHECKSUM_SLOT_SIZE);
28633101
drop(append);
28643102

28653103
let (blob, size) = context

0 commit comments

Comments
 (0)