Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions core/services/swift/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ impl Builder for SwiftBuilder {
write_with_user_metadata: true,

delete: true,
delete_max_size: Some(10000),

list: true,
list_with_recursive: true,
Expand Down Expand Up @@ -180,7 +181,7 @@ impl Access for SwiftBackend {
type Reader = HttpBody;
type Writer = oio::OneShotWriter<SwiftWriter>;
type Lister = oio::PageLister<SwiftLister>;
type Deleter = oio::OneShotDeleter<SwiftDeleter>;
type Deleter = oio::BatchDeleter<SwiftDeleter>;

fn info(&self) -> Arc<AccessorInfo> {
self.core.info.clone()
Expand Down Expand Up @@ -230,7 +231,10 @@ impl Access for SwiftBackend {
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
Ok((
RpDelete::default(),
oio::OneShotDeleter::new(SwiftDeleter::new(self.core.clone())),
oio::BatchDeleter::new(
SwiftDeleter::new(self.core.clone()),
self.core.info.full_capability().delete_max_size,
),
))
}

Expand Down
109 changes: 109 additions & 0 deletions core/services/swift/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,43 @@ impl SwiftCore {
self.info.http_client().send(req).await
}

/// Bulk delete multiple objects in a single request.
///
/// Reference: <https://docs.openstack.org/api-ref/object-store/#bulk-delete>
pub async fn swift_bulk_delete(
&self,
paths: &[(String, OpDelete)],
) -> Result<Response<Buffer>> {
// The bulk-delete endpoint is on the account URL (the endpoint itself).
let url = format!("{}?bulk-delete", &self.endpoint);

let mut req = Request::post(&url);

req = req.header("X-Auth-Token", &self.token);
req = req.header(header::CONTENT_TYPE, "text/plain");
req = req.header(header::ACCEPT, "application/json");

// Body is newline-separated list of URL-encoded paths:
// /{container}/{object_path}
let body_str: String = paths
.iter()
.map(|(path, _)| {
let abs = build_abs_path(&self.root, path);
format!("{}/{}", &self.container, percent_encode_path(&abs))
})
.collect::<Vec<_>>()
.join("\n");

req = req.header(header::CONTENT_LENGTH, body_str.len());

let req = req
.extension(Operation::Delete)
.body(Buffer::from(bytes::Bytes::from(body_str)))
.map_err(new_request_build_error)?;

self.info.http_client().send(req).await
}

pub async fn swift_list(
&self,
path: &str,
Expand Down Expand Up @@ -250,6 +287,30 @@ pub enum ListOpResponse {
},
}

/// Response from Swift bulk-delete API.
///
/// Reference: <https://docs.openstack.org/api-ref/object-store/#bulk-delete>
#[derive(Debug, Deserialize)]
#[serde(rename_all = "PascalCase")]
#[allow(dead_code)]
pub struct BulkDeleteResponse {
/// Number of objects successfully deleted.
#[serde(rename = "Number Deleted")]
pub number_deleted: i64,
/// Number of objects not found (treated as success).
#[serde(rename = "Number Not Found")]
pub number_not_found: i64,
/// Response status string, e.g. "200 OK".
#[serde(rename = "Response Status")]
pub response_status: String,
/// Per-object errors as [path, status_string] pairs.
#[serde(rename = "Errors", default)]
pub errors: Vec<Vec<String>>,
/// Response body (usually empty on success).
#[serde(rename = "Response Body")]
pub response_body: Option<String>,
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -307,4 +368,52 @@ mod tests {

Ok(())
}

#[test]
fn parse_bulk_delete_response_test() -> Result<()> {
let resp = bytes::Bytes::from(
r#"{
"Number Deleted": 2,
"Number Not Found": 1,
"Response Status": "200 OK",
"Errors": [],
"Response Body": ""
}"#,
);

let result: BulkDeleteResponse =
serde_json::from_slice(&resp).map_err(new_json_deserialize_error)?;

assert_eq!(result.number_deleted, 2);
assert_eq!(result.number_not_found, 1);
assert_eq!(result.response_status, "200 OK");
assert!(result.errors.is_empty());

Ok(())
}

#[test]
fn parse_bulk_delete_response_with_errors_test() -> Result<()> {
let resp = bytes::Bytes::from(
r#"{
"Number Deleted": 1,
"Number Not Found": 0,
"Response Status": "400 Bad Request",
"Errors": [
["/container/path/to/file", "403 Forbidden"]
],
"Response Body": ""
}"#,
);

let result: BulkDeleteResponse =
serde_json::from_slice(&resp).map_err(new_json_deserialize_error)?;

assert_eq!(result.number_deleted, 1);
assert_eq!(result.errors.len(), 1);
assert_eq!(result.errors[0][0], "/container/path/to/file");
assert_eq!(result.errors[0][1], "403 Forbidden");

Ok(())
}
}
49 changes: 48 additions & 1 deletion core/services/swift/src/deleter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl SwiftDeleter {
}
}

impl oio::OneShotDelete for SwiftDeleter {
impl oio::BatchDelete for SwiftDeleter {
async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> {
let resp = self.core.swift_delete(&path).await?;

Expand All @@ -46,4 +46,51 @@ impl oio::OneShotDelete for SwiftDeleter {
_ => Err(parse_error(resp)),
}
}

async fn delete_batch(&self, batch: Vec<(String, OpDelete)>) -> Result<oio::BatchDeleteResult> {
let resp = self.core.swift_bulk_delete(&batch).await?;

let status = resp.status();
if status != StatusCode::OK {
return Err(parse_error(resp));
}

let bs = resp.into_body().to_bytes();
let result: BulkDeleteResponse =
serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?;

let mut batched_result = oio::BatchDeleteResult {
succeeded: Vec::with_capacity(batch.len() - result.errors.len()),
failed: Vec::with_capacity(result.errors.len()),
};

for (path, op) in batch {
// Check if this path appears in the errors list.
// The error paths from Swift include the container prefix, so we need
// to reconstruct the full path for comparison.
let abs = build_abs_path(&self.core.root, &path);
let full_path = format!("{}/{}", &self.core.container, abs);

if let Some(error_entry) = result.errors.iter().find(|e| {
e.first()
.map(|p| percent_decode_path(p) == full_path)
.unwrap_or(false)
}) {
let status_str = error_entry.get(1).cloned().unwrap_or_default();
batched_result.failed.push((
path,
op,
Error::new(
ErrorKind::Unexpected,
format!("bulk delete error: {status_str}"),
),
));
} else {
// Either deleted successfully or not found (both are success for us).
batched_result.succeeded.push((path, op));
}
}

Ok(batched_result)
}
}
Loading