diff --git a/runtime/src/utils/buffer/paged/append.rs b/runtime/src/utils/buffer/paged/append.rs index c8c7528dca1..2525126b0e8 100644 --- a/runtime/src/utils/buffer/paged/append.rs +++ b/runtime/src/utils/buffer/paged/append.rs @@ -8,7 +8,8 @@ //! pages have a [Checksum] at the end. If no CRC record existed before for the page being written, //! then one of the checksums will be all zero. If a checksum already existed for the page being //! written, then the write will overwrite only the checksum with the lesser length value. Should -//! this write fail, the previously committed page state can still be recovered. +//! this write fail, the previously committed page state can still be recovered. Partial-page +//! shrink makes the shorter checksum durable before invalidating the old longer checksum. //! //! During initialization, the wrapper will back up over any page that is not accompanied by a //! valid CRC, treating it as the result of an incomplete write that may be invalid. @@ -16,7 +17,7 @@ use super::read::{PageReader, Replay}; use crate::{ buffer::{ - paged::{CacheRef, Checksum, CHECKSUM_SIZE}, + paged::{CacheRef, Checksum, CHECKSUM_SIZE, CHECKSUM_SLOT_SIZE}, tip::Buffer, }, Blob, Error, IoBuf, IoBufMut, IoBufs, @@ -758,6 +759,14 @@ impl Append { (write_buffer, Some(crc_record)) } + /// Encode one checksum slot as `[len: u16][crc: u32]`, matching `Checksum::write`. + fn checksum_slot_bytes(len: u16, crc: u32) -> [u8; CHECKSUM_SLOT_SIZE] { + let mut bytes = [0u8; CHECKSUM_SLOT_SIZE]; + bytes[..2].copy_from_slice(&len.to_be_bytes()); + bytes[2..].copy_from_slice(&crc.to_be_bytes()); + bytes + } + /// Build a CRC record that preserves the old CRC in its original slot and places /// the new CRC in the other slot. const fn build_crc_record_preserving_old( @@ -786,6 +795,75 @@ impl Append { } } + /// Durably rewrite a committed page to a shorter partial length. + async fn sync_partial_page_shrink( + blob: &B, + page: u64, + logical_page_size: u64, + new_len: u16, + new_crc: u32, + old_crc: &Checksum, + ) -> Result { + // Recovery chooses the valid slot with the larger length. While shrinking, the new + // checksum must be made durable without becoming authoritative until the old longer slot + // can be disabled. The sequence below therefore lets recovery observe either the old page + // or the new shorter page, but not a footer where both slots were damaged by one torn write. + let physical_page_size = logical_page_size + .checked_add(CHECKSUM_SIZE) + .ok_or(Error::OffsetOverflow)?; + let crc_start = page + .checked_mul(physical_page_size) + .and_then(|start| start.checked_add(logical_page_size)) + .ok_or(Error::OffsetOverflow)?; + let (new_slot_start, old_slot_start) = if old_crc.len1 >= old_crc.len2 { + (CHECKSUM_SLOT_SIZE, 0) + } else { + (0, CHECKSUM_SLOT_SIZE) + }; + + // Stage the new slot with a 0 length and the shrunken page CRC. A crash here leaves the + // old slot as the only non-zero valid slot. + let new_slot_offset = crc_start + .checked_add(new_slot_start as u64) + .ok_or(Error::OffsetOverflow)?; + let staged_slot = Self::checksum_slot_bytes(0, new_crc); + blob.write_at(new_slot_offset, staged_slot.to_vec()).await?; + blob.sync().await?; + + // Publish the new shrunken length. If a crash happens before the old slot is invalidated, + // both slots may be valid, but recovery still chooses the old longer length. + blob.write_at(new_slot_offset, new_len.to_be_bytes().to_vec()) + .await?; + blob.sync().await?; + + // Clear only the old slot's length bytes. Rewriting the whole footer here could tear across + // both slots and lose the already-durable shorter checksum. Once this lands, length 0 is + // never authoritative, so the shrunken slot wins. + let old_slot_offset = crc_start + .checked_add(old_slot_start as u64) + .ok_or(Error::OffsetOverflow)?; + let len_size = std::mem::size_of::(); + blob.write_at(old_slot_offset, vec![0u8; len_size]).await?; + blob.sync().await?; + + let final_record = if new_slot_start == 0 { + Checksum { + len1: new_len, + crc1: new_crc, + len2: 0, + crc2: 0, + } + } else { + Checksum { + len1: 0, + crc1: 0, + len2: new_len, + crc2: new_crc, + } + }; + Ok(final_record) + } + /// Flushes any buffered data, then returns a [Replay] for the underlying blob. /// /// The returned replay can be used to sequentially read all pages from the blob while ensuring @@ -864,6 +942,9 @@ impl Append { /// - The resize is not guaranteed durable until the next sync. pub async fn resize(&self, size: u64) -> Result<(), Error> { let current_size = self.size().await; + if size == current_size { + return Ok(()); + } // Handle growing by appending zero bytes. if size > current_size { @@ -874,8 +955,15 @@ impl Append { return Ok(()); } + self.shrink(size).await + } + + /// Coordinate the locking and dispatch logic for shrinking the blob. + async fn shrink(&self, target_size: u64) -> Result<(), Error> { let logical_page_size = self.cache_ref.page_size(); - let physical_page_size = logical_page_size + CHECKSUM_SIZE; + let physical_page_size = logical_page_size + .checked_add(CHECKSUM_SIZE) + .ok_or(Error::OffsetOverflow)?; // Flush any buffered data first to ensure we have a consistent state on disk. self.sync().await?; @@ -885,57 +973,88 @@ impl Append { let mut blob_guard = self.blob_state.write().await; // Calculate the physical size needed for the new logical size. - let full_pages = size / logical_page_size; - let partial_bytes = size % logical_page_size; - let new_physical_size = if partial_bytes > 0 { - // We need full_pages + 1 physical pages to hold the partial data. - // The partial page will be padded to full physical page size. - (full_pages + 1) * physical_page_size - } else { - // No partial page needed. - full_pages * physical_page_size - }; + let full_pages = target_size / logical_page_size; + let partial_bytes = target_size % logical_page_size; + let physical_pages = full_pages + .checked_add(u64::from(partial_bytes > 0)) + .ok_or(Error::OffsetOverflow)?; + let new_physical_size = physical_pages + .checked_mul(physical_page_size) + .ok_or(Error::OffsetOverflow)?; + let tail_offset = full_pages + .checked_mul(logical_page_size) + .ok_or(Error::OffsetOverflow)?; - // Resize the underlying blob. + // Drop cached pages at or beyond the new tail. Future appends may reuse those logical + // offsets, and cache-only reads must not see pre-shrink bytes there. blob_guard.blob.resize(new_physical_size).await?; - blob_guard.partial_page_state = None; - - // Evict cached pages at or beyond the new full-page boundary. The page at `full_pages` (if - // partial) is now owned by the tip buffer, and anything above is beyond the new logical - // size. Leaving their pre-resize contents in the cache lets `try_read_sync` (which bypasses - // the tip buffer) observe stale bytes once the tip is repopulated. self.cache_ref.invalidate_from(self.id, full_pages); - // Update blob state and buffer based on the desired logical size. The partial page data is - // read with CRC validation; the validated length may exceed partial_bytes (reflecting the - // old data length), but we only load the prefix we need. The next sync will write the - // correct CRC for the new length. - // - // Note: This updates state before validation completes, which could leave state - // inconsistent if validation fails. This is acceptable because failures from mutable - // methods are fatal - callers must not use the blob after any error. + if partial_bytes > 0 { + return self + .shrink_to_partial( + &mut buf_guard, + &mut blob_guard, + full_pages, + partial_bytes, + logical_page_size, + tail_offset, + ) + .await; + } + // Shrink the blob to a page boundary, which requires no CRC-slot rewrite. + blob_guard.partial_page_state = None; blob_guard.current_page = full_pages; - buf_guard.offset = full_pages * logical_page_size; - - if partial_bytes > 0 { - // There's a partial page. Read its data from disk with CRC validation. - let page_data = - super::get_page_from_blob(&blob_guard.blob, full_pages, logical_page_size).await?; + buf_guard.offset = tail_offset; + buf_guard.clear(); - // Ensure the validated data covers what we need. - if (page_data.len() as u64) < partial_bytes { - return Err(Error::InvalidChecksum); - } + Ok(()) + } - buf_guard.clear(); - let over_capacity = buf_guard.append(&page_data.as_ref()[..partial_bytes as usize]); - assert!(!over_capacity); - } else { - // No partial page - all pages are full or blob is empty. - buf_guard.clear(); + /// Perform a shrink to a partial page tip and make the shorter CRC slot authoritative. + async fn shrink_to_partial( + &self, + buf_guard: &mut Buffer, + blob_guard: &mut BlobState, + full_pages: u64, + partial_bytes: u64, + logical_page_size: u64, + tail_offset: u64, + ) -> Result<(), Error> { + // Update blob state and buffer based on the desired logical size. The page data is + // read with CRC validation, then durably rewritten below with a shorter CRC. + blob_guard.current_page = full_pages; + buf_guard.offset = tail_offset; + + let (page_data, old_crc) = super::get_page_with_checksum_from_blob( + &blob_guard.blob, + full_pages, + logical_page_size, + ) + .await?; + + // Ensure the validated data covers what we need. + if (page_data.len() as u64) < partial_bytes { + return Err(Error::InvalidChecksum); } + buf_guard.clear(); + let new_data = &page_data.as_ref()[..partial_bytes as usize]; + let over_capacity = buf_guard.append(new_data); + assert!(!over_capacity); + + let final_record = Self::sync_partial_page_shrink( + &blob_guard.blob, + full_pages, + logical_page_size, + partial_bytes as u16, + Crc32::checksum(new_data), + &old_crc, + ) + .await?; + blob_guard.partial_page_state = Some(final_record); + Ok(()) } } @@ -944,13 +1063,19 @@ impl Append { mod tests { use super::*; use crate::{ - deterministic, telemetry::metrics::Registry, BufferPool, BufferPoolConfig, Runner as _, - Storage as _, + deterministic, telemetry::metrics::Registry, BufferPool, BufferPoolConfig, IoBufsMut, + Runner as _, Storage as _, }; use commonware_codec::ReadExt; use commonware_macros::test_traced; use commonware_utils::{NZUsize, NZU16, NZU32}; - use std::num::NonZeroU16; + use std::{ + num::NonZeroU16, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + }; const PAGE_SIZE: NonZeroU16 = NZU16!(103); // janky size to ensure we test page alignment const BUFFER_SIZE: usize = PAGE_SIZE.get() as usize * 2; @@ -1485,6 +1610,96 @@ mod tests { Checksum::read(&mut &page_bytes[crc_start..]).unwrap() } + /// Blob wrapper that turns one write into a durable partial write followed by an error. + #[derive(Clone)] + struct PartialWriteBlob { + inner: B, + writes: Arc, + failed_write_len: Arc, + fail_on: usize, + partial_len: usize, + } + + impl PartialWriteBlob { + fn new(inner: B, fail_on: usize, partial_len: usize) -> Self { + Self { + inner, + writes: Arc::new(AtomicUsize::new(0)), + failed_write_len: Arc::new(AtomicUsize::new(0)), + fail_on, + partial_len, + } + } + + fn failed_write_len(&self) -> Arc { + self.failed_write_len.clone() + } + + fn write_count(&self) -> Arc { + self.writes.clone() + } + } + + impl crate::Blob for PartialWriteBlob { + async fn read_at(&self, offset: u64, len: usize) -> Result { + self.inner.read_at(offset, len).await + } + + async fn read_at_buf( + &self, + offset: u64, + len: usize, + bufs: impl Into + Send, + ) -> Result { + self.inner.read_at_buf(offset, len, bufs).await + } + + async fn write_at(&self, offset: u64, bufs: impl Into + Send) -> Result<(), Error> { + let bufs = bufs.into(); + let write = self.writes.fetch_add(1, Ordering::SeqCst) + 1; + if write == self.fail_on { + let bytes = bufs.coalesce(); + self.failed_write_len.store(bytes.len(), Ordering::SeqCst); + let partial_len = self.partial_len.min(bytes.len()); + self.inner + .write_at(offset, bytes.slice(..partial_len)) + .await?; + self.inner.sync().await?; + return Err(Error::Io(std::io::Error::other("injected partial write"))); + } + + self.inner.write_at(offset, bufs).await + } + + async fn write_at_sync( + &self, + offset: u64, + bufs: impl Into + Send, + ) -> Result<(), Error> { + let bufs = bufs.into(); + let write = self.writes.fetch_add(1, Ordering::SeqCst) + 1; + if write == self.fail_on { + let bytes = bufs.coalesce(); + self.failed_write_len.store(bytes.len(), Ordering::SeqCst); + let partial_len = self.partial_len.min(bytes.len()); + self.inner + .write_at_sync(offset, bytes.slice(..partial_len)) + .await?; + return Err(Error::Io(std::io::Error::other("injected partial write"))); + } + + self.inner.write_at_sync(offset, bufs).await + } + + async fn resize(&self, len: u64) -> Result<(), Error> { + self.inner.resize(len).await + } + + async fn sync(&self) -> Result<(), Error> { + self.inner.sync().await + } + } + /// Dummy marker bytes with len=0 so the mangled slot is never authoritative. /// Format: [len_hi=0, len_lo=0, 0xDE, 0xAD, 0xBE, 0xEF] const DUMMY_MARKER: [u8; 6] = [0x00, 0x00, 0xDE, 0xAD, 0xBE, 0xEF]; @@ -2506,6 +2721,404 @@ mod tests { }); } + #[test] + fn test_resize_same_size_is_noop() { + let executor = deterministic::Runner::default(); + executor.start(|context: deterministic::Context| async move { + let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE)); + let (blob, blob_size) = context + .open("test_partition", b"resize_same_size") + .await + .unwrap(); + let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref) + .await + .unwrap(); + + append.append(b"hello world").await.unwrap(); + assert_eq!(append.size().await, 11); + + // Resize to same size. Should succeed. + append.resize(11).await.unwrap(); + assert_eq!(append.size().await, 11); + + // Verify content is still readable and intact. + let read = append.read_at(0, 11).await.unwrap().coalesce(); + assert_eq!(read.as_ref(), b"hello world"); + }); + } + + #[test] + fn test_resize_same_page_shrink_reopens_at_shorter_size() { + let executor = deterministic::Runner::default(); + + executor.start(|context| async move { + let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE)); + let data: Vec = (0..50).collect(); + + let (blob, size) = context + .open("test_partition", b"same_page_shrink") + .await + .unwrap(); + let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone()) + .await + .unwrap(); + + // Create a partial page whose authoritative CRC is in the first slot. The interrupted + // tests below exercise the opposite slot orientation. + append.append(&data).await.unwrap(); + append.sync().await.unwrap(); + + append.resize(45).await.unwrap(); + drop(append); + + let (blob, size) = context + .open("test_partition", b"same_page_shrink") + .await + .unwrap(); + let append = Append::new(blob, size, BUFFER_SIZE, cache_ref) + .await + .unwrap(); + assert_eq!(append.size().await, 45); + let read = append.read_at(0, 45).await.unwrap().coalesce(); + assert_eq!(read.as_ref(), &data[..45]); + }); + } + + #[test] + fn test_resize_same_page_shrink_survives_interrupted_crc_stage() { + let executor = deterministic::Runner::default(); + + executor.start(|context| async move { + let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE)); + let data: Vec = (0..50).collect(); + + let (blob, size) = context + .open("test_partition", b"same_page_shrink_interrupted") + .await + .unwrap(); + let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone()) + .await + .unwrap(); + append.append(&data[..40]).await.unwrap(); + append.sync().await.unwrap(); + append.append(&data[40..]).await.unwrap(); + append.sync().await.unwrap(); + drop(append); + + let (blob, size) = context + .open("test_partition", b"same_page_shrink_interrupted") + .await + .unwrap(); + let faulty_blob = PartialWriteBlob::new(blob, 1, 3); + let write_count = faulty_blob.write_count(); + let failed_write_len = faulty_blob.failed_write_len(); + let append = Append::new(faulty_blob, size, BUFFER_SIZE, cache_ref.clone()) + .await + .unwrap(); + + assert!( + append.resize(45).await.is_err(), + "phase-1 partial write should fail" + ); + assert_eq!(write_count.load(Ordering::SeqCst), 1); + assert_eq!(failed_write_len.load(Ordering::SeqCst), CHECKSUM_SLOT_SIZE); + drop(append); + + let (blob, size) = context + .open("test_partition", b"same_page_shrink_interrupted") + .await + .unwrap(); + let append = Append::new(blob, size, BUFFER_SIZE, cache_ref) + .await + .unwrap(); + assert_eq!(append.size().await, 50); + let read = append.read_at(0, 50).await.unwrap().coalesce(); + assert_eq!(read.as_ref(), &data); + }); + } + + #[test] + fn test_resize_same_page_shrink_survives_interrupted_len_stage() { + let executor = deterministic::Runner::default(); + + executor.start(|context| async move { + const LARGE_PAGE_SIZE: NonZeroU16 = NZU16!(600); + const LARGE_BUFFER_SIZE: usize = 1_200; + + let cache_ref = + CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(LARGE_BUFFER_SIZE)); + let data: Vec = (0..300).map(|i| (i % 251) as u8).collect(); + + let (blob, size) = context + .open("test_partition", b"same_page_shrink_len_stage") + .await + .unwrap(); + let append = Append::new(blob, size, LARGE_BUFFER_SIZE, cache_ref.clone()) + .await + .unwrap(); + append.append(&data[..255]).await.unwrap(); + append.sync().await.unwrap(); + append.append(&data[255..]).await.unwrap(); + append.sync().await.unwrap(); + drop(append); + + let (blob, size) = context + .open("test_partition", b"same_page_shrink_len_stage") + .await + .unwrap(); + let faulty_blob = PartialWriteBlob::new(blob, 2, 1); + let write_count = faulty_blob.write_count(); + let failed_write_len = faulty_blob.failed_write_len(); + let append = Append::new(faulty_blob, size, LARGE_BUFFER_SIZE, cache_ref.clone()) + .await + .unwrap(); + + assert!( + append.resize(257).await.is_err(), + "length-stage partial write should fail" + ); + assert_eq!(write_count.load(Ordering::SeqCst), 2); + assert_eq!(failed_write_len.load(Ordering::SeqCst), 2); + drop(append); + + let (blob, size) = context + .open("test_partition", b"same_page_shrink_len_stage") + .await + .unwrap(); + let append = Append::new(blob, size, LARGE_BUFFER_SIZE, cache_ref) + .await + .unwrap(); + assert_eq!(append.size().await, 300); + let read = append.read_at(0, 300).await.unwrap().coalesce(); + assert_eq!(read.as_ref(), &data); + }); + } + + #[test] + fn test_resize_same_page_shrink_preserves_validated_fallback_slot() { + let executor = deterministic::Runner::default(); + + executor.start(|context| async move { + let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE)); + let data: Vec = (0..52).collect(); + + let (blob, size) = context + .open("test_partition", b"same_page_shrink_fallback_slot") + .await + .unwrap(); + let faulty_blob = PartialWriteBlob::new(blob.clone(), 5, 3); + let write_count = faulty_blob.write_count(); + let failed_write_len = faulty_blob.failed_write_len(); + let append = Append::new(faulty_blob, size, BUFFER_SIZE, cache_ref.clone()) + .await + .unwrap(); + append.append(&data[..48]).await.unwrap(); + append.sync().await.unwrap(); + assert_eq!(write_count.load(Ordering::SeqCst), 1); + + append.append(&data[48..50]).await.unwrap(); + append.sync().await.unwrap(); + assert_eq!(write_count.load(Ordering::SeqCst), 3); + + append.append(&data[50..]).await.unwrap(); + append.sync().await.unwrap(); + assert_eq!(write_count.load(Ordering::SeqCst), 4); + + // Corrupt the newer authoritative slot. The older slot still covers the shrink target. + // `resize()` first syncs the live buffer, which writes a valid fallback slot but leaves + // the cached footer stale. A torn phase-1 shrink write must preserve that validated + // fallback slot. + let slot0_offset = PAGE_SIZE.get() as u64; + blob.write_at(slot0_offset, DUMMY_MARKER.to_vec()) + .await + .unwrap(); + blob.sync().await.unwrap(); + + assert!( + append.resize(45).await.is_err(), + "phase-1 partial write should fail" + ); + assert_eq!(write_count.load(Ordering::SeqCst), 5); + assert_eq!(failed_write_len.load(Ordering::SeqCst), CHECKSUM_SLOT_SIZE); + drop(append); + + let (blob, size) = context + .open("test_partition", b"same_page_shrink_fallback_slot") + .await + .unwrap(); + let append = Append::new(blob, size, BUFFER_SIZE, cache_ref) + .await + .unwrap(); + assert_eq!(append.size().await, 50); + let read = append.read_at(0, 50).await.unwrap().coalesce(); + assert_eq!(read.as_ref(), &data[..50]); + }); + } + + #[test] + fn test_resize_full_page_to_partial_reopens_at_shorter_size() { + let executor = deterministic::Runner::default(); + + executor.start(|context| async move { + let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE)); + let page_size = PAGE_SIZE.get() as u64; + let target = page_size + 45; + let data: Vec = (0..page_size * 2).map(|i| (i % 251) as u8).collect(); + + let (blob, size) = context + .open("test_partition", b"full_page_to_partial") + .await + .unwrap(); + let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone()) + .await + .unwrap(); + append.append(&data).await.unwrap(); + append.sync().await.unwrap(); + + append.resize(target).await.unwrap(); + drop(append); + + let (blob, size) = context + .open("test_partition", b"full_page_to_partial") + .await + .unwrap(); + let append = Append::new(blob, size, BUFFER_SIZE, cache_ref) + .await + .unwrap(); + assert_eq!(append.size().await, target); + let read = append.read_at(0, target as usize).await.unwrap().coalesce(); + assert_eq!(read.as_ref(), &data[..target as usize]); + }); + } + + #[test] + fn test_resize_full_page_to_partial_survives_interrupted_crc_stage() { + let executor = deterministic::Runner::default(); + + executor.start(|context| async move { + let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE)); + let page_size = PAGE_SIZE.get() as u64; + let target = page_size + 45; + let data: Vec = (0..page_size * 3).map(|i| (i % 251) as u8).collect(); + + let (blob, size) = context + .open("test_partition", b"full_page_to_partial_interrupted") + .await + .unwrap(); + let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone()) + .await + .unwrap(); + append.append(&data).await.unwrap(); + append.sync().await.unwrap(); + drop(append); + + let (blob, size) = context + .open("test_partition", b"full_page_to_partial_interrupted") + .await + .unwrap(); + let faulty_blob = PartialWriteBlob::new(blob, 1, 3); + let write_count = faulty_blob.write_count(); + let failed_write_len = faulty_blob.failed_write_len(); + let append = Append::new(faulty_blob, size, BUFFER_SIZE, cache_ref.clone()) + .await + .unwrap(); + + assert!( + append.resize(target).await.is_err(), + "phase-1 partial write should fail" + ); + assert_eq!(write_count.load(Ordering::SeqCst), 1); + assert_eq!(failed_write_len.load(Ordering::SeqCst), CHECKSUM_SLOT_SIZE); + drop(append); + + let (blob, size) = context + .open("test_partition", b"full_page_to_partial_interrupted") + .await + .unwrap(); + let append = Append::new(blob, size, BUFFER_SIZE, cache_ref) + .await + .unwrap(); + assert_eq!(append.size().await, page_size * 2); + let read = append + .read_at(0, (page_size * 2) as usize) + .await + .unwrap() + .coalesce(); + assert_eq!(read.as_ref(), &data[..(page_size * 2) as usize]); + }); + } + + #[test] + fn test_resize_same_page_shrink_survives_interrupted_length_invalidation() { + let executor = deterministic::Runner::default(); + + executor.start(|context| async move { + const LARGE_PAGE_SIZE: NonZeroU16 = NZU16!(600); + const LARGE_BUFFER_SIZE: usize = 1_200; + + let cache_ref = + CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(LARGE_BUFFER_SIZE)); + let data: Vec = (0..300).map(|i| (i % 251) as u8).collect(); + + let (blob, size) = context + .open( + "test_partition", + b"same_page_shrink_interrupted_len_invalidation", + ) + .await + .unwrap(); + let append = Append::new(blob, size, LARGE_BUFFER_SIZE, cache_ref.clone()) + .await + .unwrap(); + // Put the old authoritative CRC in slot 1, so the shorter CRC will be staged in slot + // 0. The old length is above 255, so a one-byte tear changes the decoded length. + append.append(&data[..255]).await.unwrap(); + append.sync().await.unwrap(); + append.append(&data[255..]).await.unwrap(); + append.sync().await.unwrap(); + drop(append); + + let (blob, size) = context + .open( + "test_partition", + b"same_page_shrink_interrupted_len_invalidation", + ) + .await + .unwrap(); + let faulty_blob = PartialWriteBlob::new(blob, 3, 1); + let write_count = faulty_blob.write_count(); + let failed_write_len = faulty_blob.failed_write_len(); + let append = Append::new(faulty_blob, size, LARGE_BUFFER_SIZE, cache_ref.clone()) + .await + .unwrap(); + + assert!( + append.resize(40).await.is_err(), + "old-slot length invalidation should fail" + ); + assert_eq!(write_count.load(Ordering::SeqCst), 3); + assert_eq!( + failed_write_len.load(Ordering::SeqCst), + std::mem::size_of::() + ); + drop(append); + + let (blob, size) = context + .open( + "test_partition", + b"same_page_shrink_interrupted_len_invalidation", + ) + .await + .unwrap(); + let append = Append::new(blob, size, LARGE_BUFFER_SIZE, cache_ref) + .await + .unwrap(); + assert_eq!(append.size().await, 40); + let read = append.read_at(0, 40).await.unwrap().coalesce(); + assert_eq!(read.as_ref(), &data[..40]); + }); + } + #[test] fn test_reopen_partial_tail_append_and_resize() { let executor = deterministic::Runner::default(); diff --git a/runtime/src/utils/buffer/paged/mod.rs b/runtime/src/utils/buffer/paged/mod.rs index 7453c32029f..6284bd05204 100644 --- a/runtime/src/utils/buffer/paged/mod.rs +++ b/runtime/src/utils/buffer/paged/mod.rs @@ -13,15 +13,15 @@ //! //! Two checksums are stored so that partial pages can be re-written without overwriting a valid //! checksum for its previously committed contents. A checksum over a page is computed over the -//! first [0,len) bytes in the page, with all other bytes in the page ignored. This implementation -//! always 0-pads the range [len, page_size). A checksum with length 0 is never considered -//! valid. If both checksums are valid for the page, the one with the larger `len` is considered -//! authoritative. +//! first [0,len) bytes in the page, with all other bytes in the page ignored. Ordinary partial-page +//! payload writes 0-pad the range [len, page_size), but recovery does not depend on bytes outside +//! [0,len). A checksum with length 0 is never considered valid. If both checksums are valid for the +//! page, the one with the larger `len` is considered authoritative. Partial-page shrink first makes +//! the shorter checksum durable in the alternate slot, then invalidates the old longer checksum. //! //! A _full_ page is one whose crc stores a len equal to the logical page size. Otherwise the page //! is called _partial_. All pages in a blob are full except for the very last page, which can be -//! full or partial. A partial page's logical bytes are immutable on commit, and if it's re-written, -//! it's only to add more bytes after the existing ones. +//! full or partial. A partial page's committed prefix remains recoverable while it is rewritten. use crate::{Blob, Buf, BufMut, Error, IoBuf}; use commonware_codec::{EncodeFixed, FixedSize, Read as CodecRead, ReadExt, Write}; @@ -36,8 +36,9 @@ pub use cache::CacheRef; pub use read::Replay; use tracing::{debug, error}; -// A checksum record contains two u16 lengths and two CRCs (each 4 bytes). +// A checksum record contains two slots. Each slot stores one u16 length and one CRC. const CHECKSUM_SIZE: u64 = Checksum::SIZE as u64; +const CHECKSUM_SLOT_SIZE: usize = u16::SIZE + crc32::Digest::SIZE; /// Read the designated page from the underlying blob and return its logical bytes as a vector if it /// passes the integrity check, returning error otherwise. Safely handles partial pages. Caller can @@ -47,8 +48,22 @@ async fn get_page_from_blob( page_num: u64, logical_page_size: u64, ) -> Result { - let physical_page_size = logical_page_size + CHECKSUM_SIZE; - let physical_page_start = page_num * physical_page_size; + let (page, _) = get_page_with_checksum_from_blob(blob, page_num, logical_page_size).await?; + Ok(page) +} + +/// Read the designated page and return both its logical bytes and validated checksum record. +async fn get_page_with_checksum_from_blob( + blob: &impl Blob, + page_num: u64, + logical_page_size: u64, +) -> Result<(IoBuf, Checksum), Error> { + let physical_page_size = logical_page_size + .checked_add(CHECKSUM_SIZE) + .ok_or(Error::OffsetOverflow)?; + let physical_page_start = page_num + .checked_mul(physical_page_size) + .ok_or(Error::OffsetOverflow)?; let page = blob .read_at(physical_page_start, physical_page_size as usize) @@ -60,7 +75,7 @@ async fn get_page_from_blob( }; let (len, _) = record.get_crc(); - Ok(page.freeze().slice(..len as usize)) + Ok((page.freeze().slice(..len as usize), record)) } /// Describes a CRC record stored at the end of a page. diff --git a/storage/src/journal/segmented/variable.rs b/storage/src/journal/segmented/variable.rs index 0f44cc013b1..a986199ae4f 100644 --- a/storage/src/journal/segmented/variable.rs +++ b/storage/src/journal/segmented/variable.rs @@ -402,7 +402,13 @@ impl Journal { new_size = state.valid_offset, "trailing bytes detected: truncating" ); - state.blob.resize(state.valid_offset).await.ok()?; + if let Err(err) = + state.blob.resize(state.valid_offset).await + { + batch.push(Err(err.into())); + state.done = true; + return Some((batch, state)); + } } state.done = true; return if batch.is_empty() { @@ -428,7 +434,11 @@ impl Journal { new_size = state.valid_offset, "incomplete item at end: truncating" ); - state.blob.resize(state.valid_offset).await.ok()?; + if let Err(err) = state.blob.resize(state.valid_offset).await { + batch.push(Err(err.into())); + state.done = true; + return Some((batch, state)); + } state.done = true; return if batch.is_empty() { None @@ -1343,6 +1353,65 @@ mod tests { }); } + #[test_traced] + fn test_journal_replay_reports_resize_error_on_trailing_bytes() { + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let cfg = Config { + partition: "test-partition".into(), + compression: None, + codec_config: (), + page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE), + write_buffer: NZUsize!(1024), + }; + let section = 1u64; + let item = 10u64; + + let mut journal = Journal::init(context.child("first"), cfg.clone()) + .await + .expect("Failed to initialize journal"); + journal + .append(section, &item) + .await + .expect("Failed to append item"); + journal + .append_raw(section, &[0xFF, 0xFF]) + .await + .expect("Failed to append trailing bytes"); + journal.sync(section).await.expect("Failed to sync journal"); + drop(journal); + + let journal = Journal::<_, u64>::init(context.child("second"), cfg) + .await + .expect("Failed to re-initialize journal"); + *context.storage_fault_config().write() = deterministic::FaultConfig { + resize_rate: Some(1.0), + ..Default::default() + }; + + let stream = journal + .replay(0, 0, NZUsize!(1024)) + .await + .expect("unable to setup replay"); + pin_mut!(stream); + + let first = stream + .next() + .await + .expect("expected item before trailing bytes") + .expect("failed to replay valid item"); + assert_eq!(first, (section, 0, item.encode_size() as u32, item)); + + match stream.next().await { + Some(Err(_)) => {} + other => { + panic!("expected resize error while repairing trailing bytes, got {other:?}") + } + } + assert!(stream.next().await.is_none()); + }); + } + #[test_traced] fn test_journal_read_item_missing() { // Initialize the deterministic context