Skip to content

Commit a00f75d

Browse files
committed
made client Arc
added append_object from express-zone
1 parent 300557c commit a00f75d

38 files changed

+917
-17
lines changed

.github/workflows/rust.yml

+13-3
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,22 @@ jobs:
2323
cargo clippy --all-targets --all-features
2424
cargo build --bins --examples --tests --benches --verbose
2525
26-
- name: Run tests
26+
- name: Run tests S3
2727
run: |
28-
./tests/start-server.sh
28+
./tests/start-server.sh s3
2929
export SERVER_ENDPOINT=localhost:9000
3030
export ACCESS_KEY=minioadmin
3131
export SECRET_KEY=minioadmin
3232
export ENABLE_HTTPS=1
3333
export SSL_CERT_FILE=./tests/public.crt
34-
cargo test --verbose -- --nocapture
34+
cargo test --verbose -- --nocapture :s3:
35+
36+
- name: Run tests S3Express
37+
run: |
38+
./tests/start-server.sh s3express
39+
export SERVER_ENDPOINT=localhost:9000
40+
export ACCESS_KEY=minioadmin
41+
export SECRET_KEY=minioadmin
42+
export ENABLE_HTTPS=1
43+
export SSL_CERT_FILE=./tests/public.crt
44+
cargo test --verbose -- --nocapture :s3express:

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ async-std = { version = "1.13.1", features = ["attributes", "tokio1"] }
6262
clap = { version = "4.5.35", features = ["derive"] }
6363
quickcheck = "1.0.3"
6464
criterion = "0.5.1"
65+
test-tag = "0.1.4"
6566

6667
[lib]
6768
name = "minio"

src/s3/builders.rs

+2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
//! Argument builders for [minio::s3::client::Client](crate::s3::client::Client) APIs
1717
18+
mod append_object;
1819
mod bucket_common;
1920
mod bucket_exists;
2021
mod copy_object;
@@ -65,6 +66,7 @@ mod set_object_tags;
6566
mod stat_object;
6667

6768
pub use crate::s3::object_content::*;
69+
pub use append_object::*;
6870
pub use bucket_common::*;
6971
pub use bucket_exists::*;
7072
pub use copy_object::*;

src/s3/builders/append_object.rs

+334
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,334 @@
1+
// MinIO Rust Library for Amazon S3 Compatible Cloud Storage
2+
// Copyright 2025 MinIO, Inc.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
use crate::s3::Client;
17+
use crate::s3::builders::{
18+
ContentStream, MAX_MULTIPART_COUNT, ObjectContent, Size, calc_part_info,
19+
};
20+
use crate::s3::error::Error;
21+
use crate::s3::response::{AppendObjectResponse, StatObjectResponse};
22+
use crate::s3::segmented_bytes::SegmentedBytes;
23+
use crate::s3::sse::Sse;
24+
use crate::s3::types::{S3Api, S3Request, ToS3Request};
25+
use crate::s3::utils::{Multimap, check_bucket_name, check_object_name};
26+
use http::Method;
27+
use std::sync::Arc;
28+
29+
// region: append-object
30+
#[derive(Debug, Clone, Default)]
31+
pub struct AppendObject {
32+
client: Arc<Client>,
33+
34+
extra_headers: Option<Multimap>,
35+
extra_query_params: Option<Multimap>,
36+
bucket: String,
37+
object: String,
38+
39+
region: Option<String>,
40+
sse: Option<Arc<dyn Sse>>,
41+
data: SegmentedBytes,
42+
43+
/// value of x-amz-write-offset-bytes
44+
offset_bytes: u64,
45+
}
46+
47+
impl AppendObject {
48+
pub fn new(client: &Arc<Client>, bucket: &str, object: &str, data: SegmentedBytes) -> Self {
49+
AppendObject {
50+
client: Arc::clone(client),
51+
bucket: bucket.to_owned(),
52+
object: object.to_owned(),
53+
data,
54+
..Default::default()
55+
}
56+
}
57+
58+
pub fn extra_headers(mut self, extra_headers: Option<Multimap>) -> Self {
59+
self.extra_headers = extra_headers;
60+
self
61+
}
62+
63+
pub fn extra_query_params(mut self, extra_query_params: Option<Multimap>) -> Self {
64+
self.extra_query_params = extra_query_params;
65+
self
66+
}
67+
68+
pub fn offset_bytes(mut self, offset_bytes: u64) -> Self {
69+
self.offset_bytes = offset_bytes;
70+
self
71+
}
72+
}
73+
74+
impl S3Api for AppendObject {
75+
type S3Response = AppendObjectResponse;
76+
}
77+
78+
impl ToS3Request for AppendObject {
79+
fn to_s3request(self) -> Result<S3Request, Error> {
80+
{
81+
check_bucket_name(&self.bucket, true)?;
82+
check_object_name(&self.object)?;
83+
84+
if let Some(v) = &self.sse {
85+
if v.tls_required() && !self.client.is_secure() {
86+
return Err(Error::SseTlsRequired(None));
87+
}
88+
}
89+
}
90+
91+
let mut headers: Multimap = self.extra_headers.unwrap_or_default();
92+
headers.insert(
93+
"x-amz-write-offset-bytes".into(),
94+
self.offset_bytes.to_string(),
95+
);
96+
97+
Ok(S3Request::new(self.client, Method::PUT)
98+
.region(self.region)
99+
.bucket(Some(self.bucket))
100+
.query_params(self.extra_query_params.unwrap_or_default())
101+
.object(Some(self.object))
102+
.headers(headers)
103+
.body(Some(self.data)))
104+
}
105+
}
106+
// endregion: append-object
107+
108+
// region: append-object-content
109+
110+
/// AppendObjectContent takes a `ObjectContent` stream and appends it to MinIO/S3.
111+
///
112+
/// It is a higher level API and handles multipart appends transparently.
113+
pub struct AppendObjectContent {
114+
client: Arc<Client>,
115+
116+
extra_headers: Option<Multimap>,
117+
extra_query_params: Option<Multimap>,
118+
region: Option<String>,
119+
bucket: String,
120+
object: String,
121+
sse: Option<Arc<dyn Sse>>,
122+
part_size: Size,
123+
124+
// source data
125+
input_content: ObjectContent,
126+
127+
// Computed.
128+
content_stream: ContentStream,
129+
part_count: Option<u16>,
130+
131+
/// Value of x-amz-write-offset-bytes
132+
offset_bytes: u64,
133+
}
134+
135+
impl AppendObjectContent {
136+
pub fn new(
137+
client: &Arc<Client>,
138+
bucket: &str,
139+
object: &str,
140+
content: impl Into<ObjectContent>,
141+
) -> Self {
142+
AppendObjectContent {
143+
client: Arc::clone(client),
144+
bucket: bucket.to_owned(),
145+
object: object.to_owned(),
146+
input_content: content.into(),
147+
extra_headers: None,
148+
extra_query_params: None,
149+
region: None,
150+
sse: None,
151+
part_size: Size::Unknown,
152+
content_stream: ContentStream::empty(),
153+
part_count: None,
154+
offset_bytes: 0,
155+
}
156+
}
157+
158+
pub fn extra_headers(mut self, extra_headers: Option<Multimap>) -> Self {
159+
self.extra_headers = extra_headers;
160+
self
161+
}
162+
163+
pub fn extra_query_params(mut self, extra_query_params: Option<Multimap>) -> Self {
164+
self.extra_query_params = extra_query_params;
165+
self
166+
}
167+
168+
pub fn region(mut self, region: Option<String>) -> Self {
169+
self.region = region;
170+
self
171+
}
172+
173+
pub fn part_size(mut self, part_size: impl Into<Size>) -> Self {
174+
self.part_size = part_size.into();
175+
self
176+
}
177+
178+
pub fn offset_bytes(mut self, offset_bytes: u64) -> Self {
179+
self.offset_bytes = offset_bytes;
180+
self
181+
}
182+
183+
pub async fn send(mut self) -> Result<AppendObjectResponse, Error> {
184+
{
185+
check_bucket_name(&self.bucket, true)?;
186+
check_object_name(&self.object)?;
187+
if let Some(v) = &self.sse {
188+
if v.tls_required() && !self.client.is_secure() {
189+
return Err(Error::SseTlsRequired(None));
190+
}
191+
}
192+
}
193+
194+
{
195+
let mut headers: Multimap = match self.extra_headers {
196+
Some(ref headers) => headers.clone(),
197+
None => Multimap::new(),
198+
};
199+
headers.insert(
200+
String::from("x-amz-write-offset-bytes"),
201+
self.offset_bytes.to_string(),
202+
);
203+
self.extra_query_params = Some(headers);
204+
}
205+
206+
self.content_stream = std::mem::take(&mut self.input_content)
207+
.to_content_stream()
208+
.await
209+
.map_err(Error::IOError)?;
210+
211+
// object_size may be Size::Unknown.
212+
let object_size = self.content_stream.get_size();
213+
214+
let (part_size, n_expected_parts) = calc_part_info(object_size, self.part_size)?;
215+
// Set the chosen part size and part count.
216+
self.part_size = Size::Known(part_size);
217+
self.part_count = n_expected_parts;
218+
219+
// Read the first part.
220+
let seg_bytes = self.content_stream.read_upto(part_size as usize).await?;
221+
222+
// get the length (if any) of the current file
223+
let resp: StatObjectResponse = self
224+
.client
225+
.stat_object(&self.bucket, &self.object)
226+
.send()
227+
.await?;
228+
println!("statObjectResponse={:#?}", resp);
229+
230+
let current_file_size = resp.size;
231+
232+
// In the first part read, if:
233+
//
234+
// - object_size is unknown AND we got less than the part size, OR
235+
// - we are expecting only one part to be uploaded,
236+
//
237+
// we upload it as a simple put object.
238+
if (object_size.is_unknown() && (seg_bytes.len() as u64) < part_size)
239+
|| n_expected_parts == Some(1)
240+
{
241+
let ao = AppendObject {
242+
client: self.client,
243+
extra_headers: self.extra_headers,
244+
extra_query_params: self.extra_query_params,
245+
bucket: self.bucket,
246+
object: self.object,
247+
region: self.region,
248+
offset_bytes: current_file_size,
249+
sse: self.sse,
250+
data: seg_bytes,
251+
};
252+
ao.send().await
253+
} else if object_size.is_known() && (seg_bytes.len() as u64) < part_size {
254+
// Not enough data!
255+
let expected = object_size.as_u64().unwrap();
256+
let got = seg_bytes.len() as u64;
257+
Err(Error::InsufficientData(expected, got))
258+
} else {
259+
// Otherwise, we start a multipart append.
260+
self.send_mpa(part_size, current_file_size, seg_bytes).await
261+
}
262+
}
263+
264+
/// multipart append
265+
async fn send_mpa(
266+
&mut self,
267+
part_size: u64,
268+
object_size: u64,
269+
first_part: SegmentedBytes,
270+
) -> Result<AppendObjectResponse, Error> {
271+
let mut done = false;
272+
let mut part_number = 0;
273+
274+
let mut last_resp: Option<AppendObjectResponse> = None;
275+
let mut next_offset_bytes: u64 = object_size;
276+
println!("initial offset_bytes: {}", next_offset_bytes);
277+
278+
let mut first_part = Some(first_part);
279+
while !done {
280+
let part_content: SegmentedBytes = {
281+
if let Some(v) = first_part.take() {
282+
v
283+
} else {
284+
self.content_stream.read_upto(part_size as usize).await?
285+
}
286+
};
287+
part_number += 1;
288+
let buffer_size = part_content.len() as u64;
289+
290+
assert!(
291+
buffer_size <= part_size,
292+
"{:?} <= {:?}",
293+
buffer_size,
294+
part_size
295+
);
296+
297+
if buffer_size == 0 && part_number > 1 {
298+
// We are done as we appended at least 1 part and we have
299+
// reached the end of the stream.
300+
break;
301+
}
302+
303+
// Check if we have too many parts to upload.
304+
if self.part_count.is_none() && part_number > MAX_MULTIPART_COUNT {
305+
return Err(Error::TooManyParts);
306+
}
307+
308+
// Append the part now.
309+
let append_object = AppendObject {
310+
client: self.client.clone(),
311+
extra_headers: self.extra_headers.clone(),
312+
extra_query_params: self.extra_query_params.clone(),
313+
bucket: self.bucket.clone(),
314+
object: self.object.clone(),
315+
region: self.region.clone(),
316+
sse: self.sse.clone(),
317+
data: part_content,
318+
offset_bytes: next_offset_bytes,
319+
};
320+
let resp: AppendObjectResponse = append_object.send().await?;
321+
println!("AppendObjectResponse: object_size={:?}", resp.object_size);
322+
323+
next_offset_bytes = resp.object_size;
324+
325+
// Finally check if we are done.
326+
if buffer_size < part_size {
327+
done = true;
328+
last_resp = Some(resp);
329+
}
330+
}
331+
Ok(last_resp.unwrap())
332+
}
333+
}
334+
// endregion: append-object-content

0 commit comments

Comments
 (0)