Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 156 additions & 62 deletions runtime/src/utils/buffer/paged/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -954,103 +954,80 @@ impl<B: Blob> Append<B> {
let mut buf_guard = self.buffer.write().await;
let mut blob_guard = self.blob_state.write().await;

// Calculate the physical size needed for the new logical size.
// Calculate the physical size needed for the new logical size. A partial page still
// occupies a full physical page on disk.
let full_pages = size / logical_page_size;
let partial_bytes = size % logical_page_size;
let new_physical_size = if partial_bytes > 0 {
// We need full_pages + 1 physical pages to hold the partial data.
// The partial page will be padded to full physical page size.
(full_pages + 1) * physical_page_size
} else {
// No partial page needed.
full_pages * physical_page_size
};

let old_partial_page_state = blob_guard.partial_page_state.clone();
let same_page_partial_shrink = size < current_size
&& partial_bytes > 0
&& full_pages == blob_guard.current_page
&& old_partial_page_state.is_some();

if same_page_partial_shrink {
let old_crc =
old_partial_page_state.expect("same-page shrink requires old partial CRC");
let new_physical_size = (full_pages + u64::from(partial_bytes > 0)) * physical_page_size;

// Any shrink that lands inside an existing page must transition the target page's CRC
// through the alternate slot. The old page may have been full or partial, but rewriting it
// as a padded partial page before the shorter CRC is durable can make recovery discard the
// whole page.
if partial_bytes > 0 {
// Evict cached pages at or beyond the new full-page boundary. The page at
// `full_pages` is now owned by the tip buffer, and anything above is beyond the new
// logical size.
self.cache_ref.invalidate_from(self.id, full_pages);

// Update blob state and buffer based on the desired logical size. The page data is
// read with CRC validation, then durably rewritten below with a shorter CRC.
blob_guard.current_page = full_pages;
buf_guard.offset = full_pages * logical_page_size;

let page_data =
let (page_data, old_crc) =
super::get_page_from_blob(&blob_guard.blob, full_pages, logical_page_size).await?;

// Ensure the validated data covers what we need.
if (page_data.len() as u64) < partial_bytes {
return Err(Error::InvalidChecksum);
}

buf_guard.clear();
let new_data = &page_data.as_ref()[..partial_bytes as usize];
let over_capacity = buf_guard.append(new_data);
assert!(!over_capacity);
let new_crc = Crc32::checksum(new_data);

let current_physical_size = blob_guard
.current_page
.checked_add(u64::from(blob_guard.partial_page_state.is_some()))
.and_then(|pages| pages.checked_mul(physical_page_size))
.ok_or(Error::OffsetOverflow)?;

// If the shrink drops later physical pages, make that truncation durable before
// changing the landing page's CRC. A crash before the shorter CRC is staged can then
// still recover the landing page at its old length, but recovery stops at the newly
// truncated physical end.
if new_physical_size < current_physical_size {
blob_guard.blob.resize(new_physical_size).await?;
blob_guard.blob.sync().await?;
}

let final_record = Self::sync_same_page_shrink(
&blob_guard.blob,
full_pages,
logical_page_size,
partial_bytes as u16,
Crc32::checksum(new_data),
new_crc,
&old_crc,
)
.await?;

// Update blob state and buffer based on the desired logical size.
blob_guard.current_page = full_pages;
buf_guard.offset = full_pages * logical_page_size;
buf_guard.clear();
let over_capacity = buf_guard.append(new_data);
assert!(!over_capacity);
blob_guard.partial_page_state = Some(final_record);
return Ok(());
}

// Resize the underlying blob.
// Page-aligned shrink: no partial page remains, so truncate the blob directly.
blob_guard.blob.resize(new_physical_size).await?;
blob_guard.partial_page_state = None;

// Evict cached pages at or beyond the new full-page boundary. The page at `full_pages` (if
// partial) is now owned by the tip buffer, and anything above is beyond the new logical
// size. Leaving their pre-resize contents in the cache lets `try_read_sync` (which bypasses
// the tip buffer) observe stale bytes once the tip is repopulated.
// Evict cached pages at or beyond the new full-page boundary. Leaving their pre-resize
// contents in the cache lets `try_read_sync` observe stale bytes after the shrink.
self.cache_ref.invalidate_from(self.id, full_pages);

// Update blob state and buffer based on the desired logical size. The partial page data is
// read with CRC validation; the validated length may exceed partial_bytes (reflecting the
// old data length), but we only load the prefix we need. The next sync will write the
// correct CRC for the new length.
//
// Note: This updates state before validation completes, which could leave state
// inconsistent if validation fails. This is acceptable because failures from mutable
// methods are fatal - callers must not use the blob after any error.

// All remaining pages are full, so the tip buffer is empty.
blob_guard.current_page = full_pages;
buf_guard.offset = full_pages * logical_page_size;

if partial_bytes > 0 {
// There's a partial page. Read its data from disk with CRC validation.
let page_data =
super::get_page_from_blob(&blob_guard.blob, full_pages, logical_page_size).await?;

// Ensure the validated data covers what we need.
if (page_data.len() as u64) < partial_bytes {
return Err(Error::InvalidChecksum);
}

buf_guard.clear();
let over_capacity = buf_guard.append(&page_data.as_ref()[..partial_bytes as usize]);
assert!(!over_capacity);
} else {
// No partial page - all pages are full or blob is empty.
buf_guard.clear();
}
buf_guard.offset = size;
buf_guard.clear();

Ok(())
}
Expand Down Expand Up @@ -2878,6 +2855,123 @@ mod tests {
});
}

#[test]
fn test_resize_full_page_to_partial_survives_interrupted_crc_stage() {
let executor = deterministic::Runner::default();

executor.start(|context| {
assert_resize_to_partial_survives_interrupted_crc(
context,
b"full_page_shrink_stage",
PAGE_SIZE.get() as usize,
45,
1,
PAGE_SIZE.get() as usize,
)
});
}

#[test]
fn test_resize_full_page_to_partial_survives_interrupted_crc_invalidation() {
let executor = deterministic::Runner::default();

executor.start(|context| {
assert_resize_to_partial_survives_interrupted_crc(
context,
b"full_page_shrink_invalidation",
PAGE_SIZE.get() as usize,
45,
2,
45,
)
});
}

#[test]
fn test_resize_multi_page_to_partial_survives_interrupted_crc_stage() {
let executor = deterministic::Runner::default();

executor.start(|context| {
let page_size = PAGE_SIZE.get() as usize;
assert_resize_to_partial_survives_interrupted_crc(
context,
b"multi_page_shrink_stage",
page_size * 3,
page_size + 45,
1,
page_size * 2,
)
});
}

#[test]
fn test_resize_multi_page_to_partial_survives_interrupted_crc_invalidation() {
let executor = deterministic::Runner::default();

executor.start(|context| {
let page_size = PAGE_SIZE.get() as usize;
assert_resize_to_partial_survives_interrupted_crc(
context,
b"multi_page_shrink_invalidation",
page_size * 3,
page_size + 45,
2,
page_size + 45,
)
});
}

async fn assert_resize_to_partial_survives_interrupted_crc(
context: deterministic::Context,
blob_name: &'static [u8],
initial_len: usize,
target_len: usize,
fail_on_write: usize,
expected_len: usize,
) {
let cache_ref = CacheRef::from_pooler(&context, PAGE_SIZE, NZUsize!(BUFFER_SIZE));
let data: Vec<u8> = (0..initial_len).map(|i| i as u8).collect();

// Seed the blob with committed data before injecting a write failure.
let (blob, size) = context.open("test_partition", blob_name).await.unwrap();
let append = Append::new(blob, size, BUFFER_SIZE, cache_ref.clone())
.await
.unwrap();
append.append(&data).await.unwrap();
append.sync().await.unwrap();
drop(append);

let (blob, size) = context.open("test_partition", blob_name).await.unwrap();
let faulty_blob = PartialWriteBlob::new(blob, fail_on_write, 3);
let write_count = faulty_blob.write_count();
let failed_write_len = faulty_blob.failed_write_len();
let append = Append::new(faulty_blob, size, BUFFER_SIZE, cache_ref.clone())
.await
.unwrap();

// Depending on where the old code wrote the shorter page, the injected failure can surface
// during resize or during the following sync. The recovery assertion below is the invariant.
let resize_result = append.resize(target_len as u64).await;
let failed = if resize_result.is_err() {
assert_eq!(failed_write_len.load(Ordering::SeqCst), CHECKSUM_SLOT_SIZE);
true
} else {
append.sync().await.is_err()
};
assert!(failed, "injected partial write should fail");
assert_eq!(write_count.load(Ordering::SeqCst), fail_on_write);
drop(append);

// Reopen after the injected failure and verify recovery lands on the last durable state.
let (blob, size) = context.open("test_partition", blob_name).await.unwrap();
let append = Append::new(blob, size, BUFFER_SIZE, cache_ref)
.await
.unwrap();
assert_eq!(append.size().await, expected_len as u64);
let read = append.read_at(0, expected_len).await.unwrap().coalesce();
assert_eq!(read.as_ref(), &data[..expected_len]);
}

#[test]
fn test_reopen_partial_tail_append_and_resize() {
let executor = deterministic::Runner::default();
Expand Down
5 changes: 3 additions & 2 deletions runtime/src/utils/buffer/paged/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,8 @@ impl CacheRef {
}
Entry::Vacant(v) => {
// Nobody is currently fetching this page, so create a future that will do the
// work. get_page_from_blob handles CRC validation and returns only logical bytes.
// work. get_page_from_blob handles CRC validation; the cache only keeps the
// logical bytes.
let blob = blob.clone();
let cache = Arc::clone(&self.cache);
let page_size = self.page_size;
Expand Down Expand Up @@ -591,7 +592,7 @@ async fn fetch_cacheable_page(
page_num: u64,
page_size: u64,
) -> Result<IoBuf, Arc<Error>> {
let page = get_page_from_blob(blob, page_num, page_size)
let (page, _) = get_page_from_blob(blob, page_num, page_size)
.await
.map_err(Arc::new)?;

Expand Down
18 changes: 10 additions & 8 deletions runtime/src/utils/buffer/paged/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,19 @@ use tracing::{debug, error};
const CHECKSUM_SIZE: u64 = Checksum::SIZE as u64;
const CHECKSUM_SLOT_SIZE: usize = u16::SIZE + crc32::Digest::SIZE;

/// Read the designated page from the underlying blob and return its logical bytes as a vector if it
/// passes the integrity check, returning error otherwise. Safely handles partial pages. Caller can
/// check the length of the returned vector to determine if the page was partial vs full.
/// Read the designated page from the underlying blob and return its logical bytes plus the CRC
/// record used to validate it.
async fn get_page_from_blob(
blob: &impl Blob,
page_num: u64,
logical_page_size: u64,
) -> Result<IoBuf, Error> {
let physical_page_size = logical_page_size + CHECKSUM_SIZE;
let physical_page_start = page_num * physical_page_size;

) -> Result<(IoBuf, Checksum), Error> {
let physical_page_size = logical_page_size
.checked_add(CHECKSUM_SIZE)
.ok_or(Error::OffsetOverflow)?;
let physical_page_start = page_num
.checked_mul(physical_page_size)
.ok_or(Error::OffsetOverflow)?;
let page = blob
.read_at(physical_page_start, physical_page_size as usize)
.await?
Expand All @@ -61,7 +63,7 @@ async fn get_page_from_blob(
};
let (len, _) = record.get_crc();

Ok(page.freeze().slice(..len as usize))
Ok((page.freeze().slice(..len as usize), record))
}

/// Describes a CRC record stored at the end of a page.
Expand Down
Loading