Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
248 changes: 223 additions & 25 deletions core/core/src/raw/oio/delete/batch_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashSet;
use std::future::Future;

use crate::raw::*;
Expand All @@ -38,27 +37,31 @@ pub trait BatchDelete: Send + Sync + Unpin + 'static {

/// delete_batch delete multiple paths at once.
///
/// - Implementations should make sure that the length of `batch` equals to the return result's length.
/// - Implementations should return error no path is deleted.
/// - Implementations should report success/failure using indices into the input `batch` Vec.
/// - Implementations should return error if no path is deleted.
fn delete_batch(
&self,
batch: Vec<(String, OpDelete)>,
) -> impl Future<Output = Result<BatchDeleteResult>> + MaybeSend;
}

/// BatchDeleteResult is the result of batch delete operation.
///
/// Results are tracked by index into the input batch Vec, avoiding reliance on
/// `OpDelete` equality which can fail when services reconstruct `OpDelete` from
/// responses without preserving all fields.
#[derive(Default)]
pub struct BatchDeleteResult {
/// Collection of successful deletions, containing tuples of (path, args)
pub succeeded: Vec<(String, OpDelete)>,
/// Collection of failed deletions, containing tuples of (path, args, error)
pub failed: Vec<(String, OpDelete, Error)>,
/// Indices of successfully deleted items in the input batch.
pub succeeded: Vec<usize>,
/// Indices of failed deletions with their errors.
pub failed: Vec<(usize, Error)>,
}

/// BatchDeleter is used to implement [`oio::Delete`] based on batch delete.
pub struct BatchDeleter<D: BatchDelete> {
inner: D,
buffer: HashSet<(String, OpDelete)>,
buffer: Vec<(String, OpDelete)>,
max_batch_size: usize,
}

Expand All @@ -73,7 +76,7 @@ impl<D: BatchDelete> BatchDeleter<D> {

Self {
inner,
buffer: HashSet::default(),
buffer: Vec::new(),
max_batch_size,
}
}
Expand All @@ -84,45 +87,50 @@ impl<D: BatchDelete> BatchDeleter<D> {
}

if self.buffer.len() == 1 {
let (path, args) = self
.buffer
.iter()
.next()
.expect("the delete buffer size must be 1")
.clone();
let (path, args) = self.buffer[0].clone();
self.inner.delete_once(path, args).await?;
self.buffer.clear();
return Ok(1);
}

let batch = self.buffer.iter().cloned().collect();
let result = self.inner.delete_batch(batch).await?;
let batch: Vec<_> = self.buffer.drain(..).collect();
let result = match self.inner.delete_batch(batch.clone()).await {
Ok(result) => result,
Err(err) => {
// Restore all items back to buffer since the entire call failed.
self.buffer = batch;
return Err(err);
}
};

if result.succeeded.is_empty() {
// Restore all items back to buffer since nothing was deleted.
self.buffer = batch;
return Err(Error::new(
ErrorKind::Unexpected,
"batch delete returned zero successes",
));
}
if result.succeeded.len() + result.failed.len() != self.buffer.len() {
if result.succeeded.len() + result.failed.len() != batch.len() {
// Restore all items back to buffer since result is inconsistent.
self.buffer = batch;
return Err(Error::new(
ErrorKind::Unexpected,
"batch delete result size mismatch",
));
}

let mut deleted = 0;
for i in result.succeeded {
self.buffer.remove(&i);
deleted += 1;
}
let deleted = result.succeeded.len();

for (path, op, err) in result.failed {
// Put failed items back into the buffer for retry.
for (idx, err) in result.failed {
if !err.is_temporary() {
let (path, op) = &batch[idx];
return Err(err
.with_context("path", path)
.with_context("version", op.version().unwrap_or("<latest>")));
}
self.buffer.push(batch[idx].clone());
}

Ok(deleted)
Expand All @@ -131,7 +139,7 @@ impl<D: BatchDelete> BatchDeleter<D> {

impl<D: BatchDelete> oio::Delete for BatchDeleter<D> {
async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
self.buffer.insert((path.to_string(), args));
self.buffer.push((path.to_string(), args));
if self.buffer.len() >= self.max_batch_size {
let _ = self.flush_buffer().await?;
return Ok(());
Expand Down Expand Up @@ -159,3 +167,193 @@ impl<D: BatchDelete> oio::Delete for BatchDeleter<D> {
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::raw::oio::Delete;

/// A mock BatchDelete implementation that reports all items as successfully
/// deleted by index.
struct MockBatchDelete;

impl BatchDelete for MockBatchDelete {
async fn delete_once(&self, _path: String, _args: OpDelete) -> Result<()> {
Ok(())
}

async fn delete_batch(&self, batch: Vec<(String, OpDelete)>) -> Result<BatchDeleteResult> {
Ok(BatchDeleteResult {
succeeded: (0..batch.len()).collect(),
failed: vec![],
})
}
}

/// Regression test: previously, BatchDeleter used a HashSet with
/// `(String, OpDelete)` equality to track progress. If a service
/// reconstructed `OpDelete` from responses without preserving all fields
/// (e.g., missing `recursive`), `buffer.remove()` would silently fail,
/// leaving items stuck in the buffer forever and triggering a "no progress"
/// error. With index-based tracking, this is no longer possible.
#[tokio::test]
async fn test_batch_deleter_index_based_progress() -> Result<()> {
let mut deleter = BatchDeleter::new(MockBatchDelete, Some(3));

deleter
.delete("a", OpDelete::new().with_recursive(true))
.await?;
deleter
.delete("b", OpDelete::new().with_recursive(true))
.await?;
deleter
.delete("c", OpDelete::new().with_recursive(true))
.await?;

deleter.close().await?;
Ok(())
}

/// A mock that fails one item with a temporary error on the first call,
/// then succeeds on retry.
struct MockPartialFailBatchDelete {
call_count: std::sync::atomic::AtomicUsize,
}

impl BatchDelete for MockPartialFailBatchDelete {
async fn delete_once(&self, _path: String, _args: OpDelete) -> Result<()> {
Ok(())
}

async fn delete_batch(&self, batch: Vec<(String, OpDelete)>) -> Result<BatchDeleteResult> {
let count = self
.call_count
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
if count == 0 {
let last = batch.len() - 1;
Ok(BatchDeleteResult {
succeeded: (0..last).collect(),
failed: vec![(
last,
Error::new(ErrorKind::Unexpected, "temporary failure").set_temporary(),
)],
})
} else {
Ok(BatchDeleteResult {
succeeded: (0..batch.len()).collect(),
failed: vec![],
})
}
}
}

/// Test that failed items are properly retained and retried.
#[tokio::test]
async fn test_batch_deleter_partial_failure_retry() -> Result<()> {
let mock = MockPartialFailBatchDelete {
call_count: std::sync::atomic::AtomicUsize::new(0),
};
let mut deleter = BatchDeleter::new(mock, Some(3));

deleter.delete("a", OpDelete::new()).await?;
deleter.delete("b", OpDelete::new()).await?;
deleter.delete("c", OpDelete::new()).await?;

deleter.close().await?;
Ok(())
}

/// A mock that fails the entire delete_batch call on the first attempt,
/// then succeeds on retry.
struct MockTransportFailBatchDelete {
call_count: std::sync::atomic::AtomicUsize,
}

impl BatchDelete for MockTransportFailBatchDelete {
async fn delete_once(&self, _path: String, _args: OpDelete) -> Result<()> {
Ok(())
}

async fn delete_batch(&self, batch: Vec<(String, OpDelete)>) -> Result<BatchDeleteResult> {
let count = self
.call_count
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
if count == 0 {
Err(Error::new(ErrorKind::Unexpected, "transport error").set_temporary())
} else {
Ok(BatchDeleteResult {
succeeded: (0..batch.len()).collect(),
failed: vec![],
})
}
}
}

/// A mock that fails delete_once on the first call, then succeeds on retry.
struct MockFailOnceDeleteOnce {
call_count: std::sync::atomic::AtomicUsize,
}

impl BatchDelete for MockFailOnceDeleteOnce {
async fn delete_once(&self, _path: String, _args: OpDelete) -> Result<()> {
let count = self
.call_count
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
if count == 0 {
Err(
Error::new(ErrorKind::Unexpected, "temporary delete_once failure")
.set_temporary(),
)
} else {
Ok(())
}
}

async fn delete_batch(&self, batch: Vec<(String, OpDelete)>) -> Result<BatchDeleteResult> {
Ok(BatchDeleteResult {
succeeded: (0..batch.len()).collect(),
failed: vec![],
})
}
}

/// Regression test: when delete_once() fails for a single-item buffer,
/// the item must be retained so close() can retry successfully.
#[tokio::test]
async fn test_batch_deleter_single_item_delete_once_error_retains_buffer() -> Result<()> {
let mock = MockFailOnceDeleteOnce {
call_count: std::sync::atomic::AtomicUsize::new(0),
};
let mut deleter = BatchDeleter::new(mock, Some(3));

deleter.delete("a", OpDelete::new()).await?;

// First close() fails because delete_once returns error, but item stays in buffer.
let err = deleter.close().await;
assert!(err.is_err());

// Second close() succeeds because item was retained and delete_once now succeeds.
deleter.close().await?;
Ok(())
}

/// Regression test: when delete_batch() itself returns Err (e.g., transport
/// failure), the buffer must be restored so close() can retry successfully.
#[tokio::test]
async fn test_batch_deleter_transport_error_restores_buffer() -> Result<()> {
let mock = MockTransportFailBatchDelete {
call_count: std::sync::atomic::AtomicUsize::new(0),
};
let mut deleter = BatchDeleter::new(mock, Some(3));

deleter.delete("a", OpDelete::new()).await?;
deleter.delete("b", OpDelete::new()).await?;
// Third delete triggers flush, which fails. Buffer should be restored.
let err = deleter.delete("c", OpDelete::new()).await;
assert!(err.is_err());

// close() retries: the batch is still in buffer, now succeeds.
deleter.close().await?;
Ok(())
}
}
9 changes: 3 additions & 6 deletions core/services/azblob/src/deleter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,21 +85,18 @@ impl oio::BatchDelete for AzblobDeleter {

for (i, part) in parts.into_iter().enumerate() {
let resp = part.into_response();
let path = paths[i].clone();

// deleting not existing objects is ok
if resp.status() == StatusCode::ACCEPTED || resp.status() == StatusCode::NOT_FOUND {
batched_result.succeeded.push((path, OpDelete::default()));
batched_result.succeeded.push(i);
} else {
batched_result
.failed
.push((path, OpDelete::default(), parse_error(resp)));
batched_result.failed.push((i, parse_error(resp)));
}
}

// If no object is deleted, return directly.
if batched_result.succeeded.is_empty() {
let err = batched_result.failed.remove(0).2;
let err = batched_result.failed.remove(0).1;
return Err(err);
}

Expand Down
7 changes: 3 additions & 4 deletions core/services/cloudflare-kv/src/deleter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,17 @@ impl oio::BatchDelete for CloudflareKvDeleter {
failed: Vec::with_capacity(result.unsuccessful_keys.len()),
};

for item in batch {
for (idx, item) in batch.iter().enumerate() {
if result.unsuccessful_keys.contains(&item.0) {
batched_result.failed.push((
item.0,
item.1,
idx,
Error::new(
ErrorKind::Unexpected,
"cloudflare_kv delete this key failed for reason we don't know",
),
));
} else {
batched_result.succeeded.push(item);
batched_result.succeeded.push(idx);
}
}

Expand Down
10 changes: 3 additions & 7 deletions core/services/gcs/src/deleter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,22 +74,18 @@ impl oio::BatchDelete for GcsDeleter {

for (i, part) in parts.into_iter().enumerate() {
let resp = part.into_response();
// TODO: maybe we can take it directly?
let path = paths[i].clone();

// deleting not existing objects is ok
if resp.status().is_success() || resp.status() == StatusCode::NOT_FOUND {
batched_result.succeeded.push((path, OpDelete::default()));
batched_result.succeeded.push(i);
} else {
batched_result
.failed
.push((path, OpDelete::default(), parse_error(resp)));
batched_result.failed.push((i, parse_error(resp)));
}
}

// If no object is deleted, return directly.
if batched_result.succeeded.is_empty() {
let err = batched_result.failed.remove(0).2;
let err = batched_result.failed.remove(0).1;
return Err(err);
}

Expand Down
Loading
Loading