Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion mountpoint-s3-client/src/s3_crt_client/get_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,16 @@ impl S3CrtClient {
.map_err(S3RequestError::construction_failure)?;

let mut options = message.into_options(S3Operation::GetObject);
options.part_size(self.inner.read_part_size as u64);

// Use the client read part size, or the requested range if smaller.
Copy link
Contributor

@mansi153 mansi153 Aug 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we pls add a slightly more elaborate explanation of why we're doing this, for future context? The PR description is sufficient, however that sometimes gets lost when we refactor code for unrelated reasons :(

let part_size = (self.inner.read_part_size as u64).min(
params
.range
.as_ref()
.map(|range| range.end - range.start)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means the first_request_stream will always request the part size min(INITIAL_READ_WINDOW_SIZE, object_size), right?

Few questions:

  • Is it the right understanding that the new logic also makes the splitting of metarequests more sensible, given we always start the second metarequest with range [INITIAL_READ_WINDOW_SIZE, end of object] (and not [PART_SIZE, end of object]) even though the first request has caused CRT to return up to part size?
  • I believe this should also reduce the ttfb for the first read?
  • Do we have this risk of blocked memory in writes too, and do we intend to make any changes there to reduce default part-size etc.?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means the first_request_stream will always request the part size min(INITIAL_READ_WINDOW_SIZE, object_size), right?

Right.

Few questions:

  • Is it the right understanding that the new logic also makes the splitting of metarequests more sensible, given we always start the second metarequest with range [INITIAL_READ_WINDOW_SIZE, end of object] (and not [PART_SIZE, end of object]) even though the first request has caused CRT to return up to part size?

That is not changed. We are still requesting INITIAL_READ_WINDOW_SIZE bytes as before, the only difference is that we also tell the CRT to use INITIAL_READ_WINDOW_SIZE as the part size.

  • I believe this should also reduce the ttfb for the first read?

No changes either.

  • Do we have this risk of blocked memory in writes too, and do we intend to make any changes there to reduce default part-size etc.?

That's always been the case, but for writes we do not know the size in advance, so no trivial fix there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is not changed. We are still requesting INITIAL_READ_WINDOW_SIZE bytes as before, the only difference is that we also tell the CRT to use INITIAL_READ_WINDOW_SIZE as the part size.

Right! But won't this also change the S3 request range (and hence, size) now that we're requesting a 1.128M part instead of an 8M part? Previously we would receive data upto 8M offset but then re-request the 1.128M-8M range in the second meta request. Now we only receive 1.128M in the first go because it's the part size. So that is a change with this fix, no?

(^My comment about reduced ttfb was also based on this understanding, that smaller part GET => smaller S3 latency)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We were already only requesting INITIAL_READ_WINDOW_SIZE (1.125 MiB) before the change. read_part_size only comes into play for the buffer reservation, not for the data that is downloaded.

.unwrap_or(u64::MAX),
);
options.part_size(part_size);

let mut headers_sender = Some(event_sender.clone());
let part_sender = event_sender.clone();
Expand Down
95 changes: 93 additions & 2 deletions mountpoint-s3-client/tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,15 @@

pub mod common;

use std::collections::HashMap;
use std::sync::{Arc, Mutex};

use aws_sdk_s3::primitives::ByteStream;
use common::*;
use mountpoint_s3_client::S3CrtClient;
use mountpoint_s3_client::config::S3ClientConfig;
use futures::{StreamExt as _, pin_mut};
use mountpoint_s3_client::config::{MemoryPool, MetaRequestType, S3ClientConfig};
use mountpoint_s3_client::types::{GetBodyPart, GetObjectParams};
use mountpoint_s3_client::{ObjectClient, S3CrtClient};

#[tokio::test]
async fn test_create_client_twice() {
Expand All @@ -15,3 +21,88 @@ async fn test_create_client_twice() {
let _client = S3CrtClient::new(config.clone()).expect("could not create test client");
}
}

#[tokio::test]
async fn test_memory_pool_get_buffer_sizes() {
const PART_SIZE: usize = 8 * 1024 * 1024;
const OBJECT_SIZE: usize = 1024 * 1024;

let sdk_client = get_test_sdk_client().await;
let (bucket, prefix) = get_test_bucket_and_prefix("test_memory_pool_get_buffer_sizes");

let size = 1024 * 1024;
let key = format!("{prefix}/test");
let body = vec![0x42u8; size];
sdk_client
.put_object()
.bucket(&bucket)
.key(&key)
.body(ByteStream::from(body.clone()))
.send()
.await
.unwrap();

let pool = TrackingPool::new();
let config = S3ClientConfig::new()
.endpoint_config(get_test_endpoint_config())
.part_size(PART_SIZE)
.memory_pool(pool.clone());
let client = S3CrtClient::new(config.clone()).expect("could not create test client");

let result = client
.get_object(
&bucket,
&key,
&GetObjectParams::new().range(Some(0..(OBJECT_SIZE as u64))),
)
.await
.expect("get_object should succeed");

pin_mut!(result);
while let Some(r) = result.next().await {
let GetBodyPart { .. } = r.expect("get_object body part failed");
}

let buffers_count = pool.requests();
assert_eq!(&buffers_count, &[(MetaRequestType::GetObject, OBJECT_SIZE, 1)]);
}

#[derive(Debug, Clone)]
struct TrackingPool {
requests: Arc<Mutex<HashMap<(MetaRequestType, usize), usize>>>,
}

impl TrackingPool {
fn new() -> Self {
TrackingPool {
requests: Default::default(),
}
}

fn requests(&self) -> Vec<(MetaRequestType, usize, usize)> {
self.requests
.lock()
.unwrap()
.iter()
.map(|(&(typ, size), &count)| (typ, size, count))
.collect()
}
}

impl MemoryPool for TrackingPool {
type Buffer = Box<[u8]>;

fn get_buffer(&self, size: usize, meta_request_type: MetaRequestType) -> Self::Buffer {
*self
.requests
.lock()
.unwrap()
.entry((meta_request_type, size))
.or_default() += 1;
vec![0u8; size].into_boxed_slice()
}

fn trim(&self) -> bool {
false
}
}
2 changes: 1 addition & 1 deletion mountpoint-s3-crt/src/s3/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ impl Default for MetaRequestOptions<'_> {

/// What transformation to apply to a single [MetaRequest] to transform it into a collection of
/// requests to S3.
#[derive(Debug)]
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum MetaRequestType {
/// Send the request as-is (no transformation)
Default,
Expand Down
1 change: 1 addition & 0 deletions mountpoint-s3/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ fn mount(args: CliArgs, client_builder: impl ClientBuilder) -> anyhow::Result<Fu
// Set up a paged memory pool
let pool = PagedPool::new_with_candidate_sizes([
args.cache_block_size_in_bytes() as usize,
mountpoint_s3_fs::s3::config::INITIAL_READ_WINDOW_SIZE,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding, this implies that for objects with [INITIAL_READ_WINDOW_SIZE < objects_size < read_part_size], we will still be using a peak memory of [INITIAL_READ_WINDOW_SIZE + part_size] right (because we advance the CRT backpressure window halfway through the first metarequest being read)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only if 2 * INITIAL_READ_WINDOW_SIZE <= object_size < read_part_size. What matters is the range on the second request, which will be object_size - INITIAL_READ_WINDOW_SIZE. If that is greater than INITIAL_READ_WINDOW_SIZE, it will require a full read_part_size buffer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right, yes, makes sense

client_config.part_config.read_size_bytes,
client_config.part_config.write_size_bytes,
]);
Expand Down
Loading