From 8d2acb9f5044b1f924cfbbe45765c948c284247a Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 19 May 2026 23:41:33 -0700 Subject: [PATCH 1/8] expand crc fix scope --- runtime/src/utils/buffer/paged/append.rs | 293 +++++++++++++++++++++-- 1 file changed, 272 insertions(+), 21 deletions(-) diff --git a/runtime/src/utils/buffer/paged/append.rs b/runtime/src/utils/buffer/paged/append.rs index 4fe217bb39..7335356256 100644 --- a/runtime/src/utils/buffer/paged/append.rs +++ b/runtime/src/utils/buffer/paged/append.rs @@ -787,6 +787,31 @@ impl Append { } } + /// Read a physical page and return its validated logical bytes plus the CRC record used. + async fn read_valid_page_with_crc( + blob: &B, + page: u64, + logical_page_size: u64, + ) -> Result<(IoBuf, Checksum), Error> { + let physical_page_size = logical_page_size + .checked_add(CHECKSUM_SIZE) + .ok_or(Error::OffsetOverflow)?; + let physical_page_start = page + .checked_mul(physical_page_size) + .ok_or(Error::OffsetOverflow)?; + let page = blob + .read_at(physical_page_start, physical_page_size as usize) + .await? + .coalesce(); + + let Some(record) = Checksum::validate_page(page.as_ref()) else { + return Err(Error::InvalidChecksum); + }; + let (len, _) = record.get_crc(); + + Ok((page.freeze().slice(..len as usize), record)) + } + /// Durably rewrite a committed partial page to a shorter length in the same physical page. /// /// Since larger valid lengths are authoritative, a shorter CRC cannot simply be written next to @@ -966,48 +991,50 @@ impl Append { full_pages * physical_page_size }; - let old_partial_page_state = blob_guard.partial_page_state.clone(); - let same_page_partial_shrink = size < current_size - && partial_bytes > 0 - && full_pages == blob_guard.current_page - && old_partial_page_state.is_some(); - - if same_page_partial_shrink { - let old_crc = - old_partial_page_state.expect("same-page shrink requires old partial CRC"); - + if partial_bytes > 0 { // Evict cached pages at or beyond the new full-page boundary. The page at // `full_pages` is now owned by the tip buffer, and anything above is beyond the new // logical size. self.cache_ref.invalidate_from(self.id, full_pages); - // Update blob state and buffer based on the desired logical size. The page data is - // read with CRC validation, then durably rewritten below with a shorter CRC. - blob_guard.current_page = full_pages; - buf_guard.offset = full_pages * logical_page_size; - - let page_data = - super::get_page_from_blob(&blob_guard.blob, full_pages, logical_page_size).await?; + let (page_data, old_crc) = + Self::read_valid_page_with_crc(&blob_guard.blob, full_pages, logical_page_size) + .await?; // Ensure the validated data covers what we need. if (page_data.len() as u64) < partial_bytes { return Err(Error::InvalidChecksum); } - buf_guard.clear(); let new_data = &page_data.as_ref()[..partial_bytes as usize]; - let over_capacity = buf_guard.append(new_data); - assert!(!over_capacity); + let new_crc = Crc32::checksum(new_data); + + let current_physical_size = blob_guard + .current_page + .checked_add(u64::from(blob_guard.partial_page_state.is_some())) + .and_then(|pages| pages.checked_mul(physical_page_size)) + .ok_or(Error::OffsetOverflow)?; + if new_physical_size < current_physical_size { + blob_guard.blob.resize(new_physical_size).await?; + blob_guard.blob.sync().await?; + } let final_record = Self::sync_same_page_shrink( &blob_guard.blob, full_pages, logical_page_size, partial_bytes as u16, - Crc32::checksum(new_data), + new_crc, &old_crc, ) .await?; + + // Update blob state and buffer based on the desired logical size. + blob_guard.current_page = full_pages; + buf_guard.offset = full_pages * logical_page_size; + buf_guard.clear(); + let over_capacity = buf_guard.append(new_data); + assert!(!over_capacity); blob_guard.partial_page_state = Some(final_record); return Ok(()); } @@ -2878,6 +2905,230 @@ mod tests { }); } + #[test] + fn test_resize_full_page_to_partial_survives_interrupted_crc_stage() { + let executor = deterministic::Runner::default(); + + executor.start(|context| async move { + let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE)); + let page_size = PAGE_SIZE.get() as usize; + let data: Vec = (0..page_size).map(|i| i as u8).collect(); + + let (blob, size) = context + .open("test_partition", b"full_page_shrink_stage") + .await + .unwrap(); + let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone()) + .await + .unwrap(); + append.append(&data).await.unwrap(); + append.sync().await.unwrap(); + drop(append); + + let (blob, size) = context + .open("test_partition", b"full_page_shrink_stage") + .await + .unwrap(); + let faulty_blob = PartialWriteBlob::new(blob, 1, 3); + let write_count = faulty_blob.write_count(); + let failed_write_len = faulty_blob.failed_write_len(); + let append = Append::new(faulty_blob, size, BUFFER_SIZE, cache_ref.clone()) + .await + .unwrap(); + + let result = append.resize(45).await; + if result.is_ok() { + assert!( + append.sync().await.is_err(), + "persisting the resized partial page should fail" + ); + } + assert_eq!(write_count.load(Ordering::SeqCst), 1); + if result.is_err() { + assert_eq!(failed_write_len.load(Ordering::SeqCst), CHECKSUM_SLOT_SIZE); + } + drop(append); + + let (blob, size) = context + .open("test_partition", b"full_page_shrink_stage") + .await + .unwrap(); + let append = Append::new(blob, size, BUFFER_SIZE, cache_ref) + .await + .unwrap(); + assert_eq!(append.size().await, PAGE_SIZE.get() as u64); + let read = append.read_at(0, page_size).await.unwrap().coalesce(); + assert_eq!(read.as_ref(), &data); + }); + } + + #[test] + fn test_resize_full_page_to_partial_survives_interrupted_crc_invalidation() { + let executor = deterministic::Runner::default(); + + executor.start(|context| async move { + let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE)); + let page_size = PAGE_SIZE.get() as usize; + let data: Vec = (0..page_size).map(|i| i as u8).collect(); + + let (blob, size) = context + .open("test_partition", b"full_page_shrink_invalidation") + .await + .unwrap(); + let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone()) + .await + .unwrap(); + append.append(&data).await.unwrap(); + append.sync().await.unwrap(); + drop(append); + + let (blob, size) = context + .open("test_partition", b"full_page_shrink_invalidation") + .await + .unwrap(); + let faulty_blob = PartialWriteBlob::new(blob, 2, 3); + let write_count = faulty_blob.write_count(); + let failed_write_len = faulty_blob.failed_write_len(); + let append = Append::new(faulty_blob, size, BUFFER_SIZE, cache_ref.clone()) + .await + .unwrap(); + + assert!( + append.resize(45).await.is_err(), + "phase-2 partial write should fail" + ); + assert_eq!(write_count.load(Ordering::SeqCst), 2); + assert_eq!(failed_write_len.load(Ordering::SeqCst), CHECKSUM_SLOT_SIZE); + drop(append); + + let (blob, size) = context + .open("test_partition", b"full_page_shrink_invalidation") + .await + .unwrap(); + let append = Append::new(blob, size, BUFFER_SIZE, cache_ref) + .await + .unwrap(); + assert_eq!(append.size().await, 45); + let read = append.read_at(0, 45).await.unwrap().coalesce(); + assert_eq!(read.as_ref(), &data[..45]); + }); + } + + #[test] + fn test_resize_multi_page_to_partial_survives_interrupted_crc_stage() { + let executor = deterministic::Runner::default(); + + executor.start(|context| async move { + let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE)); + let page_size = PAGE_SIZE.get() as usize; + let target_len = page_size + 45; + let data: Vec = (0..page_size * 3).map(|i| i as u8).collect(); + + let (blob, size) = context + .open("test_partition", b"multi_page_shrink_stage") + .await + .unwrap(); + let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone()) + .await + .unwrap(); + append.append(&data).await.unwrap(); + append.sync().await.unwrap(); + drop(append); + + let (blob, size) = context + .open("test_partition", b"multi_page_shrink_stage") + .await + .unwrap(); + let faulty_blob = PartialWriteBlob::new(blob, 1, 3); + let write_count = faulty_blob.write_count(); + let failed_write_len = faulty_blob.failed_write_len(); + let append = Append::new(faulty_blob, size, BUFFER_SIZE, cache_ref.clone()) + .await + .unwrap(); + + let result = append.resize(target_len as u64).await; + if result.is_ok() { + assert!( + append.sync().await.is_err(), + "persisting the resized partial page should fail" + ); + } + assert_eq!(write_count.load(Ordering::SeqCst), 1); + if result.is_err() { + assert_eq!(failed_write_len.load(Ordering::SeqCst), CHECKSUM_SLOT_SIZE); + } + drop(append); + + let (blob, size) = context + .open("test_partition", b"multi_page_shrink_stage") + .await + .unwrap(); + let append = Append::new(blob, size, BUFFER_SIZE, cache_ref) + .await + .unwrap(); + assert_eq!(append.size().await, (page_size * 2) as u64); + let read = append + .read_at(0, page_size * 2) + .await + .unwrap() + .coalesce(); + assert_eq!(read.as_ref(), &data[..page_size * 2]); + }); + } + + #[test] + fn test_resize_multi_page_to_partial_survives_interrupted_crc_invalidation() { + let executor = deterministic::Runner::default(); + + executor.start(|context| async move { + let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE)); + let page_size = PAGE_SIZE.get() as usize; + let target_len = page_size + 45; + let data: Vec = (0..page_size * 3).map(|i| i as u8).collect(); + + let (blob, size) = context + .open("test_partition", b"multi_page_shrink_invalidation") + .await + .unwrap(); + let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone()) + .await + .unwrap(); + append.append(&data).await.unwrap(); + append.sync().await.unwrap(); + drop(append); + + let (blob, size) = context + .open("test_partition", b"multi_page_shrink_invalidation") + .await + .unwrap(); + let faulty_blob = PartialWriteBlob::new(blob, 2, 3); + let write_count = faulty_blob.write_count(); + let failed_write_len = faulty_blob.failed_write_len(); + let append = Append::new(faulty_blob, size, BUFFER_SIZE, cache_ref.clone()) + .await + .unwrap(); + + assert!( + append.resize(target_len as u64).await.is_err(), + "phase-2 partial write should fail" + ); + assert_eq!(write_count.load(Ordering::SeqCst), 2); + assert_eq!(failed_write_len.load(Ordering::SeqCst), CHECKSUM_SLOT_SIZE); + drop(append); + + let (blob, size) = context + .open("test_partition", b"multi_page_shrink_invalidation") + .await + .unwrap(); + let append = Append::new(blob, size, BUFFER_SIZE, cache_ref) + .await + .unwrap(); + assert_eq!(append.size().await, target_len as u64); + let read = append.read_at(0, target_len).await.unwrap().coalesce(); + assert_eq!(read.as_ref(), &data[..target_len]); + }); + } + #[test] fn test_reopen_partial_tail_append_and_resize() { let executor = deterministic::Runner::default(); From d83c4bbabfa9bb62ccf77157733bf822cb493e9a Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 19 May 2026 23:47:42 -0700 Subject: [PATCH 2/8] simplify --- runtime/src/utils/buffer/paged/append.rs | 34 +++++------------------- runtime/src/utils/buffer/paged/mod.rs | 20 +++++++++++--- 2 files changed, 23 insertions(+), 31 deletions(-) diff --git a/runtime/src/utils/buffer/paged/append.rs b/runtime/src/utils/buffer/paged/append.rs index 7335356256..542db3da50 100644 --- a/runtime/src/utils/buffer/paged/append.rs +++ b/runtime/src/utils/buffer/paged/append.rs @@ -787,31 +787,6 @@ impl Append { } } - /// Read a physical page and return its validated logical bytes plus the CRC record used. - async fn read_valid_page_with_crc( - blob: &B, - page: u64, - logical_page_size: u64, - ) -> Result<(IoBuf, Checksum), Error> { - let physical_page_size = logical_page_size - .checked_add(CHECKSUM_SIZE) - .ok_or(Error::OffsetOverflow)?; - let physical_page_start = page - .checked_mul(physical_page_size) - .ok_or(Error::OffsetOverflow)?; - let page = blob - .read_at(physical_page_start, physical_page_size as usize) - .await? - .coalesce(); - - let Some(record) = Checksum::validate_page(page.as_ref()) else { - return Err(Error::InvalidChecksum); - }; - let (len, _) = record.get_crc(); - - Ok((page.freeze().slice(..len as usize), record)) - } - /// Durably rewrite a committed partial page to a shorter length in the same physical page. /// /// Since larger valid lengths are authoritative, a shorter CRC cannot simply be written next to @@ -997,9 +972,12 @@ impl Append { // logical size. self.cache_ref.invalidate_from(self.id, full_pages); - let (page_data, old_crc) = - Self::read_valid_page_with_crc(&blob_guard.blob, full_pages, logical_page_size) - .await?; + let (page_data, old_crc) = super::get_page_from_blob_with_crc( + &blob_guard.blob, + full_pages, + logical_page_size, + ) + .await?; // Ensure the validated data covers what we need. if (page_data.len() as u64) < partial_bytes { diff --git a/runtime/src/utils/buffer/paged/mod.rs b/runtime/src/utils/buffer/paged/mod.rs index eedf139a8e..63934b490c 100644 --- a/runtime/src/utils/buffer/paged/mod.rs +++ b/runtime/src/utils/buffer/paged/mod.rs @@ -48,9 +48,23 @@ async fn get_page_from_blob( page_num: u64, logical_page_size: u64, ) -> Result { - let physical_page_size = logical_page_size + CHECKSUM_SIZE; - let physical_page_start = page_num * physical_page_size; + let (page, _) = get_page_from_blob_with_crc(blob, page_num, logical_page_size).await?; + Ok(page) +} +/// Read the designated page from the underlying blob and return its logical bytes plus the CRC +/// record used to validate it. +async fn get_page_from_blob_with_crc( + blob: &impl Blob, + page_num: u64, + logical_page_size: u64, +) -> Result<(IoBuf, Checksum), Error> { + let physical_page_size = logical_page_size + .checked_add(CHECKSUM_SIZE) + .ok_or(Error::OffsetOverflow)?; + let physical_page_start = page_num + .checked_mul(physical_page_size) + .ok_or(Error::OffsetOverflow)?; let page = blob .read_at(physical_page_start, physical_page_size as usize) .await? @@ -61,7 +75,7 @@ async fn get_page_from_blob( }; let (len, _) = record.get_crc(); - Ok(page.freeze().slice(..len as usize)) + Ok((page.freeze().slice(..len as usize), record)) } /// Describes a CRC record stored at the end of a page. From 94c3f4e3fa5c25198b12747b1cfac978f478b494 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 19 May 2026 23:48:32 -0700 Subject: [PATCH 3/8] fmt --- runtime/src/utils/buffer/paged/append.rs | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/runtime/src/utils/buffer/paged/append.rs b/runtime/src/utils/buffer/paged/append.rs index 542db3da50..7f2822e926 100644 --- a/runtime/src/utils/buffer/paged/append.rs +++ b/runtime/src/utils/buffer/paged/append.rs @@ -972,12 +972,9 @@ impl Append { // logical size. self.cache_ref.invalidate_from(self.id, full_pages); - let (page_data, old_crc) = super::get_page_from_blob_with_crc( - &blob_guard.blob, - full_pages, - logical_page_size, - ) - .await?; + let (page_data, old_crc) = + super::get_page_from_blob_with_crc(&blob_guard.blob, full_pages, logical_page_size) + .await?; // Ensure the validated data covers what we need. if (page_data.len() as u64) < partial_bytes { @@ -3045,11 +3042,7 @@ mod tests { .await .unwrap(); assert_eq!(append.size().await, (page_size * 2) as u64); - let read = append - .read_at(0, page_size * 2) - .await - .unwrap() - .coalesce(); + let read = append.read_at(0, page_size * 2).await.unwrap().coalesce(); assert_eq!(read.as_ref(), &data[..page_size * 2]); }); } From 23065fc320b2af8f5d00fdb86077ee92d4f3b1ff Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 19 May 2026 23:56:21 -0700 Subject: [PATCH 4/8] more comments --- runtime/src/utils/buffer/paged/append.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/runtime/src/utils/buffer/paged/append.rs b/runtime/src/utils/buffer/paged/append.rs index 7f2822e926..2be2482179 100644 --- a/runtime/src/utils/buffer/paged/append.rs +++ b/runtime/src/utils/buffer/paged/append.rs @@ -966,6 +966,10 @@ impl Append { full_pages * physical_page_size }; + // Any shrink that lands inside an existing page must transition the target page's CRC + // through the alternate slot. The old page may have been full or partial, but rewriting it + // as a padded partial page before the shorter CRC is durable can make recovery discard the + // whole page. if partial_bytes > 0 { // Evict cached pages at or beyond the new full-page boundary. The page at // `full_pages` is now owned by the tip buffer, and anything above is beyond the new @@ -989,6 +993,11 @@ impl Append { .checked_add(u64::from(blob_guard.partial_page_state.is_some())) .and_then(|pages| pages.checked_mul(physical_page_size)) .ok_or(Error::OffsetOverflow)?; + + // If the shrink drops later physical pages, make that truncation durable before + // changing the landing page's CRC. A crash before the shorter CRC is staged can then + // still recover the landing page at its old length, without reviving truncated tail + // pages. if new_physical_size < current_physical_size { blob_guard.blob.resize(new_physical_size).await?; blob_guard.blob.sync().await?; From 16f33cd8638ec34e38c6316303d51965e7663884 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 19 May 2026 23:58:06 -0700 Subject: [PATCH 5/8] nit --- runtime/src/utils/buffer/paged/append.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runtime/src/utils/buffer/paged/append.rs b/runtime/src/utils/buffer/paged/append.rs index 2be2482179..1592b6bc0c 100644 --- a/runtime/src/utils/buffer/paged/append.rs +++ b/runtime/src/utils/buffer/paged/append.rs @@ -996,8 +996,8 @@ impl Append { // If the shrink drops later physical pages, make that truncation durable before // changing the landing page's CRC. A crash before the shorter CRC is staged can then - // still recover the landing page at its old length, without reviving truncated tail - // pages. + // still recover the landing page at its old length, but recovery stops at the newly + // truncated physical end. if new_physical_size < current_physical_size { blob_guard.blob.resize(new_physical_size).await?; blob_guard.blob.sync().await?; From fb296470418b571947bf56ab67f221646b2a4994 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Wed, 20 May 2026 00:01:17 -0700 Subject: [PATCH 6/8] minimize --- runtime/src/utils/buffer/paged/append.rs | 5 ++--- runtime/src/utils/buffer/paged/cache.rs | 5 +++-- runtime/src/utils/buffer/paged/mod.rs | 14 +------------- 3 files changed, 6 insertions(+), 18 deletions(-) diff --git a/runtime/src/utils/buffer/paged/append.rs b/runtime/src/utils/buffer/paged/append.rs index 1592b6bc0c..782220bd96 100644 --- a/runtime/src/utils/buffer/paged/append.rs +++ b/runtime/src/utils/buffer/paged/append.rs @@ -977,8 +977,7 @@ impl Append { self.cache_ref.invalidate_from(self.id, full_pages); let (page_data, old_crc) = - super::get_page_from_blob_with_crc(&blob_guard.blob, full_pages, logical_page_size) - .await?; + super::get_page_from_blob(&blob_guard.blob, full_pages, logical_page_size).await?; // Ensure the validated data covers what we need. if (page_data.len() as u64) < partial_bytes { @@ -1047,7 +1046,7 @@ impl Append { if partial_bytes > 0 { // There's a partial page. Read its data from disk with CRC validation. - let page_data = + let (page_data, _) = super::get_page_from_blob(&blob_guard.blob, full_pages, logical_page_size).await?; // Ensure the validated data covers what we need. diff --git a/runtime/src/utils/buffer/paged/cache.rs b/runtime/src/utils/buffer/paged/cache.rs index b70b443118..42ab01b4e3 100644 --- a/runtime/src/utils/buffer/paged/cache.rs +++ b/runtime/src/utils/buffer/paged/cache.rs @@ -351,7 +351,8 @@ impl CacheRef { } Entry::Vacant(v) => { // Nobody is currently fetching this page, so create a future that will do the - // work. get_page_from_blob handles CRC validation and returns only logical bytes. + // work. get_page_from_blob handles CRC validation; the cache only keeps the + // logical bytes. let blob = blob.clone(); let cache = Arc::clone(&self.cache); let page_size = self.page_size; @@ -591,7 +592,7 @@ async fn fetch_cacheable_page( page_num: u64, page_size: u64, ) -> Result> { - let page = get_page_from_blob(blob, page_num, page_size) + let (page, _) = get_page_from_blob(blob, page_num, page_size) .await .map_err(Arc::new)?; diff --git a/runtime/src/utils/buffer/paged/mod.rs b/runtime/src/utils/buffer/paged/mod.rs index 63934b490c..b65153648c 100644 --- a/runtime/src/utils/buffer/paged/mod.rs +++ b/runtime/src/utils/buffer/paged/mod.rs @@ -40,21 +40,9 @@ use tracing::{debug, error}; const CHECKSUM_SIZE: u64 = Checksum::SIZE as u64; const CHECKSUM_SLOT_SIZE: usize = u16::SIZE + crc32::Digest::SIZE; -/// Read the designated page from the underlying blob and return its logical bytes as a vector if it -/// passes the integrity check, returning error otherwise. Safely handles partial pages. Caller can -/// check the length of the returned vector to determine if the page was partial vs full. -async fn get_page_from_blob( - blob: &impl Blob, - page_num: u64, - logical_page_size: u64, -) -> Result { - let (page, _) = get_page_from_blob_with_crc(blob, page_num, logical_page_size).await?; - Ok(page) -} - /// Read the designated page from the underlying blob and return its logical bytes plus the CRC /// record used to validate it. -async fn get_page_from_blob_with_crc( +async fn get_page_from_blob( blob: &impl Blob, page_num: u64, logical_page_size: u64, From 6fc1df26ff2087d794836928d5a6b870fd142060 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Wed, 20 May 2026 00:08:20 -0700 Subject: [PATCH 7/8] nits --- runtime/src/utils/buffer/paged/append.rs | 323 +++++++---------------- 1 file changed, 94 insertions(+), 229 deletions(-) diff --git a/runtime/src/utils/buffer/paged/append.rs b/runtime/src/utils/buffer/paged/append.rs index 782220bd96..8bb1c50029 100644 --- a/runtime/src/utils/buffer/paged/append.rs +++ b/runtime/src/utils/buffer/paged/append.rs @@ -954,17 +954,12 @@ impl Append { let mut buf_guard = self.buffer.write().await; let mut blob_guard = self.blob_state.write().await; - // Calculate the physical size needed for the new logical size. + // Calculate the physical size needed for the new logical size. A partial page still + // occupies a full physical page on disk. let full_pages = size / logical_page_size; let partial_bytes = size % logical_page_size; - let new_physical_size = if partial_bytes > 0 { - // We need full_pages + 1 physical pages to hold the partial data. - // The partial page will be padded to full physical page size. - (full_pages + 1) * physical_page_size - } else { - // No partial page needed. - full_pages * physical_page_size - }; + let new_physical_size = + (full_pages + u64::from(partial_bytes > 0)) * physical_page_size; // Any shrink that lands inside an existing page must transition the target page's CRC // through the alternate slot. The old page may have been full or partial, but rewriting it @@ -1022,45 +1017,18 @@ impl Append { return Ok(()); } - // Resize the underlying blob. + // Page-aligned shrink: no partial page remains, so truncate the blob directly. blob_guard.blob.resize(new_physical_size).await?; blob_guard.partial_page_state = None; - // Evict cached pages at or beyond the new full-page boundary. The page at `full_pages` (if - // partial) is now owned by the tip buffer, and anything above is beyond the new logical - // size. Leaving their pre-resize contents in the cache lets `try_read_sync` (which bypasses - // the tip buffer) observe stale bytes once the tip is repopulated. + // Evict cached pages at or beyond the new full-page boundary. Leaving their pre-resize + // contents in the cache lets `try_read_sync` observe stale bytes after the shrink. self.cache_ref.invalidate_from(self.id, full_pages); - // Update blob state and buffer based on the desired logical size. The partial page data is - // read with CRC validation; the validated length may exceed partial_bytes (reflecting the - // old data length), but we only load the prefix we need. The next sync will write the - // correct CRC for the new length. - // - // Note: This updates state before validation completes, which could leave state - // inconsistent if validation fails. This is acceptable because failures from mutable - // methods are fatal - callers must not use the blob after any error. - + // All remaining pages are full, so the tip buffer is empty. blob_guard.current_page = full_pages; - buf_guard.offset = full_pages * logical_page_size; - - if partial_bytes > 0 { - // There's a partial page. Read its data from disk with CRC validation. - let (page_data, _) = - super::get_page_from_blob(&blob_guard.blob, full_pages, logical_page_size).await?; - - // Ensure the validated data covers what we need. - if (page_data.len() as u64) < partial_bytes { - return Err(Error::InvalidChecksum); - } - - buf_guard.clear(); - let over_capacity = buf_guard.append(&page_data.as_ref()[..partial_bytes as usize]); - assert!(!over_capacity); - } else { - // No partial page - all pages are full or blob is empty. - buf_guard.clear(); - } + buf_guard.offset = size; + buf_guard.clear(); Ok(()) } @@ -2892,56 +2860,15 @@ mod tests { fn test_resize_full_page_to_partial_survives_interrupted_crc_stage() { let executor = deterministic::Runner::default(); - executor.start(|context| async move { - let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE)); - let page_size = PAGE_SIZE.get() as usize; - let data: Vec = (0..page_size).map(|i| i as u8).collect(); - - let (blob, size) = context - .open("test_partition", b"full_page_shrink_stage") - .await - .unwrap(); - let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone()) - .await - .unwrap(); - append.append(&data).await.unwrap(); - append.sync().await.unwrap(); - drop(append); - - let (blob, size) = context - .open("test_partition", b"full_page_shrink_stage") - .await - .unwrap(); - let faulty_blob = PartialWriteBlob::new(blob, 1, 3); - let write_count = faulty_blob.write_count(); - let failed_write_len = faulty_blob.failed_write_len(); - let append = Append::new(faulty_blob, size, BUFFER_SIZE, cache_ref.clone()) - .await - .unwrap(); - - let result = append.resize(45).await; - if result.is_ok() { - assert!( - append.sync().await.is_err(), - "persisting the resized partial page should fail" - ); - } - assert_eq!(write_count.load(Ordering::SeqCst), 1); - if result.is_err() { - assert_eq!(failed_write_len.load(Ordering::SeqCst), CHECKSUM_SLOT_SIZE); - } - drop(append); - - let (blob, size) = context - .open("test_partition", b"full_page_shrink_stage") - .await - .unwrap(); - let append = Append::new(blob, size, BUFFER_SIZE, cache_ref) - .await - .unwrap(); - assert_eq!(append.size().await, PAGE_SIZE.get() as u64); - let read = append.read_at(0, page_size).await.unwrap().coalesce(); - assert_eq!(read.as_ref(), &data); + executor.start(|context| { + assert_resize_to_partial_survives_interrupted_crc( + context, + b"full_page_shrink_stage", + PAGE_SIZE.get() as usize, + 45, + 1, + PAGE_SIZE.get() as usize, + ) }); } @@ -2949,51 +2876,15 @@ mod tests { fn test_resize_full_page_to_partial_survives_interrupted_crc_invalidation() { let executor = deterministic::Runner::default(); - executor.start(|context| async move { - let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE)); - let page_size = PAGE_SIZE.get() as usize; - let data: Vec = (0..page_size).map(|i| i as u8).collect(); - - let (blob, size) = context - .open("test_partition", b"full_page_shrink_invalidation") - .await - .unwrap(); - let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone()) - .await - .unwrap(); - append.append(&data).await.unwrap(); - append.sync().await.unwrap(); - drop(append); - - let (blob, size) = context - .open("test_partition", b"full_page_shrink_invalidation") - .await - .unwrap(); - let faulty_blob = PartialWriteBlob::new(blob, 2, 3); - let write_count = faulty_blob.write_count(); - let failed_write_len = faulty_blob.failed_write_len(); - let append = Append::new(faulty_blob, size, BUFFER_SIZE, cache_ref.clone()) - .await - .unwrap(); - - assert!( - append.resize(45).await.is_err(), - "phase-2 partial write should fail" - ); - assert_eq!(write_count.load(Ordering::SeqCst), 2); - assert_eq!(failed_write_len.load(Ordering::SeqCst), CHECKSUM_SLOT_SIZE); - drop(append); - - let (blob, size) = context - .open("test_partition", b"full_page_shrink_invalidation") - .await - .unwrap(); - let append = Append::new(blob, size, BUFFER_SIZE, cache_ref) - .await - .unwrap(); - assert_eq!(append.size().await, 45); - let read = append.read_at(0, 45).await.unwrap().coalesce(); - assert_eq!(read.as_ref(), &data[..45]); + executor.start(|context| { + assert_resize_to_partial_survives_interrupted_crc( + context, + b"full_page_shrink_invalidation", + PAGE_SIZE.get() as usize, + 45, + 2, + 45, + ) }); } @@ -3001,57 +2892,16 @@ mod tests { fn test_resize_multi_page_to_partial_survives_interrupted_crc_stage() { let executor = deterministic::Runner::default(); - executor.start(|context| async move { - let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE)); + executor.start(|context| { let page_size = PAGE_SIZE.get() as usize; - let target_len = page_size + 45; - let data: Vec = (0..page_size * 3).map(|i| i as u8).collect(); - - let (blob, size) = context - .open("test_partition", b"multi_page_shrink_stage") - .await - .unwrap(); - let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone()) - .await - .unwrap(); - append.append(&data).await.unwrap(); - append.sync().await.unwrap(); - drop(append); - - let (blob, size) = context - .open("test_partition", b"multi_page_shrink_stage") - .await - .unwrap(); - let faulty_blob = PartialWriteBlob::new(blob, 1, 3); - let write_count = faulty_blob.write_count(); - let failed_write_len = faulty_blob.failed_write_len(); - let append = Append::new(faulty_blob, size, BUFFER_SIZE, cache_ref.clone()) - .await - .unwrap(); - - let result = append.resize(target_len as u64).await; - if result.is_ok() { - assert!( - append.sync().await.is_err(), - "persisting the resized partial page should fail" - ); - } - assert_eq!(write_count.load(Ordering::SeqCst), 1); - if result.is_err() { - assert_eq!(failed_write_len.load(Ordering::SeqCst), CHECKSUM_SLOT_SIZE); - } - drop(append); - - let (blob, size) = context - .open("test_partition", b"multi_page_shrink_stage") - .await - .unwrap(); - let append = Append::new(blob, size, BUFFER_SIZE, cache_ref) - .await - .unwrap(); - assert_eq!(append.size().await, (page_size * 2) as u64); - let read = append.read_at(0, page_size * 2).await.unwrap().coalesce(); - assert_eq!(read.as_ref(), &data[..page_size * 2]); + assert_resize_to_partial_survives_interrupted_crc( + context, + b"multi_page_shrink_stage", + page_size * 3, + page_size + 45, + 1, + page_size * 2, + ) }); } @@ -3059,53 +2909,68 @@ mod tests { fn test_resize_multi_page_to_partial_survives_interrupted_crc_invalidation() { let executor = deterministic::Runner::default(); - executor.start(|context| async move { - let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE)); + executor.start(|context| { let page_size = PAGE_SIZE.get() as usize; - let target_len = page_size + 45; - let data: Vec = (0..page_size * 3).map(|i| i as u8).collect(); - - let (blob, size) = context - .open("test_partition", b"multi_page_shrink_invalidation") - .await - .unwrap(); - let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone()) - .await - .unwrap(); - append.append(&data).await.unwrap(); - append.sync().await.unwrap(); - drop(append); + assert_resize_to_partial_survives_interrupted_crc( + context, + b"multi_page_shrink_invalidation", + page_size * 3, + page_size + 45, + 2, + page_size + 45, + ) + }); + } - let (blob, size) = context - .open("test_partition", b"multi_page_shrink_invalidation") - .await - .unwrap(); - let faulty_blob = PartialWriteBlob::new(blob, 2, 3); - let write_count = faulty_blob.write_count(); - let failed_write_len = faulty_blob.failed_write_len(); - let append = Append::new(faulty_blob, size, BUFFER_SIZE, cache_ref.clone()) - .await - .unwrap(); + async fn assert_resize_to_partial_survives_interrupted_crc( + context: deterministic::Context, + blob_name: &'static [u8], + initial_len: usize, + target_len: usize, + fail_on_write: usize, + expected_len: usize, + ) { + let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE)); + let data: Vec = (0..initial_len).map(|i| i as u8).collect(); + + // Seed the blob with committed data before injecting a write failure. + let (blob, size) = context.open("test_partition", blob_name).await.unwrap(); + let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone()) + .await + .unwrap(); + append.append(&data).await.unwrap(); + append.sync().await.unwrap(); + drop(append); + + let (blob, size) = context.open("test_partition", blob_name).await.unwrap(); + let faulty_blob = PartialWriteBlob::new(blob, fail_on_write, 3); + let write_count = faulty_blob.write_count(); + let failed_write_len = faulty_blob.failed_write_len(); + let append = Append::new(faulty_blob, size, BUFFER_SIZE, cache_ref.clone()) + .await + .unwrap(); - assert!( - append.resize(target_len as u64).await.is_err(), - "phase-2 partial write should fail" - ); - assert_eq!(write_count.load(Ordering::SeqCst), 2); + // Depending on where the old code wrote the shorter page, the injected failure can surface + // during resize or during the following sync. The recovery assertion below is the invariant. + let resize_result = append.resize(target_len as u64).await; + let failed = if resize_result.is_err() { assert_eq!(failed_write_len.load(Ordering::SeqCst), CHECKSUM_SLOT_SIZE); - drop(append); + true + } else { + append.sync().await.is_err() + }; + assert!(failed, "injected partial write should fail"); + assert_eq!(write_count.load(Ordering::SeqCst), fail_on_write); + drop(append); - let (blob, size) = context - .open("test_partition", b"multi_page_shrink_invalidation") - .await - .unwrap(); - let append = Append::new(blob, size, BUFFER_SIZE, cache_ref) - .await - .unwrap(); - assert_eq!(append.size().await, target_len as u64); - let read = append.read_at(0, target_len).await.unwrap().coalesce(); - assert_eq!(read.as_ref(), &data[..target_len]); - }); + // Reopen after the injected failure and verify recovery lands on the last durable state. + let (blob, size) = context.open("test_partition", blob_name).await.unwrap(); + let append = Append::new(blob, size, BUFFER_SIZE, cache_ref) + .await + .unwrap(); + assert_eq!(append.size().await, expected_len as u64); + let read = append.read_at(0, expected_len).await.unwrap().coalesce(); + assert_eq!(read.as_ref(), &data[..expected_len]); } #[test] From 6177fca0807b80b0225a5ba2a24e04405a6a1998 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Wed, 20 May 2026 00:11:04 -0700 Subject: [PATCH 8/8] fmt --- runtime/src/utils/buffer/paged/append.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/runtime/src/utils/buffer/paged/append.rs b/runtime/src/utils/buffer/paged/append.rs index 8bb1c50029..5707254942 100644 --- a/runtime/src/utils/buffer/paged/append.rs +++ b/runtime/src/utils/buffer/paged/append.rs @@ -958,8 +958,7 @@ impl Append { // occupies a full physical page on disk. let full_pages = size / logical_page_size; let partial_bytes = size % logical_page_size; - let new_physical_size = - (full_pages + u64::from(partial_bytes > 0)) * physical_page_size; + let new_physical_size = (full_pages + u64::from(partial_bytes > 0)) * physical_page_size; // Any shrink that lands inside an existing page must transition the target page's CRC // through the alternate slot. The old page may have been full or partial, but rewriting it