Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions storage/fuzz/fuzz_targets/oversized_recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,7 @@ fn fuzz(input: FuzzInput) {
if overlaps_existing_blob(offset, data.len(), size) {
index_page_integrity_may_be_invalidated = true;
}
let _ = blob.write_at(offset, data.to_vec()).await;
let _ = blob.sync().await;
let _ = blob.write_at_sync(offset, data.to_vec()).await;
}
}
}
Expand All @@ -271,8 +270,7 @@ fn fuzz(input: FuzzInput) {
{
if size > 0 {
let offset = (size * (*offset_factor as u64)) / 256;
let _ = blob.write_at(offset, data.to_vec()).await;
let _ = blob.sync().await;
let _ = blob.write_at_sync(offset, data.to_vec()).await;
}
}
}
Expand All @@ -290,16 +288,14 @@ fn fuzz(input: FuzzInput) {
if let Ok((blob, size)) =
context.open(INDEX_PARTITION, &section.to_be_bytes()).await
{
let _ = blob.write_at(size, garbage.to_vec()).await;
let _ = blob.sync().await;
let _ = blob.write_at_sync(size, garbage.to_vec()).await;
}
}
CorruptionType::ExtendGlob { section, garbage } => {
if let Ok((blob, size)) =
context.open(VALUE_PARTITION, &section.to_be_bytes()).await
{
let _ = blob.write_at(size, garbage.to_vec()).await;
let _ = blob.sync().await;
let _ = blob.write_at_sync(size, garbage.to_vec()).await;
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions storage/src/archive/immutable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,7 @@ mod tests {

// Write data after restart to confirm archive is functional
let key = Sha256::hash(b"after-restart");
archive.put(0, key, 42).await.unwrap();
archive.sync().await.unwrap();
archive.put_sync(0, key, 42).await.unwrap();
drop(archive);

// Third init to verify persistence
Expand Down
10 changes: 5 additions & 5 deletions storage/src/archive/immutable/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,12 +296,12 @@ impl<E: BufferPooler + Context, K: Array, V: CodecShared> crate::archive::Archiv
let checkpoint = freezer_result?;
ordinal_result?;

// Update checkpoint
// Publish the freezer checkpoint with a single metadata sync after the
// freezer and ordinal state are durable.
let freezer_key = U64::new(FREEZER_PREFIX, 0);
self.metadata.put(freezer_key, Record::Freezer(checkpoint));

// Sync metadata once underlying are synced
self.metadata.sync().await?;
self.metadata
.put_sync(freezer_key, Record::Freezer(checkpoint))
.await?;

Ok(())
}
Expand Down
9 changes: 3 additions & 6 deletions storage/src/freezer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -759,8 +759,7 @@ mod tests {
{
let (blob, _) = context.open(&cfg.table_partition, b"table").await.unwrap();
// Write incomplete table entry (only 10 bytes instead of 24)
blob.write_at(0, vec![0xFF; 10]).await.unwrap();
blob.sync().await.unwrap();
blob.write_at_sync(0, vec![0xFF; 10]).await.unwrap();
}

// Reopen and verify it handles the corruption
Expand Down Expand Up @@ -825,8 +824,7 @@ mod tests {
let mut corrupted = entry_data.coalesce();
// Corrupt the CRC (last 4 bytes of the entry)
corrupted.as_mut()[20] ^= 0xFF;
blob.write_at(0, corrupted).await.unwrap();
blob.sync().await.unwrap();
blob.write_at_sync(0, corrupted).await.unwrap();
}

// Reopen and verify it handles invalid CRC
Expand Down Expand Up @@ -887,10 +885,9 @@ mod tests {
{
let (blob, size) = context.open(&cfg.table_partition, b"table").await.unwrap();
// Append garbage data
blob.write_at(size, hex!("0xdeadbeef").to_vec())
blob.write_at_sync(size, hex!("0xdeadbeef").to_vec())
.await
.unwrap();
blob.sync().await.unwrap();
}

// Reopen and verify it handles extra bytes gracefully
Expand Down
3 changes: 1 addition & 2 deletions storage/src/freezer/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1286,8 +1286,7 @@ mod tests {
corrupted.as_mut()[Entry::SIZE - 4] ^= 0xFF;
// Corrupt CRC of second slot (last 4 bytes of second slot)
corrupted.as_mut()[Entry::FULL_SIZE - 4] ^= 0xFF;
blob.write_at(0, corrupted).await.unwrap();
blob.sync().await.unwrap();
blob.write_at_sync(0, corrupted).await.unwrap();
}

// Reopen to trigger recovery. The bug would set both cleared entries to
Expand Down
47 changes: 20 additions & 27 deletions storage/src/journal/contiguous/fixed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,13 +487,15 @@ impl<E: Context, A: CodecFixedShared> Journal<E, A> {
if needs_metadata_update {
if pruning_boundary.is_multiple_of(items_per_blob) {
metadata.remove(&PRUNING_BOUNDARY_KEY);
metadata.sync().await?;
} else {
metadata.put(
PRUNING_BOUNDARY_KEY,
pruning_boundary.to_be_bytes().to_vec(),
);
metadata
.put_sync(
PRUNING_BOUNDARY_KEY,
pruning_boundary.to_be_bytes().to_vec(),
)
.await?;
}
metadata.sync().await?;
}

// Invariant: Tail blob must exist, even if empty. This ensures we can reconstruct size on
Expand Down Expand Up @@ -708,8 +710,9 @@ impl<E: Context, A: CodecFixedShared> Journal<E, A> {

// Persist metadata if pruning_boundary is mid-section.
if !size.is_multiple_of(items_per_blob) {
metadata.put(PRUNING_BOUNDARY_KEY, size.to_be_bytes().to_vec());
metadata.sync().await?;
metadata
.put_sync(PRUNING_BOUNDARY_KEY, size.to_be_bytes().to_vec())
.await?;
} else if metadata.get(&PRUNING_BOUNDARY_KEY).is_some() {
metadata.remove(&PRUNING_BOUNDARY_KEY);
metadata.sync().await?;
Expand Down Expand Up @@ -1002,8 +1005,7 @@ impl<E: Context, A: CodecFixedShared> Journal<E, A> {
// Persist metadata only when pruning_boundary is mid-section.
if !inner.pruning_boundary.is_multiple_of(self.items_per_blob) {
let value = inner.pruning_boundary.to_be_bytes().to_vec();
inner.metadata.put(PRUNING_BOUNDARY_KEY, value);
inner.metadata.sync().await?;
inner.metadata.put_sync(PRUNING_BOUNDARY_KEY, value).await?;
} else if inner.metadata.get(&PRUNING_BOUNDARY_KEY).is_some() {
inner.metadata.remove(&PRUNING_BOUNDARY_KEY);
inner.metadata.sync().await?;
Expand Down Expand Up @@ -1172,23 +1174,18 @@ mod tests {
.await
.expect("Failed to open legacy blob");
legacy_blob
.write_at(0, vec![0u8; 1])
.write_at_sync(0, vec![0u8; 1])
.await
.expect("Failed to write legacy blob");
legacy_blob
.sync()
.await
.expect("Failed to sync legacy blob");

let (new_blob, _) = context
.open(&blobs_partition, &0u64.to_be_bytes())
.await
.expect("Failed to open new blob");
new_blob
.write_at(0, vec![0u8; 1])
.write_at_sync(0, vec![0u8; 1])
.await
.expect("Failed to write new blob");
new_blob.sync().await.expect("Failed to sync new blob");

let result = Journal::<_, Digest>::init(context.child("second"), cfg.clone()).await;
assert!(matches!(result, Err(Error::Corruption(_))));
Expand All @@ -1209,13 +1206,9 @@ mod tests {
.await
.expect("Failed to open legacy blob");
legacy_blob
.write_at(0, vec![0u8; 1])
.write_at_sync(0, vec![0u8; 1])
.await
.expect("Failed to write legacy blob");
legacy_blob
.sync()
.await
.expect("Failed to sync legacy blob");

let journal = Journal::<_, Digest>::init(context.child("first"), cfg.clone())
.await
Expand Down Expand Up @@ -1502,10 +1495,9 @@ mod tests {
.expect("Failed to open blob");
// Write junk bytes.
let bad_bytes = 123456789u32;
blob.write_at(1, bad_bytes.to_be_bytes().to_vec())
blob.write_at_sync(1, bad_bytes.to_be_bytes().to_vec())
.await
.expect("Failed to write bad bytes");
blob.sync().await.expect("Failed to sync blob");

// Re-initialize the journal to simulate a restart
let journal = Journal::init(context.child("second"), cfg.clone())
Expand Down Expand Up @@ -1929,10 +1921,9 @@ mod tests {
.open(&blob_partition(&cfg), &0u64.to_be_bytes())
.await
.expect("Failed to open blob");
blob.write_at(size, vec![0u8; PAGE_SIZE.get() as usize * 3])
blob.write_at_sync(size, vec![0u8; PAGE_SIZE.get() as usize * 3])
.await
.expect("Failed to extend blob");
blob.sync().await.expect("Failed to sync blob");

// Re-initialize the journal to simulate a restart
let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone())
Expand Down Expand Up @@ -2945,8 +2936,10 @@ mod tests {
Metadata::<_, u64, Vec<u8>>::init(context.child("restore_meta"), meta_cfg.clone())
.await
.unwrap();
metadata.put(PRUNING_BOUNDARY_KEY, 7u64.to_be_bytes().to_vec());
metadata.sync().await.unwrap();
metadata
.put_sync(PRUNING_BOUNDARY_KEY, 7u64.to_be_bytes().to_vec())
.await
.unwrap();

// Crash Scenario 2: After ensure_section_exists(), before metadata update
// Target: init_at_size(12) -> should be section 2 (starts at 10)
Expand Down
9 changes: 3 additions & 6 deletions storage/src/journal/segmented/oversized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -962,10 +962,9 @@ mod tests {
// Last page CRC starts at offset 160 - 12 = 148
assert_eq!(size, 160);
let last_page_crc_offset = size - 12;
blob.write_at(last_page_crc_offset, vec![0xFF; 12])
blob.write_at_sync(last_page_crc_offset, vec![0xFF; 12])
.await
.expect("Failed to corrupt");
blob.sync().await.expect("Failed to sync");
drop(blob);

// Reinitialize - should detect page corruption and truncate
Expand Down Expand Up @@ -2416,10 +2415,9 @@ mod tests {

// Write 100 bytes of garbage (simulating partial/failed value write)
let garbage = vec![0xDE; 100];
blob.write_at(size, garbage)
blob.write_at_sync(size, garbage)
.await
.expect("Failed to write garbage");
blob.sync().await.expect("Failed to sync");
drop(blob);

// Verify glob now has trailing garbage
Expand Down Expand Up @@ -2529,10 +2527,9 @@ mod tests {
// Write the complete physical page: entry_data + crc_record
let mut page = entry_data;
page.extend_from_slice(&crc_record);
blob.write_at(0, page)
blob.write_at_sync(0, page)
.await
.expect("Failed to write corrupted page");
blob.sync().await.expect("Failed to sync");
drop(blob);

// Reinitialize - recovery should detect the invalid entry
Expand Down
20 changes: 6 additions & 14 deletions storage/src/journal/segmented/variable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1326,10 +1326,9 @@ mod tests {
let mut incomplete_data = Vec::new();
UInt(u32::MAX).write(&mut incomplete_data);
incomplete_data.truncate(1);
blob.write_at(0, incomplete_data)
blob.write_at_sync(0, incomplete_data)
.await
.expect("Failed to write incomplete data");
blob.sync().await.expect("Failed to sync blob");

// Initialize the journal
let journal = Journal::init(context, cfg)
Expand Down Expand Up @@ -1449,10 +1448,9 @@ mod tests {
UInt(item_size).write(&mut buf); // Varint encoding
let data = [2u8; 5];
BufMut::put_slice(&mut buf, &data);
blob.write_at(0, buf)
blob.write_at_sync(0, buf)
.await
.expect("Failed to write incomplete item");
blob.sync().await.expect("Failed to sync blob");

// Initialize the journal
let journal = Journal::init(context, cfg)
Expand Down Expand Up @@ -1508,12 +1506,10 @@ mod tests {
let mut buf = Vec::new();
UInt(item_size).write(&mut buf);
BufMut::put_slice(&mut buf, item_data);
blob.write_at(0, buf)
blob.write_at_sync(0, buf)
.await
.expect("Failed to write item without checksum");

blob.sync().await.expect("Failed to sync blob");

// Initialize the journal
let journal = Journal::init(context, cfg)
.await
Expand Down Expand Up @@ -1572,12 +1568,10 @@ mod tests {
UInt(item_size).write(&mut buf);
BufMut::put_slice(&mut buf, item_data);
buf.put_u32(incorrect_checksum);
blob.write_at(0, buf)
blob.write_at_sync(0, buf)
.await
.expect("Failed to write item with bad checksum");

blob.sync().await.expect("Failed to sync blob");

// Initialize the journal
let journal = Journal::init(context.child("storage"), cfg.clone())
.await
Expand Down Expand Up @@ -1802,10 +1796,9 @@ mod tests {
.open(&cfg.partition, &2u64.to_be_bytes())
.await
.expect("Failed to open blob");
blob.write_at(blob_size, vec![0u8; 16])
blob.write_at_sync(blob_size, vec![0u8; 16])
.await
.expect("Failed to add extra data");
blob.sync().await.expect("Failed to sync blob");

// Re-initialize the journal to simulate a restart
let journal = Journal::init(context.child("second"), cfg)
Expand Down Expand Up @@ -2296,10 +2289,9 @@ mod tests {

// Write incomplete varint: 0xFF has continuation bit set, needs more bytes
// This creates 2 trailing bytes that cannot form a valid item
blob.write_at(physical_size_before, vec![0xFF, 0xFF])
blob.write_at_sync(physical_size_before, vec![0xFF, 0xFF])
.await
.unwrap();
blob.sync().await.unwrap();

// Reopen journal and replay starting PAST all valid items
// (start_offset = valid_logical_size means we skip all valid data)
Expand Down
26 changes: 15 additions & 11 deletions storage/src/merkle/persisted/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,9 @@ impl<F: Family, E: Context, D: Digest, S: Strategy> Merkle<F, E, D, S> {
);
}
let result = update(&mut metadata, target_slot, witness)?;
metadata.put(U64::new(GEN_PTR_PREFIX, 0), vec![target_slot]);
metadata.sync().await?;
metadata
.put_sync(U64::new(GEN_PTR_PREFIX, 0), vec![target_slot])
.await?;
result
};

Expand Down Expand Up @@ -451,8 +452,9 @@ impl<F: Family, E: Context, D: Digest, S: Strategy> Merkle<F, E, D, S> {
let old_current_leaves =
Self::read_slot_size(&metadata, current_slot)?.unwrap_or(Location::new(0));
Self::clear_slot(&mut metadata, current_slot, old_current_leaves);
metadata.put(U64::new(GEN_PTR_PREFIX, 0), vec![target_slot]);
metadata.sync().await?;
metadata
.put_sync(U64::new(GEN_PTR_PREFIX, 0), vec![target_slot])
.await?;
}

*self.inner.write() = new_mem;
Expand Down Expand Up @@ -746,13 +748,15 @@ mod tests {
)
.await
.unwrap();
metadata.put(
U64::new(size_prefix(slot), 0),
(mmr::Family::MAX_LEAVES.as_u64() + 1)
.to_be_bytes()
.to_vec(),
);
metadata.sync().await.unwrap();
metadata
.put_sync(
U64::new(size_prefix(slot), 0),
(mmr::Family::MAX_LEAVES.as_u64() + 1)
.to_be_bytes()
.to_vec(),
)
.await
.unwrap();

let reopened = TestMerkle::<mmr::Family>::init(context.child("second"), cfg).await;
assert!(matches!(
Expand Down
Loading
Loading