diff --git a/core/Cargo.lock b/core/Cargo.lock index b4359c518160..2c47306c7f9a 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -7051,6 +7051,7 @@ dependencies = [ "serde", "serde_json", "tokio", + "uuid", ] [[package]] diff --git a/core/core/src/raw/oio/write/multipart_write.rs b/core/core/src/raw/oio/write/multipart_write.rs index e8b174b18e3b..6ef1ca886bbd 100644 --- a/core/core/src/raw/oio/write/multipart_write.rs +++ b/core/core/src/raw/oio/write/multipart_write.rs @@ -118,6 +118,8 @@ pub struct MultipartPart { pub etag: String, /// The checksum of the part. pub checksum: Option, + /// The size of the part in bytes. + pub size: Option, } struct WriteInput { @@ -391,6 +393,7 @@ mod tests { part_number, etag: "etag".to_string(), checksum: None, + size: None, }) } diff --git a/core/services/b2/src/writer.rs b/core/services/b2/src/writer.rs index e924a12e1150..a3809d0ed536 100644 --- a/core/services/b2/src/writer.rs +++ b/core/services/b2/src/writer.rs @@ -133,6 +133,7 @@ impl oio::MultipartWrite for B2Writer { etag: result.content_sha1, part_number, checksum: None, + size: None, }) } _ => Err(parse_error(resp)), diff --git a/core/services/cos/src/writer.rs b/core/services/cos/src/writer.rs index 0e7348aee1e3..a94986f6ca88 100644 --- a/core/services/cos/src/writer.rs +++ b/core/services/cos/src/writer.rs @@ -141,6 +141,7 @@ impl oio::MultipartWrite for CosWriter { part_number, etag, checksum: None, + size: None, }) } _ => Err(parse_error(resp)), diff --git a/core/services/gcs/src/writer.rs b/core/services/gcs/src/writer.rs index 938235355568..b8705527eb41 100644 --- a/core/services/gcs/src/writer.rs +++ b/core/services/gcs/src/writer.rs @@ -119,6 +119,7 @@ impl oio::MultipartWrite for GcsWriter { part_number, etag, checksum: None, + size: None, }) } diff --git a/core/services/obs/src/writer.rs b/core/services/obs/src/writer.rs index 6deaf64623fc..22657d2a4a77 100644 --- a/core/services/obs/src/writer.rs +++ b/core/services/obs/src/writer.rs @@ -136,6 +136,7 @@ impl oio::MultipartWrite for ObsWriter { part_number, etag, checksum: None, + size: None, }) } _ => Err(parse_error(resp)), diff --git a/core/services/oss/src/writer.rs b/core/services/oss/src/writer.rs index 7a71a9a080e6..c03e873dc730 100644 --- a/core/services/oss/src/writer.rs +++ b/core/services/oss/src/writer.rs @@ -138,6 +138,7 @@ impl oio::MultipartWrite for OssWriter { part_number, etag, checksum: None, + size: None, }) } _ => Err(parse_error(resp)), diff --git a/core/services/s3/src/writer.rs b/core/services/s3/src/writer.rs index 5a01e9237884..a377130e8159 100644 --- a/core/services/s3/src/writer.rs +++ b/core/services/s3/src/writer.rs @@ -143,6 +143,7 @@ impl oio::MultipartWrite for S3Writer { part_number, etag, checksum, + size: None, }) } _ => Err(parse_error(resp)), diff --git a/core/services/swift/Cargo.toml b/core/services/swift/Cargo.toml index eb2914e0b2cb..0d5b9b02cca4 100644 --- a/core/services/swift/Cargo.toml +++ b/core/services/swift/Cargo.toml @@ -38,6 +38,7 @@ opendal-core = { path = "../../core", version = "0.55.0", default-features = fal quick-xml = { workspace = true, features = ["serialize", "overlapped-lists"] } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } +uuid = { workspace = true, features = ["v4"] } [dev-dependencies] tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } diff --git a/core/services/swift/src/backend.rs b/core/services/swift/src/backend.rs index 5d5169629cca..1a6ad309b4e5 100644 --- a/core/services/swift/src/backend.rs +++ b/core/services/swift/src/backend.rs @@ -157,6 +157,13 @@ impl Builder for SwiftBuilder { write: true, write_can_empty: true, + write_can_multi: true, + write_multi_min_size: Some(5 * 1024 * 1024), + write_multi_max_size: if cfg!(target_pointer_width = "64") { + Some(5 * 1024 * 1024 * 1024) + } else { + Some(usize::MAX) + }, write_with_content_type: true, write_with_content_disposition: true, write_with_content_encoding: true, @@ -194,7 +201,7 @@ pub struct SwiftBackend { impl Access for SwiftBackend { type Reader = HttpBody; - type Writer = oio::OneShotWriter; + type Writer = oio::MultipartWriter; type Lister = oio::PageLister; type Deleter = oio::BatchDeleter; @@ -236,9 +243,9 @@ impl Access for SwiftBackend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + let concurrent = args.concurrent(); let writer = SwiftWriter::new(self.core.clone(), args.clone(), path.to_string()); - - let w = oio::OneShotWriter::new(writer); + let w = oio::MultipartWriter::new(self.core.info.clone(), writer, concurrent); Ok((RpWrite::default(), w)) } diff --git a/core/services/swift/src/core.rs b/core/services/swift/src/core.rs index b5d702cebbbf..ccd5d34c558f 100644 --- a/core/services/swift/src/core.rs +++ b/core/services/swift/src/core.rs @@ -26,6 +26,7 @@ use http::header::IF_MODIFIED_SINCE; use http::header::IF_NONE_MATCH; use http::header::IF_UNMODIFIED_SINCE; use serde::Deserialize; +use serde::Serialize; use opendal_core::raw::*; use opendal_core::*; @@ -313,6 +314,153 @@ impl SwiftCore { self.info.http_client().send(req).await } + + /// Build the segment path for an SLO part. + /// + /// Segments are stored as: `.segments/{object_path}/{upload_id}/{part_number:08}` + pub fn slo_segment_path(&self, path: &str, upload_id: &str, part_number: usize) -> String { + let abs = build_abs_path(&self.root, path); + format!( + ".segments/{}{}/{:08}", + abs.trim_end_matches('/'), + upload_id, + part_number + ) + } + + /// Upload a segment for an SLO multipart upload. + /// + /// Reference: + pub async fn swift_put_segment( + &self, + path: &str, + upload_id: &str, + part_number: usize, + size: u64, + body: Buffer, + ) -> Result> { + let segment = self.slo_segment_path(path, upload_id, part_number); + let url = format!( + "{}/{}/{}", + &self.endpoint, + &self.container, + percent_encode_path(&segment) + ); + + let mut req = Request::put(&url); + req = req.header("X-Auth-Token", &self.token); + req = req.header(header::CONTENT_LENGTH, size); + + let req = req + .extension(Operation::Write) + .body(body) + .map_err(new_request_build_error)?; + + self.info.http_client().send(req).await + } + + /// Finalize an SLO by uploading the manifest. + /// + /// PUT {container}/{path}?multipart-manifest=put with a JSON body listing + /// each segment's path, etag, and size. + /// + /// Reference: + pub async fn swift_put_slo_manifest( + &self, + path: &str, + manifest: &[SloManifestEntry], + args: &OpWrite, + ) -> Result> { + let abs = build_abs_path(&self.root, path); + let url = format!( + "{}/{}/{}?multipart-manifest=put", + &self.endpoint, + &self.container, + percent_encode_path(&abs) + ); + + let body = serde_json::to_vec(manifest).map_err(new_json_serialize_error)?; + + let mut req = Request::put(&url); + req = req.header("X-Auth-Token", &self.token); + req = req.header(header::CONTENT_LENGTH, body.len()); + req = req.header(header::CONTENT_TYPE, "application/json"); + + // Forward user metadata to the manifest object. + if let Some(user_metadata) = args.user_metadata() { + for (k, v) in user_metadata { + req = req.header(format!("X-Object-Meta-{k}"), v); + } + } + + let req = req + .extension(Operation::Write) + .body(Buffer::from(bytes::Bytes::from(body))) + .map_err(new_request_build_error)?; + + self.info.http_client().send(req).await + } + + /// Delete an SLO manifest and all its segments. + /// + /// DELETE {container}/{path}?multipart-manifest=delete removes the manifest + /// and all referenced segments in one call. + /// + /// Reference: + pub async fn swift_delete_slo(&self, path: &str, upload_id: &str) -> Result<()> { + // List segments under the upload_id prefix and delete them individually. + // We can't use multipart-manifest=delete because we haven't created + // the manifest yet (abort happens before complete). + let abs = build_abs_path(&self.root, path); + let prefix = format!(".segments/{}{}/", abs.trim_end_matches('/'), upload_id); + + // List all segments with this prefix. + let url = QueryPairsWriter::new(&format!("{}/{}/", &self.endpoint, &self.container)) + .push("prefix", &percent_encode_path(&prefix)) + .push("format", "json") + .finish(); + + let mut req = Request::get(&url); + req = req.header("X-Auth-Token", &self.token); + + let req = req + .extension(Operation::List) + .body(Buffer::new()) + .map_err(new_request_build_error)?; + + let resp = self.info.http_client().send(req).await?; + if !resp.status().is_success() { + return Ok(()); + } + + let bs = resp.into_body().to_bytes(); + let segments: Vec = serde_json::from_slice(&bs).unwrap_or_default(); + + // Delete each segment. + for seg in segments { + if let ListOpResponse::FileInfo { name, .. } = seg { + let seg_url = format!( + "{}/{}/{}", + &self.endpoint, + &self.container, + percent_encode_path(&name) + ); + + let mut req = Request::delete(&seg_url); + req = req.header("X-Auth-Token", &self.token); + + let req = req + .extension(Operation::Delete) + .body(Buffer::new()) + .map_err(new_request_build_error)?; + + // Best effort — ignore individual segment delete failures. + let _ = self.info.http_client().send(req).await; + } + } + + Ok(()) + } } #[derive(Debug, Eq, PartialEq, Deserialize)] @@ -354,6 +502,19 @@ pub struct BulkDeleteResponse { pub response_body: Option, } +/// Entry in an SLO manifest JSON array. +/// +/// Reference: +#[derive(Debug, Serialize)] +pub struct SloManifestEntry { + /// Path to the segment: `/{container}/{segment_name}` + pub path: String, + /// MD5 etag of the segment (without quotes). + pub etag: String, + /// Size of the segment in bytes. + pub size_bytes: u64, +} + #[cfg(test)] mod tests { use super::*; diff --git a/core/services/swift/src/writer.rs b/core/services/swift/src/writer.rs index 3f8c63864cc6..5c89a6f0e913 100644 --- a/core/services/swift/src/writer.rs +++ b/core/services/swift/src/writer.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use http::StatusCode; +use super::core::SloManifestEntry; use super::core::SwiftCore; use super::error::parse_error; use opendal_core::raw::*; @@ -50,8 +51,8 @@ impl SwiftWriter { } } -impl oio::OneShotWrite for SwiftWriter { - async fn write_once(&self, bs: Buffer) -> Result { +impl oio::MultipartWrite for SwiftWriter { + async fn write_once(&self, _size: u64, bs: Buffer) -> Result { let resp = self .core .swift_create_object(&self.path, bs.len() as u64, &self.op, bs) @@ -67,4 +68,85 @@ impl oio::OneShotWrite for SwiftWriter { _ => Err(parse_error(resp)), } } + + async fn initiate_part(&self) -> Result { + // Swift SLO doesn't need a server-side initiate call. + // Generate a local UUID as the upload ID to namespace the segments. + Ok(uuid::Uuid::new_v4().to_string()) + } + + async fn write_part( + &self, + upload_id: &str, + part_number: usize, + size: u64, + body: Buffer, + ) -> Result { + let resp = self + .core + .swift_put_segment(&self.path, upload_id, part_number, size, body) + .await?; + + let status = resp.status(); + + match status { + StatusCode::CREATED | StatusCode::OK => { + let etag = parse_etag(resp.headers())? + .ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + "ETag not present in segment upload response", + ) + })? + .to_string(); + + Ok(oio::MultipartPart { + part_number, + etag, + checksum: None, + size: Some(size), + }) + } + _ => Err(parse_error(resp)), + } + } + + async fn complete_part( + &self, + upload_id: &str, + parts: &[oio::MultipartPart], + ) -> Result { + let manifest: Vec = parts + .iter() + .map(|part| { + let segment = self + .core + .slo_segment_path(&self.path, upload_id, part.part_number); + SloManifestEntry { + path: format!("{}/{}", &self.core.container, segment), + etag: part.etag.trim_matches('"').to_string(), + size_bytes: part.size.unwrap_or(0), + } + }) + .collect(); + + let resp = self + .core + .swift_put_slo_manifest(&self.path, &manifest, &self.op) + .await?; + + let status = resp.status(); + + match status { + StatusCode::CREATED | StatusCode::OK => { + let metadata = SwiftWriter::parse_metadata(resp.headers())?; + Ok(metadata) + } + _ => Err(parse_error(resp)), + } + } + + async fn abort_part(&self, upload_id: &str) -> Result<()> { + self.core.swift_delete_slo(&self.path, upload_id).await + } } diff --git a/core/services/upyun/src/writer.rs b/core/services/upyun/src/writer.rs index 82a36bb20491..fb6ea77f1122 100644 --- a/core/services/upyun/src/writer.rs +++ b/core/services/upyun/src/writer.rs @@ -95,6 +95,7 @@ impl oio::MultipartWrite for UpyunWriter { part_number, etag: "".to_string(), checksum: None, + size: None, }), _ => Err(parse_error(resp)), } diff --git a/core/services/vercel-blob/src/writer.rs b/core/services/vercel-blob/src/writer.rs index b4bc8eefd260..ee602a457434 100644 --- a/core/services/vercel-blob/src/writer.rs +++ b/core/services/vercel-blob/src/writer.rs @@ -102,6 +102,7 @@ impl oio::MultipartWrite for VercelBlobWriter { part_number, etag: resp.etag, checksum: None, + size: None, }) } _ => Err(parse_error(resp)), diff --git a/integrations/object_store/src/service/writer.rs b/integrations/object_store/src/service/writer.rs index 9b6c07dc663f..4b3c3437a98d 100644 --- a/integrations/object_store/src/service/writer.rs +++ b/integrations/object_store/src/service/writer.rs @@ -156,6 +156,7 @@ impl oio::MultipartWrite for ObjectStoreWriter { part_number, etag, checksum: None, // No checksum for now + size: None, }; Ok(multipart_part) }