Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 16 additions & 18 deletions storage/fuzz/fuzz_targets/fixed_journal_operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,14 @@ fn fuzz(input: FuzzInput) {
}

JournalOperation::Read { pos } => {
let reader = journal.reader().await;
let bounds = reader.bounds();
let bounds = journal.bounds();
if bounds.contains(pos) {
reader.read(*pos).await.unwrap();
journal.read(*pos).await.unwrap();
}
}

JournalOperation::ReadMany { positions } => {
let reader = journal.reader().await;
let reader = journal.reader();
let bounds = reader.bounds();
// Map fuzz positions into valid, sorted, deduplicated positions
let mut mapped: Vec<u64> = positions
Expand All @@ -168,7 +167,7 @@ fn fuzz(input: FuzzInput) {
}

JournalOperation::Size => {
let size = journal.size().await;
let size = journal.size();
assert_eq!(journal_size, size, "unexpected size");
}

Expand All @@ -181,26 +180,25 @@ fn fuzz(input: FuzzInput) {
journal.rewind(*size).await.unwrap();
journal.sync().await.unwrap();
journal_size = *size;
oldest_retained_pos = journal.reader().await.bounds().start;
oldest_retained_pos = journal.bounds().start;
}
}

JournalOperation::Bounds => {
let _bounds = journal.reader().await.bounds();
let _bounds = journal.bounds();
}

JournalOperation::Prune { min_pos } => {
if *min_pos <= journal_size {
journal.prune(*min_pos).await.unwrap();
oldest_retained_pos = journal.reader().await.bounds().start;
oldest_retained_pos = journal.bounds().start;
}
}

JournalOperation::Replay { buffer, start_pos } => {
let reader = journal.reader().await;
let bounds = reader.bounds();
let bounds = journal.bounds();
let start_pos = bounds.start + (*start_pos % (bounds.end - bounds.start + 1));
let replay = reader.replay(NZUsize!(*buffer), start_pos).await;
let replay = journal.replay(NZUsize!(*buffer), start_pos).await;

match replay {
Ok(stream) => {
Expand Down Expand Up @@ -231,8 +229,8 @@ fn fuzz(input: FuzzInput) {
.unwrap();
restarts += 1;
// Reset tracking variables to match recovered state
journal_size = journal.size().await;
oldest_retained_pos = journal.reader().await.bounds().start;
journal_size = journal.size();
oldest_retained_pos = journal.bounds().start;
}

JournalOperation::Destroy => {
Expand Down Expand Up @@ -295,12 +293,12 @@ fn fuzz(input: FuzzInput) {
let new_size = journal.rewind_to(|item| *item == target).await.unwrap();
journal.sync().await.unwrap();
journal_size = new_size;
oldest_retained_pos = journal.reader().await.bounds().start;
oldest_retained_pos = journal.reader().bounds().start;
}
}

JournalOperation::TryReadSync { pos } => {
let reader = journal.reader().await;
let reader = journal.reader();
let bounds = reader.bounds();
if bounds.contains(pos) {
// Cross-check: sync result must match async result
Expand All @@ -312,7 +310,7 @@ fn fuzz(input: FuzzInput) {
}

JournalOperation::PruningBoundary => {
let boundary = journal.pruning_boundary().await;
let boundary = journal.pruning_boundary();
assert_eq!(boundary, oldest_retained_pos);
}

Expand All @@ -330,8 +328,8 @@ fn fuzz(input: FuzzInput) {
.await
.unwrap();
restarts += 1;
journal_size = journal.size().await;
oldest_retained_pos = journal.reader().await.bounds().start;
journal_size = journal.size();
oldest_retained_pos = journal.reader().bounds().start;
}
}
}
Expand Down
16 changes: 8 additions & 8 deletions storage/fuzz/fuzz_targets/journal_crash_recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,13 +244,13 @@ impl FuzzJournal for FixedJournal<deterministic::Context, Item> {
}

async fn size(&self) -> u64 {
FixedJournal::size(self).await
FixedJournal::size(self)
}

// Cannot use `async fn` here due to RPITIT Send auto-trait limitation.
#[allow(clippy::manual_async_fn)]
fn bounds(&self) -> impl Future<Output = Range<u64>> + Send {
async { self.reader().await.bounds() }
async { Reader::bounds(self) }
}

async fn append(&mut self, item: Item) -> Result<u64, commonware_storage::journal::Error> {
Expand All @@ -263,7 +263,7 @@ impl FuzzJournal for FixedJournal<deterministic::Context, Item> {
&self,
pos: u64,
) -> impl Future<Output = Result<Item, commonware_storage::journal::Error>> + Send {
async move { self.reader().await.read(pos).await }
async move { Reader::read(self, pos).await }
}

async fn sync(&mut self) -> Result<(), commonware_storage::journal::Error> {
Expand All @@ -283,7 +283,7 @@ impl FuzzJournal for FixedJournal<deterministic::Context, Item> {
buffer: NonZeroUsize,
start_pos: u64,
) -> Result<(), commonware_storage::journal::Error> {
let _ = self.reader().await.replay(buffer, start_pos).await?;
let _ = Reader::replay(self, buffer, start_pos).await?;
Ok(())
}

Expand Down Expand Up @@ -326,13 +326,13 @@ impl FuzzJournal for VariableJournal<deterministic::Context, Item> {
}

async fn size(&self) -> u64 {
VariableJournal::size(self).await
VariableJournal::size(self)
}

// Cannot use `async fn` here due to RPITIT Send auto-trait limitation.
#[allow(clippy::manual_async_fn)]
fn bounds(&self) -> impl Future<Output = Range<u64>> + Send {
async { self.reader().await.bounds() }
async { self.reader().bounds() }
}

async fn append(&mut self, item: Item) -> Result<u64, commonware_storage::journal::Error> {
Expand All @@ -345,7 +345,7 @@ impl FuzzJournal for VariableJournal<deterministic::Context, Item> {
&self,
pos: u64,
) -> impl Future<Output = Result<Item, commonware_storage::journal::Error>> + Send {
async move { self.reader().await.read(pos).await }
async move { self.reader().read(pos).await }
}

async fn sync(&mut self) -> Result<(), commonware_storage::journal::Error> {
Expand All @@ -365,7 +365,7 @@ impl FuzzJournal for VariableJournal<deterministic::Context, Item> {
buffer: NonZeroUsize,
start_pos: u64,
) -> Result<(), commonware_storage::journal::Error> {
let _ = self.reader().await.replay(buffer, start_pos).await?;
let _ = self.reader().replay(buffer, start_pos).await?;
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion storage/fuzz/fuzz_targets/ordinal_operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ fn fuzz(input: FuzzInput) {
}

OrdinalOperation::ReopenAfterOperations => {
if let Some(o) = store.take() {
if let Some(mut o) = store.take() {
// Sync and drop the current ordinal
o.sync().await.expect("failed to sync store before reopen failed");
drop(o);
Expand Down
14 changes: 7 additions & 7 deletions storage/fuzz/fuzz_targets/queue_crash_recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ async fn run_operations(

QueueOperation::DequeueAndAck => {
if let Ok(Some((pos, _item))) = queue.dequeue().await {
if queue.ack(pos).await.is_ok() {
if queue.ack(pos).is_ok() {
state.update_ack_floor(queue.ack_floor());
}
}
Expand All @@ -276,12 +276,12 @@ async fn run_operations(
}

QueueOperation::AckOffset { offset } => {
let size = queue.size().await;
let size = queue.size();
let ack_floor = queue.ack_floor();
if size > ack_floor {
let range = size - ack_floor;
let pos = ack_floor + (*offset as u64 % range);
match queue.ack(pos).await {
match queue.ack(pos) {
Ok(()) => {
state.update_ack_floor(queue.ack_floor());
}
Expand All @@ -294,9 +294,9 @@ async fn run_operations(
}

QueueOperation::AckUpToOffset { offset } => {
let size = queue.size().await;
let size = queue.size();
let up_to = (*offset as u64) % (size + 1);
match queue.ack_up_to(up_to).await {
match queue.ack_up_to(up_to) {
Ok(()) => {
state.update_ack_floor(queue.ack_floor());
}
Expand Down Expand Up @@ -338,7 +338,7 @@ async fn run_operations(
/// that the queue can be re-initialized and used again for basic operations.
async fn verify_recovery_after_mutable_error(queue: &mut Queue<deterministic::Context, Vec<u8>>) {
// Basic read-path sanity should not fail.
let size_before = queue.size().await;
let size_before = queue.size();
queue
.dequeue()
.await
Expand Down Expand Up @@ -371,7 +371,7 @@ async fn verify_recovery(
return;
}

let size = queue.size().await;
let size = queue.size();
let ack_floor = queue.ack_floor();

// Size should be within expected bounds
Expand Down
18 changes: 7 additions & 11 deletions storage/fuzz/fuzz_targets/queue_operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,14 @@ fn fuzz(input: FuzzInput) {
}

QueueOperation::Ack { pos_offset } => {
let size = queue.size().await;
let size = queue.size();
if size == 0 {
continue;
}
// Map offset to a valid position range
let pos = (*pos_offset as u64) % size;

let result = queue.ack(pos).await;
let result = queue.ack(pos);
let ref_result = reference.ack(pos);

assert_eq!(
Expand All @@ -232,11 +232,11 @@ fn fuzz(input: FuzzInput) {
}

QueueOperation::AckUpTo { pos_offset } => {
let size = queue.size().await;
let size = queue.size();
// Map offset to valid range [0, size]
let up_to = (*pos_offset as u64) % (size + 1);

let result = queue.ack_up_to(up_to).await;
let result = queue.ack_up_to(up_to);
let ref_result = reference.ack_up_to(up_to);

assert_eq!(
Expand All @@ -257,11 +257,7 @@ fn fuzz(input: FuzzInput) {
}

// Verify invariants after each operation
assert_eq!(
queue.size().await,
reference.size(),
"size mismatch after {op:?}"
);
assert_eq!(queue.size(), reference.size(), "size mismatch after {op:?}");
assert_eq!(
queue.ack_floor(),
reference.ack_floor(),
Expand All @@ -273,13 +269,13 @@ fn fuzz(input: FuzzInput) {
"read_position mismatch after {op:?}"
);
assert_eq!(
queue.is_empty().await,
queue.is_empty(),
reference.is_empty(),
"is_empty mismatch after {op:?}"
);

// Verify is_acked consistency for a sample of positions
for pos in 0..queue.size().await.min(20) {
for pos in 0..queue.size().min(20) {
assert_eq!(
queue.is_acked(pos),
reference.is_acked(pos),
Expand Down
2 changes: 1 addition & 1 deletion storage/fuzz/fuzz_targets/store_operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ fn fuzz(input: FuzzInput) {
}

Operation::OpCount => {
let _ = db.bounds().await.end;
let _ = db.bounds().end;
}

Operation::InactivityFloorLoc => {
Expand Down
Loading
Loading