Skip to content

Commit e97ad59

Browse files
authored
Introduce CursorState to consolidate per-cursor memory tracking (#1832)
Consolidates per-cursor state in the memory limiter into a single `CursorState` struct, eliminating hot-path lookups from the backpressure controller and cursor read path. Previously, per-cursor data was split across two maps in MemoryLimiter: one for reservation balances and one for active FUSE read ranges. Every `reserve`, `try_reserve`, `set_active_read`, and `clear_active_read` call performed an independent lookup on the hot path. This change introduces `CursorState`, a shared struct holding both the reservation counter and active read state, and a `CursorHandle` wrapper for setting active reads. The `BackpressureController` and `Cursor` directly hold a reference to `CursorState`, so reservation and active-read operations no longer require any lookup. The map is now only accessed on cursor creation, release, and in the `on_pool_reserve` callback (same as before, but unified into a single map). ### Does this change impact existing behavior? No behavioral changes. Existing unit tests adapted to the new API. ### Does this change need a changelog entry? Does it require a version change? No. --- By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license and I agree to the terms of the [Developer Certificate of Origin (DCO)](https://developercertificate.org/). Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>
1 parent e9fa52a commit e97ad59

9 files changed

Lines changed: 347 additions & 344 deletions

File tree

mountpoint-s3-fs/src/memory.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ mod pool;
55
mod stats;
66

77
pub use buffers::{PoolBuffer, PoolBufferMut};
8-
pub use limiter::{ActiveRead, ActiveReadGuard, BufferArea, MINIMUM_MEM_LIMIT, effective_total_memory};
8+
pub use limiter::{
9+
ActiveRead, ActiveReadGuard, BufferArea, CursorHandle, CursorState, MINIMUM_MEM_LIMIT, effective_total_memory,
10+
};
911
pub use pool::PagedPool;
1012
pub use stats::BufferKind;

mountpoint-s3-fs/src/memory/limiter.rs

Lines changed: 248 additions & 184 deletions
Large diffs are not rendered by default.

mountpoint-s3-fs/src/memory/pool.rs

Lines changed: 14 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::prefetch::CursorId;
66
use crate::sync::{Arc, RwLock};
77

88
use super::buffers::{PoolBuffer, PoolBufferMut};
9-
use super::limiter::{ActiveReadGuard, BufferArea, MemoryLimiter};
9+
use super::limiter::{CursorHandle, MemoryLimiter};
1010
use super::pages::{Page, PagedBufferPtr};
1111
use super::stats::{BufferKind, PoolStats, SizePoolStats};
1212

@@ -183,57 +183,30 @@ impl PagedPool {
183183
.sum()
184184
}
185185

186-
// TODO: Refactor MemoryLimiter unit tests to remove inner_stats and inner_limiter. We should have clearly separated unit tests for the PagedPool and the MemoryLimiter.
187-
188-
/// Expose the internal stats for testing purposes (e.g., to wire a standalone limiter).
189-
#[cfg(test)]
190-
pub(crate) fn inner_stats(&self) -> &PoolStats {
191-
&self.inner.stats
192-
}
193-
194-
/// Expose the internal limiter for testing purposes.
195-
#[cfg(test)]
196-
pub(crate) fn inner_limiter(&self) -> &MemoryLimiter {
197-
&self.inner.limiter
198-
}
199-
200-
// ─── Delegation methods for MemoryLimiter ───────────────────────────────────
201-
202-
/// Reserve memory for future uses. Always succeeds (unconditional).
203-
pub fn reserve(&self, cursor_id: CursorId, area: BufferArea, size: u64) {
204-
self.inner.limiter.reserve(cursor_id, area, size);
205-
}
206-
207-
/// Reserve memory if available. Returns `false` if over budget.
208-
pub fn try_reserve(&self, cursor_id: CursorId, area: BufferArea, size: u64) -> bool {
209-
self.inner.limiter.try_reserve(cursor_id, area, size, &self.inner.stats)
210-
}
211-
212-
/// Release all remaining reservation for a cursor and remove it from tracking.
213-
pub fn release_cursor(&self, cursor_id: CursorId, area: BufferArea) {
214-
self.inner.limiter.release_cursor(cursor_id, area);
215-
}
216-
217-
/// Generate a new unique CursorId.
218-
pub fn next_cursor_id(&self) -> CursorId {
219-
self.inner.limiter.next_cursor_id()
186+
/// Create a new cursor and return the shared state handle.
187+
pub fn create_cursor(&self) -> CursorHandle {
188+
self.inner.limiter.create_cursor(self)
220189
}
221190

222191
/// Query available memory.
223192
pub fn available_mem(&self) -> u64 {
224193
self.inner.limiter.available_mem(&self.inner.stats)
225194
}
226195

227-
/// Record that a FUSE read is active for the given cursor.
228-
/// Returns a guard that clears the active read on drop.
229-
pub fn set_active_read(&self, cursor_id: CursorId, offset: u64, size: usize) -> ActiveReadGuard {
230-
self.inner.limiter.set_active_read(cursor_id, offset, size)
231-
}
232-
233196
/// Check if the given cursor has an active read overlapping the specified range.
234197
pub fn has_active_read_in_range(&self, cursor_id: CursorId, offset: u64, size: usize) -> bool {
235198
self.inner.limiter.has_active_read_in_range(cursor_id, offset, size)
236199
}
200+
201+
// ─── Internal components exposed to the rest of the `memory` module. ──────────
202+
203+
pub(super) fn stats(&self) -> &PoolStats {
204+
&self.inner.stats
205+
}
206+
207+
pub(super) fn limiter(&self) -> &MemoryLimiter {
208+
&self.inner.limiter
209+
}
237210
}
238211

239212
impl MemoryPool for PagedPool {

mountpoint-s3-fs/src/prefetch.rs

Lines changed: 19 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ use tracing::trace;
4242
use crate::checksums::{ChecksummedBytes, IntegrityError};
4343
use crate::data_cache::DataCache;
4444
use crate::fs::error_metadata::{ErrorMetadata, MOUNTPOINT_ERROR_CLIENT};
45-
use crate::memory::PagedPool;
4645
use crate::metrics::defs::FUSE_CACHE_HIT;
4746
use crate::object::ObjectId;
4847

@@ -186,7 +185,6 @@ fn determine_max_read_size() -> usize {
186185
#[derive(Debug)]
187186
pub struct Prefetcher<Client> {
188187
part_stream: PartStream<Client>,
189-
pool: PagedPool,
190188
config: PrefetcherConfig,
191189
}
192190

@@ -208,27 +206,16 @@ where
208206
}
209207

210208
/// Create a new [Prefetcher] from the given [ObjectPartStream] instance.
211-
pub fn new(part_stream: PartStream<Client>, pool: PagedPool, config: PrefetcherConfig) -> Self {
212-
Self {
213-
part_stream,
214-
pool,
215-
config,
216-
}
209+
pub fn new(part_stream: PartStream<Client>, config: PrefetcherConfig) -> Self {
210+
Self { part_stream, config }
217211
}
218212

219213
/// Start a new prefetch request to the specified object.
220214
pub fn prefetch(&self, bucket: String, object_id: ObjectId, size: u64) -> PrefetchGetObject<Client>
221215
where
222216
Client: ObjectClient + Clone + Send + Sync + 'static,
223217
{
224-
PrefetchGetObject::new(
225-
self.part_stream.clone(),
226-
self.pool.clone(),
227-
self.config,
228-
bucket,
229-
object_id,
230-
size,
231-
)
218+
PrefetchGetObject::new(self.part_stream.clone(), self.config, bucket, object_id, size)
232219
}
233220
}
234221

@@ -239,7 +226,6 @@ where
239226
Client: ObjectClient + Clone + Send + Sync + 'static,
240227
{
241228
part_stream: PartStream<Client>,
242-
pool: PagedPool,
243229
config: PrefetcherConfig,
244230
bucket: String,
245231
object_id: ObjectId,
@@ -256,15 +242,13 @@ where
256242
/// Create and spawn a new prefetching request for an object
257243
fn new(
258244
part_stream: PartStream<Client>,
259-
pool: PagedPool,
260245
config: PrefetcherConfig,
261246
bucket: String,
262247
object_id: ObjectId,
263248
size: u64,
264249
) -> Self {
265250
PrefetchGetObject {
266251
part_stream,
267-
pool,
268252
config,
269253
bucket,
270254
object_id,
@@ -347,39 +331,33 @@ where
347331
/// Create a new Cursor and associated backpressure GetObject request which has a range from current offset
348332
/// to the end of the file.
349333
fn create_cursor(&self, offset: u64) -> Result<Cursor<Client>, PrefetchReadError<Client::ClientError>> {
350-
let start = offset;
351334
let object_size = self.size as usize;
352-
let read_part_size = self.part_stream.client().read_part_size();
353-
let range = RequestRange::new(object_size, start, object_size);
354-
355-
// The prefetcher now relies on backpressure mechanism so it must be enabled
356-
match self.part_stream.client().initial_read_window_size() {
357-
Some(value) => {
358-
// Also, make sure that we don't get blocked from the beginning
359-
if value == 0 {
360-
return Err(PrefetchReadError::BackpressurePreconditionFailed);
361-
}
362-
}
363-
None => return Err(PrefetchReadError::BackpressurePreconditionFailed),
364-
};
335+
let client = self.part_stream.client();
336+
337+
// Validate backpressure preconditions: client must have backpressure enabled
338+
// with an initial read window size greater than 0.
339+
match client.initial_read_window_size() {
340+
Some(0) | None => return Err(PrefetchReadError::BackpressurePreconditionFailed),
341+
Some(_) => {}
342+
}
365343

366-
let cursor_id = self.pool.next_cursor_id();
367-
let config = RequestTaskConfig {
368-
cursor_id,
344+
let pool = self.part_stream.pool();
345+
let cursor_handle = pool.create_cursor();
346+
let task_config = RequestTaskConfig {
347+
cursor_state: cursor_handle.state(),
369348
bucket: self.bucket.clone(),
370349
object_id: self.object_id.clone(),
371-
range,
372-
read_part_size,
350+
range: RequestRange::new(object_size, offset, object_size),
351+
read_part_size: client.read_part_size(),
373352
preferred_part_size: self.preferred_part_size,
374353
initial_request_size: self.config.initial_request_size,
375354
max_read_window_size: self.config.max_read_window_size,
376355
read_window_size_multiplier: self.config.sequential_prefetch_multiplier,
377356
};
378-
let request_task = self.part_stream.spawn_get_object_request(config);
357+
let request_task = self.part_stream.spawn_request_task(task_config);
379358
Ok(Cursor::new(
380-
cursor_id,
381359
request_task,
382-
self.pool.clone(),
360+
cursor_handle,
383361
&self.config,
384362
self.object_id.clone(),
385363
offset,

mountpoint-s3-fs/src/prefetch/backpressure_controller.rs

Lines changed: 14 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ use async_channel::{Receiver, RecvError, Sender, unbounded};
44
use humansize::make_format;
55
use tracing::trace;
66

7-
use crate::memory::BufferArea;
8-
use crate::memory::PagedPool;
7+
use crate::memory::CursorState;
8+
use crate::sync::Arc;
99

1010
use super::CursorId;
1111
use super::PrefetchReadError;
@@ -52,8 +52,6 @@ impl ReadWindowAlignmentConfig {
5252
}
5353

5454
pub struct BackpressureConfig {
55-
/// Id of the associated Cursor
56-
pub cursor_id: CursorId,
5755
/// Backpressure's initial read window size
5856
pub initial_read_window_size: usize,
5957
/// Minimum read window size that the backpressure controller is allowed to scale down to
@@ -71,7 +69,6 @@ pub struct BackpressureConfig {
7169
impl BackpressureConfig {
7270
pub fn new(config: &RequestTaskConfig, read_window_alignment_config: ReadWindowAlignmentConfig) -> Self {
7371
Self {
74-
cursor_id: config.cursor_id,
7572
initial_read_window_size: config.initial_read_window_size(),
7673
min_read_window_size: config.read_part_size,
7774
max_read_window_size: config.max_read_window_size,
@@ -106,13 +103,8 @@ pub struct BackpressureController {
106103
///
107104
/// The request can return data up to this offset *exclusively*.
108105
request_end_offset: u64,
109-
/// Memory limiter is used to guide decisions on how much data to prefetch and to track
110-
/// per-cursor memory reservations.
111-
///
112-
/// For example, when memory is low we should scale down [Self::preferred_read_window_size].
113-
pool: PagedPool,
114-
/// Unique cursor ID for per-cursor memory reservation tracking.
115-
cursor_id: CursorId,
106+
/// Cursor state handle for direct reservation operations.
107+
cursor_state: Arc<CursorState>,
116108
/// Enable alignment of read window end to part boundary
117109
read_window_alignment_config: ReadWindowAlignmentConfig,
118110
}
@@ -142,17 +134,14 @@ pub struct BackpressureLimiter {
142134
/// informing a producer (a holder of the [BackpressureLimiter]) when it should provide data more aggressively.
143135
pub fn new_backpressure_controller(
144136
config: BackpressureConfig,
145-
pool: PagedPool,
137+
cursor_state: Arc<CursorState>,
146138
) -> (BackpressureController, BackpressureLimiter) {
147139
// Minimum window size multiplier as the scaling up and down won't work if the multiplier is 1.
148140
const MIN_WINDOW_SIZE_MULTIPLIER: usize = 2;
149141
let read_window_end_offset = config.request_range.start + config.initial_read_window_size as u64;
150-
pool.reserve(
151-
config.cursor_id,
152-
BufferArea::Prefetch,
153-
config.initial_read_window_size as u64,
154-
);
142+
cursor_state.reserve(config.initial_read_window_size as u64);
155143

144+
let cursor_id = cursor_state.id();
156145
let (read_window_updater, read_window_increment_queue) = unbounded();
157146
let read_window_increment_queue = ReadWindowIncrementQueue::new(read_window_increment_queue);
158147

@@ -165,8 +154,7 @@ pub fn new_backpressure_controller(
165154
read_window_end_offset,
166155
next_read_offset: config.request_range.start,
167156
request_end_offset: config.request_range.end,
168-
cursor_id: config.cursor_id,
169-
pool,
157+
cursor_state,
170158
read_window_alignment_config: config.read_window_alignment_config,
171159
};
172160

@@ -176,7 +164,7 @@ pub fn new_backpressure_controller(
176164
read_window_increment_queue,
177165
read_window_end_offset,
178166
request_end_offset: config.request_range.end,
179-
cursor_id: config.cursor_id,
167+
cursor_id,
180168
};
181169

182170
(controller, limiter)
@@ -223,18 +211,14 @@ impl BackpressureController {
223211
// read window size.
224212
if self.preferred_read_window_size <= self.min_read_window_size {
225213
trace!(new_read_window_end_offset, "sending a read window increment");
226-
self.pool
227-
.reserve(self.cursor_id, BufferArea::Prefetch, to_increase as u64);
214+
self.cursor_state.reserve(to_increase as u64);
228215
self.increment_read_window(to_increase).await;
229216
break;
230217
}
231218

232219
// Try to reserve the memory for the length we want to increase before sending the request,
233220
// scale down the read window if it fails.
234-
if self
235-
.pool
236-
.try_reserve(self.cursor_id, BufferArea::Prefetch, to_increase as u64)
237-
{
221+
if self.cursor_state.try_reserve(to_increase as u64) {
238222
trace!(new_read_window_end_offset, "sending a read window increment");
239223
self.increment_read_window(to_increase).await;
240224
break;
@@ -278,7 +262,7 @@ impl BackpressureController {
278262
// because only `preferred_read_window_size` is increased but the actual read window will
279263
// be updated later on `DataRead` event (where we do reserve memory).
280264
let to_increase = (new_read_window_size - self.preferred_read_window_size) as u64;
281-
let available_mem = self.pool.available_mem();
265+
let available_mem = self.cursor_state.available_mem();
282266
if available_mem >= to_increase {
283267
let formatter = make_format(humansize::BINARY);
284268
trace!(
@@ -313,15 +297,6 @@ impl BackpressureController {
313297
}
314298
}
315299

316-
impl Drop for BackpressureController {
317-
fn drop(&mut self) {
318-
// Release whatever remains of this cursor's reservation. The per-cursor counter
319-
// tracks only the unallocated portion — pool allocations already decremented it
320-
// via the on_reserve callback.
321-
self.pool.release_cursor(self.cursor_id, BufferArea::Prefetch);
322-
}
323-
}
324-
325300
impl BackpressureLimiter {
326301
pub fn read_window_end_offset(&self) -> u64 {
327302
self.read_window_end_offset
@@ -420,7 +395,6 @@ mod tests {
420395
fn test_read_window_scale_up(initial_read_window_size: usize, read_window_size_multiplier: usize) {
421396
let request_range = 0..(5 * 1024 * 1024 * 1024);
422397
let backpressure_config = BackpressureConfig {
423-
cursor_id: CursorId::new_from_raw(0),
424398
initial_read_window_size,
425399
min_read_window_size: 8 * 1024 * 1024,
426400
max_read_window_size: 2 * 1024 * 1024 * 1024,
@@ -449,7 +423,6 @@ mod tests {
449423
fn test_read_window_scale_down(initial_read_window_size: usize, read_window_size_multiplier: usize) {
450424
let request_range = 0..(5 * 1024 * 1024 * 1024);
451425
let backpressure_config = BackpressureConfig {
452-
cursor_id: CursorId::new_from_raw(0),
453426
initial_read_window_size,
454427
min_read_window_size: 8 * 1024 * 1024,
455428
max_read_window_size: 2 * 1024 * 1024 * 1024,
@@ -480,7 +453,6 @@ mod tests {
480453
// OK, back to basics. Just reproduce what happened, verify it passes after the fix.
481454
#[allow(clippy::identity_op)]
482455
let backpressure_config = BackpressureConfig {
483-
cursor_id: CursorId::new_from_raw(0),
484456
initial_read_window_size: 1 * MIB,
485457
min_read_window_size: 8 * MIB,
486458
max_read_window_size: 2 * GIB,
@@ -536,6 +508,7 @@ mod tests {
536508
) -> (BackpressureController, BackpressureLimiter) {
537509
let pool =
538510
PagedPool::new_with_candidate_sizes([8 * 1024 * 1024], backpressure_config.max_read_window_size as u64);
539-
new_backpressure_controller(backpressure_config, pool)
511+
let cursor_state = pool.create_cursor().state();
512+
new_backpressure_controller(backpressure_config, cursor_state)
540513
}
541514
}

mountpoint-s3-fs/src/prefetch/builder.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ where
7373
prefetcher_config: PrefetcherConfig,
7474
) -> Prefetcher<Client> {
7575
let part_stream = ClientPartStream::new(runtime, self.client, pool.clone());
76-
Prefetcher::new(PartStream::new(part_stream), pool, prefetcher_config)
76+
Prefetcher::new(PartStream::new(part_stream), prefetcher_config)
7777
}
7878
}
7979

@@ -94,6 +94,6 @@ where
9494
prefetcher_config: PrefetcherConfig,
9595
) -> Prefetcher<Client> {
9696
let part_stream = CachingPartStream::new(runtime, self.client, pool.clone(), self.cache);
97-
Prefetcher::new(PartStream::new(part_stream), pool, prefetcher_config)
97+
Prefetcher::new(PartStream::new(part_stream), prefetcher_config)
9898
}
9999
}

0 commit comments

Comments
 (0)