diff --git a/mountpoint-s3-client/src/s3_crt_client/get_object.rs b/mountpoint-s3-client/src/s3_crt_client/get_object.rs index d93a75b16..65a7ba5f7 100644 --- a/mountpoint-s3-client/src/s3_crt_client/get_object.rs +++ b/mountpoint-s3-client/src/s3_crt_client/get_object.rs @@ -77,7 +77,16 @@ impl S3CrtClient { .map_err(S3RequestError::construction_failure)?; let mut options = message.into_options(S3Operation::GetObject); - options.part_size(self.inner.read_part_size as u64); + + // Use the client read part size, or the requested range if smaller. + let part_size = (self.inner.read_part_size as u64).min( + params + .range + .as_ref() + .map(|range| range.end - range.start) + .unwrap_or(u64::MAX), + ); + options.part_size(part_size); let mut headers_sender = Some(event_sender.clone()); let part_sender = event_sender.clone(); diff --git a/mountpoint-s3-client/tests/client.rs b/mountpoint-s3-client/tests/client.rs index 8174535da..968ce136b 100644 --- a/mountpoint-s3-client/tests/client.rs +++ b/mountpoint-s3-client/tests/client.rs @@ -2,9 +2,15 @@ pub mod common; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; + +use aws_sdk_s3::primitives::ByteStream; use common::*; -use mountpoint_s3_client::S3CrtClient; -use mountpoint_s3_client::config::S3ClientConfig; +use futures::{StreamExt as _, pin_mut}; +use mountpoint_s3_client::config::{MemoryPool, MetaRequestType, S3ClientConfig}; +use mountpoint_s3_client::types::{GetBodyPart, GetObjectParams}; +use mountpoint_s3_client::{ObjectClient, S3CrtClient}; #[tokio::test] async fn test_create_client_twice() { @@ -15,3 +21,88 @@ async fn test_create_client_twice() { let _client = S3CrtClient::new(config.clone()).expect("could not create test client"); } } + +#[tokio::test] +async fn test_memory_pool_get_buffer_sizes() { + const PART_SIZE: usize = 8 * 1024 * 1024; + const OBJECT_SIZE: usize = 1024 * 1024; + + let sdk_client = get_test_sdk_client().await; + let (bucket, prefix) = get_test_bucket_and_prefix("test_memory_pool_get_buffer_sizes"); + + let size = 1024 * 1024; + let key = format!("{prefix}/test"); + let body = vec![0x42u8; size]; + sdk_client + .put_object() + .bucket(&bucket) + .key(&key) + .body(ByteStream::from(body.clone())) + .send() + .await + .unwrap(); + + let pool = TrackingPool::new(); + let config = S3ClientConfig::new() + .endpoint_config(get_test_endpoint_config()) + .part_size(PART_SIZE) + .memory_pool(pool.clone()); + let client = S3CrtClient::new(config.clone()).expect("could not create test client"); + + let result = client + .get_object( + &bucket, + &key, + &GetObjectParams::new().range(Some(0..(OBJECT_SIZE as u64))), + ) + .await + .expect("get_object should succeed"); + + pin_mut!(result); + while let Some(r) = result.next().await { + let GetBodyPart { .. } = r.expect("get_object body part failed"); + } + + let buffers_count = pool.requests(); + assert_eq!(&buffers_count, &[(MetaRequestType::GetObject, OBJECT_SIZE, 1)]); +} + +#[derive(Debug, Clone)] +struct TrackingPool { + requests: Arc>>, +} + +impl TrackingPool { + fn new() -> Self { + TrackingPool { + requests: Default::default(), + } + } + + fn requests(&self) -> Vec<(MetaRequestType, usize, usize)> { + self.requests + .lock() + .unwrap() + .iter() + .map(|(&(typ, size), &count)| (typ, size, count)) + .collect() + } +} + +impl MemoryPool for TrackingPool { + type Buffer = Box<[u8]>; + + fn get_buffer(&self, size: usize, meta_request_type: MetaRequestType) -> Self::Buffer { + *self + .requests + .lock() + .unwrap() + .entry((meta_request_type, size)) + .or_default() += 1; + vec![0u8; size].into_boxed_slice() + } + + fn trim(&self) -> bool { + false + } +} diff --git a/mountpoint-s3-crt/src/s3/client.rs b/mountpoint-s3-crt/src/s3/client.rs index a36860fae..70645e5ef 100644 --- a/mountpoint-s3-crt/src/s3/client.rs +++ b/mountpoint-s3-crt/src/s3/client.rs @@ -514,7 +514,7 @@ impl Default for MetaRequestOptions<'_> { /// What transformation to apply to a single [MetaRequest] to transform it into a collection of /// requests to S3. -#[derive(Debug)] +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] pub enum MetaRequestType { /// Send the request as-is (no transformation) Default, diff --git a/mountpoint-s3/src/run.rs b/mountpoint-s3/src/run.rs index 7eb5467af..3e0215089 100644 --- a/mountpoint-s3/src/run.rs +++ b/mountpoint-s3/src/run.rs @@ -211,6 +211,7 @@ fn mount(args: CliArgs, client_builder: impl ClientBuilder) -> anyhow::Result