Skip to content

Commit c6a805b

Browse files
Update fixed journal operations (#3602)
Co-authored-by: Dan Laine <dan@commonware.xyz>
1 parent 5cce12d commit c6a805b

1 file changed

Lines changed: 149 additions & 8 deletions

File tree

storage/fuzz/fuzz_targets/fixed_journal_operations.rs

Lines changed: 149 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@
33
use arbitrary::{Arbitrary, Result, Unstructured};
44
use commonware_cryptography::{Hasher as _, Sha256};
55
use commonware_runtime::{buffer::paged::CacheRef, deterministic, Metrics, Runner};
6-
use commonware_storage::journal::contiguous::{
7-
fixed::{Config as JournalConfig, Journal},
8-
Reader,
6+
use commonware_storage::journal::{
7+
contiguous::{
8+
fixed::{Config as JournalConfig, Journal},
9+
Many, Mutable as _, Reader,
10+
},
11+
Error,
912
};
1013
use commonware_utils::{NZUsize, NZU16, NZU64};
1114
use futures::{pin_mut, StreamExt};
@@ -15,12 +18,23 @@ use std::num::NonZeroU16;
1518
const MAX_REPLAY_BUF: usize = 2048;
1619
const MAX_WRITE_BUF: usize = 2048;
1720
const MAX_OPERATIONS: usize = 50;
21+
const MAX_APPEND_MANY: u8 = 20;
22+
const MAX_READ_MANY: usize = 16;
1823

1924
fn bounded_non_zero(u: &mut Unstructured<'_>) -> Result<usize> {
2025
let v = u.int_in_range(1..=MAX_REPLAY_BUF)?;
2126
Ok(v)
2227
}
2328

29+
fn bounded_append_count(u: &mut Unstructured<'_>) -> Result<u8> {
30+
u.int_in_range(0..=MAX_APPEND_MANY)
31+
}
32+
33+
fn bounded_positions(u: &mut Unstructured<'_>) -> Result<Vec<u64>> {
34+
let len = u.int_in_range(0..=MAX_READ_MANY)?;
35+
(0..len).map(|_| u64::arbitrary(u)).collect()
36+
}
37+
2438
#[derive(Arbitrary, Debug, Clone)]
2539
enum JournalOperation {
2640
Append {
@@ -45,10 +59,31 @@ enum JournalOperation {
4559
},
4660
Restart,
4761
Destroy,
62+
ReadMany {
63+
#[arbitrary(with = bounded_positions)]
64+
positions: Vec<u64>,
65+
},
4866
AppendMany {
67+
#[arbitrary(with = bounded_append_count)]
4968
count: u8,
5069
},
70+
AppendNested {
71+
#[arbitrary(with = bounded_append_count)]
72+
count_a: u8,
73+
#[arbitrary(with = bounded_append_count)]
74+
count_b: u8,
75+
},
76+
RewindTo {
77+
keep_value: u64,
78+
},
5179
MultipleSync,
80+
TryReadSync {
81+
pos: u64,
82+
},
83+
PruningBoundary,
84+
InitAtSize {
85+
size: u64,
86+
},
5287
}
5388

5489
#[derive(Debug)]
@@ -103,6 +138,33 @@ fn fuzz(input: FuzzInput) {
103138
}
104139
}
105140

141+
JournalOperation::ReadMany { positions } => {
142+
let reader = journal.reader().await;
143+
let bounds = reader.bounds();
144+
// Map fuzz positions into valid, sorted, deduplicated positions
145+
let mut mapped: Vec<u64> = positions
146+
.iter()
147+
.filter_map(|p| {
148+
if bounds.is_empty() {
149+
return None;
150+
}
151+
let len = bounds.end - bounds.start;
152+
Some(bounds.start + (*p % len))
153+
})
154+
.collect();
155+
mapped.sort_unstable();
156+
mapped.dedup();
157+
if !mapped.is_empty() {
158+
let batch = reader.read_many(&mapped).await.unwrap();
159+
assert_eq!(batch.len(), mapped.len());
160+
// Cross-check against individual reads
161+
for (i, &pos) in mapped.iter().enumerate() {
162+
let single = reader.read(pos).await.unwrap();
163+
assert_eq!(batch[i], single);
164+
}
165+
}
166+
}
167+
106168
JournalOperation::Size => {
107169
let size = journal.size().await;
108170
assert_eq!(journal_size, size, "unexpected size");
@@ -177,11 +239,20 @@ fn fuzz(input: FuzzInput) {
177239
}
178240

179241
JournalOperation::AppendMany { count } => {
180-
for _ in 0..*count {
181-
let digest = Sha256::hash(&next_value.to_be_bytes());
182-
journal.append(&digest).await.unwrap();
183-
next_value += 1;
184-
journal_size += 1;
242+
if *count == 0 {
243+
// Exercise the EmptyAppend error path
244+
let err = journal.append_many(Many::Flat(&[])).await;
245+
assert!(matches!(err, Err(Error::EmptyAppend)));
246+
} else {
247+
let items: Vec<_> = (0..*count)
248+
.map(|_| {
249+
let d = Sha256::hash(&next_value.to_be_bytes());
250+
next_value += 1;
251+
d
252+
})
253+
.collect();
254+
journal.append_many(Many::Flat(&items)).await.unwrap();
255+
journal_size += *count as u64;
185256
}
186257
}
187258

@@ -190,6 +261,76 @@ fn fuzz(input: FuzzInput) {
190261
journal.sync().await.unwrap();
191262
journal.sync().await.unwrap();
192263
}
264+
265+
JournalOperation::AppendNested { count_a, count_b } => {
266+
if *count_a == 0 && *count_b == 0 {
267+
let err = journal.append_many(Many::Nested(&[&[], &[]])).await;
268+
assert!(matches!(err, Err(Error::EmptyAppend)));
269+
} else {
270+
let items_a: Vec<_> = (0..*count_a)
271+
.map(|_| {
272+
let d = Sha256::hash(&next_value.to_be_bytes());
273+
next_value += 1;
274+
d
275+
})
276+
.collect();
277+
let items_b: Vec<_> = (0..*count_b)
278+
.map(|_| {
279+
let d = Sha256::hash(&next_value.to_be_bytes());
280+
next_value += 1;
281+
d
282+
})
283+
.collect();
284+
let slices: &[&[_]] = &[&items_a, &items_b];
285+
journal.append_many(Many::Nested(slices)).await.unwrap();
286+
journal_size += *count_a as u64 + *count_b as u64;
287+
}
288+
}
289+
290+
JournalOperation::RewindTo { keep_value } => {
291+
if journal_size > oldest_retained_pos {
292+
let target = Sha256::hash(&keep_value.to_be_bytes());
293+
let new_size = journal.rewind_to(|item| *item == target).await.unwrap();
294+
journal.sync().await.unwrap();
295+
journal_size = new_size;
296+
oldest_retained_pos = journal.reader().await.bounds().start;
297+
}
298+
}
299+
300+
JournalOperation::TryReadSync { pos } => {
301+
let reader = journal.reader().await;
302+
let bounds = reader.bounds();
303+
if bounds.contains(pos) {
304+
// Cross-check: sync result must match async result
305+
if let Some(sync_val) = reader.try_read_sync(*pos) {
306+
let async_val = reader.read(*pos).await.unwrap();
307+
assert_eq!(sync_val, async_val);
308+
}
309+
}
310+
}
311+
312+
JournalOperation::PruningBoundary => {
313+
let boundary = journal.pruning_boundary().await;
314+
assert_eq!(boundary, oldest_retained_pos);
315+
}
316+
317+
JournalOperation::InitAtSize { size } => {
318+
// Cap to avoid excessive memory use
319+
let target_size = *size % 256;
320+
drop(journal);
321+
journal = Journal::init_at_size(
322+
context
323+
.with_label("journal")
324+
.with_attribute("instance", restarts),
325+
cfg.clone(),
326+
target_size,
327+
)
328+
.await
329+
.unwrap();
330+
restarts += 1;
331+
journal_size = journal.size().await;
332+
oldest_retained_pos = journal.reader().await.bounds().start;
333+
}
193334
}
194335
}
195336
});

0 commit comments

Comments
 (0)