Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions core/core/src/raw/oio/write/multipart_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ pub struct MultipartPart {
pub etag: String,
/// The checksum of the part.
pub checksum: Option<String>,
/// The size of the part in bytes.
pub size: Option<u64>,
}

struct WriteInput<W: MultipartWrite> {
Expand Down Expand Up @@ -391,6 +393,7 @@ mod tests {
part_number,
etag: "etag".to_string(),
checksum: None,
size: None,
})
}

Expand Down
1 change: 1 addition & 0 deletions core/services/b2/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ impl oio::MultipartWrite for B2Writer {
etag: result.content_sha1,
part_number,
checksum: None,
size: None,
})
}
_ => Err(parse_error(resp)),
Expand Down
1 change: 1 addition & 0 deletions core/services/cos/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ impl oio::MultipartWrite for CosWriter {
part_number,
etag,
checksum: None,
size: None,
})
}
_ => Err(parse_error(resp)),
Expand Down
1 change: 1 addition & 0 deletions core/services/gcs/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ impl oio::MultipartWrite for GcsWriter {
part_number,
etag,
checksum: None,
size: None,
})
}

Expand Down
1 change: 1 addition & 0 deletions core/services/obs/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ impl oio::MultipartWrite for ObsWriter {
part_number,
etag,
checksum: None,
size: None,
})
}
_ => Err(parse_error(resp)),
Expand Down
1 change: 1 addition & 0 deletions core/services/oss/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ impl oio::MultipartWrite for OssWriter {
part_number,
etag,
checksum: None,
size: None,
})
}
_ => Err(parse_error(resp)),
Expand Down
1 change: 1 addition & 0 deletions core/services/s3/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ impl oio::MultipartWrite for S3Writer {
part_number,
etag,
checksum,
size: None,
})
}
_ => Err(parse_error(resp)),
Expand Down
1 change: 1 addition & 0 deletions core/services/swift/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ opendal-core = { path = "../../core", version = "0.55.0", default-features = fal
quick-xml = { workspace = true, features = ["serialize", "overlapped-lists"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
uuid = { workspace = true, features = ["v4"] }

[dev-dependencies]
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
13 changes: 10 additions & 3 deletions core/services/swift/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,13 @@ impl Builder for SwiftBuilder {

write: true,
write_can_empty: true,
write_can_multi: true,
write_multi_min_size: Some(5 * 1024 * 1024),
write_multi_max_size: if cfg!(target_pointer_width = "64") {
Some(5 * 1024 * 1024 * 1024)
} else {
Some(usize::MAX)
},
write_with_content_type: true,
write_with_content_disposition: true,
write_with_content_encoding: true,
Expand Down Expand Up @@ -194,7 +201,7 @@ pub struct SwiftBackend {

impl Access for SwiftBackend {
type Reader = HttpBody;
type Writer = oio::OneShotWriter<SwiftWriter>;
type Writer = oio::MultipartWriter<SwiftWriter>;
type Lister = oio::PageLister<SwiftLister>;
type Deleter = oio::BatchDeleter<SwiftDeleter>;

Expand Down Expand Up @@ -236,9 +243,9 @@ impl Access for SwiftBackend {
}

async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let concurrent = args.concurrent();
let writer = SwiftWriter::new(self.core.clone(), args.clone(), path.to_string());

let w = oio::OneShotWriter::new(writer);
let w = oio::MultipartWriter::new(self.core.info.clone(), writer, concurrent);

Ok((RpWrite::default(), w))
}
Expand Down
161 changes: 161 additions & 0 deletions core/services/swift/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use http::header::IF_MODIFIED_SINCE;
use http::header::IF_NONE_MATCH;
use http::header::IF_UNMODIFIED_SINCE;
use serde::Deserialize;
use serde::Serialize;

use opendal_core::raw::*;
use opendal_core::*;
Expand Down Expand Up @@ -313,6 +314,153 @@ impl SwiftCore {

self.info.http_client().send(req).await
}

/// Build the segment path for an SLO part.
///
/// Segments are stored as: `.segments/{object_path}/{upload_id}/{part_number:08}`
pub fn slo_segment_path(&self, path: &str, upload_id: &str, part_number: usize) -> String {
let abs = build_abs_path(&self.root, path);
format!(
".segments/{}{}/{:08}",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the path defined by Swift, or did we choose it ourselves? Will this path be listed out by users?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The .segments/ prefix is a convention, not defined by Swift — SLO just needs segments to be reachable objects anywhere in the container. This convention is widely used by Swift clients (e.g. python-swiftclient uses the same pattern).

Segments won't appear in OpenDAL listings because they're outside the user's root prefix — swift_list uses build_abs_path(&self.root, path) as the prefix filter, and .segments/ sits at the container root. They would be visible if someone lists the container directly via the Swift API, but that's the same behavior as python-swiftclient.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks for the explanation. I would greatly appreciate it if you could add those explanations directly in the code as comments.

abs.trim_end_matches('/'),
upload_id,
part_number
)
}

/// Upload a segment for an SLO multipart upload.
///
/// Reference: <https://docs.openstack.org/swift/latest/overview_large_objects.html>
pub async fn swift_put_segment(
&self,
path: &str,
upload_id: &str,
part_number: usize,
size: u64,
body: Buffer,
) -> Result<Response<Buffer>> {
let segment = self.slo_segment_path(path, upload_id, part_number);
let url = format!(
"{}/{}/{}",
&self.endpoint,
&self.container,
percent_encode_path(&segment)
);

let mut req = Request::put(&url);
req = req.header("X-Auth-Token", &self.token);
req = req.header(header::CONTENT_LENGTH, size);

let req = req
.extension(Operation::Write)
.body(body)
.map_err(new_request_build_error)?;

self.info.http_client().send(req).await
}

/// Finalize an SLO by uploading the manifest.
///
/// PUT {container}/{path}?multipart-manifest=put with a JSON body listing
/// each segment's path, etag, and size.
///
/// Reference: <https://docs.openstack.org/swift/latest/overview_large_objects.html>
pub async fn swift_put_slo_manifest(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing we need to carry the user metadata here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — the manifest PUT should carry user metadata and content headers (Content-Type, Content-Disposition, etc.) from OpWrite. Currently only segments get the raw bytes and the manifest gets none of the user's metadata. I'll fix this.

&self,
path: &str,
manifest: &[SloManifestEntry],
args: &OpWrite,
) -> Result<Response<Buffer>> {
let abs = build_abs_path(&self.root, path);
let url = format!(
"{}/{}/{}?multipart-manifest=put",
&self.endpoint,
&self.container,
percent_encode_path(&abs)
);

let body = serde_json::to_vec(manifest).map_err(new_json_serialize_error)?;

let mut req = Request::put(&url);
req = req.header("X-Auth-Token", &self.token);
req = req.header(header::CONTENT_LENGTH, body.len());
req = req.header(header::CONTENT_TYPE, "application/json");

// Forward user metadata to the manifest object.
if let Some(user_metadata) = args.user_metadata() {
for (k, v) in user_metadata {
req = req.header(format!("X-Object-Meta-{k}"), v);
}
}

let req = req
.extension(Operation::Write)
.body(Buffer::from(bytes::Bytes::from(body)))
.map_err(new_request_build_error)?;

self.info.http_client().send(req).await
}

/// Delete an SLO manifest and all its segments.
///
/// DELETE {container}/{path}?multipart-manifest=delete removes the manifest
/// and all referenced segments in one call.
///
/// Reference: <https://docs.openstack.org/swift/latest/overview_large_objects.html>
pub async fn swift_delete_slo(&self, path: &str, upload_id: &str) -> Result<()> {
// List segments under the upload_id prefix and delete them individually.
// We can't use multipart-manifest=delete because we haven't created
// the manifest yet (abort happens before complete).
let abs = build_abs_path(&self.root, path);
let prefix = format!(".segments/{}{}/", abs.trim_end_matches('/'), upload_id);

// List all segments with this prefix.
let url = QueryPairsWriter::new(&format!("{}/{}/", &self.endpoint, &self.container))
.push("prefix", &percent_encode_path(&prefix))
.push("format", "json")
.finish();

let mut req = Request::get(&url);
req = req.header("X-Auth-Token", &self.token);

let req = req
.extension(Operation::List)
.body(Buffer::new())
.map_err(new_request_build_error)?;

let resp = self.info.http_client().send(req).await?;
if !resp.status().is_success() {
return Ok(());
}

let bs = resp.into_body().to_bytes();
let segments: Vec<ListOpResponse> = serde_json::from_slice(&bs).unwrap_or_default();

// Delete each segment.
for seg in segments {
if let ListOpResponse::FileInfo { name, .. } = seg {
let seg_url = format!(
"{}/{}/{}",
&self.endpoint,
&self.container,
percent_encode_path(&name)
);

let mut req = Request::delete(&seg_url);
req = req.header("X-Auth-Token", &self.token);

let req = req
.extension(Operation::Delete)
.body(Buffer::new())
.map_err(new_request_build_error)?;

// Best effort — ignore individual segment delete failures.
let _ = self.info.http_client().send(req).await;
}
}

Ok(())
}
}

#[derive(Debug, Eq, PartialEq, Deserialize)]
Expand Down Expand Up @@ -354,6 +502,19 @@ pub struct BulkDeleteResponse {
pub response_body: Option<String>,
}

/// Entry in an SLO manifest JSON array.
///
/// Reference: <https://docs.openstack.org/swift/latest/overview_large_objects.html>
#[derive(Debug, Serialize)]
pub struct SloManifestEntry {
/// Path to the segment: `/{container}/{segment_name}`
pub path: String,
/// MD5 etag of the segment (without quotes).
pub etag: String,
/// Size of the segment in bytes.
pub size_bytes: u64,
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading
Loading