diff --git a/runtime/src/utils/buffer/paged/append.rs b/runtime/src/utils/buffer/paged/append.rs index 4fe217bb39..5707254942 100644 --- a/runtime/src/utils/buffer/paged/append.rs +++ b/runtime/src/utils/buffer/paged/append.rs @@ -954,39 +954,23 @@ impl Append { let mut buf_guard = self.buffer.write().await; let mut blob_guard = self.blob_state.write().await; - // Calculate the physical size needed for the new logical size. + // Calculate the physical size needed for the new logical size. A partial page still + // occupies a full physical page on disk. let full_pages = size / logical_page_size; let partial_bytes = size % logical_page_size; - let new_physical_size = if partial_bytes > 0 { - // We need full_pages + 1 physical pages to hold the partial data. - // The partial page will be padded to full physical page size. - (full_pages + 1) * physical_page_size - } else { - // No partial page needed. - full_pages * physical_page_size - }; - - let old_partial_page_state = blob_guard.partial_page_state.clone(); - let same_page_partial_shrink = size < current_size - && partial_bytes > 0 - && full_pages == blob_guard.current_page - && old_partial_page_state.is_some(); - - if same_page_partial_shrink { - let old_crc = - old_partial_page_state.expect("same-page shrink requires old partial CRC"); + let new_physical_size = (full_pages + u64::from(partial_bytes > 0)) * physical_page_size; + // Any shrink that lands inside an existing page must transition the target page's CRC + // through the alternate slot. The old page may have been full or partial, but rewriting it + // as a padded partial page before the shorter CRC is durable can make recovery discard the + // whole page. + if partial_bytes > 0 { // Evict cached pages at or beyond the new full-page boundary. The page at // `full_pages` is now owned by the tip buffer, and anything above is beyond the new // logical size. self.cache_ref.invalidate_from(self.id, full_pages); - // Update blob state and buffer based on the desired logical size. The page data is - // read with CRC validation, then durably rewritten below with a shorter CRC. - blob_guard.current_page = full_pages; - buf_guard.offset = full_pages * logical_page_size; - - let page_data = + let (page_data, old_crc) = super::get_page_from_blob(&blob_guard.blob, full_pages, logical_page_size).await?; // Ensure the validated data covers what we need. @@ -994,63 +978,56 @@ impl Append { return Err(Error::InvalidChecksum); } - buf_guard.clear(); let new_data = &page_data.as_ref()[..partial_bytes as usize]; - let over_capacity = buf_guard.append(new_data); - assert!(!over_capacity); + let new_crc = Crc32::checksum(new_data); + + let current_physical_size = blob_guard + .current_page + .checked_add(u64::from(blob_guard.partial_page_state.is_some())) + .and_then(|pages| pages.checked_mul(physical_page_size)) + .ok_or(Error::OffsetOverflow)?; + + // If the shrink drops later physical pages, make that truncation durable before + // changing the landing page's CRC. A crash before the shorter CRC is staged can then + // still recover the landing page at its old length, but recovery stops at the newly + // truncated physical end. + if new_physical_size < current_physical_size { + blob_guard.blob.resize(new_physical_size).await?; + blob_guard.blob.sync().await?; + } let final_record = Self::sync_same_page_shrink( &blob_guard.blob, full_pages, logical_page_size, partial_bytes as u16, - Crc32::checksum(new_data), + new_crc, &old_crc, ) .await?; + + // Update blob state and buffer based on the desired logical size. + blob_guard.current_page = full_pages; + buf_guard.offset = full_pages * logical_page_size; + buf_guard.clear(); + let over_capacity = buf_guard.append(new_data); + assert!(!over_capacity); blob_guard.partial_page_state = Some(final_record); return Ok(()); } - // Resize the underlying blob. + // Page-aligned shrink: no partial page remains, so truncate the blob directly. blob_guard.blob.resize(new_physical_size).await?; blob_guard.partial_page_state = None; - // Evict cached pages at or beyond the new full-page boundary. The page at `full_pages` (if - // partial) is now owned by the tip buffer, and anything above is beyond the new logical - // size. Leaving their pre-resize contents in the cache lets `try_read_sync` (which bypasses - // the tip buffer) observe stale bytes once the tip is repopulated. + // Evict cached pages at or beyond the new full-page boundary. Leaving their pre-resize + // contents in the cache lets `try_read_sync` observe stale bytes after the shrink. self.cache_ref.invalidate_from(self.id, full_pages); - // Update blob state and buffer based on the desired logical size. The partial page data is - // read with CRC validation; the validated length may exceed partial_bytes (reflecting the - // old data length), but we only load the prefix we need. The next sync will write the - // correct CRC for the new length. - // - // Note: This updates state before validation completes, which could leave state - // inconsistent if validation fails. This is acceptable because failures from mutable - // methods are fatal - callers must not use the blob after any error. - + // All remaining pages are full, so the tip buffer is empty. blob_guard.current_page = full_pages; - buf_guard.offset = full_pages * logical_page_size; - - if partial_bytes > 0 { - // There's a partial page. Read its data from disk with CRC validation. - let page_data = - super::get_page_from_blob(&blob_guard.blob, full_pages, logical_page_size).await?; - - // Ensure the validated data covers what we need. - if (page_data.len() as u64) < partial_bytes { - return Err(Error::InvalidChecksum); - } - - buf_guard.clear(); - let over_capacity = buf_guard.append(&page_data.as_ref()[..partial_bytes as usize]); - assert!(!over_capacity); - } else { - // No partial page - all pages are full or blob is empty. - buf_guard.clear(); - } + buf_guard.offset = size; + buf_guard.clear(); Ok(()) } @@ -2878,6 +2855,123 @@ mod tests { }); } + #[test] + fn test_resize_full_page_to_partial_survives_interrupted_crc_stage() { + let executor = deterministic::Runner::default(); + + executor.start(|context| { + assert_resize_to_partial_survives_interrupted_crc( + context, + b"full_page_shrink_stage", + PAGE_SIZE.get() as usize, + 45, + 1, + PAGE_SIZE.get() as usize, + ) + }); + } + + #[test] + fn test_resize_full_page_to_partial_survives_interrupted_crc_invalidation() { + let executor = deterministic::Runner::default(); + + executor.start(|context| { + assert_resize_to_partial_survives_interrupted_crc( + context, + b"full_page_shrink_invalidation", + PAGE_SIZE.get() as usize, + 45, + 2, + 45, + ) + }); + } + + #[test] + fn test_resize_multi_page_to_partial_survives_interrupted_crc_stage() { + let executor = deterministic::Runner::default(); + + executor.start(|context| { + let page_size = PAGE_SIZE.get() as usize; + assert_resize_to_partial_survives_interrupted_crc( + context, + b"multi_page_shrink_stage", + page_size * 3, + page_size + 45, + 1, + page_size * 2, + ) + }); + } + + #[test] + fn test_resize_multi_page_to_partial_survives_interrupted_crc_invalidation() { + let executor = deterministic::Runner::default(); + + executor.start(|context| { + let page_size = PAGE_SIZE.get() as usize; + assert_resize_to_partial_survives_interrupted_crc( + context, + b"multi_page_shrink_invalidation", + page_size * 3, + page_size + 45, + 2, + page_size + 45, + ) + }); + } + + async fn assert_resize_to_partial_survives_interrupted_crc( + context: deterministic::Context, + blob_name: &'static [u8], + initial_len: usize, + target_len: usize, + fail_on_write: usize, + expected_len: usize, + ) { + let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE)); + let data: Vec = (0..initial_len).map(|i| i as u8).collect(); + + // Seed the blob with committed data before injecting a write failure. + let (blob, size) = context.open("test_partition", blob_name).await.unwrap(); + let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone()) + .await + .unwrap(); + append.append(&data).await.unwrap(); + append.sync().await.unwrap(); + drop(append); + + let (blob, size) = context.open("test_partition", blob_name).await.unwrap(); + let faulty_blob = PartialWriteBlob::new(blob, fail_on_write, 3); + let write_count = faulty_blob.write_count(); + let failed_write_len = faulty_blob.failed_write_len(); + let append = Append::new(faulty_blob, size, BUFFER_SIZE, cache_ref.clone()) + .await + .unwrap(); + + // Depending on where the old code wrote the shorter page, the injected failure can surface + // during resize or during the following sync. The recovery assertion below is the invariant. + let resize_result = append.resize(target_len as u64).await; + let failed = if resize_result.is_err() { + assert_eq!(failed_write_len.load(Ordering::SeqCst), CHECKSUM_SLOT_SIZE); + true + } else { + append.sync().await.is_err() + }; + assert!(failed, "injected partial write should fail"); + assert_eq!(write_count.load(Ordering::SeqCst), fail_on_write); + drop(append); + + // Reopen after the injected failure and verify recovery lands on the last durable state. + let (blob, size) = context.open("test_partition", blob_name).await.unwrap(); + let append = Append::new(blob, size, BUFFER_SIZE, cache_ref) + .await + .unwrap(); + assert_eq!(append.size().await, expected_len as u64); + let read = append.read_at(0, expected_len).await.unwrap().coalesce(); + assert_eq!(read.as_ref(), &data[..expected_len]); + } + #[test] fn test_reopen_partial_tail_append_and_resize() { let executor = deterministic::Runner::default(); diff --git a/runtime/src/utils/buffer/paged/cache.rs b/runtime/src/utils/buffer/paged/cache.rs index b70b443118..42ab01b4e3 100644 --- a/runtime/src/utils/buffer/paged/cache.rs +++ b/runtime/src/utils/buffer/paged/cache.rs @@ -351,7 +351,8 @@ impl CacheRef { } Entry::Vacant(v) => { // Nobody is currently fetching this page, so create a future that will do the - // work. get_page_from_blob handles CRC validation and returns only logical bytes. + // work. get_page_from_blob handles CRC validation; the cache only keeps the + // logical bytes. let blob = blob.clone(); let cache = Arc::clone(&self.cache); let page_size = self.page_size; @@ -591,7 +592,7 @@ async fn fetch_cacheable_page( page_num: u64, page_size: u64, ) -> Result> { - let page = get_page_from_blob(blob, page_num, page_size) + let (page, _) = get_page_from_blob(blob, page_num, page_size) .await .map_err(Arc::new)?; diff --git a/runtime/src/utils/buffer/paged/mod.rs b/runtime/src/utils/buffer/paged/mod.rs index eedf139a8e..b65153648c 100644 --- a/runtime/src/utils/buffer/paged/mod.rs +++ b/runtime/src/utils/buffer/paged/mod.rs @@ -40,17 +40,19 @@ use tracing::{debug, error}; const CHECKSUM_SIZE: u64 = Checksum::SIZE as u64; const CHECKSUM_SLOT_SIZE: usize = u16::SIZE + crc32::Digest::SIZE; -/// Read the designated page from the underlying blob and return its logical bytes as a vector if it -/// passes the integrity check, returning error otherwise. Safely handles partial pages. Caller can -/// check the length of the returned vector to determine if the page was partial vs full. +/// Read the designated page from the underlying blob and return its logical bytes plus the CRC +/// record used to validate it. async fn get_page_from_blob( blob: &impl Blob, page_num: u64, logical_page_size: u64, -) -> Result { - let physical_page_size = logical_page_size + CHECKSUM_SIZE; - let physical_page_start = page_num * physical_page_size; - +) -> Result<(IoBuf, Checksum), Error> { + let physical_page_size = logical_page_size + .checked_add(CHECKSUM_SIZE) + .ok_or(Error::OffsetOverflow)?; + let physical_page_start = page_num + .checked_mul(physical_page_size) + .ok_or(Error::OffsetOverflow)?; let page = blob .read_at(physical_page_start, physical_page_size as usize) .await? @@ -61,7 +63,7 @@ async fn get_page_from_blob( }; let (len, _) = record.get_crc(); - Ok(page.freeze().slice(..len as usize)) + Ok((page.freeze().slice(..len as usize), record)) } /// Describes a CRC record stored at the end of a page.