Skip to content

Commit e16d198

Browse files
sync repaired blobs immediately
1 parent 5bbdd79 commit e16d198

4 files changed

Lines changed: 206 additions & 1 deletion

File tree

storage/src/journal/contiguous/fixed.rs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -673,6 +673,7 @@ impl<E: Context, A: CodecFixedShared> Journal<E, A> {
673673
.journal
674674
.rewind(repair.section, repair.byte_offset)
675675
.await?;
676+
inner.journal.sync(repair.section).await?;
676677
}
677678

678679
let tail_section = size / items_per_blob;
@@ -1396,6 +1397,7 @@ mod tests {
13961397
use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256};
13971398
use commonware_macros::test_traced;
13981399
use commonware_runtime::{
1400+
buffer::paged::Append,
13991401
deterministic::{self, Context},
14001402
Blob, BufferPooler, Error as RuntimeError, Metrics as _, Runner, Storage, Supervisor as _,
14011403
};
@@ -2072,6 +2074,67 @@ mod tests {
20722074
});
20732075
}
20742076

2077+
#[test_traced]
2078+
fn test_fixed_journal_init_persists_trailing_item_repair() {
2079+
let executor = deterministic::Runner::default();
2080+
let ((blob_partition, expected_size), checkpoint) =
2081+
executor.start_and_recover(|context| async move {
2082+
let cfg = test_cfg(&context, NZU64!(5));
2083+
let blob_partition = blob_partition(&cfg);
2084+
let journal = Journal::init(context.child("first"), cfg.clone())
2085+
.await
2086+
.unwrap();
2087+
2088+
for i in 0..3 {
2089+
journal.append(&test_digest(i)).await.unwrap();
2090+
}
2091+
journal.sync().await.unwrap();
2092+
drop(journal);
2093+
2094+
let (blob, raw_size) = context
2095+
.open(&blob_partition, &0u64.to_be_bytes())
2096+
.await
2097+
.unwrap();
2098+
let append = Append::new(
2099+
blob,
2100+
raw_size,
2101+
2048,
2102+
CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2103+
)
2104+
.await
2105+
.unwrap();
2106+
let logical_size = append.size().await;
2107+
assert_eq!(logical_size, 3 * Digest::SIZE as u64);
2108+
append.resize(logical_size - 1).await.unwrap();
2109+
append.sync().await.unwrap();
2110+
drop(append);
2111+
2112+
let journal = Journal::<_, Digest>::init(context.child("second"), cfg)
2113+
.await
2114+
.unwrap();
2115+
assert_eq!(journal.size().await, 2);
2116+
drop(journal);
2117+
2118+
(blob_partition, 2 * Digest::SIZE as u64)
2119+
});
2120+
2121+
deterministic::Runner::from(checkpoint).start(move |context| async move {
2122+
let (blob, raw_size) = context
2123+
.open(&blob_partition, &0u64.to_be_bytes())
2124+
.await
2125+
.unwrap();
2126+
let append = Append::new(
2127+
blob,
2128+
raw_size,
2129+
2048,
2130+
CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2131+
)
2132+
.await
2133+
.unwrap();
2134+
assert_eq!(append.size().await, expected_size);
2135+
});
2136+
}
2137+
20752138
#[test_traced]
20762139
fn test_fixed_journal_recover_truncates_short_oldest_section() {
20772140
let executor = deterministic::Runner::default();

storage/src/journal/contiguous/variable.rs

Lines changed: 135 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1347,7 +1347,8 @@ mod tests {
13471347
use crate::journal::contiguous::tests::run_contiguous_tests;
13481348
use commonware_macros::test_traced;
13491349
use commonware_runtime::{
1350-
buffer::paged::CacheRef, deterministic, Metrics as _, Runner, Storage, Supervisor as _,
1350+
buffer::paged::{Append, CacheRef},
1351+
deterministic, Metrics as _, Runner, Storage, Supervisor as _,
13511352
};
13521353
use commonware_utils::{sequence::FixedBytes, NZUsize, NZU16, NZU64};
13531354
use futures::FutureExt as _;
@@ -2384,6 +2385,139 @@ mod tests {
23842385
});
23852386
}
23862387

2388+
#[test_traced]
2389+
fn test_variable_init_persists_offsets_trailing_item_repair() {
2390+
let executor = deterministic::Runner::default();
2391+
let ((offsets_blob_partition, expected_size), checkpoint) =
2392+
executor.start_and_recover(|context| async move {
2393+
let cfg = Config {
2394+
partition: "offsets-init-repair-sync".into(),
2395+
items_per_section: NZU64!(10),
2396+
compression: None,
2397+
codec_config: (),
2398+
page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
2399+
write_buffer: NZUsize!(1024),
2400+
};
2401+
let offsets_blob_partition = format!("{}-blobs", cfg.offsets_partition());
2402+
let expected_size = 2 * std::mem::size_of::<u64>() as u64;
2403+
2404+
let journal = Journal::<_, u64>::init(context.child("first"), cfg.clone())
2405+
.await
2406+
.unwrap();
2407+
journal.append(&10).await.unwrap();
2408+
journal.append(&20).await.unwrap();
2409+
journal.sync().await.unwrap();
2410+
drop(journal);
2411+
2412+
let (blob, raw_size) = context
2413+
.open(&offsets_blob_partition, &0u64.to_be_bytes())
2414+
.await
2415+
.unwrap();
2416+
let append = Append::new(
2417+
blob,
2418+
raw_size,
2419+
2048,
2420+
CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
2421+
)
2422+
.await
2423+
.unwrap();
2424+
assert_eq!(append.size().await, expected_size);
2425+
append.resize(expected_size + 1).await.unwrap();
2426+
append.sync().await.unwrap();
2427+
drop(append);
2428+
2429+
let journal = Journal::<_, u64>::init(context.child("second"), cfg)
2430+
.await
2431+
.unwrap();
2432+
assert_eq!(journal.bounds().await, 0..2);
2433+
drop(journal);
2434+
2435+
(offsets_blob_partition, expected_size)
2436+
});
2437+
2438+
deterministic::Runner::from(checkpoint).start(move |context| async move {
2439+
let (blob, raw_size) = context
2440+
.open(&offsets_blob_partition, &0u64.to_be_bytes())
2441+
.await
2442+
.unwrap();
2443+
let append = Append::new(
2444+
blob,
2445+
raw_size,
2446+
2048,
2447+
CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
2448+
)
2449+
.await
2450+
.unwrap();
2451+
assert_eq!(append.size().await, expected_size);
2452+
});
2453+
}
2454+
2455+
#[test_traced]
2456+
fn test_variable_init_persists_data_tail_repair() {
2457+
let executor = deterministic::Runner::default();
2458+
let ((data_partition, expected_size), checkpoint) =
2459+
executor.start_and_recover(|context| async move {
2460+
let cfg = Config {
2461+
partition: "data-init-repair-sync".into(),
2462+
items_per_section: NZU64!(10),
2463+
compression: None,
2464+
codec_config: (),
2465+
page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
2466+
write_buffer: NZUsize!(1024),
2467+
};
2468+
let data_partition = cfg.data_partition();
2469+
2470+
let journal = Journal::<_, u64>::init(context.child("first"), cfg.clone())
2471+
.await
2472+
.unwrap();
2473+
journal.append(&10).await.unwrap();
2474+
journal.append(&20).await.unwrap();
2475+
journal.sync().await.unwrap();
2476+
drop(journal);
2477+
2478+
let (blob, raw_size) = context
2479+
.open(&data_partition, &0u64.to_be_bytes())
2480+
.await
2481+
.unwrap();
2482+
let append = Append::new(
2483+
blob,
2484+
raw_size,
2485+
2048,
2486+
CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
2487+
)
2488+
.await
2489+
.unwrap();
2490+
let expected_size = append.size().await;
2491+
append.append(&[0xFF, 0xFF]).await.unwrap();
2492+
append.sync().await.unwrap();
2493+
drop(append);
2494+
2495+
let journal = Journal::<_, u64>::init(context.child("second"), cfg)
2496+
.await
2497+
.unwrap();
2498+
assert_eq!(journal.bounds().await, 0..2);
2499+
drop(journal);
2500+
2501+
(data_partition, expected_size)
2502+
});
2503+
2504+
deterministic::Runner::from(checkpoint).start(move |context| async move {
2505+
let (blob, raw_size) = context
2506+
.open(&data_partition, &0u64.to_be_bytes())
2507+
.await
2508+
.unwrap();
2509+
let append = Append::new(
2510+
blob,
2511+
raw_size,
2512+
2048,
2513+
CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)),
2514+
)
2515+
.await
2516+
.unwrap();
2517+
assert_eq!(append.size().await, expected_size);
2518+
});
2519+
}
2520+
23872521
/// Test recovery from crash after data sync but before offsets sync when journal was
23882522
/// previously emptied by pruning.
23892523
#[test_traced]

storage/src/journal/segmented/fixed.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,9 @@ impl<E: Storage + Metrics, A: CodecFixedShared> Journal<E, A> {
105105
"trailing bytes detected: truncating"
106106
);
107107
manager.rewind_section(section, valid_size).await?;
108+
// Startup repair is exceptional; make it durable immediately so callers do not
109+
// need to track repaired sections separately.
110+
manager.sync(section).await?;
108111
}
109112
}
110113

storage/src/journal/segmented/variable.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,11 @@ impl<E: Storage + Metrics, V: CodecShared> Journal<E, V> {
402402
new_size = state.valid_offset,
403403
"trailing bytes detected: truncating"
404404
);
405+
// Tail repair is exceptional; make it durable
406+
// immediately so callers do not need to track
407+
// replay-time repaired sections separately.
405408
state.blob.resize(state.valid_offset).await.ok()?;
409+
state.blob.sync().await.ok()?;
406410
}
407411
state.done = true;
408412
return if batch.is_empty() {
@@ -429,6 +433,7 @@ impl<E: Storage + Metrics, V: CodecShared> Journal<E, V> {
429433
"incomplete item at end: truncating"
430434
);
431435
state.blob.resize(state.valid_offset).await.ok()?;
436+
state.blob.sync().await.ok()?;
432437
state.done = true;
433438
return if batch.is_empty() {
434439
None

0 commit comments

Comments
 (0)