Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions mountpoint-s3-fs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub mod s3;
mod superblock;
mod sync;
pub mod upload;
mod wake_signal;

pub use async_util::Runtime;
pub use config::MountpointConfig;
Expand Down
1 change: 1 addition & 0 deletions mountpoint-s3-fs/src/memory.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod buffers;
mod limiter;
mod maintenance;
mod pages;
mod pool;
mod stats;
Expand Down
68 changes: 68 additions & 0 deletions mountpoint-s3-fs/src/memory/limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use tracing::{debug, trace};
use crate::prefetch::CursorId;
use crate::sync::atomic::{AtomicU64, Ordering};
use crate::sync::{Arc, Mutex, Weak};
use crate::wake_signal::WakeSignal;

use super::PagedPool;
use super::stats::PoolStats;
Expand Down Expand Up @@ -62,6 +63,10 @@ pub struct MemoryLimiter {
next_cursor_id: AtomicU64,
/// Additional reserved memory for other non-buffer usage like storing metadata
additional_mem_reserved: u64,
/// Wakes the background pruning loop's outer wait when memory pressure starts.
/// Once the inner tick is running it polls every [`PRUNING_TICK`](super::maintenance::PRUNING_TICK)
/// regardless.
pruning_signal: Arc<WakeSignal>,
}

impl MemoryLimiter {
Expand All @@ -80,6 +85,7 @@ impl MemoryLimiter {
cursors: Default::default(),
next_cursor_id: AtomicU64::new(1),
additional_mem_reserved,
pruning_signal: Arc::new(WakeSignal::new()),
}
}

Expand Down Expand Up @@ -194,6 +200,68 @@ impl MemoryLimiter {
// All pool buffer kinds are accounted for here.
stats.total_reserved_bytes() as u64
}

// -----------------------------------------------------------------------
// Pruning hooks — see `maintenance.rs` for the loop, signal, and round logic.
// -----------------------------------------------------------------------

/// Returns `true` while there is at least one queued allocation request.
/// Callers use this to decide whether new prefetch reservations should
/// scale down or wait.
#[allow(dead_code)]
pub(crate) fn is_memory_pressure(&self) -> bool {
// TODO: Change to something like `!self.allocation_queue.lock().is_empty()`.
false
}

/// Wake the maintenance loop if it's currently parked between idle intervals.
#[allow(dead_code)]
pub fn trigger_pruning(&self) {
self.pruning_signal.notify();
}

/// Shared notify handle. The pruner needs its own clone to park on.
pub(crate) fn pruning_signal(&self) -> &Arc<WakeSignal> {
&self.pruning_signal
}

/// Returns `true` if any cursor has an active read in progress. An active
/// read with an allocated buffer is waiting on an in-flight S3 GET, and
/// the response will free the buffer without our help.
///
/// TODO: once we track per-handle in-flight state we can be more precise.
/// For now this returns `true` whenever any cursor has an active read in
/// progress, which over-conservatively prefers waiting over dropping a handle.
pub(crate) fn has_active_reads(&self) -> bool {
self.cursors.iter().any(|entry| {
entry
.value()
.upgrade()
.is_some_and(|state| state.active_read.lock().unwrap().is_some())
})
}

/// Drop the least-recently-read idle prefetch handle to free its read
/// window reservation and buffered parts.
///
/// Returns `true` if a handle was dropped. Stub for now: requires
/// per-handle tracking of the current `RequestTask` so we can drop it
/// synchronously.
pub(crate) fn drop_one_idle_prefetch_handle(&self) -> bool {
// TODO: Iterate the handle registry, filter to entries that have an
// allocated request task but no active read, sort by last-read tick
// ascending, drop the oldest entry's task and signal a reset.
false
}
}

impl Drop for MemoryLimiter {
fn drop(&mut self) {
// Wake the pruning thread so it observes its `Weak` failing to upgrade
// and exits. Without this, a pruner parked in the outer wait at drop
// time would never wake.
self.pruning_signal.notify();
}
}

/// Returns the effective total memory available to this process in bytes.
Expand Down
198 changes: 198 additions & 0 deletions mountpoint-s3-fs/src/memory/maintenance.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
//! Background pool-maintenance engine for [`MemoryLimiter`].

use std::time::Duration;

use tracing::trace;

use crate::sync::{Arc, Weak, thread};
use crate::wake_signal::WakeSignal;

use super::pool::PagedPoolInner;

/// Outcome of a single pruning round. Used for metrics and tracing.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum PruningOutcome {
/// Nothing to prune (queue empty). Pressure is defined as
/// "queue non-empty", so an empty queue means pressure has already cleared.
Idle,
/// In-flight uploads or active reads will release buffers naturally; wait.
WaitingForRelease,
/// One idle prefetch handle was dropped this round.
Acted,
}

/// Period of the pruning loop's inner tick while under memory pressure.
pub(crate) const PRUNING_TICK: Duration = Duration::from_millis(1);
/// If the head of the allocation queue has been waiting longer than this, the pruner
/// will drop an idle prefetch handle even if uploads/active reads are in flight.
/// Acts as a starvation backstop.
pub(crate) const PRUNING_STARVATION_THRESHOLD: Duration = Duration::from_millis(5);

/// Spawn the background maintenance thread. Must be called once after constructing
/// the pool, at filesystem init. Holds a [`Weak`] to the pool so the thread
/// terminates when the pool is dropped.
///
/// `idle_interval` controls how often a periodic [`PagedPoolInner::trim`] runs
/// when there is no memory pressure. Production uses ~60s; tests use shorter
/// values.
pub(super) fn spawn_pool_maintenance_thread(
pool_inner: &Arc<PagedPoolInner>,
idle_interval: Duration,
) -> thread::JoinHandle<()> {
let weak = Arc::downgrade(pool_inner);
let signal = pool_inner.limiter().pruning_signal().clone();
thread::Builder::new()
.name("mem-pool-maintenance".to_string())
.spawn(move || maintenance_loop(weak, signal, idle_interval))
.expect("failed to spawn pool maintenance thread")
}

/// Maintenance loop body.
///
/// Two phases per outer iteration:
/// 1. **Idle wait**: `wait_timeout(idle_interval)`. Returns early on `notify`,
/// or after the idle interval elapses, whichever first.
/// 2. **Drain rounds**: run [`run_pruning_round`] in a loop. If the round
/// returns [`PruningOutcome::Idle`] (no pressure), break. Otherwise sleep
/// [`PRUNING_TICK`] and run another round.
///
/// When there is no pressure, the drain loop exits immediately after one
/// round (which still does the cheap pool trim), so the periodic trim is
/// implicit in the same code path.
///
/// `thread::sleep` inside the inner drain is deliberately uninterruptible by
/// notifies — under pressure the loop polls every [`PRUNING_TICK`] anyway,
/// so there is nothing useful for an extra wake to do.
fn maintenance_loop(pool_inner: Weak<PagedPoolInner>, signal: Arc<WakeSignal>, idle_interval: Duration) {
loop {
signal.wait_timeout(idle_interval);

// Drain: keep running rounds until the queue is empty (or pool is gone).
loop {
let Some(strong) = pool_inner.upgrade() else {
return; // pool dropped — exit
};
let outcome = run_pruning_round(&strong);
trace!(?outcome, "pruning round complete");
drop(strong);

if matches!(outcome, PruningOutcome::Idle) {
break;
}
thread::sleep(PRUNING_TICK);
}
}
}

/// Run a single pruning round.
///
/// Order of operations:
/// 1. `pool.trim()` — always cheap; releases empty pages back to the
/// allocator. Does NOT directly progress the allocation queue.
/// 2. If the queue is empty, return [`PruningOutcome::Idle`] so the loop
/// exits its inner tick and goes back to parking.
/// 3. If uploads are in flight or active reads hold buffers, let the
/// natural release path do the work — unless the head of the queue
/// has been waiting beyond [`PRUNING_STARVATION_THRESHOLD`].
/// 4. Otherwise drop one idle prefetch handle.
fn run_pruning_round(pool_inner: &PagedPoolInner) -> PruningOutcome {
// 1. Pool trim — idempotent and harmless. Empty pages may now be reusable
// by a different SizePool after a future allocation.
// TODO: Consider doing trim cooldown (i.e. invoke trim less often)
// if it's contending too much with reserve read lock.
let _trimmed = pool_inner.trim();

// 2. Allocation queue not yet implemented. For now we have no waiters,
// so any wakeup we receive is transient.
let queue_empty = true; // TODO: inspect allocation_queue
let head_waited = Duration::ZERO; // TODO: queue.front().queued_at.elapsed()
if queue_empty {
return PruningOutcome::Idle;
}

let starving = head_waited >= PRUNING_STARVATION_THRESHOLD;

// 3. Natural release path: in-flight uploads complete, or active reads
// receive their S3 response, freeing buffers without our help.
let in_flight = has_uploads_in_flight(pool_inner) || pool_inner.limiter().has_active_reads();
if in_flight && !starving {
return PruningOutcome::WaitingForRelease;
}

// 4. Disruptive: drop one idle prefetch handle.
if pool_inner.limiter().drop_one_idle_prefetch_handle() {
metrics::counter!("mem.pruning_resets").increment(1);
return PruningOutcome::Acted;
}

// We attempted to drop an idle prefetch handle but found nothing eligible.
// Wait for the next tick; a release elsewhere may unstick us.
PruningOutcome::WaitingForRelease
}

/// Returns `true` if any in-flight `UploadPart`/`PutObject` is currently
/// holding a pool buffer that will be released when the request completes.
///
/// TODO: Currently a stub returning `false`.
/// Tighten to "in-flight UploadPart/PutObject exists":
/// `reserved_bytes(PutObject) + reserved_bytes(Append)
/// > upload_handles_holding_buffers * write_part_size`
/// Each write handle holds at most one "filling" buffer (FUSE write data
/// being staged) at any time without an in-flight request, so excess
/// reserved bytes above that baseline indicate at least one part actually
/// uploading. Requires tracking `upload_handles_holding_buffers` (likely
/// the active-write-handles counter).
fn has_uploads_in_flight(_pool_inner: &PagedPoolInner) -> bool {
false
}

#[cfg(all(test, not(feature = "shuttle")))]
mod tests {
use std::time::{Duration, Instant};

use super::{PruningOutcome, run_pruning_round, spawn_pool_maintenance_thread};
use crate::memory::PagedPool;

const TEST_WAIT_TIMEOUT: Duration = Duration::from_secs(1);
/// Long idle interval used in tests where we want the loop to stay
/// parked unless explicitly notified or the pool is dropped.
const TEST_IDLE_INTERVAL: Duration = Duration::from_secs(60);

/// Dropping the pool while the maintenance thread is parked must wake it
/// (via `MemoryLimiter::drop` notify) so it can observe the dead `Weak`
/// and exit. Otherwise the thread leaks until the idle interval elapses.
#[test]
fn maintenance_thread_exits_on_pool_drop() {
let pool = PagedPool::new_with_candidate_sizes_minimally_limited([1024]);
let handle = spawn_pool_maintenance_thread(pool.inner(), TEST_IDLE_INTERVAL);

drop(pool);

let deadline = Instant::now() + TEST_WAIT_TIMEOUT;
while !handle.is_finished() {
assert!(
Instant::now() < deadline,
"maintenance thread did not exit within {TEST_WAIT_TIMEOUT:?} of pool drop \
— likely missing `Drop` notify on `MemoryLimiter`",
);
std::thread::sleep(Duration::from_millis(10));
}
handle.join().expect("maintenance thread panicked");
}

/// Contract for `run_pruning_round` while the allocation queue stub
/// reports empty: returns [`PruningOutcome::Idle`]. Pressure is defined
/// as "queue non-empty", so an empty queue means no pressure — the
/// pruner's job here is to observe and exit.
#[test]
fn run_pruning_round_returns_idle_on_empty_queue() {
let pool = PagedPool::new_with_candidate_sizes_minimally_limited([1024]);

let outcome = run_pruning_round(pool.inner());
assert_eq!(outcome, PruningOutcome::Idle);
assert!(
!pool.inner().limiter().is_memory_pressure(),
"empty allocation queue must report no memory pressure",
);
}
}
41 changes: 26 additions & 15 deletions mountpoint-s3-fs/src/memory/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,18 +113,16 @@ impl PagedPool {
self.inner.trim()
}

/// Schedule recurring calls to [trim](Self::trim).
pub fn schedule_trim(&self, recurring_time: Duration) {
let weak = Arc::downgrade(&self.inner);
std::thread::spawn(move || {
loop {
std::thread::sleep(recurring_time);
let Some(pool) = weak.upgrade() else {
return;
};
pool.trim();
}
});
/// Spawn the background pool maintenance thread. Must be called once after
/// construction, at filesystem init.
///
/// The thread serves two responsibilities on a single OS thread:
/// - **Periodic trim**: every `idle_interval`, run [`Self::trim`] to release
/// empty pages back to the system allocator.
/// - **Pressure pruning**: on [`MemoryLimiter::trigger_pruning`], wake
/// immediately and run pruning rounds until the allocation queue drains.
pub fn spawn_pool_maintenance_thread(&self, idle_interval: Duration) {
super::maintenance::spawn_pool_maintenance_thread(&self.inner, idle_interval);
}

/// Return the reserved memory in bytes for the given kind of buffer.
Expand Down Expand Up @@ -183,6 +181,14 @@ impl PagedPool {
.sum()
}

/// Internal access for the maintenance module (sibling), used by maintenance tests
/// (which are gated `not(feature = "shuttle")`).
#[cfg(test)]
#[cfg_attr(feature = "shuttle", allow(dead_code))]
pub(super) fn inner(&self) -> &Arc<PagedPoolInner> {
&self.inner
}

/// Create a new cursor and return the shared state handle.
pub fn create_cursor(&self) -> CursorHandle {
self.inner.limiter.create_cursor(self)
Expand Down Expand Up @@ -226,13 +232,18 @@ impl MemoryPool for PagedPool {
}

#[derive(Debug)]
struct PagedPoolInner {
pub(super) struct PagedPoolInner {
ordered_size_pools: Vec<SizePool>,
stats: Arc<PoolStats>,
limiter: MemoryLimiter,
}

impl PagedPoolInner {
/// Reference to the inner [`MemoryLimiter`] for sibling modules (e.g. the maintenance thread).
pub(super) fn limiter(&self) -> &MemoryLimiter {
&self.limiter
}

fn get_pool_for_size(&self, size: usize) -> Option<&SizePool> {
if size == 0 {
return None;
Expand All @@ -246,7 +257,7 @@ impl PagedPoolInner {
Some(&self.ordered_size_pools[index])
}

fn trim(&self) -> bool {
pub(super) fn trim(&self) -> bool {
let mut removed = false;
for pool in &self.ordered_size_pools {
if pool.stats.empty_pages() == 0 {
Expand Down Expand Up @@ -387,7 +398,7 @@ mod tests {
fn stress_test(original: &[u8], buffer_sizes: &[usize], schedule: Option<Duration>) {
let pool = PagedPool::new_with_candidate_sizes_unlimited(buffer_sizes);
if let Some(duration) = schedule {
pool.schedule_trim(duration);
pool.spawn_pool_maintenance_thread(duration);
}

let num_threads = 10000;
Expand Down
Loading
Loading