diff --git a/storage/fuzz/fuzz_targets/oversized_recovery.rs b/storage/fuzz/fuzz_targets/oversized_recovery.rs index 22e3b506244..4135fbba588 100644 --- a/storage/fuzz/fuzz_targets/oversized_recovery.rs +++ b/storage/fuzz/fuzz_targets/oversized_recovery.rs @@ -256,8 +256,7 @@ fn fuzz(input: FuzzInput) { if overlaps_existing_blob(offset, data.len(), size) { index_page_integrity_may_be_invalidated = true; } - let _ = blob.write_at(offset, data.to_vec()).await; - let _ = blob.sync().await; + let _ = blob.write_at_sync(offset, data.to_vec()).await; } } } @@ -271,8 +270,7 @@ fn fuzz(input: FuzzInput) { { if size > 0 { let offset = (size * (*offset_factor as u64)) / 256; - let _ = blob.write_at(offset, data.to_vec()).await; - let _ = blob.sync().await; + let _ = blob.write_at_sync(offset, data.to_vec()).await; } } } @@ -290,16 +288,14 @@ fn fuzz(input: FuzzInput) { if let Ok((blob, size)) = context.open(INDEX_PARTITION, §ion.to_be_bytes()).await { - let _ = blob.write_at(size, garbage.to_vec()).await; - let _ = blob.sync().await; + let _ = blob.write_at_sync(size, garbage.to_vec()).await; } } CorruptionType::ExtendGlob { section, garbage } => { if let Ok((blob, size)) = context.open(VALUE_PARTITION, §ion.to_be_bytes()).await { - let _ = blob.write_at(size, garbage.to_vec()).await; - let _ = blob.sync().await; + let _ = blob.write_at_sync(size, garbage.to_vec()).await; } } } diff --git a/storage/src/archive/immutable/mod.rs b/storage/src/archive/immutable/mod.rs index 365075fc945..d44f17133da 100644 --- a/storage/src/archive/immutable/mod.rs +++ b/storage/src/archive/immutable/mod.rs @@ -247,8 +247,7 @@ mod tests { // Write data after restart to confirm archive is functional let key = Sha256::hash(b"after-restart"); - archive.put(0, key, 42).await.unwrap(); - archive.sync().await.unwrap(); + archive.put_sync(0, key, 42).await.unwrap(); drop(archive); // Third init to verify persistence diff --git a/storage/src/archive/immutable/storage.rs b/storage/src/archive/immutable/storage.rs index 51a3ba8a4ff..52135b54f16 100644 --- a/storage/src/archive/immutable/storage.rs +++ b/storage/src/archive/immutable/storage.rs @@ -296,12 +296,12 @@ impl crate::archive::Archiv let checkpoint = freezer_result?; ordinal_result?; - // Update checkpoint + // Publish the freezer checkpoint with a single metadata sync after the + // freezer and ordinal state are durable. let freezer_key = U64::new(FREEZER_PREFIX, 0); - self.metadata.put(freezer_key, Record::Freezer(checkpoint)); - - // Sync metadata once underlying are synced - self.metadata.sync().await?; + self.metadata + .put_sync(freezer_key, Record::Freezer(checkpoint)) + .await?; Ok(()) } diff --git a/storage/src/freezer/mod.rs b/storage/src/freezer/mod.rs index beebcec5349..439a4e4385f 100644 --- a/storage/src/freezer/mod.rs +++ b/storage/src/freezer/mod.rs @@ -759,8 +759,7 @@ mod tests { { let (blob, _) = context.open(&cfg.table_partition, b"table").await.unwrap(); // Write incomplete table entry (only 10 bytes instead of 24) - blob.write_at(0, vec![0xFF; 10]).await.unwrap(); - blob.sync().await.unwrap(); + blob.write_at_sync(0, vec![0xFF; 10]).await.unwrap(); } // Reopen and verify it handles the corruption @@ -825,8 +824,7 @@ mod tests { let mut corrupted = entry_data.coalesce(); // Corrupt the CRC (last 4 bytes of the entry) corrupted.as_mut()[20] ^= 0xFF; - blob.write_at(0, corrupted).await.unwrap(); - blob.sync().await.unwrap(); + blob.write_at_sync(0, corrupted).await.unwrap(); } // Reopen and verify it handles invalid CRC @@ -887,10 +885,9 @@ mod tests { { let (blob, size) = context.open(&cfg.table_partition, b"table").await.unwrap(); // Append garbage data - blob.write_at(size, hex!("0xdeadbeef").to_vec()) + blob.write_at_sync(size, hex!("0xdeadbeef").to_vec()) .await .unwrap(); - blob.sync().await.unwrap(); } // Reopen and verify it handles extra bytes gracefully diff --git a/storage/src/freezer/storage.rs b/storage/src/freezer/storage.rs index 92bf9d676fe..bee0d4cd579 100644 --- a/storage/src/freezer/storage.rs +++ b/storage/src/freezer/storage.rs @@ -1286,8 +1286,7 @@ mod tests { corrupted.as_mut()[Entry::SIZE - 4] ^= 0xFF; // Corrupt CRC of second slot (last 4 bytes of second slot) corrupted.as_mut()[Entry::FULL_SIZE - 4] ^= 0xFF; - blob.write_at(0, corrupted).await.unwrap(); - blob.sync().await.unwrap(); + blob.write_at_sync(0, corrupted).await.unwrap(); } // Reopen to trigger recovery. The bug would set both cleared entries to diff --git a/storage/src/journal/contiguous/fixed.rs b/storage/src/journal/contiguous/fixed.rs index 4b8f749cf39..d724bb14e45 100644 --- a/storage/src/journal/contiguous/fixed.rs +++ b/storage/src/journal/contiguous/fixed.rs @@ -487,13 +487,15 @@ impl Journal { if needs_metadata_update { if pruning_boundary.is_multiple_of(items_per_blob) { metadata.remove(&PRUNING_BOUNDARY_KEY); + metadata.sync().await?; } else { - metadata.put( - PRUNING_BOUNDARY_KEY, - pruning_boundary.to_be_bytes().to_vec(), - ); + metadata + .put_sync( + PRUNING_BOUNDARY_KEY, + pruning_boundary.to_be_bytes().to_vec(), + ) + .await?; } - metadata.sync().await?; } // Invariant: Tail blob must exist, even if empty. This ensures we can reconstruct size on @@ -708,8 +710,9 @@ impl Journal { // Persist metadata if pruning_boundary is mid-section. if !size.is_multiple_of(items_per_blob) { - metadata.put(PRUNING_BOUNDARY_KEY, size.to_be_bytes().to_vec()); - metadata.sync().await?; + metadata + .put_sync(PRUNING_BOUNDARY_KEY, size.to_be_bytes().to_vec()) + .await?; } else if metadata.get(&PRUNING_BOUNDARY_KEY).is_some() { metadata.remove(&PRUNING_BOUNDARY_KEY); metadata.sync().await?; @@ -1002,8 +1005,7 @@ impl Journal { // Persist metadata only when pruning_boundary is mid-section. if !inner.pruning_boundary.is_multiple_of(self.items_per_blob) { let value = inner.pruning_boundary.to_be_bytes().to_vec(); - inner.metadata.put(PRUNING_BOUNDARY_KEY, value); - inner.metadata.sync().await?; + inner.metadata.put_sync(PRUNING_BOUNDARY_KEY, value).await?; } else if inner.metadata.get(&PRUNING_BOUNDARY_KEY).is_some() { inner.metadata.remove(&PRUNING_BOUNDARY_KEY); inner.metadata.sync().await?; @@ -1172,23 +1174,18 @@ mod tests { .await .expect("Failed to open legacy blob"); legacy_blob - .write_at(0, vec![0u8; 1]) + .write_at_sync(0, vec![0u8; 1]) .await .expect("Failed to write legacy blob"); - legacy_blob - .sync() - .await - .expect("Failed to sync legacy blob"); let (new_blob, _) = context .open(&blobs_partition, &0u64.to_be_bytes()) .await .expect("Failed to open new blob"); new_blob - .write_at(0, vec![0u8; 1]) + .write_at_sync(0, vec![0u8; 1]) .await .expect("Failed to write new blob"); - new_blob.sync().await.expect("Failed to sync new blob"); let result = Journal::<_, Digest>::init(context.child("second"), cfg.clone()).await; assert!(matches!(result, Err(Error::Corruption(_)))); @@ -1209,13 +1206,9 @@ mod tests { .await .expect("Failed to open legacy blob"); legacy_blob - .write_at(0, vec![0u8; 1]) + .write_at_sync(0, vec![0u8; 1]) .await .expect("Failed to write legacy blob"); - legacy_blob - .sync() - .await - .expect("Failed to sync legacy blob"); let journal = Journal::<_, Digest>::init(context.child("first"), cfg.clone()) .await @@ -1502,10 +1495,9 @@ mod tests { .expect("Failed to open blob"); // Write junk bytes. let bad_bytes = 123456789u32; - blob.write_at(1, bad_bytes.to_be_bytes().to_vec()) + blob.write_at_sync(1, bad_bytes.to_be_bytes().to_vec()) .await .expect("Failed to write bad bytes"); - blob.sync().await.expect("Failed to sync blob"); // Re-initialize the journal to simulate a restart let journal = Journal::init(context.child("second"), cfg.clone()) @@ -1929,10 +1921,9 @@ mod tests { .open(&blob_partition(&cfg), &0u64.to_be_bytes()) .await .expect("Failed to open blob"); - blob.write_at(size, vec![0u8; PAGE_SIZE.get() as usize * 3]) + blob.write_at_sync(size, vec![0u8; PAGE_SIZE.get() as usize * 3]) .await .expect("Failed to extend blob"); - blob.sync().await.expect("Failed to sync blob"); // Re-initialize the journal to simulate a restart let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone()) @@ -2945,8 +2936,10 @@ mod tests { Metadata::<_, u64, Vec>::init(context.child("restore_meta"), meta_cfg.clone()) .await .unwrap(); - metadata.put(PRUNING_BOUNDARY_KEY, 7u64.to_be_bytes().to_vec()); - metadata.sync().await.unwrap(); + metadata + .put_sync(PRUNING_BOUNDARY_KEY, 7u64.to_be_bytes().to_vec()) + .await + .unwrap(); // Crash Scenario 2: After ensure_section_exists(), before metadata update // Target: init_at_size(12) -> should be section 2 (starts at 10) diff --git a/storage/src/journal/segmented/oversized.rs b/storage/src/journal/segmented/oversized.rs index 7cea2c64d3a..2cfca1b969e 100644 --- a/storage/src/journal/segmented/oversized.rs +++ b/storage/src/journal/segmented/oversized.rs @@ -962,10 +962,9 @@ mod tests { // Last page CRC starts at offset 160 - 12 = 148 assert_eq!(size, 160); let last_page_crc_offset = size - 12; - blob.write_at(last_page_crc_offset, vec![0xFF; 12]) + blob.write_at_sync(last_page_crc_offset, vec![0xFF; 12]) .await .expect("Failed to corrupt"); - blob.sync().await.expect("Failed to sync"); drop(blob); // Reinitialize - should detect page corruption and truncate @@ -2416,10 +2415,9 @@ mod tests { // Write 100 bytes of garbage (simulating partial/failed value write) let garbage = vec![0xDE; 100]; - blob.write_at(size, garbage) + blob.write_at_sync(size, garbage) .await .expect("Failed to write garbage"); - blob.sync().await.expect("Failed to sync"); drop(blob); // Verify glob now has trailing garbage @@ -2529,10 +2527,9 @@ mod tests { // Write the complete physical page: entry_data + crc_record let mut page = entry_data; page.extend_from_slice(&crc_record); - blob.write_at(0, page) + blob.write_at_sync(0, page) .await .expect("Failed to write corrupted page"); - blob.sync().await.expect("Failed to sync"); drop(blob); // Reinitialize - recovery should detect the invalid entry diff --git a/storage/src/journal/segmented/variable.rs b/storage/src/journal/segmented/variable.rs index 4390066f2b8..986b8ef9cd1 100644 --- a/storage/src/journal/segmented/variable.rs +++ b/storage/src/journal/segmented/variable.rs @@ -1326,10 +1326,9 @@ mod tests { let mut incomplete_data = Vec::new(); UInt(u32::MAX).write(&mut incomplete_data); incomplete_data.truncate(1); - blob.write_at(0, incomplete_data) + blob.write_at_sync(0, incomplete_data) .await .expect("Failed to write incomplete data"); - blob.sync().await.expect("Failed to sync blob"); // Initialize the journal let journal = Journal::init(context, cfg) @@ -1449,10 +1448,9 @@ mod tests { UInt(item_size).write(&mut buf); // Varint encoding let data = [2u8; 5]; BufMut::put_slice(&mut buf, &data); - blob.write_at(0, buf) + blob.write_at_sync(0, buf) .await .expect("Failed to write incomplete item"); - blob.sync().await.expect("Failed to sync blob"); // Initialize the journal let journal = Journal::init(context, cfg) @@ -1508,12 +1506,10 @@ mod tests { let mut buf = Vec::new(); UInt(item_size).write(&mut buf); BufMut::put_slice(&mut buf, item_data); - blob.write_at(0, buf) + blob.write_at_sync(0, buf) .await .expect("Failed to write item without checksum"); - blob.sync().await.expect("Failed to sync blob"); - // Initialize the journal let journal = Journal::init(context, cfg) .await @@ -1572,12 +1568,10 @@ mod tests { UInt(item_size).write(&mut buf); BufMut::put_slice(&mut buf, item_data); buf.put_u32(incorrect_checksum); - blob.write_at(0, buf) + blob.write_at_sync(0, buf) .await .expect("Failed to write item with bad checksum"); - blob.sync().await.expect("Failed to sync blob"); - // Initialize the journal let journal = Journal::init(context.child("storage"), cfg.clone()) .await @@ -1802,10 +1796,9 @@ mod tests { .open(&cfg.partition, &2u64.to_be_bytes()) .await .expect("Failed to open blob"); - blob.write_at(blob_size, vec![0u8; 16]) + blob.write_at_sync(blob_size, vec![0u8; 16]) .await .expect("Failed to add extra data"); - blob.sync().await.expect("Failed to sync blob"); // Re-initialize the journal to simulate a restart let journal = Journal::init(context.child("second"), cfg) @@ -2296,10 +2289,9 @@ mod tests { // Write incomplete varint: 0xFF has continuation bit set, needs more bytes // This creates 2 trailing bytes that cannot form a valid item - blob.write_at(physical_size_before, vec![0xFF, 0xFF]) + blob.write_at_sync(physical_size_before, vec![0xFF, 0xFF]) .await .unwrap(); - blob.sync().await.unwrap(); // Reopen journal and replay starting PAST all valid items // (start_offset = valid_logical_size means we skip all valid data) diff --git a/storage/src/merkle/persisted/compact.rs b/storage/src/merkle/persisted/compact.rs index 8389863728c..3f8fc1de3aa 100644 --- a/storage/src/merkle/persisted/compact.rs +++ b/storage/src/merkle/persisted/compact.rs @@ -390,8 +390,9 @@ impl Merkle { ); } let result = update(&mut metadata, target_slot, witness)?; - metadata.put(U64::new(GEN_PTR_PREFIX, 0), vec![target_slot]); - metadata.sync().await?; + metadata + .put_sync(U64::new(GEN_PTR_PREFIX, 0), vec![target_slot]) + .await?; result }; @@ -451,8 +452,9 @@ impl Merkle { let old_current_leaves = Self::read_slot_size(&metadata, current_slot)?.unwrap_or(Location::new(0)); Self::clear_slot(&mut metadata, current_slot, old_current_leaves); - metadata.put(U64::new(GEN_PTR_PREFIX, 0), vec![target_slot]); - metadata.sync().await?; + metadata + .put_sync(U64::new(GEN_PTR_PREFIX, 0), vec![target_slot]) + .await?; } *self.inner.write() = new_mem; @@ -746,13 +748,15 @@ mod tests { ) .await .unwrap(); - metadata.put( - U64::new(size_prefix(slot), 0), - (mmr::Family::MAX_LEAVES.as_u64() + 1) - .to_be_bytes() - .to_vec(), - ); - metadata.sync().await.unwrap(); + metadata + .put_sync( + U64::new(size_prefix(slot), 0), + (mmr::Family::MAX_LEAVES.as_u64() + 1) + .to_be_bytes() + .to_vec(), + ) + .await + .unwrap(); let reopened = TestMerkle::::init(context.child("second"), cfg).await; assert!(matches!( diff --git a/storage/src/merkle/persisted/full.rs b/storage/src/merkle/persisted/full.rs index 6ea42bc428e..e67c7570b84 100644 --- a/storage/src/merkle/persisted/full.rs +++ b/storage/src/merkle/persisted/full.rs @@ -637,15 +637,16 @@ impl Merkle remove -> put same key metadata.put(key.clone(), b"first".to_vec()); metadata.remove(&key); - metadata.put(key.clone(), b"second".to_vec()); - metadata.sync().await.unwrap(); + metadata + .put_sync(key.clone(), b"second".to_vec()) + .await + .unwrap(); let value = metadata.get(&key).unwrap(); assert_eq!(value, b"second"); @@ -843,8 +846,10 @@ mod tests { let value = metadata.get_mut(&key).unwrap(); value[0] = b'T'; metadata.remove(&key); - metadata.put(key.clone(), b"fourth".to_vec()); - metadata.sync().await.unwrap(); + metadata + .put_sync(key.clone(), b"fourth".to_vec()) + .await + .unwrap(); let value = metadata.get(&key).unwrap(); assert_eq!(value, b"fourth"); @@ -1006,8 +1011,10 @@ mod tests { // Initial data metadata.put(U64::new(1), b"first".to_vec()); - metadata.put(U64::new(2), b"second".to_vec()); - metadata.sync().await.unwrap(); + metadata + .put_sync(U64::new(2), b"second".to_vec()) + .await + .unwrap(); // Clear everything metadata.clear(); @@ -1029,8 +1036,10 @@ mod tests { // Repopulate with different data metadata.put(U64::new(3), b"third".to_vec()); - metadata.put(U64::new(4), b"fourth".to_vec()); - metadata.sync().await.unwrap(); + metadata + .put_sync(U64::new(4), b"fourth".to_vec()) + .await + .unwrap(); // Verify new data assert_eq!(metadata.get(&U64::new(3)).unwrap(), b"third"); diff --git a/storage/src/metadata/storage.rs b/storage/src/metadata/storage.rs index ddcece8c5f1..aa5751f1361 100644 --- a/storage/src/metadata/storage.rs +++ b/storage/src/metadata/storage.rs @@ -270,6 +270,9 @@ impl Metadata { } /// Perform a [Self::put] and [Self::sync] in a single operation. + /// + /// Like calling [Self::sync] directly, this commits all pending metadata + /// changes, not just the provided key. pub async fn put_sync(&mut self, key: K, value: V) -> Result<(), Error> { self.put(key, value); self.sync().await @@ -434,12 +437,16 @@ impl Metadata { } next_data.put_u32(Crc32::checksum(&next_data[..])); - // Write and persist the new data - target.blob.write_at(0, next_data.clone()).await?; + // Shrinking rewrites must also persist the resize, so they need a full sync. if next_data.len() < target.data.len() { + target.blob.write_at(0, next_data.clone()).await?; target.blob.resize(next_data.len() as u64).await?; + target.blob.sync().await?; + } else { + // Non-shrinking rewrites are a single write and can use range-scoped + // durability. + target.blob.write_at_sync(0, next_data.clone()).await?; } - target.blob.sync().await?; // Update blob state target.version = next_version; diff --git a/storage/src/ordinal/mod.rs b/storage/src/ordinal/mod.rs index fb09ad7d427..c1eed4aad8a 100644 --- a/storage/src/ordinal/mod.rs +++ b/storage/src/ordinal/mod.rs @@ -568,8 +568,7 @@ mod tests { .await .unwrap(); // Corrupt the CRC by changing a byte - blob.write_at(32, vec![0xFF]).await.unwrap(); - blob.sync().await.unwrap(); + blob.write_at_sync(32, vec![0xFF]).await.unwrap(); } // Reopen and try to read corrupted data @@ -699,8 +698,7 @@ mod tests { .await .unwrap(); // Overwrite second record with partial data (32 bytes instead of 36) - blob.write_at(36, vec![0xFF; 32]).await.unwrap(); - blob.sync().await.unwrap(); + blob.write_at_sync(36, vec![0xFF; 32]).await.unwrap(); } // Reopen and verify it handles partial write gracefully @@ -768,10 +766,9 @@ mod tests { .await .unwrap(); // Corrupt some bytes in the value of the first record - blob.write_at(10, hex!("0xFFFFFFFF").to_vec()) + blob.write_at_sync(10, hex!("0xFFFFFFFF").to_vec()) .await .unwrap(); - blob.sync().await.unwrap(); } // Reopen and verify it detects corruption @@ -828,16 +825,14 @@ mod tests { .open("test-ordinal", &0u64.to_be_bytes()) .await .unwrap(); - blob.write_at(32, vec![0xFF]).await.unwrap(); // Corrupt CRC of index 0 - blob.sync().await.unwrap(); + blob.write_at_sync(32, vec![0xFF]).await.unwrap(); // Corrupt CRC of index 0 // Corrupt value in second blob (which will invalidate CRC) let (blob, _) = context .open("test-ordinal", &1u64.to_be_bytes()) .await .unwrap(); - blob.write_at(5, vec![0xFF; 4]).await.unwrap(); // Corrupt value of index 10 - blob.sync().await.unwrap(); + blob.write_at_sync(5, vec![0xFF; 4]).await.unwrap(); // Corrupt value of index 10 } // Reopen and verify handling of CRC corruptions @@ -908,8 +903,7 @@ mod tests { let invalid_crc = 0xDEADBEEFu32; garbage.extend_from_slice(&invalid_crc.to_be_bytes()); assert_eq!(garbage.len(), 36); // Full record size - blob.write_at(size, garbage).await.unwrap(); - blob.sync().await.unwrap(); + blob.write_at_sync(size, garbage).await.unwrap(); } // Reopen and verify it handles extra bytes @@ -963,15 +957,13 @@ mod tests { // Write zeros for several record positions let zeros = vec![0u8; 36 * 5]; // 5 records worth of zeros - blob.write_at(0, zeros).await.unwrap(); + blob.write_at_sync(0, zeros).await.unwrap(); // Write a valid record after the zeros let mut valid_record = vec![44u8; 32]; let crc = Crc32::checksum(&valid_record); valid_record.extend_from_slice(&crc.to_be_bytes()); - blob.write_at(36 * 5, valid_record).await.unwrap(); - - blob.sync().await.unwrap(); + blob.write_at_sync(36 * 5, valid_record).await.unwrap(); } // Initialize store and verify it handles zero-filled records @@ -2032,8 +2024,7 @@ mod tests { .unwrap(); // Corrupt the CRC of record at index 2 let offset = 2 * 36 + 32; // 2 * record_size + value_size - blob.write_at(offset, vec![0xFF]).await.unwrap(); - blob.sync().await.unwrap(); + blob.write_at_sync(offset, vec![0xFF]).await.unwrap(); } // Reinitialize with bits that include the corrupted record diff --git a/storage/src/qmdb/immutable/compact.rs b/storage/src/qmdb/immutable/compact.rs index 31bc10c1074..77a6493279b 100644 --- a/storage/src/qmdb/immutable/compact.rs +++ b/storage/src/qmdb/immutable/compact.rs @@ -622,8 +622,7 @@ mod tests { let mut metadata = open_metadata(context, partition).await; let mut bytes = metadata.get(&key).cloned().expect("metadata entry missing"); *bytes.last_mut().expect("metadata entry empty") ^= 0x01; - metadata.put(key, bytes); - metadata.sync().await.unwrap(); + metadata.put_sync(key, bytes).await.unwrap(); } async fn open_metadata( @@ -648,8 +647,7 @@ mod tests { bytes: Vec, ) { let mut metadata = open_metadata(context, partition).await; - metadata.put(key, bytes); - metadata.sync().await.unwrap(); + metadata.put_sync(key, bytes).await.unwrap(); } #[test_traced("INFO")] diff --git a/storage/src/qmdb/keyless/compact.rs b/storage/src/qmdb/keyless/compact.rs index 2533ae6fe68..20194d168d1 100644 --- a/storage/src/qmdb/keyless/compact.rs +++ b/storage/src/qmdb/keyless/compact.rs @@ -608,8 +608,7 @@ mod tests { let mut metadata = open_metadata(context, partition).await; let mut bytes = metadata.get(&key).cloned().expect("metadata entry missing"); *bytes.last_mut().expect("metadata entry empty") ^= 0x01; - metadata.put(key, bytes); - metadata.sync().await.unwrap(); + metadata.put_sync(key, bytes).await.unwrap(); } async fn open_metadata( @@ -634,8 +633,7 @@ mod tests { bytes: Vec, ) { let mut metadata = open_metadata(context, partition).await; - metadata.put(key, bytes); - metadata.sync().await.unwrap(); + metadata.put_sync(key, bytes).await.unwrap(); } #[test_traced("INFO")]