Skip to content

Commit c0e3b70

Browse files
committed
make mutating contiguous journal methods &mut
1 parent 266ff72 commit c0e3b70

36 files changed

Lines changed: 1026 additions & 1221 deletions

storage/fuzz/fuzz_targets/fixed_journal_operations.rs

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -131,15 +131,14 @@ fn fuzz(input: FuzzInput) {
131131
}
132132

133133
JournalOperation::Read { pos } => {
134-
let reader = journal.reader().await;
135-
let bounds = reader.bounds();
134+
let bounds = journal.bounds();
136135
if bounds.contains(pos) {
137-
reader.read(*pos).await.unwrap();
136+
journal.read(*pos).await.unwrap();
138137
}
139138
}
140139

141140
JournalOperation::ReadMany { positions } => {
142-
let reader = journal.reader().await;
141+
let reader = journal.reader();
143142
let bounds = reader.bounds();
144143
// Map fuzz positions into valid, sorted, deduplicated positions
145144
let mut mapped: Vec<u64> = positions
@@ -166,7 +165,7 @@ fn fuzz(input: FuzzInput) {
166165
}
167166

168167
JournalOperation::Size => {
169-
let size = journal.size().await;
168+
let size = journal.size();
170169
assert_eq!(journal_size, size, "unexpected size");
171170
}
172171

@@ -179,26 +178,25 @@ fn fuzz(input: FuzzInput) {
179178
journal.rewind(*size).await.unwrap();
180179
journal.sync().await.unwrap();
181180
journal_size = *size;
182-
oldest_retained_pos = journal.reader().await.bounds().start;
181+
oldest_retained_pos = journal.bounds().start;
183182
}
184183
}
185184

186185
JournalOperation::Bounds => {
187-
let _bounds = journal.reader().await.bounds();
186+
let _bounds = journal.bounds();
188187
}
189188

190189
JournalOperation::Prune { min_pos } => {
191190
if *min_pos <= journal_size {
192191
journal.prune(*min_pos).await.unwrap();
193-
oldest_retained_pos = journal.reader().await.bounds().start;
192+
oldest_retained_pos = journal.bounds().start;
194193
}
195194
}
196195

197196
JournalOperation::Replay { buffer, start_pos } => {
198-
let reader = journal.reader().await;
199-
let bounds = reader.bounds();
197+
let bounds = journal.bounds();
200198
let start_pos = bounds.start + (*start_pos % (bounds.end - bounds.start + 1));
201-
let replay = reader.replay(NZUsize!(*buffer), start_pos).await;
199+
let replay = journal.replay(NZUsize!(*buffer), start_pos).await;
202200

203201
match replay {
204202
Ok(stream) => {
@@ -229,8 +227,8 @@ fn fuzz(input: FuzzInput) {
229227
.unwrap();
230228
restarts += 1;
231229
// Reset tracking variables to match recovered state
232-
journal_size = journal.size().await;
233-
oldest_retained_pos = journal.reader().await.bounds().start;
230+
journal_size = journal.size();
231+
oldest_retained_pos = journal.bounds().start;
234232
}
235233

236234
JournalOperation::Destroy => {
@@ -293,12 +291,12 @@ fn fuzz(input: FuzzInput) {
293291
let new_size = journal.rewind_to(|item| *item == target).await.unwrap();
294292
journal.sync().await.unwrap();
295293
journal_size = new_size;
296-
oldest_retained_pos = journal.reader().await.bounds().start;
294+
oldest_retained_pos = journal.reader().bounds().start;
297295
}
298296
}
299297

300298
JournalOperation::TryReadSync { pos } => {
301-
let reader = journal.reader().await;
299+
let reader = journal.reader();
302300
let bounds = reader.bounds();
303301
if bounds.contains(pos) {
304302
// Cross-check: sync result must match async result
@@ -310,7 +308,7 @@ fn fuzz(input: FuzzInput) {
310308
}
311309

312310
JournalOperation::PruningBoundary => {
313-
let boundary = journal.pruning_boundary().await;
311+
let boundary = journal.pruning_boundary();
314312
assert_eq!(boundary, oldest_retained_pos);
315313
}
316314

@@ -328,8 +326,8 @@ fn fuzz(input: FuzzInput) {
328326
.await
329327
.unwrap();
330328
restarts += 1;
331-
journal_size = journal.size().await;
332-
oldest_retained_pos = journal.reader().await.bounds().start;
329+
journal_size = journal.size();
330+
oldest_retained_pos = journal.reader().bounds().start;
333331
}
334332
}
335333
}

storage/fuzz/fuzz_targets/journal_crash_recovery.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -244,13 +244,13 @@ impl FuzzJournal for FixedJournal<deterministic::Context, Item> {
244244
}
245245

246246
async fn size(&self) -> u64 {
247-
FixedJournal::size(self).await
247+
FixedJournal::size(self)
248248
}
249249

250250
// Cannot use `async fn` here due to RPITIT Send auto-trait limitation.
251251
#[allow(clippy::manual_async_fn)]
252252
fn bounds(&self) -> impl Future<Output = Range<u64>> + Send {
253-
async { self.reader().await.bounds() }
253+
async { Reader::bounds(self) }
254254
}
255255

256256
async fn append(&mut self, item: Item) -> Result<u64, commonware_storage::journal::Error> {
@@ -263,7 +263,7 @@ impl FuzzJournal for FixedJournal<deterministic::Context, Item> {
263263
&self,
264264
pos: u64,
265265
) -> impl Future<Output = Result<Item, commonware_storage::journal::Error>> + Send {
266-
async move { self.reader().await.read(pos).await }
266+
async move { Reader::read(self, pos).await }
267267
}
268268

269269
async fn sync(&mut self) -> Result<(), commonware_storage::journal::Error> {
@@ -283,7 +283,7 @@ impl FuzzJournal for FixedJournal<deterministic::Context, Item> {
283283
buffer: NonZeroUsize,
284284
start_pos: u64,
285285
) -> Result<(), commonware_storage::journal::Error> {
286-
let _ = self.reader().await.replay(buffer, start_pos).await?;
286+
let _ = Reader::replay(self, buffer, start_pos).await?;
287287
Ok(())
288288
}
289289

@@ -326,13 +326,13 @@ impl FuzzJournal for VariableJournal<deterministic::Context, Item> {
326326
}
327327

328328
async fn size(&self) -> u64 {
329-
VariableJournal::size(self).await
329+
VariableJournal::size(self)
330330
}
331331

332332
// Cannot use `async fn` here due to RPITIT Send auto-trait limitation.
333333
#[allow(clippy::manual_async_fn)]
334334
fn bounds(&self) -> impl Future<Output = Range<u64>> + Send {
335-
async { self.reader().await.bounds() }
335+
async { self.reader().bounds() }
336336
}
337337

338338
async fn append(&mut self, item: Item) -> Result<u64, commonware_storage::journal::Error> {
@@ -345,7 +345,7 @@ impl FuzzJournal for VariableJournal<deterministic::Context, Item> {
345345
&self,
346346
pos: u64,
347347
) -> impl Future<Output = Result<Item, commonware_storage::journal::Error>> + Send {
348-
async move { self.reader().await.read(pos).await }
348+
async move { self.reader().read(pos).await }
349349
}
350350

351351
async fn sync(&mut self) -> Result<(), commonware_storage::journal::Error> {
@@ -365,7 +365,7 @@ impl FuzzJournal for VariableJournal<deterministic::Context, Item> {
365365
buffer: NonZeroUsize,
366366
start_pos: u64,
367367
) -> Result<(), commonware_storage::journal::Error> {
368-
let _ = self.reader().await.replay(buffer, start_pos).await?;
368+
let _ = self.reader().replay(buffer, start_pos).await?;
369369
Ok(())
370370
}
371371

storage/fuzz/fuzz_targets/ordinal_operations.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ fn fuzz(input: FuzzInput) {
242242
}
243243

244244
OrdinalOperation::ReopenAfterOperations => {
245-
if let Some(o) = store.take() {
245+
if let Some(mut o) = store.take() {
246246
// Sync and drop the current ordinal
247247
o.sync().await.expect("failed to sync store before reopen failed");
248248
drop(o);

storage/fuzz/fuzz_targets/queue_crash_recovery.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ async fn run_operations(
264264

265265
QueueOperation::DequeueAndAck => {
266266
if let Ok(Some((pos, _item))) = queue.dequeue().await {
267-
if queue.ack(pos).await.is_ok() {
267+
if queue.ack(pos).is_ok() {
268268
state.update_ack_floor(queue.ack_floor());
269269
}
270270
}
@@ -276,12 +276,12 @@ async fn run_operations(
276276
}
277277

278278
QueueOperation::AckOffset { offset } => {
279-
let size = queue.size().await;
279+
let size = queue.size();
280280
let ack_floor = queue.ack_floor();
281281
if size > ack_floor {
282282
let range = size - ack_floor;
283283
let pos = ack_floor + (*offset as u64 % range);
284-
match queue.ack(pos).await {
284+
match queue.ack(pos) {
285285
Ok(()) => {
286286
state.update_ack_floor(queue.ack_floor());
287287
}
@@ -294,9 +294,9 @@ async fn run_operations(
294294
}
295295

296296
QueueOperation::AckUpToOffset { offset } => {
297-
let size = queue.size().await;
297+
let size = queue.size();
298298
let up_to = (*offset as u64) % (size + 1);
299-
match queue.ack_up_to(up_to).await {
299+
match queue.ack_up_to(up_to) {
300300
Ok(()) => {
301301
state.update_ack_floor(queue.ack_floor());
302302
}
@@ -338,7 +338,7 @@ async fn run_operations(
338338
/// that the queue can be re-initialized and used again for basic operations.
339339
async fn verify_recovery_after_mutable_error(queue: &mut Queue<deterministic::Context, Vec<u8>>) {
340340
// Basic read-path sanity should not fail.
341-
let size_before = queue.size().await;
341+
let size_before = queue.size();
342342
queue
343343
.dequeue()
344344
.await
@@ -371,7 +371,7 @@ async fn verify_recovery(
371371
return;
372372
}
373373

374-
let size = queue.size().await;
374+
let size = queue.size();
375375
let ack_floor = queue.ack_floor();
376376

377377
// Size should be within expected bounds

storage/fuzz/fuzz_targets/queue_operations.rs

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -214,14 +214,14 @@ fn fuzz(input: FuzzInput) {
214214
}
215215

216216
QueueOperation::Ack { pos_offset } => {
217-
let size = queue.size().await;
217+
let size = queue.size();
218218
if size == 0 {
219219
continue;
220220
}
221221
// Map offset to a valid position range
222222
let pos = (*pos_offset as u64) % size;
223223

224-
let result = queue.ack(pos).await;
224+
let result = queue.ack(pos);
225225
let ref_result = reference.ack(pos);
226226

227227
assert_eq!(
@@ -232,11 +232,11 @@ fn fuzz(input: FuzzInput) {
232232
}
233233

234234
QueueOperation::AckUpTo { pos_offset } => {
235-
let size = queue.size().await;
235+
let size = queue.size();
236236
// Map offset to valid range [0, size]
237237
let up_to = (*pos_offset as u64) % (size + 1);
238238

239-
let result = queue.ack_up_to(up_to).await;
239+
let result = queue.ack_up_to(up_to);
240240
let ref_result = reference.ack_up_to(up_to);
241241

242242
assert_eq!(
@@ -257,11 +257,7 @@ fn fuzz(input: FuzzInput) {
257257
}
258258

259259
// Verify invariants after each operation
260-
assert_eq!(
261-
queue.size().await,
262-
reference.size(),
263-
"size mismatch after {op:?}"
264-
);
260+
assert_eq!(queue.size(), reference.size(), "size mismatch after {op:?}");
265261
assert_eq!(
266262
queue.ack_floor(),
267263
reference.ack_floor(),
@@ -273,13 +269,13 @@ fn fuzz(input: FuzzInput) {
273269
"read_position mismatch after {op:?}"
274270
);
275271
assert_eq!(
276-
queue.is_empty().await,
272+
queue.is_empty(),
277273
reference.is_empty(),
278274
"is_empty mismatch after {op:?}"
279275
);
280276

281277
// Verify is_acked consistency for a sample of positions
282-
for pos in 0..queue.size().await.min(20) {
278+
for pos in 0..queue.size().min(20) {
283279
assert_eq!(
284280
queue.is_acked(pos),
285281
reference.is_acked(pos),

storage/fuzz/fuzz_targets/store_operations.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ fn fuzz(input: FuzzInput) {
167167
}
168168

169169
Operation::OpCount => {
170-
let _ = db.bounds().await.end;
170+
let _ = db.bounds().end;
171171
}
172172

173173
Operation::InactivityFloorLoc => {

0 commit comments

Comments
 (0)