Skip to content

Commit 7f9bef2

Browse files
authored
feat: implement content-md5 for s3 (#6508)
* feat: implement content-md5 for s3 Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * move to checksum algo Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * avoid copying content Signed-off-by: Ruihang Xia <waynestxia@gmail.com> --------- Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
1 parent 1a7dd56 commit 7f9bef2

File tree

5 files changed

+33
-0
lines changed

5 files changed

+33
-0
lines changed

core/src/raw/http_util/header.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,21 @@ pub fn format_content_md5(bs: &[u8]) -> String {
221221
general_purpose::STANDARD.encode(hasher.finalize())
222222
}
223223

224+
/// format content md5 header by given iter of bytes.
225+
pub fn format_content_md5_iter<I>(bs: I) -> String
226+
where
227+
I: IntoIterator,
228+
I::Item: AsRef<[u8]>,
229+
{
230+
let mut hasher = md5::Md5::new();
231+
232+
for b in bs {
233+
hasher.update(b.as_ref());
234+
}
235+
236+
general_purpose::STANDARD.encode(hasher.finalize())
237+
}
238+
224239
/// format authorization header by basic auth.
225240
///
226241
/// # Errors

core/src/raw/http_util/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ pub use header::build_header_value;
3838
pub use header::format_authorization_by_basic;
3939
pub use header::format_authorization_by_bearer;
4040
pub use header::format_content_md5;
41+
pub use header::format_content_md5_iter;
4142
pub use header::parse_content_disposition;
4243
pub use header::parse_content_encoding;
4344
pub use header::parse_content_length;

core/src/services/s3/backend.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -771,6 +771,7 @@ impl Builder for S3Builder {
771771

772772
let checksum_algorithm = match self.config.checksum_algorithm.as_deref() {
773773
Some("crc32c") => Some(ChecksumAlgorithm::Crc32c),
774+
Some("md5") => Some(ChecksumAlgorithm::Md5),
774775
None => None,
775776
v => {
776777
return Err(Error::new(

core/src/services/s3/core.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,7 @@ impl S3Core {
275275
.for_each(|b| crc = crc32c::crc32c_append(crc, &b));
276276
Some(BASE64_STANDARD.encode(crc.to_be_bytes()))
277277
}
278+
Some(ChecksumAlgorithm::Md5) => Some(format_content_md5_iter(body.clone())),
278279
}
279280
}
280281
pub fn insert_checksum_header(
@@ -588,6 +589,12 @@ impl S3Core {
588589
// Set SSE headers.
589590
req = self.insert_sse_headers(req, true);
590591

592+
// Calculate Checksum.
593+
if let Some(checksum) = self.calculate_checksum(&body) {
594+
// Set Checksum header.
595+
req = self.insert_checksum_header(req, &checksum);
596+
}
597+
591598
// Inject operation to the request.
592599
req = req.extension(Operation::Write);
593600

@@ -1251,11 +1258,14 @@ pub struct ListObjectVersionsOutputDeleteMarker {
12511258

12521259
pub enum ChecksumAlgorithm {
12531260
Crc32c,
1261+
/// Mapping to the `Content-MD5` header from S3.
1262+
Md5,
12541263
}
12551264
impl ChecksumAlgorithm {
12561265
pub fn to_header_name(&self) -> HeaderName {
12571266
match self {
12581267
Self::Crc32c => HeaderName::from_static("x-amz-checksum-crc32c"),
1268+
Self::Md5 => HeaderName::from_static("content-md5"),
12591269
}
12601270
}
12611271
}
@@ -1266,6 +1276,7 @@ impl Display for ChecksumAlgorithm {
12661276
"{}",
12671277
match self {
12681278
Self::Crc32c => "CRC32C",
1279+
Self::Md5 => "MD5",
12691280
}
12701281
)
12711282
}

core/src/services/s3/writer.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,11 @@ impl oio::MultipartWrite for S3Writer {
172172
etag: p.etag.clone(),
173173
checksum_crc32c: p.checksum.clone(),
174174
},
175+
ChecksumAlgorithm::Md5 => CompleteMultipartUploadRequestPart {
176+
part_number: p.part_number,
177+
etag: p.etag.clone(),
178+
..Default::default()
179+
},
175180
},
176181
})
177182
.collect();

0 commit comments

Comments
 (0)