diff --git a/core/core/src/raw/oio/delete/batch_delete.rs b/core/core/src/raw/oio/delete/batch_delete.rs index 63d262148199..6360fb335fd4 100644 --- a/core/core/src/raw/oio/delete/batch_delete.rs +++ b/core/core/src/raw/oio/delete/batch_delete.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashSet; use std::future::Future; use crate::raw::*; @@ -38,8 +37,8 @@ pub trait BatchDelete: Send + Sync + Unpin + 'static { /// delete_batch delete multiple paths at once. /// - /// - Implementations should make sure that the length of `batch` equals to the return result's length. - /// - Implementations should return error no path is deleted. + /// - Implementations should report success/failure using indices into the input `batch` Vec. + /// - Implementations should return error if no path is deleted. fn delete_batch( &self, batch: Vec<(String, OpDelete)>, @@ -47,18 +46,22 @@ pub trait BatchDelete: Send + Sync + Unpin + 'static { } /// BatchDeleteResult is the result of batch delete operation. +/// +/// Results are tracked by index into the input batch Vec, avoiding reliance on +/// `OpDelete` equality which can fail when services reconstruct `OpDelete` from +/// responses without preserving all fields. #[derive(Default)] pub struct BatchDeleteResult { - /// Collection of successful deletions, containing tuples of (path, args) - pub succeeded: Vec<(String, OpDelete)>, - /// Collection of failed deletions, containing tuples of (path, args, error) - pub failed: Vec<(String, OpDelete, Error)>, + /// Indices of successfully deleted items in the input batch. + pub succeeded: Vec, + /// Indices of failed deletions with their errors. + pub failed: Vec<(usize, Error)>, } /// BatchDeleter is used to implement [`oio::Delete`] based on batch delete. pub struct BatchDeleter { inner: D, - buffer: HashSet<(String, OpDelete)>, + buffer: Vec<(String, OpDelete)>, max_batch_size: usize, } @@ -73,7 +76,7 @@ impl BatchDeleter { Self { inner, - buffer: HashSet::default(), + buffer: Vec::new(), max_batch_size, } } @@ -84,45 +87,50 @@ impl BatchDeleter { } if self.buffer.len() == 1 { - let (path, args) = self - .buffer - .iter() - .next() - .expect("the delete buffer size must be 1") - .clone(); + let (path, args) = self.buffer[0].clone(); self.inner.delete_once(path, args).await?; self.buffer.clear(); return Ok(1); } - let batch = self.buffer.iter().cloned().collect(); - let result = self.inner.delete_batch(batch).await?; + let batch: Vec<_> = self.buffer.drain(..).collect(); + let result = match self.inner.delete_batch(batch.clone()).await { + Ok(result) => result, + Err(err) => { + // Restore all items back to buffer since the entire call failed. + self.buffer = batch; + return Err(err); + } + }; if result.succeeded.is_empty() { + // Restore all items back to buffer since nothing was deleted. + self.buffer = batch; return Err(Error::new( ErrorKind::Unexpected, "batch delete returned zero successes", )); } - if result.succeeded.len() + result.failed.len() != self.buffer.len() { + if result.succeeded.len() + result.failed.len() != batch.len() { + // Restore all items back to buffer since result is inconsistent. + self.buffer = batch; return Err(Error::new( ErrorKind::Unexpected, "batch delete result size mismatch", )); } - let mut deleted = 0; - for i in result.succeeded { - self.buffer.remove(&i); - deleted += 1; - } + let deleted = result.succeeded.len(); - for (path, op, err) in result.failed { + // Put failed items back into the buffer for retry. + for (idx, err) in result.failed { if !err.is_temporary() { + let (path, op) = &batch[idx]; return Err(err .with_context("path", path) .with_context("version", op.version().unwrap_or(""))); } + self.buffer.push(batch[idx].clone()); } Ok(deleted) @@ -131,7 +139,7 @@ impl BatchDeleter { impl oio::Delete for BatchDeleter { async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { - self.buffer.insert((path.to_string(), args)); + self.buffer.push((path.to_string(), args)); if self.buffer.len() >= self.max_batch_size { let _ = self.flush_buffer().await?; return Ok(()); @@ -159,3 +167,193 @@ impl oio::Delete for BatchDeleter { Ok(()) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::raw::oio::Delete; + + /// A mock BatchDelete implementation that reports all items as successfully + /// deleted by index. + struct MockBatchDelete; + + impl BatchDelete for MockBatchDelete { + async fn delete_once(&self, _path: String, _args: OpDelete) -> Result<()> { + Ok(()) + } + + async fn delete_batch(&self, batch: Vec<(String, OpDelete)>) -> Result { + Ok(BatchDeleteResult { + succeeded: (0..batch.len()).collect(), + failed: vec![], + }) + } + } + + /// Regression test: previously, BatchDeleter used a HashSet with + /// `(String, OpDelete)` equality to track progress. If a service + /// reconstructed `OpDelete` from responses without preserving all fields + /// (e.g., missing `recursive`), `buffer.remove()` would silently fail, + /// leaving items stuck in the buffer forever and triggering a "no progress" + /// error. With index-based tracking, this is no longer possible. + #[tokio::test] + async fn test_batch_deleter_index_based_progress() -> Result<()> { + let mut deleter = BatchDeleter::new(MockBatchDelete, Some(3)); + + deleter + .delete("a", OpDelete::new().with_recursive(true)) + .await?; + deleter + .delete("b", OpDelete::new().with_recursive(true)) + .await?; + deleter + .delete("c", OpDelete::new().with_recursive(true)) + .await?; + + deleter.close().await?; + Ok(()) + } + + /// A mock that fails one item with a temporary error on the first call, + /// then succeeds on retry. + struct MockPartialFailBatchDelete { + call_count: std::sync::atomic::AtomicUsize, + } + + impl BatchDelete for MockPartialFailBatchDelete { + async fn delete_once(&self, _path: String, _args: OpDelete) -> Result<()> { + Ok(()) + } + + async fn delete_batch(&self, batch: Vec<(String, OpDelete)>) -> Result { + let count = self + .call_count + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + if count == 0 { + let last = batch.len() - 1; + Ok(BatchDeleteResult { + succeeded: (0..last).collect(), + failed: vec![( + last, + Error::new(ErrorKind::Unexpected, "temporary failure").set_temporary(), + )], + }) + } else { + Ok(BatchDeleteResult { + succeeded: (0..batch.len()).collect(), + failed: vec![], + }) + } + } + } + + /// Test that failed items are properly retained and retried. + #[tokio::test] + async fn test_batch_deleter_partial_failure_retry() -> Result<()> { + let mock = MockPartialFailBatchDelete { + call_count: std::sync::atomic::AtomicUsize::new(0), + }; + let mut deleter = BatchDeleter::new(mock, Some(3)); + + deleter.delete("a", OpDelete::new()).await?; + deleter.delete("b", OpDelete::new()).await?; + deleter.delete("c", OpDelete::new()).await?; + + deleter.close().await?; + Ok(()) + } + + /// A mock that fails the entire delete_batch call on the first attempt, + /// then succeeds on retry. + struct MockTransportFailBatchDelete { + call_count: std::sync::atomic::AtomicUsize, + } + + impl BatchDelete for MockTransportFailBatchDelete { + async fn delete_once(&self, _path: String, _args: OpDelete) -> Result<()> { + Ok(()) + } + + async fn delete_batch(&self, batch: Vec<(String, OpDelete)>) -> Result { + let count = self + .call_count + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + if count == 0 { + Err(Error::new(ErrorKind::Unexpected, "transport error").set_temporary()) + } else { + Ok(BatchDeleteResult { + succeeded: (0..batch.len()).collect(), + failed: vec![], + }) + } + } + } + + /// A mock that fails delete_once on the first call, then succeeds on retry. + struct MockFailOnceDeleteOnce { + call_count: std::sync::atomic::AtomicUsize, + } + + impl BatchDelete for MockFailOnceDeleteOnce { + async fn delete_once(&self, _path: String, _args: OpDelete) -> Result<()> { + let count = self + .call_count + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + if count == 0 { + Err( + Error::new(ErrorKind::Unexpected, "temporary delete_once failure") + .set_temporary(), + ) + } else { + Ok(()) + } + } + + async fn delete_batch(&self, batch: Vec<(String, OpDelete)>) -> Result { + Ok(BatchDeleteResult { + succeeded: (0..batch.len()).collect(), + failed: vec![], + }) + } + } + + /// Regression test: when delete_once() fails for a single-item buffer, + /// the item must be retained so close() can retry successfully. + #[tokio::test] + async fn test_batch_deleter_single_item_delete_once_error_retains_buffer() -> Result<()> { + let mock = MockFailOnceDeleteOnce { + call_count: std::sync::atomic::AtomicUsize::new(0), + }; + let mut deleter = BatchDeleter::new(mock, Some(3)); + + deleter.delete("a", OpDelete::new()).await?; + + // First close() fails because delete_once returns error, but item stays in buffer. + let err = deleter.close().await; + assert!(err.is_err()); + + // Second close() succeeds because item was retained and delete_once now succeeds. + deleter.close().await?; + Ok(()) + } + + /// Regression test: when delete_batch() itself returns Err (e.g., transport + /// failure), the buffer must be restored so close() can retry successfully. + #[tokio::test] + async fn test_batch_deleter_transport_error_restores_buffer() -> Result<()> { + let mock = MockTransportFailBatchDelete { + call_count: std::sync::atomic::AtomicUsize::new(0), + }; + let mut deleter = BatchDeleter::new(mock, Some(3)); + + deleter.delete("a", OpDelete::new()).await?; + deleter.delete("b", OpDelete::new()).await?; + // Third delete triggers flush, which fails. Buffer should be restored. + let err = deleter.delete("c", OpDelete::new()).await; + assert!(err.is_err()); + + // close() retries: the batch is still in buffer, now succeeds. + deleter.close().await?; + Ok(()) + } +} diff --git a/core/services/azblob/src/deleter.rs b/core/services/azblob/src/deleter.rs index a2aba9fd9049..68ba2aeea849 100644 --- a/core/services/azblob/src/deleter.rs +++ b/core/services/azblob/src/deleter.rs @@ -85,21 +85,18 @@ impl oio::BatchDelete for AzblobDeleter { for (i, part) in parts.into_iter().enumerate() { let resp = part.into_response(); - let path = paths[i].clone(); // deleting not existing objects is ok if resp.status() == StatusCode::ACCEPTED || resp.status() == StatusCode::NOT_FOUND { - batched_result.succeeded.push((path, OpDelete::default())); + batched_result.succeeded.push(i); } else { - batched_result - .failed - .push((path, OpDelete::default(), parse_error(resp))); + batched_result.failed.push((i, parse_error(resp))); } } // If no object is deleted, return directly. if batched_result.succeeded.is_empty() { - let err = batched_result.failed.remove(0).2; + let err = batched_result.failed.remove(0).1; return Err(err); } diff --git a/core/services/cloudflare-kv/src/deleter.rs b/core/services/cloudflare-kv/src/deleter.rs index c63ac54e6447..0f194c7286db 100644 --- a/core/services/cloudflare-kv/src/deleter.rs +++ b/core/services/cloudflare-kv/src/deleter.rs @@ -99,18 +99,17 @@ impl oio::BatchDelete for CloudflareKvDeleter { failed: Vec::with_capacity(result.unsuccessful_keys.len()), }; - for item in batch { + for (idx, item) in batch.iter().enumerate() { if result.unsuccessful_keys.contains(&item.0) { batched_result.failed.push(( - item.0, - item.1, + idx, Error::new( ErrorKind::Unexpected, "cloudflare_kv delete this key failed for reason we don't know", ), )); } else { - batched_result.succeeded.push(item); + batched_result.succeeded.push(idx); } } diff --git a/core/services/gcs/src/deleter.rs b/core/services/gcs/src/deleter.rs index 745b19f9cf08..a4739cabca12 100644 --- a/core/services/gcs/src/deleter.rs +++ b/core/services/gcs/src/deleter.rs @@ -74,22 +74,18 @@ impl oio::BatchDelete for GcsDeleter { for (i, part) in parts.into_iter().enumerate() { let resp = part.into_response(); - // TODO: maybe we can take it directly? - let path = paths[i].clone(); // deleting not existing objects is ok if resp.status().is_success() || resp.status() == StatusCode::NOT_FOUND { - batched_result.succeeded.push((path, OpDelete::default())); + batched_result.succeeded.push(i); } else { - batched_result - .failed - .push((path, OpDelete::default(), parse_error(resp))); + batched_result.failed.push((i, parse_error(resp))); } } // If no object is deleted, return directly. if batched_result.succeeded.is_empty() { - let err = batched_result.failed.remove(0).2; + let err = batched_result.failed.remove(0).1; return Err(err); } diff --git a/core/services/hf/src/deleter.rs b/core/services/hf/src/deleter.rs index d15022f9598f..034efc76630c 100644 --- a/core/services/hf/src/deleter.rs +++ b/core/services/hf/src/deleter.rs @@ -80,7 +80,7 @@ impl oio::BatchDelete for HfDeleter { self.delete_paths(paths).await?; Ok(BatchDeleteResult { - succeeded: batch, + succeeded: (0..batch.len()).collect(), failed: vec![], }) } diff --git a/core/services/oss/src/deleter.rs b/core/services/oss/src/deleter.rs index da900148d230..e260986d3e5c 100644 --- a/core/services/oss/src/deleter.rs +++ b/core/services/oss/src/deleter.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashSet; use std::sync::Arc; use bytes::Buf; @@ -50,14 +49,7 @@ impl oio::BatchDelete for OssDeleter { } async fn delete_batch(&self, batch: Vec<(String, OpDelete)>) -> Result { - // Sadly, OSS will not return failed keys, so we will build - // a set to calculate the failed keys. - let mut keys: HashSet<(String, OpDelete)> = batch - .iter() - .map(|path| (path.0.to_owned(), path.1.clone())) - .collect(); - - let resp = self.core.oss_delete_objects(batch).await?; + let resp = self.core.oss_delete_objects(batch.clone()).await?; let status = resp.status(); @@ -77,31 +69,48 @@ impl oio::BatchDelete for OssDeleter { )); } + // Build a lookup from (rel_path, version) to input indices. + // Use Vec to handle duplicate entries in the same batch. + let mut lookup: std::collections::HashMap<(String, Option), Vec> = + std::collections::HashMap::with_capacity(batch.len()); + for (idx, (path, op)) in batch.iter().enumerate() { + lookup + .entry((path.clone(), op.version().map(|v| v.to_string()))) + .or_default() + .push(idx); + } + + // Track which indices have been accounted for. + let mut accounted: std::collections::HashSet = + std::collections::HashSet::with_capacity(batch.len()); + let mut batched_result = BatchDeleteResult { succeeded: Vec::with_capacity(result.deleted.len()), - failed: Vec::with_capacity(keys.len() - result.deleted.len()), + failed: Vec::with_capacity(batch.len() - result.deleted.len()), }; for i in result.deleted { let path = build_rel_path(&self.core.root, &i.key); - let mut op = OpDelete::default(); - if let Some(version) = &i.version_id { - op = op.with_version(version); + let version = i.version_id; + if let Some(indices) = lookup.get_mut(&(path, version)) { + if let Some(idx) = indices.pop() { + batched_result.succeeded.push(idx); + accounted.insert(idx); + } } - let object = (path, op); - keys.remove(&object); - batched_result.succeeded.push(object); } - // TODO: we should handle those errors with code. - for (path, op) in keys { - batched_result.failed.push(( - path, - op, - Error::new( - ErrorKind::Unexpected, - "oss delete this key failed for reason we don't know", - ), - )); + + // Any items not accounted for are considered failed. + for idx in 0..batch.len() { + if !accounted.contains(&idx) { + batched_result.failed.push(( + idx, + Error::new( + ErrorKind::Unexpected, + "oss delete this key failed for reason we don't know", + ), + )); + } } Ok(batched_result) diff --git a/core/services/s3/src/deleter.rs b/core/services/s3/src/deleter.rs index feb9ff90bb71..e3295438a0ac 100644 --- a/core/services/s3/src/deleter.rs +++ b/core/services/s3/src/deleter.rs @@ -59,7 +59,7 @@ impl oio::BatchDelete for S3Deleter { } async fn delete_batch(&self, batch: Vec<(String, OpDelete)>) -> Result { - let resp = self.core.s3_delete_objects(batch).await?; + let resp = self.core.s3_delete_objects(batch.clone()).await?; let status = resp.status(); if status != StatusCode::OK { @@ -71,27 +71,41 @@ impl oio::BatchDelete for S3Deleter { let result: DeleteObjectsResult = quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?; + // Build a lookup from (rel_path, version) to input indices for matching + // response entries back to input batch items. Use Vec to handle + // duplicate entries in the same batch. + let mut lookup: std::collections::HashMap<(String, Option), Vec> = + std::collections::HashMap::with_capacity(batch.len()); + for (idx, (path, op)) in batch.iter().enumerate() { + lookup + .entry((path.clone(), op.version().map(|v| v.to_string()))) + .or_default() + .push(idx); + } + let mut batched_result = BatchDeleteResult { succeeded: Vec::with_capacity(result.deleted.len()), failed: Vec::with_capacity(result.error.len()), }; for i in result.deleted { let path = build_rel_path(&self.core.root, &i.key); - let mut op = OpDelete::new(); - if let Some(version_id) = i.version_id { - op = op.with_version(version_id.as_str()); + let version = i.version_id; + if let Some(indices) = lookup.get_mut(&(path, version)) { + if let Some(idx) = indices.pop() { + batched_result.succeeded.push(idx); + } } - batched_result.succeeded.push((path, op)); } for i in result.error { let path = build_rel_path(&self.core.root, &i.key); - let mut op = OpDelete::new(); - if let Some(version_id) = &i.version_id { - op = op.with_version(version_id.as_str()); + let version = i.version_id.clone(); + if let Some(indices) = lookup.get_mut(&(path, version)) { + if let Some(idx) = indices.pop() { + batched_result + .failed + .push((idx, parse_delete_objects_result_error(i))); + } } - batched_result - .failed - .push((path, op, parse_delete_objects_result_error(i))); } Ok(batched_result) diff --git a/core/services/swift/src/deleter.rs b/core/services/swift/src/deleter.rs index d429a0fa361a..d1c276d880bb 100644 --- a/core/services/swift/src/deleter.rs +++ b/core/services/swift/src/deleter.rs @@ -64,11 +64,11 @@ impl oio::BatchDelete for SwiftDeleter { failed: Vec::with_capacity(result.errors.len()), }; - for (path, op) in batch { + for (idx, (path, _op)) in batch.iter().enumerate() { // Check if this path appears in the errors list. // The error paths from Swift include the container prefix, so we need // to reconstruct the full path for comparison. - let abs = build_abs_path(&self.core.root, &path); + let abs = build_abs_path(&self.core.root, path); let full_path = format!("{}/{}", &self.core.container, abs); if let Some(error_entry) = result.errors.iter().find(|e| { @@ -78,8 +78,7 @@ impl oio::BatchDelete for SwiftDeleter { }) { let status_str = error_entry.get(1).cloned().unwrap_or_default(); batched_result.failed.push(( - path, - op, + idx, Error::new( ErrorKind::Unexpected, format!("bulk delete error: {status_str}"), @@ -87,7 +86,7 @@ impl oio::BatchDelete for SwiftDeleter { )); } else { // Either deleted successfully or not found (both are success for us). - batched_result.succeeded.push((path, op)); + batched_result.succeeded.push(idx); } } diff --git a/integrations/object_store/src/service/deleter.rs b/integrations/object_store/src/service/deleter.rs index 5740cb3a6f1c..2a01cdb4f2cf 100644 --- a/integrations/object_store/src/service/deleter.rs +++ b/integrations/object_store/src/service/deleter.rs @@ -54,14 +54,8 @@ impl oio::BatchDelete for ObjectStoreDeleter { let mut result_batch = BatchDeleteResult::default(); for (idx, result) in results.into_iter().enumerate() { match result { - Ok(_) => result_batch - .succeeded - .push((paths[idx].0.clone(), paths[idx].1.clone())), - Err(e) => result_batch.failed.push(( - paths[idx].0.clone(), - paths[idx].1.clone(), - parse_error(e), - )), + Ok(_) => result_batch.succeeded.push(idx), + Err(e) => result_batch.failed.push((idx, parse_error(e))), } }