Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 74 additions & 2 deletions core/core/src/raw/oio/delete/batch_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,16 @@ impl<D: BatchDelete> BatchDeleter<D> {
if result.succeeded.is_empty() {
// Restore all items back to buffer since nothing was deleted.
self.buffer = batch;
return Err(Error::new(
let all_temporary = !result.failed.is_empty()
&& result.failed.iter().all(|(_, err)| err.is_temporary());
let mut err = Error::new(
ErrorKind::Unexpected,
"batch delete returned zero successes",
));
);
if all_temporary {
err = err.set_temporary();
}
return Err(err);
}
if result.succeeded.len() + result.failed.len() != batch.len() {
// Restore all items back to buffer since result is inconsistent.
Expand Down Expand Up @@ -356,4 +362,70 @@ mod tests {
deleter.close().await?;
Ok(())
}

/// A mock that returns zero successes with all temporary failures on the
/// first call, then succeeds on retry.
struct MockAllTempFailBatchDelete {
call_count: std::sync::atomic::AtomicUsize,
}

impl BatchDelete for MockAllTempFailBatchDelete {
async fn delete_once(&self, _path: String, _args: OpDelete) -> Result<()> {
Ok(())
}

async fn delete_batch(&self, batch: Vec<(String, OpDelete)>) -> Result<BatchDeleteResult> {
let count = self
.call_count
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
if count == 0 {
Ok(BatchDeleteResult {
succeeded: vec![],
failed: (0..batch.len())
.map(|idx| {
(
idx,
Error::new(ErrorKind::Unexpected, "temporary failure")
.set_temporary(),
)
})
.collect(),
})
} else {
Ok(BatchDeleteResult {
succeeded: (0..batch.len()).collect(),
failed: vec![],
})
}
}
}

/// Regression test: when batch delete returns zero successes but all
/// failures are temporary, the error must be marked temporary so retry
/// layers can retry the operation.
#[tokio::test]
async fn test_batch_deleter_zero_success_all_temporary_returns_temporary_error() -> Result<()> {
let mock = MockAllTempFailBatchDelete {
call_count: std::sync::atomic::AtomicUsize::new(0),
};
// Use max_batch_size > 3 so delete() doesn't trigger flush.
let mut deleter = BatchDeleter::new(mock, Some(10));

deleter.delete("a", OpDelete::new()).await?;
deleter.delete("b", OpDelete::new()).await?;
deleter.delete("c", OpDelete::new()).await?;

// First close() fails because all items have temporary failures.
let err = deleter.close().await;
assert!(err.is_err());
let err = err.unwrap_err();
assert!(
err.is_temporary(),
"error should be temporary when all failures are temporary, got: {err}"
);

// Second close() succeeds because retry now works.
deleter.close().await?;
Ok(())
}
}
Loading