Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 46 additions & 3 deletions mountpoint-s3-client/src/mock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
use std::borrow::Cow;
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::fmt::Write;
use std::ops::Range;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, LazyLock, RwLock};
use std::task::{Context, Poll};
use std::time::{Duration, SystemTime};
Expand Down Expand Up @@ -67,6 +68,7 @@ pub struct MockClientConfig {
enable_backpressure: bool,
initial_read_window_size: usize,
enable_rename: bool,
fail_on_non_aligned_read_window: bool,
}

impl MockClientConfig {
Expand Down Expand Up @@ -106,6 +108,12 @@ impl MockClientConfig {
self
}

/// Enable a check to ensure all read window increments are aligned with part boundaries
pub fn fail_on_non_aligned_read_window(mut self, enable: bool) -> Self {
self.fail_on_non_aligned_read_window = enable;
self
}

/// Build the MockClient
pub fn build(self) -> MockClient {
MockClient::new(self)
Expand All @@ -120,6 +128,7 @@ pub struct MockClient {
objects: Arc<RwLock<BTreeMap<String, MockObject>>>,
in_progress_uploads: Arc<RwLock<BTreeSet<String>>>,
operation_counts: Arc<RwLock<HashMap<Operation, u64>>>,
read_window_increment_failed: Arc<AtomicBool>,
}

fn add_object(objects: &Arc<RwLock<BTreeMap<String, MockObject>>>, key: &str, value: MockObject) {
Expand All @@ -129,11 +138,13 @@ fn add_object(objects: &Arc<RwLock<BTreeMap<String, MockObject>>>, key: &str, va
impl MockClient {
/// Create a new [MockClient] with the given config
pub fn new(config: MockClientConfig) -> Self {
let read_window_increment_failed = Arc::new(AtomicBool::new(false));
Self {
config,
objects: Default::default(),
in_progress_uploads: Default::default(),
operation_counts: Default::default(),
read_window_increment_failed,
}
}

Expand Down Expand Up @@ -720,11 +731,25 @@ fn validate_checksum(
#[derive(Clone, Debug)]
pub struct MockBackpressureHandle {
read_window_end_offset: Arc<AtomicU64>,
request_range: Range<u64>,
part_size: u64,
read_window_increment_failed: Arc<AtomicBool>,
}

impl ClientBackpressureHandle for MockBackpressureHandle {
fn increment_read_window(&mut self, len: usize) {
self.read_window_end_offset.fetch_add(len as u64, Ordering::SeqCst);
let prev_read_window_end_offset = self.read_window_end_offset.fetch_add(len as u64, Ordering::SeqCst);
let read_window_end_offset = prev_read_window_end_offset + len as u64;
let relative_read_window_end = read_window_end_offset - self.request_range.start;
if read_window_end_offset < self.request_range.end && !relative_read_window_end.is_multiple_of(self.part_size) {
tracing::warn!(
relative_read_window_end,
self.part_size,
self.request_range.end,
"read window is not aligned with part boundaries",
);
self.read_window_increment_failed.store(true, Ordering::SeqCst);
}
}

fn ensure_read_window(&mut self, desired_end_offset: u64) {
Expand All @@ -744,6 +769,8 @@ pub struct MockGetObjectResponse {
length: usize,
part_size: usize,
backpressure_handle: Option<MockBackpressureHandle>,
read_window_increment_failed: Arc<AtomicBool>,
fail_on_non_aligned_read_window: bool,
}

impl MockGetObjectResponse {
Expand Down Expand Up @@ -783,6 +810,15 @@ impl Stream for MockGetObjectResponse {
type Item = ObjectClientResult<GetBodyPart, GetObjectError, MockClientError>;

fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.fail_on_non_aligned_read_window && self.read_window_increment_failed.load(Ordering::SeqCst) {
// Return an error for this and all future requests made by this client.
// This ensures the error is observed by the user, since errors that occur
// during readahead are not propagated to userspace and may otherwise be missed.
return Poll::Ready(Some(Err(ObjectClientError::ClientError(MockClientError(
"read window increment failed".into(),
)))));
}

if self.length == 0 {
return Poll::Ready(None);
}
Expand Down Expand Up @@ -930,7 +966,12 @@ impl ObjectClient for MockClient {
let read_window_end_offset = Arc::new(AtomicU64::new(
next_offset + self.config.initial_read_window_size as u64,
));
Some(MockBackpressureHandle { read_window_end_offset })
Some(MockBackpressureHandle {
read_window_end_offset,
request_range: next_offset..next_offset + length as u64,
part_size: self.read_part_size() as u64,
read_window_increment_failed: self.read_window_increment_failed.clone(),
})
} else {
None
};
Expand All @@ -940,6 +981,8 @@ impl ObjectClient for MockClient {
length,
part_size: self.config.part_size,
backpressure_handle,
read_window_increment_failed: self.read_window_increment_failed.clone(),
fail_on_non_aligned_read_window: self.config.fail_on_non_aligned_read_window,
})
} else {
Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey(
Expand Down
64 changes: 62 additions & 2 deletions mountpoint-s3-fs/src/prefetch/backpressure_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ pub struct BackpressureConfig {
pub read_window_size_multiplier: usize,
/// Request range to apply backpressure
pub request_range: Range<u64>,
/// Enable alignment of read window end to part boundary
pub align_read_window: bool,
}

/// A [BackpressureController] should be given to consumers of a byte stream.
Expand All @@ -50,6 +52,9 @@ pub struct BackpressureController {
read_window_end_offset: u64,
/// Next offset of the data to be read, relative to the start of the S3 object.
next_read_offset: u64,
/// Start offset of the second "main" request to S3. This helps in aligning window ends of the
/// request with part boundaries.
second_request_start: u64,
/// End offset within the S3 object for the request.
///
/// The request can return data up to this offset *exclusively*.
Expand All @@ -58,6 +63,8 @@ pub struct BackpressureController {
///
/// For example, when memory is low we should scale down [Self::preferred_read_window_size].
mem_limiter: Arc<MemoryLimiter>,
/// Enable alignment of read window end to part boundary
align_read_window: bool,
}

/// The [BackpressureLimiter] is used on producer side of a stream, for example,
Expand Down Expand Up @@ -100,8 +107,10 @@ pub fn new_backpressure_controller(
read_window_size_multiplier: config.read_window_size_multiplier.max(MIN_WINDOW_SIZE_MULTIPLIER),
read_window_end_offset,
next_read_offset: config.request_range.start,
second_request_start: config.request_range.start + config.initial_read_window_size as u64,
request_end_offset: config.request_range.end,
mem_limiter,
align_read_window: config.align_read_window,
};

let limiter = BackpressureLimiter {
Expand Down Expand Up @@ -136,8 +145,17 @@ impl BackpressureController {
{
let new_read_window_end_offset = self
.next_read_offset
.saturating_add(self.preferred_read_window_size as u64)
.min(self.request_end_offset);
.saturating_add(self.preferred_read_window_size as u64);
let new_read_window_end_offset = Self::round_up_to_part_boundary(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the right place? Or should we align in scale_up and scale_down, where we already modify preferred_read_window_size?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the existing incrementing logic, I think, it is the right place. Above this line we "blindly" adding preferred_read_window_size to new_read_window_end_offset. Having a fixed preferred_read_window_size won't work for arbitrary new_read_window_end_offset, which is dictated by the reading side (e.g. we may need to increment the window at offset 4,194,305 or 4,194,306 depending on how much data was read).

new_read_window_end_offset,
self.second_request_start,
self.min_read_window_size as u64,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are assuming this is actually the read part size, right? We should make it explicit.

self.align_read_window,
);
// NOTE: in the end of the object we still have up to "part_size" of unaccounted memory.
// For more accurate memory limiting we could reserve in "full parts", but that would make
// release more complicated. So accept this inaccuracy, and reserve only till `request_end_offset`.
let new_read_window_end_offset = new_read_window_end_offset.min(self.request_end_offset);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try to avoid shadowing here.

// We can skip if the new `read_window_end_offset` is less than or equal to the current one, this
// could happen after the read window is scaled down.
if new_read_window_end_offset <= self.read_window_end_offset {
Expand Down Expand Up @@ -231,6 +249,33 @@ impl BackpressureController {
.record((self.preferred_read_window_size / 1024 / 1024) as f64);
}
}

/// Round up `read_window_end` to next part boundary (relative to request start).
///
/// This function ensures that read window end offsets are aligned to part boundaries for the second request,
/// which helps optimize memory usage.
///
/// If the `read_window_end` is before the second request start or `align_read_window` is false, it returns the `read_window_end` unchanged.
///
/// Note: window excludes the last byte, denoted by `read_window_end`.
fn round_up_to_part_boundary(
read_window_end: u64,
second_req_start: u64,
part_size: u64,
align_read_window: bool,
) -> u64 {
if align_read_window && read_window_end > second_req_start {
let relative_end_offset = read_window_end - second_req_start;
if !relative_end_offset.is_multiple_of(part_size) {
let aligned_relative_offset = part_size * (relative_end_offset / part_size + 1);
second_req_start + aligned_relative_offset
} else {
read_window_end
}
} else {
read_window_end
}
}
}

impl Drop for BackpressureController {
Expand Down Expand Up @@ -346,6 +391,7 @@ mod tests {
max_read_window_size: 2 * 1024 * 1024 * 1024,
read_window_size_multiplier,
request_range,
align_read_window: true,
};

let (mut backpressure_controller, _backpressure_limiter) =
Expand Down Expand Up @@ -373,6 +419,7 @@ mod tests {
max_read_window_size: 2 * 1024 * 1024 * 1024,
read_window_size_multiplier,
request_range,
align_read_window: true,
};

let (mut backpressure_controller, _backpressure_limiter) =
Expand Down Expand Up @@ -402,6 +449,7 @@ mod tests {
max_read_window_size: 2 * GIB,
read_window_size_multiplier: 2,
request_range: 0..(5 * GIB as u64),
align_read_window: true,
};

let (mut backpressure_controller, mut backpressure_limiter) =
Expand Down Expand Up @@ -434,6 +482,18 @@ mod tests {
});
}

#[test_case(500, 1000, 100, 500; "offset before second request start")]
#[test_case(1000, 1000, 512, 1000; "offset at second request start")]
#[test_case(1500, 1000, 512, 1512; "offset after second request start, needs rounding up")]
#[test_case(2024, 1000, 512, 2024; "offset after second request start, already aligned")]
#[test_case(1001, 1000, 512, 1512; "offset just after second request start, needs rounding up")]
#[test_case(1512, 1000, 512, 1512; "offset exactly at part boundary")]
#[test_case(1513, 1000, 512, 2024; "offset just past part boundary")]
fn test_round_up_to_part_boundary(offset: u64, second_req_start: u64, part_size: u64, expected: u64) {
let result = BackpressureController::round_up_to_part_boundary(offset, second_req_start, part_size, true);
assert_eq!(result, expected);
}

fn new_backpressure_controller_for_test(
backpressure_config: BackpressureConfig,
) -> (BackpressureController, BackpressureLimiter) {
Expand Down
1 change: 1 addition & 0 deletions mountpoint-s3-fs/src/prefetch/caching_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ where
max_read_window_size: config.max_read_window_size,
read_window_size_multiplier: config.read_window_size_multiplier,
request_range: range.into(),
align_read_window: false, // we don't know where S3 request starts, so can not align the read window
};
let (backpressure_controller, backpressure_limiter) =
new_backpressure_controller(backpressure_config, self.mem_limiter.clone());
Expand Down
1 change: 1 addition & 0 deletions mountpoint-s3-fs/src/prefetch/part_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ impl<Client: ObjectClient + Clone + Send + Sync + 'static> ObjectPartStream<Clie
max_read_window_size: config.max_read_window_size,
read_window_size_multiplier: config.read_window_size_multiplier,
request_range: range.into(),
align_read_window: true,
};
let (backpressure_controller, mut backpressure_limiter) =
new_backpressure_controller(backpressure_config, self.mem_limiter.clone());
Expand Down
12 changes: 11 additions & 1 deletion mountpoint-s3-fs/tests/common/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,17 @@ pub struct TestSessionConfig {
pub cache_block_size: usize,
#[cfg(feature = "manifest")]
pub manifest: Option<Manifest>,
pub fail_on_non_aligned_read_window: bool,
}

impl Default for TestSessionConfig {
fn default() -> Self {
let part_size = 8 * 1024 * 1024;
let initial_read_window_size = 1024 * 1024 + 128 * 1024;
let cache_block_size = 1024 * 1024;
Self {
part_size,
initial_read_window_size: part_size,
initial_read_window_size,
filesystem_config: Default::default(),
auth_config: Default::default(),
pass_fuse_fd: false,
Expand All @@ -91,6 +93,7 @@ impl Default for TestSessionConfig {
cache_block_size,
#[cfg(feature = "manifest")]
manifest: None,
fail_on_non_aligned_read_window: false,
}
}
}
Expand All @@ -105,6 +108,12 @@ impl TestSessionConfig {
self.pass_fuse_fd = pass_fuse_fd;
self
}

/// Enable a check to ensure all read window increments are aligned with part boundaries
pub fn fail_on_non_aligned_read_window(mut self, enable: bool) -> Self {
self.fail_on_non_aligned_read_window = enable;
self
}
}

// Holds resources for the testing session and cleans them on drop.
Expand Down Expand Up @@ -307,6 +316,7 @@ pub mod mock_session {
.enable_backpressure(true)
.initial_read_window_size(test_config.initial_read_window_size)
.enable_rename(test_config.filesystem_config.allow_rename)
.fail_on_non_aligned_read_window(test_config.fail_on_non_aligned_read_window)
.build(),
);
let runtime = Runtime::new(ThreadPool::builder().pool_size(1).create().unwrap());
Expand Down
Loading
Loading