Skip to content

Commit 17b68c3

Browse files
committed
Merge remote-tracking branch 'origin/main' into danlaine/contiguous-2
2 parents 57fd42b + c6a805b commit 17b68c3

3 files changed

Lines changed: 196 additions & 28 deletions

File tree

storage/fuzz/fuzz_targets/fixed_journal_operations.rs

Lines changed: 150 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,13 @@
33
use arbitrary::{Arbitrary, Result, Unstructured};
44
use commonware_cryptography::{Hasher as _, Sha256};
55
use commonware_runtime::{buffer::paged::CacheRef, deterministic, Metrics, Runner};
6-
use commonware_storage::journal::contiguous::fixed::{Config as JournalConfig, Journal};
6+
use commonware_storage::journal::{
7+
contiguous::{
8+
fixed::{Config as JournalConfig, Journal},
9+
Contiguous as _, Many, Mutable as _, Reader,
10+
},
11+
Error,
12+
};
713
use commonware_utils::{NZUsize, NZU16, NZU64};
814
use futures::{pin_mut, StreamExt};
915
use libfuzzer_sys::fuzz_target;
@@ -12,12 +18,23 @@ use std::num::NonZeroU16;
1218
const MAX_REPLAY_BUF: usize = 2048;
1319
const MAX_WRITE_BUF: usize = 2048;
1420
const MAX_OPERATIONS: usize = 50;
21+
const MAX_APPEND_MANY: u8 = 20;
22+
const MAX_READ_MANY: usize = 16;
1523

1624
fn bounded_non_zero(u: &mut Unstructured<'_>) -> Result<usize> {
1725
let v = u.int_in_range(1..=MAX_REPLAY_BUF)?;
1826
Ok(v)
1927
}
2028

29+
fn bounded_append_count(u: &mut Unstructured<'_>) -> Result<u8> {
30+
u.int_in_range(0..=MAX_APPEND_MANY)
31+
}
32+
33+
fn bounded_positions(u: &mut Unstructured<'_>) -> Result<Vec<u64>> {
34+
let len = u.int_in_range(0..=MAX_READ_MANY)?;
35+
(0..len).map(|_| u64::arbitrary(u)).collect()
36+
}
37+
2138
#[derive(Arbitrary, Debug, Clone)]
2239
enum JournalOperation {
2340
Append {
@@ -42,10 +59,31 @@ enum JournalOperation {
4259
},
4360
Restart,
4461
Destroy,
62+
ReadMany {
63+
#[arbitrary(with = bounded_positions)]
64+
positions: Vec<u64>,
65+
},
4566
AppendMany {
67+
#[arbitrary(with = bounded_append_count)]
4668
count: u8,
4769
},
70+
AppendNested {
71+
#[arbitrary(with = bounded_append_count)]
72+
count_a: u8,
73+
#[arbitrary(with = bounded_append_count)]
74+
count_b: u8,
75+
},
76+
RewindTo {
77+
keep_value: u64,
78+
},
4879
MultipleSync,
80+
TryReadSync {
81+
pos: u64,
82+
},
83+
PruningBoundary,
84+
InitAtSize {
85+
size: u64,
86+
},
4987
}
5088

5189
#[derive(Debug)]
@@ -99,6 +137,33 @@ fn fuzz(input: FuzzInput) {
99137
}
100138
}
101139

140+
JournalOperation::ReadMany { positions } => {
141+
let reader = journal.reader().await;
142+
let bounds = reader.bounds();
143+
// Map fuzz positions into valid, sorted, deduplicated positions
144+
let mut mapped: Vec<u64> = positions
145+
.iter()
146+
.filter_map(|p| {
147+
if bounds.is_empty() {
148+
return None;
149+
}
150+
let len = bounds.end - bounds.start;
151+
Some(bounds.start + (*p % len))
152+
})
153+
.collect();
154+
mapped.sort_unstable();
155+
mapped.dedup();
156+
if !mapped.is_empty() {
157+
let batch = reader.read_many(&mapped).await.unwrap();
158+
assert_eq!(batch.len(), mapped.len());
159+
// Cross-check against individual reads
160+
for (i, &pos) in mapped.iter().enumerate() {
161+
let single = reader.read(pos).await.unwrap();
162+
assert_eq!(batch[i], single);
163+
}
164+
}
165+
}
166+
102167
JournalOperation::Size => {
103168
let size = journal.size();
104169
assert_eq!(journal_size, size, "unexpected size");
@@ -172,11 +237,20 @@ fn fuzz(input: FuzzInput) {
172237
}
173238

174239
JournalOperation::AppendMany { count } => {
175-
for _ in 0..*count {
176-
let digest = Sha256::hash(&next_value.to_be_bytes());
177-
journal.append(&digest).await.unwrap();
178-
next_value += 1;
179-
journal_size += 1;
240+
if *count == 0 {
241+
// Exercise the EmptyAppend error path
242+
let err = journal.append_many(Many::Flat(&[])).await;
243+
assert!(matches!(err, Err(Error::EmptyAppend)));
244+
} else {
245+
let items: Vec<_> = (0..*count)
246+
.map(|_| {
247+
let d = Sha256::hash(&next_value.to_be_bytes());
248+
next_value += 1;
249+
d
250+
})
251+
.collect();
252+
journal.append_many(Many::Flat(&items)).await.unwrap();
253+
journal_size += *count as u64;
180254
}
181255
}
182256

@@ -185,6 +259,76 @@ fn fuzz(input: FuzzInput) {
185259
journal.sync().await.unwrap();
186260
journal.sync().await.unwrap();
187261
}
262+
263+
JournalOperation::AppendNested { count_a, count_b } => {
264+
if *count_a == 0 && *count_b == 0 {
265+
let err = journal.append_many(Many::Nested(&[&[], &[]])).await;
266+
assert!(matches!(err, Err(Error::EmptyAppend)));
267+
} else {
268+
let items_a: Vec<_> = (0..*count_a)
269+
.map(|_| {
270+
let d = Sha256::hash(&next_value.to_be_bytes());
271+
next_value += 1;
272+
d
273+
})
274+
.collect();
275+
let items_b: Vec<_> = (0..*count_b)
276+
.map(|_| {
277+
let d = Sha256::hash(&next_value.to_be_bytes());
278+
next_value += 1;
279+
d
280+
})
281+
.collect();
282+
let slices: &[&[_]] = &[&items_a, &items_b];
283+
journal.append_many(Many::Nested(slices)).await.unwrap();
284+
journal_size += *count_a as u64 + *count_b as u64;
285+
}
286+
}
287+
288+
JournalOperation::RewindTo { keep_value } => {
289+
if journal_size > oldest_retained_pos {
290+
let target = Sha256::hash(&keep_value.to_be_bytes());
291+
let new_size = journal.rewind_to(|item| *item == target).await.unwrap();
292+
journal.sync().await.unwrap();
293+
journal_size = new_size;
294+
oldest_retained_pos = journal.reader().await.bounds().start;
295+
}
296+
}
297+
298+
JournalOperation::TryReadSync { pos } => {
299+
let reader = journal.reader().await;
300+
let bounds = reader.bounds();
301+
if bounds.contains(pos) {
302+
// Cross-check: sync result must match async result
303+
if let Some(sync_val) = reader.try_read_sync(*pos) {
304+
let async_val = reader.read(*pos).await.unwrap();
305+
assert_eq!(sync_val, async_val);
306+
}
307+
}
308+
}
309+
310+
JournalOperation::PruningBoundary => {
311+
let boundary = journal.pruning_boundary();
312+
assert_eq!(boundary, oldest_retained_pos);
313+
}
314+
315+
JournalOperation::InitAtSize { size } => {
316+
// Cap to avoid excessive memory use
317+
let target_size = *size % 256;
318+
drop(journal);
319+
journal = Journal::init_at_size(
320+
context
321+
.with_label("journal")
322+
.with_attribute("instance", restarts),
323+
cfg.clone(),
324+
target_size,
325+
)
326+
.await
327+
.unwrap();
328+
restarts += 1;
329+
journal_size = journal.size();
330+
oldest_retained_pos = journal.reader().await.bounds().start;
331+
}
188332
}
189333
}
190334
});

storage/src/qmdb/current/batch.rs

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@ use crate::{
2525
},
2626
Context,
2727
};
28+
use ahash::AHasher;
2829
use commonware_codec::Codec;
2930
use commonware_cryptography::{Digest, Hasher};
3031
use commonware_utils::bitmap::{Prunable as BitMap, Readable as BitmapReadable};
3132
use std::{
32-
collections::{BTreeMap, BTreeSet},
33+
collections::{BTreeSet, HashMap},
34+
hash::BuildHasherDefault,
3335
sync::Arc,
3436
};
3537

@@ -41,17 +43,20 @@ use std::{
4143
#[derive(Clone, Debug, Default)]
4244
pub(crate) struct ChunkOverlay<const N: usize> {
4345
/// Dirty chunks: chunk_idx -> materialized chunk bytes.
44-
pub(crate) chunks: BTreeMap<usize, [u8; N]>,
46+
///
47+
/// `ahash` (fast on integer keys) with `BuildHasherDefault` (no per-construction RNG
48+
/// sampling). Iteration order is not observed by any consumer.
49+
pub(crate) chunks: HashMap<usize, [u8; N], BuildHasherDefault<AHasher>>,
4550
/// Total number of bits (parent + new operations).
4651
pub(crate) len: u64,
4752
}
4853

4954
impl<const N: usize> ChunkOverlay<N> {
5055
const CHUNK_BITS: u64 = BitMap::<N>::CHUNK_SIZE_BITS;
5156

52-
const fn new(len: u64) -> Self {
57+
fn new(len: u64) -> Self {
5358
Self {
54-
chunks: BTreeMap::new(),
59+
chunks: HashMap::default(),
5560
len,
5661
}
5762
}
@@ -590,6 +595,7 @@ where
590595
// compute_db_root sees newly completed chunks. Using bitmap_parent alone would miss chunks
591596
// that transitioned from partial to complete in this batch.
592597
let bitmap_batch = BitmapBatch::Layer(Arc::new(BitmapBatchLayer {
598+
pruned_chunks: bitmap_parent.pruned_chunks(),
593599
parent: bitmap_parent.clone(),
594600
overlay: Arc::new(overlay),
595601
}));
@@ -648,6 +654,10 @@ pub(crate) struct BitmapBatchLayer<const N: usize> {
648654
pub(crate) parent: BitmapBatch<N>,
649655
/// Chunk-level overlay: materialized bytes for every chunk that differs from parent.
650656
pub(crate) overlay: Arc<ChunkOverlay<N>>,
657+
/// Pruned-chunk count, copied from `parent` at construction. Invariant across the whole
658+
/// layer chain (pruning only happens on the committed base), so caching here lets
659+
/// `BitmapBatch::pruned_chunks` return in O(1) instead of walking to the Base.
660+
pub(crate) pruned_chunks: usize,
651661
}
652662

653663
impl<const N: usize> BitmapBatch<N> {
@@ -660,14 +670,17 @@ impl<const N: usize> BitmapReadable<N> for BitmapBatch<N> {
660670
}
661671

662672
fn get_chunk(&self, idx: usize) -> [u8; N] {
663-
match self {
664-
Self::Base(bm) => *bm.get_chunk(idx),
665-
Self::Layer(layer) => {
666-
// Check overlay first; fall through to parent if unmodified.
667-
if let Some(&chunk) = layer.overlay.get(idx) {
668-
chunk
669-
} else {
670-
layer.parent.get_chunk(idx)
673+
// Walk the layer chain. Each layer's overlay either holds the chunk (return it) or
674+
// doesn't (descend).
675+
let mut current = self;
676+
loop {
677+
match current {
678+
Self::Base(bm) => return *bm.get_chunk(idx),
679+
Self::Layer(layer) => {
680+
if let Some(&chunk) = layer.overlay.get(idx) {
681+
return chunk;
682+
}
683+
current = &layer.parent;
671684
}
672685
}
673686
}
@@ -691,7 +704,7 @@ impl<const N: usize> BitmapReadable<N> for BitmapBatch<N> {
691704
fn pruned_chunks(&self) -> usize {
692705
match self {
693706
Self::Base(bm) => bm.pruned_chunks(),
694-
Self::Layer(layer) => layer.parent.pruned_chunks(),
707+
Self::Layer(layer) => layer.pruned_chunks,
695708
}
696709
}
697710

@@ -723,8 +736,13 @@ impl<const N: usize> BitmapBatch<N> {
723736
}
724737

725738
// Slow path: create a new layer.
739+
let pruned_chunks = self.pruned_chunks();
726740
let parent = self.clone();
727-
*self = Self::Layer(Arc::new(BitmapBatchLayer { parent, overlay }));
741+
*self = Self::Layer(Arc::new(BitmapBatchLayer {
742+
parent,
743+
overlay,
744+
pruned_chunks,
745+
}));
728746
}
729747

730748
/// Flatten all layers back to a single `Base(Arc<BitMap<N>>)`.

utils/src/bitmap/mod.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -967,7 +967,13 @@ pub trait Readable<const N: usize> {
967967
where
968968
Self: Sized,
969969
{
970-
OnesIter { bitmap: self, pos }
970+
let len = self.len();
971+
let pruned_start = (self.pruned_chunks() as u64) * BitMap::<N>::CHUNK_SIZE_BITS;
972+
OnesIter {
973+
bitmap: self,
974+
pos: pos.max(pruned_start),
975+
len,
976+
}
971977
}
972978
}
973979

@@ -1002,19 +1008,19 @@ impl<const N: usize> Readable<N> for BitMap<N> {
10021008
pub struct OnesIter<'a, B, const N: usize> {
10031009
bitmap: &'a B,
10041010
pos: u64,
1011+
/// Cached `bitmap.len()` at iterator construction. The underlying bitmap is borrowed
1012+
/// immutably for the iterator's lifetime, so this can never change mid-iteration.
1013+
/// For layered bitmaps (e.g. `BitmapBatch`), `len()` walks the layer chain, so caching
1014+
/// this avoids that walk on every `next`.
1015+
len: u64,
10051016
}
10061017

10071018
impl<B: Readable<N>, const N: usize> iter::Iterator for OnesIter<'_, B, N> {
10081019
type Item = u64;
10091020

10101021
fn next(&mut self) -> Option<u64> {
1011-
let len = self.bitmap.len();
10121022
let chunk_bits = BitMap::<N>::CHUNK_SIZE_BITS;
1013-
let pruned_start = self.bitmap.pruned_chunks() as u64 * chunk_bits;
1014-
if self.pos < pruned_start {
1015-
self.pos = pruned_start;
1016-
}
1017-
while self.pos < len {
1023+
while self.pos < self.len {
10181024
let chunk_idx = BitMap::<N>::to_chunk_index(self.pos);
10191025
let chunk = self.bitmap.get_chunk(chunk_idx);
10201026
let chunk_start = chunk_idx as u64 * chunk_bits;
@@ -1027,7 +1033,7 @@ impl<B: Readable<N>, const N: usize> iter::Iterator for OnesIter<'_, B, N> {
10271033
let found = chunk_start
10281034
+ (byte_idx * 8 + bit_in_byte) as u64
10291035
+ masked.trailing_zeros() as u64;
1030-
if found >= len {
1036+
if found >= self.len {
10311037
return None;
10321038
}
10331039
self.pos = found + 1;

0 commit comments

Comments
 (0)