Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion mountpoint-s3-fs/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ mod pool;
mod stats;

pub use buffers::{PoolBuffer, PoolBufferMut};
pub use limiter::{ActiveRead, ActiveReadGuard, BufferArea, MINIMUM_MEM_LIMIT, effective_total_memory};
pub use limiter::{
ActiveRead, ActiveReadGuard, BufferArea, CursorHandle, CursorState, MINIMUM_MEM_LIMIT, effective_total_memory,
};
pub use pool::PagedPool;
pub use stats::BufferKind;
432 changes: 248 additions & 184 deletions mountpoint-s3-fs/src/memory/limiter.rs

Large diffs are not rendered by default.

55 changes: 14 additions & 41 deletions mountpoint-s3-fs/src/memory/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::prefetch::CursorId;
use crate::sync::{Arc, RwLock};

use super::buffers::{PoolBuffer, PoolBufferMut};
use super::limiter::{ActiveReadGuard, BufferArea, MemoryLimiter};
use super::limiter::{CursorHandle, MemoryLimiter};
use super::pages::{Page, PagedBufferPtr};
use super::stats::{BufferKind, PoolStats, SizePoolStats};

Expand Down Expand Up @@ -183,57 +183,30 @@ impl PagedPool {
.sum()
}

// TODO: Refactor MemoryLimiter unit tests to remove inner_stats and inner_limiter. We should have clearly separated unit tests for the PagedPool and the MemoryLimiter.

/// Expose the internal stats for testing purposes (e.g., to wire a standalone limiter).
#[cfg(test)]
pub(crate) fn inner_stats(&self) -> &PoolStats {
&self.inner.stats
}

/// Expose the internal limiter for testing purposes.
#[cfg(test)]
pub(crate) fn inner_limiter(&self) -> &MemoryLimiter {
&self.inner.limiter
}

// ─── Delegation methods for MemoryLimiter ───────────────────────────────────

/// Reserve memory for future uses. Always succeeds (unconditional).
pub fn reserve(&self, cursor_id: CursorId, area: BufferArea, size: u64) {
self.inner.limiter.reserve(cursor_id, area, size);
}

/// Reserve memory if available. Returns `false` if over budget.
pub fn try_reserve(&self, cursor_id: CursorId, area: BufferArea, size: u64) -> bool {
self.inner.limiter.try_reserve(cursor_id, area, size, &self.inner.stats)
}

/// Release all remaining reservation for a cursor and remove it from tracking.
pub fn release_cursor(&self, cursor_id: CursorId, area: BufferArea) {
self.inner.limiter.release_cursor(cursor_id, area);
}

/// Generate a new unique CursorId.
pub fn next_cursor_id(&self) -> CursorId {
self.inner.limiter.next_cursor_id()
/// Create a new cursor and return the shared state handle.
pub fn create_cursor(&self) -> CursorHandle {
self.inner.limiter.create_cursor(self)
}

/// Query available memory.
pub fn available_mem(&self) -> u64 {
self.inner.limiter.available_mem(&self.inner.stats)
}

/// Record that a FUSE read is active for the given cursor.
/// Returns a guard that clears the active read on drop.
pub fn set_active_read(&self, cursor_id: CursorId, offset: u64, size: usize) -> ActiveReadGuard {
self.inner.limiter.set_active_read(cursor_id, offset, size)
}

/// Check if the given cursor has an active read overlapping the specified range.
pub fn has_active_read_in_range(&self, cursor_id: CursorId, offset: u64, size: usize) -> bool {
self.inner.limiter.has_active_read_in_range(cursor_id, offset, size)
}

// ─── Internal components exposed to the rest of the `memory` module. ──────────

pub(super) fn stats(&self) -> &PoolStats {
&self.inner.stats
}

pub(super) fn limiter(&self) -> &MemoryLimiter {
&self.inner.limiter
}
}

impl MemoryPool for PagedPool {
Expand Down
60 changes: 19 additions & 41 deletions mountpoint-s3-fs/src/prefetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ use tracing::trace;
use crate::checksums::{ChecksummedBytes, IntegrityError};
use crate::data_cache::DataCache;
use crate::fs::error_metadata::{ErrorMetadata, MOUNTPOINT_ERROR_CLIENT};
use crate::memory::PagedPool;
use crate::metrics::defs::FUSE_CACHE_HIT;
use crate::object::ObjectId;

Expand Down Expand Up @@ -186,7 +185,6 @@ fn determine_max_read_size() -> usize {
#[derive(Debug)]
pub struct Prefetcher<Client> {
part_stream: PartStream<Client>,
pool: PagedPool,
config: PrefetcherConfig,
}

Expand All @@ -208,27 +206,16 @@ where
}

/// Create a new [Prefetcher] from the given [ObjectPartStream] instance.
pub fn new(part_stream: PartStream<Client>, pool: PagedPool, config: PrefetcherConfig) -> Self {
Self {
part_stream,
pool,
config,
}
pub fn new(part_stream: PartStream<Client>, config: PrefetcherConfig) -> Self {
Self { part_stream, config }
}

/// Start a new prefetch request to the specified object.
pub fn prefetch(&self, bucket: String, object_id: ObjectId, size: u64) -> PrefetchGetObject<Client>
where
Client: ObjectClient + Clone + Send + Sync + 'static,
{
PrefetchGetObject::new(
self.part_stream.clone(),
self.pool.clone(),
self.config,
bucket,
object_id,
size,
)
PrefetchGetObject::new(self.part_stream.clone(), self.config, bucket, object_id, size)
}
}

Expand All @@ -239,7 +226,6 @@ where
Client: ObjectClient + Clone + Send + Sync + 'static,
{
part_stream: PartStream<Client>,
pool: PagedPool,
config: PrefetcherConfig,
bucket: String,
object_id: ObjectId,
Expand All @@ -256,15 +242,13 @@ where
/// Create and spawn a new prefetching request for an object
fn new(
part_stream: PartStream<Client>,
pool: PagedPool,
config: PrefetcherConfig,
bucket: String,
object_id: ObjectId,
size: u64,
) -> Self {
PrefetchGetObject {
part_stream,
pool,
config,
bucket,
object_id,
Expand Down Expand Up @@ -347,39 +331,33 @@ where
/// Create a new Cursor and associated backpressure GetObject request which has a range from current offset
/// to the end of the file.
fn create_cursor(&self, offset: u64) -> Result<Cursor<Client>, PrefetchReadError<Client::ClientError>> {
let start = offset;
let object_size = self.size as usize;
let read_part_size = self.part_stream.client().read_part_size();
let range = RequestRange::new(object_size, start, object_size);

// The prefetcher now relies on backpressure mechanism so it must be enabled
match self.part_stream.client().initial_read_window_size() {
Some(value) => {
// Also, make sure that we don't get blocked from the beginning
if value == 0 {
return Err(PrefetchReadError::BackpressurePreconditionFailed);
}
}
None => return Err(PrefetchReadError::BackpressurePreconditionFailed),
};
let client = self.part_stream.client();

// Validate backpressure preconditions: client must have backpressure enabled
// with an initial read window size greater than 0.
match client.initial_read_window_size() {
Some(0) | None => return Err(PrefetchReadError::BackpressurePreconditionFailed),
Some(_) => {}
}

let cursor_id = self.pool.next_cursor_id();
let config = RequestTaskConfig {
cursor_id,
let pool = self.part_stream.pool();
let cursor_handle = pool.create_cursor();
let task_config = RequestTaskConfig {
cursor_state: cursor_handle.state(),
bucket: self.bucket.clone(),
object_id: self.object_id.clone(),
range,
read_part_size,
range: RequestRange::new(object_size, offset, object_size),
read_part_size: client.read_part_size(),
preferred_part_size: self.preferred_part_size,
initial_request_size: self.config.initial_request_size,
max_read_window_size: self.config.max_read_window_size,
read_window_size_multiplier: self.config.sequential_prefetch_multiplier,
};
let request_task = self.part_stream.spawn_get_object_request(config);
let request_task = self.part_stream.spawn_request_task(task_config);
Ok(Cursor::new(
cursor_id,
request_task,
self.pool.clone(),
cursor_handle,
&self.config,
self.object_id.clone(),
offset,
Expand Down
55 changes: 14 additions & 41 deletions mountpoint-s3-fs/src/prefetch/backpressure_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use async_channel::{Receiver, RecvError, Sender, unbounded};
use humansize::make_format;
use tracing::trace;

use crate::memory::BufferArea;
use crate::memory::PagedPool;
use crate::memory::CursorState;
use crate::sync::Arc;

use super::CursorId;
use super::PrefetchReadError;
Expand Down Expand Up @@ -52,8 +52,6 @@ impl ReadWindowAlignmentConfig {
}

pub struct BackpressureConfig {
/// Id of the associated Cursor
pub cursor_id: CursorId,
/// Backpressure's initial read window size
pub initial_read_window_size: usize,
/// Minimum read window size that the backpressure controller is allowed to scale down to
Expand All @@ -71,7 +69,6 @@ pub struct BackpressureConfig {
impl BackpressureConfig {
pub fn new(config: &RequestTaskConfig, read_window_alignment_config: ReadWindowAlignmentConfig) -> Self {
Self {
cursor_id: config.cursor_id,
initial_read_window_size: config.initial_read_window_size(),
min_read_window_size: config.read_part_size,
max_read_window_size: config.max_read_window_size,
Expand Down Expand Up @@ -106,13 +103,8 @@ pub struct BackpressureController {
///
/// The request can return data up to this offset *exclusively*.
request_end_offset: u64,
/// Memory limiter is used to guide decisions on how much data to prefetch and to track
/// per-cursor memory reservations.
///
/// For example, when memory is low we should scale down [Self::preferred_read_window_size].
pool: PagedPool,
/// Unique cursor ID for per-cursor memory reservation tracking.
cursor_id: CursorId,
/// Cursor state handle for direct reservation operations.
cursor_state: Arc<CursorState>,
/// Enable alignment of read window end to part boundary
read_window_alignment_config: ReadWindowAlignmentConfig,
}
Expand Down Expand Up @@ -142,17 +134,14 @@ pub struct BackpressureLimiter {
/// informing a producer (a holder of the [BackpressureLimiter]) when it should provide data more aggressively.
pub fn new_backpressure_controller(
config: BackpressureConfig,
pool: PagedPool,
cursor_state: Arc<CursorState>,
) -> (BackpressureController, BackpressureLimiter) {
// Minimum window size multiplier as the scaling up and down won't work if the multiplier is 1.
const MIN_WINDOW_SIZE_MULTIPLIER: usize = 2;
let read_window_end_offset = config.request_range.start + config.initial_read_window_size as u64;
pool.reserve(
config.cursor_id,
BufferArea::Prefetch,
config.initial_read_window_size as u64,
);
cursor_state.reserve(config.initial_read_window_size as u64);

let cursor_id = cursor_state.id();
let (read_window_updater, read_window_increment_queue) = unbounded();
let read_window_increment_queue = ReadWindowIncrementQueue::new(read_window_increment_queue);

Expand All @@ -165,8 +154,7 @@ pub fn new_backpressure_controller(
read_window_end_offset,
next_read_offset: config.request_range.start,
request_end_offset: config.request_range.end,
cursor_id: config.cursor_id,
pool,
cursor_state,
read_window_alignment_config: config.read_window_alignment_config,
};

Expand All @@ -176,7 +164,7 @@ pub fn new_backpressure_controller(
read_window_increment_queue,
read_window_end_offset,
request_end_offset: config.request_range.end,
cursor_id: config.cursor_id,
cursor_id,
};

(controller, limiter)
Expand Down Expand Up @@ -223,18 +211,14 @@ impl BackpressureController {
// read window size.
if self.preferred_read_window_size <= self.min_read_window_size {
trace!(new_read_window_end_offset, "sending a read window increment");
self.pool
.reserve(self.cursor_id, BufferArea::Prefetch, to_increase as u64);
self.cursor_state.reserve(to_increase as u64);
self.increment_read_window(to_increase).await;
break;
}

// Try to reserve the memory for the length we want to increase before sending the request,
// scale down the read window if it fails.
if self
.pool
.try_reserve(self.cursor_id, BufferArea::Prefetch, to_increase as u64)
{
if self.cursor_state.try_reserve(to_increase as u64) {
trace!(new_read_window_end_offset, "sending a read window increment");
self.increment_read_window(to_increase).await;
break;
Expand Down Expand Up @@ -278,7 +262,7 @@ impl BackpressureController {
// because only `preferred_read_window_size` is increased but the actual read window will
// be updated later on `DataRead` event (where we do reserve memory).
let to_increase = (new_read_window_size - self.preferred_read_window_size) as u64;
let available_mem = self.pool.available_mem();
let available_mem = self.cursor_state.available_mem();
if available_mem >= to_increase {
let formatter = make_format(humansize::BINARY);
trace!(
Expand Down Expand Up @@ -313,15 +297,6 @@ impl BackpressureController {
}
}

impl Drop for BackpressureController {
fn drop(&mut self) {
// Release whatever remains of this cursor's reservation. The per-cursor counter
// tracks only the unallocated portion — pool allocations already decremented it
// via the on_reserve callback.
self.pool.release_cursor(self.cursor_id, BufferArea::Prefetch);
}
}

impl BackpressureLimiter {
pub fn read_window_end_offset(&self) -> u64 {
self.read_window_end_offset
Expand Down Expand Up @@ -420,7 +395,6 @@ mod tests {
fn test_read_window_scale_up(initial_read_window_size: usize, read_window_size_multiplier: usize) {
let request_range = 0..(5 * 1024 * 1024 * 1024);
let backpressure_config = BackpressureConfig {
cursor_id: CursorId::new_from_raw(0),
initial_read_window_size,
min_read_window_size: 8 * 1024 * 1024,
max_read_window_size: 2 * 1024 * 1024 * 1024,
Expand Down Expand Up @@ -449,7 +423,6 @@ mod tests {
fn test_read_window_scale_down(initial_read_window_size: usize, read_window_size_multiplier: usize) {
let request_range = 0..(5 * 1024 * 1024 * 1024);
let backpressure_config = BackpressureConfig {
cursor_id: CursorId::new_from_raw(0),
initial_read_window_size,
min_read_window_size: 8 * 1024 * 1024,
max_read_window_size: 2 * 1024 * 1024 * 1024,
Expand Down Expand Up @@ -480,7 +453,6 @@ mod tests {
// OK, back to basics. Just reproduce what happened, verify it passes after the fix.
#[allow(clippy::identity_op)]
let backpressure_config = BackpressureConfig {
cursor_id: CursorId::new_from_raw(0),
initial_read_window_size: 1 * MIB,
min_read_window_size: 8 * MIB,
max_read_window_size: 2 * GIB,
Expand Down Expand Up @@ -536,6 +508,7 @@ mod tests {
) -> (BackpressureController, BackpressureLimiter) {
let pool =
PagedPool::new_with_candidate_sizes([8 * 1024 * 1024], backpressure_config.max_read_window_size as u64);
new_backpressure_controller(backpressure_config, pool)
let cursor_state = pool.create_cursor().state();
new_backpressure_controller(backpressure_config, cursor_state)
}
}
4 changes: 2 additions & 2 deletions mountpoint-s3-fs/src/prefetch/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ where
prefetcher_config: PrefetcherConfig,
) -> Prefetcher<Client> {
let part_stream = ClientPartStream::new(runtime, self.client, pool.clone());
Prefetcher::new(PartStream::new(part_stream), pool, prefetcher_config)
Prefetcher::new(PartStream::new(part_stream), prefetcher_config)
}
}

Expand All @@ -94,6 +94,6 @@ where
prefetcher_config: PrefetcherConfig,
) -> Prefetcher<Client> {
let part_stream = CachingPartStream::new(runtime, self.client, pool.clone(), self.cache);
Prefetcher::new(PartStream::new(part_stream), pool, prefetcher_config)
Prefetcher::new(PartStream::new(part_stream), prefetcher_config)
}
}
Loading
Loading