Skip to content

Commit 82a86ec

Browse files
restructure/simplify
1 parent b5d0b71 commit 82a86ec

1 file changed

Lines changed: 89 additions & 82 deletions

File tree

runtime/src/utils/buffer/paged/append.rs

Lines changed: 89 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -822,11 +822,8 @@ impl<B: Blob> Append<B> {
822822
0
823823
};
824824
let staged_slot = Self::checksum_slot_bytes(0, new_crc);
825-
blob.write_at(
826-
crc_start + new_slot_start as u64,
827-
staged_slot.to_vec(),
828-
)
829-
.await?;
825+
blob.write_at(crc_start + new_slot_start as u64, staged_slot.to_vec())
826+
.await?;
830827
blob.sync().await?;
831828

832829
blob.write_at(
@@ -963,6 +960,11 @@ impl<B: Blob> Append<B> {
963960
return Ok(());
964961
}
965962

963+
self.shrink(size).await
964+
}
965+
966+
/// Coordinate the locking and dispatch logic for shrinking the blob.
967+
async fn shrink(&self, target_size: u64) -> Result<(), Error> {
966968
let logical_page_size = self.cache_ref.page_size();
967969
let physical_page_size = logical_page_size + CHECKSUM_SIZE;
968970

@@ -974,8 +976,8 @@ impl<B: Blob> Append<B> {
974976
let mut blob_guard = self.blob_state.write().await;
975977

976978
// Calculate the physical size needed for the new logical size.
977-
let full_pages = size / logical_page_size;
978-
let partial_bytes = size % logical_page_size;
979+
let full_pages = target_size / logical_page_size;
980+
let partial_bytes = target_size % logical_page_size;
979981
let new_physical_size = if partial_bytes > 0 {
980982
// We need full_pages + 1 physical pages to hold the partial data.
981983
// The partial page will be padded to full physical page size.
@@ -985,55 +987,87 @@ impl<B: Blob> Append<B> {
985987
full_pages * physical_page_size
986988
};
987989

988-
let protected_partial_shrink = size < current_size
989-
&& partial_bytes > 0
990-
&& (full_pages < blob_guard.current_page
991-
|| (full_pages == blob_guard.current_page
992-
&& blob_guard.partial_page_state.is_some()));
993-
994-
if protected_partial_shrink {
995-
blob_guard.blob.resize(new_physical_size).await?;
996-
997-
// Evict cached pages at or beyond the new full-page boundary. The page at
998-
// `full_pages` is now owned by the tip buffer, and anything above is beyond the new
999-
// logical size.
1000-
self.cache_ref.invalidate_from(self.id, full_pages);
1001-
1002-
// Update blob state and buffer based on the desired logical size. The page data is
1003-
// read with CRC validation, then durably rewritten below with a shorter CRC.
1004-
blob_guard.current_page = full_pages;
1005-
buf_guard.offset = full_pages * logical_page_size;
1006-
1007-
let (page_data, old_crc) = super::get_page_with_checksum_from_blob(
1008-
&blob_guard.blob,
990+
if partial_bytes > 0 {
991+
self.shrink_protected_partial(
992+
&mut buf_guard,
993+
&mut blob_guard,
994+
new_physical_size,
1009995
full_pages,
996+
partial_bytes,
1010997
logical_page_size,
1011998
)
1012-
.await?;
999+
.await
1000+
} else {
1001+
self.shrink_standard(
1002+
&mut buf_guard,
1003+
&mut blob_guard,
1004+
new_physical_size,
1005+
full_pages,
1006+
)
1007+
.await
1008+
}
1009+
}
10131010

1014-
// Ensure the validated data covers what we need.
1015-
if (page_data.len() as u64) < partial_bytes {
1016-
return Err(Error::InvalidChecksum);
1017-
}
1011+
/// Perform a safe, torn-write-resistant shrink to a new partial page tip.
1012+
async fn shrink_protected_partial(
1013+
&self,
1014+
buf_guard: &mut Buffer,
1015+
blob_guard: &mut BlobState<B>,
1016+
new_physical_size: u64,
1017+
full_pages: u64,
1018+
partial_bytes: u64,
1019+
logical_page_size: u64,
1020+
) -> Result<(), Error> {
1021+
blob_guard.blob.resize(new_physical_size).await?;
10181022

1019-
buf_guard.clear();
1020-
let new_data = &page_data.as_ref()[..partial_bytes as usize];
1021-
let over_capacity = buf_guard.append(new_data);
1022-
assert!(!over_capacity);
1023+
// Evict cached pages at or beyond the new full-page boundary. The page at
1024+
// `full_pages` is now owned by the tip buffer, and anything above is beyond the new
1025+
// logical size.
1026+
self.cache_ref.invalidate_from(self.id, full_pages);
10231027

1024-
let final_record = Self::sync_partial_page_shrink(
1025-
&blob_guard.blob,
1026-
full_pages,
1027-
logical_page_size,
1028-
partial_bytes as u16,
1029-
Crc32::checksum(new_data),
1030-
&old_crc,
1031-
)
1032-
.await?;
1033-
blob_guard.partial_page_state = Some(final_record);
1034-
return Ok(());
1028+
// Update blob state and buffer based on the desired logical size. The page data is
1029+
// read with CRC validation, then durably rewritten below with a shorter CRC.
1030+
blob_guard.current_page = full_pages;
1031+
buf_guard.offset = full_pages * logical_page_size;
1032+
1033+
let (page_data, old_crc) = super::get_page_with_checksum_from_blob(
1034+
&blob_guard.blob,
1035+
full_pages,
1036+
logical_page_size,
1037+
)
1038+
.await?;
1039+
1040+
// Ensure the validated data covers what we need.
1041+
if (page_data.len() as u64) < partial_bytes {
1042+
return Err(Error::InvalidChecksum);
10351043
}
10361044

1045+
buf_guard.clear();
1046+
let new_data = &page_data.as_ref()[..partial_bytes as usize];
1047+
let over_capacity = buf_guard.append(new_data);
1048+
assert!(!over_capacity);
1049+
1050+
let final_record = Self::sync_partial_page_shrink(
1051+
&blob_guard.blob,
1052+
full_pages,
1053+
logical_page_size,
1054+
partial_bytes as u16,
1055+
Crc32::checksum(new_data),
1056+
&old_crc,
1057+
)
1058+
.await?;
1059+
blob_guard.partial_page_state = Some(final_record);
1060+
Ok(())
1061+
}
1062+
1063+
/// Perform a standard shrink to a page boundary or a generic partial page.
1064+
async fn shrink_standard(
1065+
&self,
1066+
buf_guard: &mut Buffer,
1067+
blob_guard: &mut BlobState<B>,
1068+
new_physical_size: u64,
1069+
full_pages: u64,
1070+
) -> Result<(), Error> {
10371071
// Resize the underlying blob.
10381072
blob_guard.blob.resize(new_physical_size).await?;
10391073
blob_guard.partial_page_state = None;
@@ -1044,35 +1078,10 @@ impl<B: Blob> Append<B> {
10441078
// the tip buffer) observe stale bytes once the tip is repopulated.
10451079
self.cache_ref.invalidate_from(self.id, full_pages);
10461080

1047-
// Update blob state and buffer based on the desired logical size. The partial page data is
1048-
// read with CRC validation; the validated length may exceed partial_bytes (reflecting the
1049-
// old data length), but we only load the prefix we need. The next sync will write the
1050-
// correct CRC for the new length.
1051-
//
1052-
// Note: This updates state before validation completes, which could leave state
1053-
// inconsistent if validation fails. This is acceptable because failures from mutable
1054-
// methods are fatal - callers must not use the blob after any error.
1055-
1081+
// Update blob state and buffer based on the desired logical size.
10561082
blob_guard.current_page = full_pages;
1057-
buf_guard.offset = full_pages * logical_page_size;
1058-
1059-
if partial_bytes > 0 {
1060-
// There's a partial page. Read its data from disk with CRC validation.
1061-
let page_data =
1062-
super::get_page_from_blob(&blob_guard.blob, full_pages, logical_page_size).await?;
1063-
1064-
// Ensure the validated data covers what we need.
1065-
if (page_data.len() as u64) < partial_bytes {
1066-
return Err(Error::InvalidChecksum);
1067-
}
1068-
1069-
buf_guard.clear();
1070-
let over_capacity = buf_guard.append(&page_data.as_ref()[..partial_bytes as usize]);
1071-
assert!(!over_capacity);
1072-
} else {
1073-
// No partial page - all pages are full or blob is empty.
1074-
buf_guard.clear();
1075-
}
1083+
buf_guard.offset = full_pages * self.cache_ref.page_size();
1084+
buf_guard.clear();
10761085

10771086
Ok(())
10781087
}
@@ -2945,7 +2954,9 @@ mod tests {
29452954
.open("test_partition", b"same_page_shrink_fallback_slot")
29462955
.await
29472956
.unwrap();
2948-
let append = Append::new(blob, size, BUFFER_SIZE, cache_ref).await.unwrap();
2957+
let append = Append::new(blob, size, BUFFER_SIZE, cache_ref)
2958+
.await
2959+
.unwrap();
29492960
assert_eq!(append.size().await, 50);
29502961
let read = append.read_at(0, 50).await.unwrap().coalesce();
29512962
assert_eq!(read.as_ref(), &data[..50]);
@@ -2983,11 +2994,7 @@ mod tests {
29832994
.await
29842995
.unwrap();
29852996
assert_eq!(append.size().await, target);
2986-
let read = append
2987-
.read_at(0, target as usize)
2988-
.await
2989-
.unwrap()
2990-
.coalesce();
2997+
let read = append.read_at(0, target as usize).await.unwrap().coalesce();
29912998
assert_eq!(read.as_ref(), &data[..target as usize]);
29922999
});
29933000
}

0 commit comments

Comments
 (0)