33use arbitrary:: { Arbitrary , Result , Unstructured } ;
44use commonware_cryptography:: { Hasher as _, Sha256 } ;
55use commonware_runtime:: { buffer:: paged:: CacheRef , deterministic, Metrics , Runner } ;
6- use commonware_storage:: journal:: contiguous:: {
7- fixed:: { Config as JournalConfig , Journal } ,
8- Reader ,
6+ use commonware_storage:: journal:: {
7+ contiguous:: {
8+ fixed:: { Config as JournalConfig , Journal } ,
9+ Many , Mutable as _, Reader ,
10+ } ,
11+ Error ,
912} ;
1013use commonware_utils:: { NZUsize , NZU16 , NZU64 } ;
1114use futures:: { pin_mut, StreamExt } ;
@@ -15,12 +18,23 @@ use std::num::NonZeroU16;
1518const MAX_REPLAY_BUF : usize = 2048 ;
1619const MAX_WRITE_BUF : usize = 2048 ;
1720const MAX_OPERATIONS : usize = 50 ;
21+ const MAX_APPEND_MANY : u8 = 20 ;
22+ const MAX_READ_MANY : usize = 16 ;
1823
1924fn bounded_non_zero ( u : & mut Unstructured < ' _ > ) -> Result < usize > {
2025 let v = u. int_in_range ( 1 ..=MAX_REPLAY_BUF ) ?;
2126 Ok ( v)
2227}
2328
29+ fn bounded_append_count ( u : & mut Unstructured < ' _ > ) -> Result < u8 > {
30+ u. int_in_range ( 0 ..=MAX_APPEND_MANY )
31+ }
32+
33+ fn bounded_positions ( u : & mut Unstructured < ' _ > ) -> Result < Vec < u64 > > {
34+ let len = u. int_in_range ( 0 ..=MAX_READ_MANY ) ?;
35+ ( 0 ..len) . map ( |_| u64:: arbitrary ( u) ) . collect ( )
36+ }
37+
2438#[ derive( Arbitrary , Debug , Clone ) ]
2539enum JournalOperation {
2640 Append {
@@ -45,10 +59,31 @@ enum JournalOperation {
4559 } ,
4660 Restart ,
4761 Destroy ,
62+ ReadMany {
63+ #[ arbitrary( with = bounded_positions) ]
64+ positions : Vec < u64 > ,
65+ } ,
4866 AppendMany {
67+ #[ arbitrary( with = bounded_append_count) ]
4968 count : u8 ,
5069 } ,
70+ AppendNested {
71+ #[ arbitrary( with = bounded_append_count) ]
72+ count_a : u8 ,
73+ #[ arbitrary( with = bounded_append_count) ]
74+ count_b : u8 ,
75+ } ,
76+ RewindTo {
77+ keep_value : u64 ,
78+ } ,
5179 MultipleSync ,
80+ TryReadSync {
81+ pos : u64 ,
82+ } ,
83+ PruningBoundary ,
84+ InitAtSize {
85+ size : u64 ,
86+ } ,
5287}
5388
5489#[ derive( Debug ) ]
@@ -103,6 +138,33 @@ fn fuzz(input: FuzzInput) {
103138 }
104139 }
105140
141+ JournalOperation :: ReadMany { positions } => {
142+ let reader = journal. reader ( ) . await ;
143+ let bounds = reader. bounds ( ) ;
144+ // Map fuzz positions into valid, sorted, deduplicated positions
145+ let mut mapped: Vec < u64 > = positions
146+ . iter ( )
147+ . filter_map ( |p| {
148+ if bounds. is_empty ( ) {
149+ return None ;
150+ }
151+ let len = bounds. end - bounds. start ;
152+ Some ( bounds. start + ( * p % len) )
153+ } )
154+ . collect ( ) ;
155+ mapped. sort_unstable ( ) ;
156+ mapped. dedup ( ) ;
157+ if !mapped. is_empty ( ) {
158+ let batch = reader. read_many ( & mapped) . await . unwrap ( ) ;
159+ assert_eq ! ( batch. len( ) , mapped. len( ) ) ;
160+ // Cross-check against individual reads
161+ for ( i, & pos) in mapped. iter ( ) . enumerate ( ) {
162+ let single = reader. read ( pos) . await . unwrap ( ) ;
163+ assert_eq ! ( batch[ i] , single) ;
164+ }
165+ }
166+ }
167+
106168 JournalOperation :: Size => {
107169 let size = journal. size ( ) . await ;
108170 assert_eq ! ( journal_size, size, "unexpected size" ) ;
@@ -177,11 +239,20 @@ fn fuzz(input: FuzzInput) {
177239 }
178240
179241 JournalOperation :: AppendMany { count } => {
180- for _ in 0 ..* count {
181- let digest = Sha256 :: hash ( & next_value. to_be_bytes ( ) ) ;
182- journal. append ( & digest) . await . unwrap ( ) ;
183- next_value += 1 ;
184- journal_size += 1 ;
242+ if * count == 0 {
243+ // Exercise the EmptyAppend error path
244+ let err = journal. append_many ( Many :: Flat ( & [ ] ) ) . await ;
245+ assert ! ( matches!( err, Err ( Error :: EmptyAppend ) ) ) ;
246+ } else {
247+ let items: Vec < _ > = ( 0 ..* count)
248+ . map ( |_| {
249+ let d = Sha256 :: hash ( & next_value. to_be_bytes ( ) ) ;
250+ next_value += 1 ;
251+ d
252+ } )
253+ . collect ( ) ;
254+ journal. append_many ( Many :: Flat ( & items) ) . await . unwrap ( ) ;
255+ journal_size += * count as u64 ;
185256 }
186257 }
187258
@@ -190,6 +261,76 @@ fn fuzz(input: FuzzInput) {
190261 journal. sync ( ) . await . unwrap ( ) ;
191262 journal. sync ( ) . await . unwrap ( ) ;
192263 }
264+
265+ JournalOperation :: AppendNested { count_a, count_b } => {
266+ if * count_a == 0 && * count_b == 0 {
267+ let err = journal. append_many ( Many :: Nested ( & [ & [ ] , & [ ] ] ) ) . await ;
268+ assert ! ( matches!( err, Err ( Error :: EmptyAppend ) ) ) ;
269+ } else {
270+ let items_a: Vec < _ > = ( 0 ..* count_a)
271+ . map ( |_| {
272+ let d = Sha256 :: hash ( & next_value. to_be_bytes ( ) ) ;
273+ next_value += 1 ;
274+ d
275+ } )
276+ . collect ( ) ;
277+ let items_b: Vec < _ > = ( 0 ..* count_b)
278+ . map ( |_| {
279+ let d = Sha256 :: hash ( & next_value. to_be_bytes ( ) ) ;
280+ next_value += 1 ;
281+ d
282+ } )
283+ . collect ( ) ;
284+ let slices: & [ & [ _ ] ] = & [ & items_a, & items_b] ;
285+ journal. append_many ( Many :: Nested ( slices) ) . await . unwrap ( ) ;
286+ journal_size += * count_a as u64 + * count_b as u64 ;
287+ }
288+ }
289+
290+ JournalOperation :: RewindTo { keep_value } => {
291+ if journal_size > oldest_retained_pos {
292+ let target = Sha256 :: hash ( & keep_value. to_be_bytes ( ) ) ;
293+ let new_size = journal. rewind_to ( |item| * item == target) . await . unwrap ( ) ;
294+ journal. sync ( ) . await . unwrap ( ) ;
295+ journal_size = new_size;
296+ oldest_retained_pos = journal. reader ( ) . await . bounds ( ) . start ;
297+ }
298+ }
299+
300+ JournalOperation :: TryReadSync { pos } => {
301+ let reader = journal. reader ( ) . await ;
302+ let bounds = reader. bounds ( ) ;
303+ if bounds. contains ( pos) {
304+ // Cross-check: sync result must match async result
305+ if let Some ( sync_val) = reader. try_read_sync ( * pos) {
306+ let async_val = reader. read ( * pos) . await . unwrap ( ) ;
307+ assert_eq ! ( sync_val, async_val) ;
308+ }
309+ }
310+ }
311+
312+ JournalOperation :: PruningBoundary => {
313+ let boundary = journal. pruning_boundary ( ) . await ;
314+ assert_eq ! ( boundary, oldest_retained_pos) ;
315+ }
316+
317+ JournalOperation :: InitAtSize { size } => {
318+ // Cap to avoid excessive memory use
319+ let target_size = * size % 256 ;
320+ drop ( journal) ;
321+ journal = Journal :: init_at_size (
322+ context
323+ . with_label ( "journal" )
324+ . with_attribute ( "instance" , restarts) ,
325+ cfg. clone ( ) ,
326+ target_size,
327+ )
328+ . await
329+ . unwrap ( ) ;
330+ restarts += 1 ;
331+ journal_size = journal. size ( ) . await ;
332+ oldest_retained_pos = journal. reader ( ) . await . bounds ( ) . start ;
333+ }
193334 }
194335 }
195336 } ) ;
0 commit comments