diff --git a/mountpoint-s3-fs/src/lib.rs b/mountpoint-s3-fs/src/lib.rs index d6eacbb3e..cdf411703 100644 --- a/mountpoint-s3-fs/src/lib.rs +++ b/mountpoint-s3-fs/src/lib.rs @@ -21,6 +21,7 @@ pub mod s3; mod superblock; mod sync; pub mod upload; +mod wake_signal; pub use async_util::Runtime; pub use config::MountpointConfig; diff --git a/mountpoint-s3-fs/src/memory.rs b/mountpoint-s3-fs/src/memory.rs index 9a1fea57a..5f6dad7e9 100644 --- a/mountpoint-s3-fs/src/memory.rs +++ b/mountpoint-s3-fs/src/memory.rs @@ -1,5 +1,6 @@ mod buffers; mod limiter; +mod maintenance; mod pages; mod pool; mod stats; diff --git a/mountpoint-s3-fs/src/memory/limiter.rs b/mountpoint-s3-fs/src/memory/limiter.rs index 874679c7a..574dc0694 100644 --- a/mountpoint-s3-fs/src/memory/limiter.rs +++ b/mountpoint-s3-fs/src/memory/limiter.rs @@ -8,6 +8,7 @@ use tracing::{debug, trace}; use crate::prefetch::CursorId; use crate::sync::atomic::{AtomicU64, Ordering}; use crate::sync::{Arc, Mutex, Weak}; +use crate::wake_signal::WakeSignal; use super::PagedPool; use super::stats::PoolStats; @@ -62,6 +63,10 @@ pub struct MemoryLimiter { next_cursor_id: AtomicU64, /// Additional reserved memory for other non-buffer usage like storing metadata additional_mem_reserved: u64, + /// Wakes the background pruning loop's outer wait when memory pressure starts. + /// Once the inner tick is running it polls every [`PRUNING_TICK`](super::maintenance::PRUNING_TICK) + /// regardless. + pruning_signal: Arc, } impl MemoryLimiter { @@ -80,6 +85,7 @@ impl MemoryLimiter { cursors: Default::default(), next_cursor_id: AtomicU64::new(1), additional_mem_reserved, + pruning_signal: Arc::new(WakeSignal::new()), } } @@ -194,6 +200,68 @@ impl MemoryLimiter { // All pool buffer kinds are accounted for here. stats.total_reserved_bytes() as u64 } + + // ----------------------------------------------------------------------- + // Pruning hooks — see `maintenance.rs` for the loop, signal, and round logic. + // ----------------------------------------------------------------------- + + /// Returns `true` while there is at least one queued allocation request. + /// Callers use this to decide whether new prefetch reservations should + /// scale down or wait. + #[allow(dead_code)] + pub(crate) fn is_memory_pressure(&self) -> bool { + // TODO: Change to something like `!self.allocation_queue.lock().is_empty()`. + false + } + + /// Wake the maintenance loop if it's currently parked between idle intervals. + #[allow(dead_code)] + pub fn trigger_pruning(&self) { + self.pruning_signal.notify(); + } + + /// Shared notify handle. The pruner needs its own clone to park on. + pub(crate) fn pruning_signal(&self) -> &Arc { + &self.pruning_signal + } + + /// Returns `true` if any cursor has an active read in progress. An active + /// read with an allocated buffer is waiting on an in-flight S3 GET, and + /// the response will free the buffer without our help. + /// + /// TODO: once we track per-handle in-flight state we can be more precise. + /// For now this returns `true` whenever any cursor has an active read in + /// progress, which over-conservatively prefers waiting over dropping a handle. + pub(crate) fn has_active_reads(&self) -> bool { + self.cursors.iter().any(|entry| { + entry + .value() + .upgrade() + .is_some_and(|state| state.active_read.lock().unwrap().is_some()) + }) + } + + /// Drop the least-recently-read idle prefetch handle to free its read + /// window reservation and buffered parts. + /// + /// Returns `true` if a handle was dropped. Stub for now: requires + /// per-handle tracking of the current `RequestTask` so we can drop it + /// synchronously. + pub(crate) fn drop_one_idle_prefetch_handle(&self) -> bool { + // TODO: Iterate the handle registry, filter to entries that have an + // allocated request task but no active read, sort by last-read tick + // ascending, drop the oldest entry's task and signal a reset. + false + } +} + +impl Drop for MemoryLimiter { + fn drop(&mut self) { + // Wake the pruning thread so it observes its `Weak` failing to upgrade + // and exits. Without this, a pruner parked in the outer wait at drop + // time would never wake. + self.pruning_signal.notify(); + } } /// Returns the effective total memory available to this process in bytes. diff --git a/mountpoint-s3-fs/src/memory/maintenance.rs b/mountpoint-s3-fs/src/memory/maintenance.rs new file mode 100644 index 000000000..3f3a14b35 --- /dev/null +++ b/mountpoint-s3-fs/src/memory/maintenance.rs @@ -0,0 +1,198 @@ +//! Background pool-maintenance engine for [`MemoryLimiter`]. + +use std::time::Duration; + +use tracing::trace; + +use crate::sync::{Arc, Weak, thread}; +use crate::wake_signal::WakeSignal; + +use super::pool::PagedPoolInner; + +/// Outcome of a single pruning round. Used for metrics and tracing. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum PruningOutcome { + /// Nothing to prune (queue empty). Pressure is defined as + /// "queue non-empty", so an empty queue means pressure has already cleared. + Idle, + /// In-flight uploads or active reads will release buffers naturally; wait. + WaitingForRelease, + /// One idle prefetch handle was dropped this round. + Acted, +} + +/// Period of the pruning loop's inner tick while under memory pressure. +pub(crate) const PRUNING_TICK: Duration = Duration::from_millis(1); +/// If the head of the allocation queue has been waiting longer than this, the pruner +/// will drop an idle prefetch handle even if uploads/active reads are in flight. +/// Acts as a starvation backstop. +pub(crate) const PRUNING_STARVATION_THRESHOLD: Duration = Duration::from_millis(5); + +/// Spawn the background maintenance thread. Must be called once after constructing +/// the pool, at filesystem init. Holds a [`Weak`] to the pool so the thread +/// terminates when the pool is dropped. +/// +/// `idle_interval` controls how often a periodic [`PagedPoolInner::trim`] runs +/// when there is no memory pressure. Production uses ~60s; tests use shorter +/// values. +pub(super) fn spawn_pool_maintenance_thread( + pool_inner: &Arc, + idle_interval: Duration, +) -> thread::JoinHandle<()> { + let weak = Arc::downgrade(pool_inner); + let signal = pool_inner.limiter().pruning_signal().clone(); + thread::Builder::new() + .name("mem-pool-maintenance".to_string()) + .spawn(move || maintenance_loop(weak, signal, idle_interval)) + .expect("failed to spawn pool maintenance thread") +} + +/// Maintenance loop body. +/// +/// Two phases per outer iteration: +/// 1. **Idle wait**: `wait_timeout(idle_interval)`. Returns early on `notify`, +/// or after the idle interval elapses, whichever first. +/// 2. **Drain rounds**: run [`run_pruning_round`] in a loop. If the round +/// returns [`PruningOutcome::Idle`] (no pressure), break. Otherwise sleep +/// [`PRUNING_TICK`] and run another round. +/// +/// When there is no pressure, the drain loop exits immediately after one +/// round (which still does the cheap pool trim), so the periodic trim is +/// implicit in the same code path. +/// +/// `thread::sleep` inside the inner drain is deliberately uninterruptible by +/// notifies — under pressure the loop polls every [`PRUNING_TICK`] anyway, +/// so there is nothing useful for an extra wake to do. +fn maintenance_loop(pool_inner: Weak, signal: Arc, idle_interval: Duration) { + loop { + signal.wait_timeout(idle_interval); + + // Drain: keep running rounds until the queue is empty (or pool is gone). + loop { + let Some(strong) = pool_inner.upgrade() else { + return; // pool dropped — exit + }; + let outcome = run_pruning_round(&strong); + trace!(?outcome, "pruning round complete"); + drop(strong); + + if matches!(outcome, PruningOutcome::Idle) { + break; + } + thread::sleep(PRUNING_TICK); + } + } +} + +/// Run a single pruning round. +/// +/// Order of operations: +/// 1. `pool.trim()` — always cheap; releases empty pages back to the +/// allocator. Does NOT directly progress the allocation queue. +/// 2. If the queue is empty, return [`PruningOutcome::Idle`] so the loop +/// exits its inner tick and goes back to parking. +/// 3. If uploads are in flight or active reads hold buffers, let the +/// natural release path do the work — unless the head of the queue +/// has been waiting beyond [`PRUNING_STARVATION_THRESHOLD`]. +/// 4. Otherwise drop one idle prefetch handle. +fn run_pruning_round(pool_inner: &PagedPoolInner) -> PruningOutcome { + // 1. Pool trim — idempotent and harmless. Empty pages may now be reusable + // by a different SizePool after a future allocation. + // TODO: Consider doing trim cooldown (i.e. invoke trim less often) + // if it's contending too much with reserve read lock. + let _trimmed = pool_inner.trim(); + + // 2. Allocation queue not yet implemented. For now we have no waiters, + // so any wakeup we receive is transient. + let queue_empty = true; // TODO: inspect allocation_queue + let head_waited = Duration::ZERO; // TODO: queue.front().queued_at.elapsed() + if queue_empty { + return PruningOutcome::Idle; + } + + let starving = head_waited >= PRUNING_STARVATION_THRESHOLD; + + // 3. Natural release path: in-flight uploads complete, or active reads + // receive their S3 response, freeing buffers without our help. + let in_flight = has_uploads_in_flight(pool_inner) || pool_inner.limiter().has_active_reads(); + if in_flight && !starving { + return PruningOutcome::WaitingForRelease; + } + + // 4. Disruptive: drop one idle prefetch handle. + if pool_inner.limiter().drop_one_idle_prefetch_handle() { + metrics::counter!("mem.pruning_resets").increment(1); + return PruningOutcome::Acted; + } + + // We attempted to drop an idle prefetch handle but found nothing eligible. + // Wait for the next tick; a release elsewhere may unstick us. + PruningOutcome::WaitingForRelease +} + +/// Returns `true` if any in-flight `UploadPart`/`PutObject` is currently +/// holding a pool buffer that will be released when the request completes. +/// +/// TODO: Currently a stub returning `false`. +/// Tighten to "in-flight UploadPart/PutObject exists": +/// `reserved_bytes(PutObject) + reserved_bytes(Append) +/// > upload_handles_holding_buffers * write_part_size` +/// Each write handle holds at most one "filling" buffer (FUSE write data +/// being staged) at any time without an in-flight request, so excess +/// reserved bytes above that baseline indicate at least one part actually +/// uploading. Requires tracking `upload_handles_holding_buffers` (likely +/// the active-write-handles counter). +fn has_uploads_in_flight(_pool_inner: &PagedPoolInner) -> bool { + false +} + +#[cfg(all(test, not(feature = "shuttle")))] +mod tests { + use std::time::{Duration, Instant}; + + use super::{PruningOutcome, run_pruning_round, spawn_pool_maintenance_thread}; + use crate::memory::PagedPool; + + const TEST_WAIT_TIMEOUT: Duration = Duration::from_secs(1); + /// Long idle interval used in tests where we want the loop to stay + /// parked unless explicitly notified or the pool is dropped. + const TEST_IDLE_INTERVAL: Duration = Duration::from_secs(60); + + /// Dropping the pool while the maintenance thread is parked must wake it + /// (via `MemoryLimiter::drop` notify) so it can observe the dead `Weak` + /// and exit. Otherwise the thread leaks until the idle interval elapses. + #[test] + fn maintenance_thread_exits_on_pool_drop() { + let pool = PagedPool::new_with_candidate_sizes_minimally_limited([1024]); + let handle = spawn_pool_maintenance_thread(pool.inner(), TEST_IDLE_INTERVAL); + + drop(pool); + + let deadline = Instant::now() + TEST_WAIT_TIMEOUT; + while !handle.is_finished() { + assert!( + Instant::now() < deadline, + "maintenance thread did not exit within {TEST_WAIT_TIMEOUT:?} of pool drop \ + — likely missing `Drop` notify on `MemoryLimiter`", + ); + std::thread::sleep(Duration::from_millis(10)); + } + handle.join().expect("maintenance thread panicked"); + } + + /// Contract for `run_pruning_round` while the allocation queue stub + /// reports empty: returns [`PruningOutcome::Idle`]. Pressure is defined + /// as "queue non-empty", so an empty queue means no pressure — the + /// pruner's job here is to observe and exit. + #[test] + fn run_pruning_round_returns_idle_on_empty_queue() { + let pool = PagedPool::new_with_candidate_sizes_minimally_limited([1024]); + + let outcome = run_pruning_round(pool.inner()); + assert_eq!(outcome, PruningOutcome::Idle); + assert!( + !pool.inner().limiter().is_memory_pressure(), + "empty allocation queue must report no memory pressure", + ); + } +} diff --git a/mountpoint-s3-fs/src/memory/pool.rs b/mountpoint-s3-fs/src/memory/pool.rs index 36673ecdf..35015f44f 100644 --- a/mountpoint-s3-fs/src/memory/pool.rs +++ b/mountpoint-s3-fs/src/memory/pool.rs @@ -113,18 +113,16 @@ impl PagedPool { self.inner.trim() } - /// Schedule recurring calls to [trim](Self::trim). - pub fn schedule_trim(&self, recurring_time: Duration) { - let weak = Arc::downgrade(&self.inner); - std::thread::spawn(move || { - loop { - std::thread::sleep(recurring_time); - let Some(pool) = weak.upgrade() else { - return; - }; - pool.trim(); - } - }); + /// Spawn the background pool maintenance thread. Must be called once after + /// construction, at filesystem init. + /// + /// The thread serves two responsibilities on a single OS thread: + /// - **Periodic trim**: every `idle_interval`, run [`Self::trim`] to release + /// empty pages back to the system allocator. + /// - **Pressure pruning**: on [`MemoryLimiter::trigger_pruning`], wake + /// immediately and run pruning rounds until the allocation queue drains. + pub fn spawn_pool_maintenance_thread(&self, idle_interval: Duration) { + super::maintenance::spawn_pool_maintenance_thread(&self.inner, idle_interval); } /// Return the reserved memory in bytes for the given kind of buffer. @@ -183,6 +181,14 @@ impl PagedPool { .sum() } + /// Internal access for the maintenance module (sibling), used by maintenance tests + /// (which are gated `not(feature = "shuttle")`). + #[cfg(test)] + #[cfg_attr(feature = "shuttle", allow(dead_code))] + pub(super) fn inner(&self) -> &Arc { + &self.inner + } + /// Create a new cursor and return the shared state handle. pub fn create_cursor(&self) -> CursorHandle { self.inner.limiter.create_cursor(self) @@ -226,13 +232,18 @@ impl MemoryPool for PagedPool { } #[derive(Debug)] -struct PagedPoolInner { +pub(super) struct PagedPoolInner { ordered_size_pools: Vec, stats: Arc, limiter: MemoryLimiter, } impl PagedPoolInner { + /// Reference to the inner [`MemoryLimiter`] for sibling modules (e.g. the maintenance thread). + pub(super) fn limiter(&self) -> &MemoryLimiter { + &self.limiter + } + fn get_pool_for_size(&self, size: usize) -> Option<&SizePool> { if size == 0 { return None; @@ -246,7 +257,7 @@ impl PagedPoolInner { Some(&self.ordered_size_pools[index]) } - fn trim(&self) -> bool { + pub(super) fn trim(&self) -> bool { let mut removed = false; for pool in &self.ordered_size_pools { if pool.stats.empty_pages() == 0 { @@ -387,7 +398,7 @@ mod tests { fn stress_test(original: &[u8], buffer_sizes: &[usize], schedule: Option) { let pool = PagedPool::new_with_candidate_sizes_unlimited(buffer_sizes); if let Some(duration) = schedule { - pool.schedule_trim(duration); + pool.spawn_pool_maintenance_thread(duration); } let num_threads = 10000; diff --git a/mountpoint-s3-fs/src/wake_signal.rs b/mountpoint-s3-fs/src/wake_signal.rs new file mode 100644 index 000000000..d30f84c54 --- /dev/null +++ b/mountpoint-s3-fs/src/wake_signal.rs @@ -0,0 +1,148 @@ +//! A coalescing wake signal for parking a single consumer thread until an +//! event of interest occurs, with optional timeout. +//! +//! Built on `Mutex` + `Condvar`. Use it when one thread needs to +//! suspend until another thread signals "wake up", and where multiple +//! signals before the next wake should collapse to one. Notifies that +//! arrive while the consumer is busy are absorbed — the consumer observes +//! the pending flag on its next [`WakeSignal::wait_timeout`] call. +//! +//! # Example +//! +//! ```ignore +//! use std::sync::Arc; +//! use std::time::Duration; +//! use mountpoint_s3_fs::wake_signal::WakeSignal; +//! +//! let signal = Arc::new(WakeSignal::new()); +//! let consumer = std::thread::spawn({ +//! let signal = signal.clone(); +//! move || loop { +//! signal.wait_timeout(Duration::from_secs(60)); +//! // ... do work ... +//! } +//! }); +//! signal.notify(); // wakes the consumer immediately +//! # drop(consumer); +//! ``` + +use std::time::Duration; + +use crate::sync::{Condvar, Mutex}; + +/// A coalescing single-consumer wake signal. +#[derive(Debug, Default)] +pub struct WakeSignal { + pending: Mutex, + cvar: Condvar, +} + +impl WakeSignal { + pub fn new() -> Self { + Self::default() + } + + /// Wake the consumer if it is currently in [`Self::wait_timeout`]. + /// + /// If the consumer is busy or about to enter wait, the pending flag + /// is set and observed on the next [`Self::wait_timeout`] call — + /// notifies before wait are not lost. + pub fn notify(&self) { + let mut pending = self.pending.lock().unwrap(); + *pending = true; + self.cvar.notify_one(); + } + + /// Wait up to `timeout` for a [`Self::notify`]. Returns once the pending + /// flag is set (consuming it) or once the timeout elapses, whichever + /// comes first. Spurious wakeups are handled by looping on the flag. + pub fn wait_timeout(&self, timeout: Duration) { + let pending = self.pending.lock().unwrap(); + let (mut pending, _) = self.cvar.wait_timeout_while(pending, timeout, |p| !*p).unwrap(); + *pending = false; + } +} + +#[cfg(all(test, not(feature = "shuttle")))] +mod tests { + use std::sync::Arc; + use std::time::{Duration, Instant}; + + use super::WakeSignal; + + const TEST_WAIT_TIMEOUT: Duration = Duration::from_secs(1); + + /// `notify` must wake a waiter blocked in `wait_timeout` well before + /// the timeout elapses. This is the primitive's defining behavior. + #[test] + fn notify_wakes_waiter_before_timeout() { + let signal = Arc::new(WakeSignal::new()); + let signal_for_waiter = signal.clone(); + let started = Arc::new(std::sync::Mutex::new(false)); + let started_for_waiter = started.clone(); + let waiter_started = Arc::new(std::sync::Condvar::new()); + let waiter_started_clone = waiter_started.clone(); + + let waiter = std::thread::spawn(move || { + *started_for_waiter.lock().unwrap() = true; + waiter_started_clone.notify_one(); + let start = Instant::now(); + signal_for_waiter.wait_timeout(Duration::from_secs(60)); + start.elapsed() + }); + + // Make sure the waiter has reached its wait_timeout call so we + // exercise the cvar wake path (rather than the pre-notify path). + let mut g = started.lock().unwrap(); + while !*g { + g = waiter_started.wait(g).unwrap(); + } + drop(g); + std::thread::sleep(Duration::from_millis(10)); + + signal.notify(); + let elapsed = waiter.join().expect("waiter panicked"); + assert!( + elapsed < TEST_WAIT_TIMEOUT, + "wait_timeout returned in {elapsed:?}, expected < {TEST_WAIT_TIMEOUT:?}; \ + notify did not wake the cvar", + ); + } + + /// `notify` called *before* `wait_timeout` must not be lost — the + /// pending flag is observed and the wait returns immediately. + #[test] + fn notify_before_wait_is_not_lost() { + let signal = WakeSignal::new(); + signal.notify(); + let start = Instant::now(); + signal.wait_timeout(Duration::from_secs(60)); + let elapsed = start.elapsed(); + assert!( + elapsed < TEST_WAIT_TIMEOUT, + "wait_timeout did not return promptly after pre-notify (took {elapsed:?})", + ); + } + + /// Multiple `notify` calls before the next `wait_timeout` collapse + /// to a single wake (the flag is consumed once). The next + /// `wait_timeout` after that runs the full timeout. + /// + /// We don't run the second wait for its full timeout; we just verify + /// that the *first* wait returns promptly (consuming the flag), so a + /// follow-up notify is required to wake again. + #[test] + fn notify_coalesces() { + let signal = WakeSignal::new(); + for _ in 0..5 { + signal.notify(); + } + let start = Instant::now(); + signal.wait_timeout(Duration::from_secs(60)); + let elapsed = start.elapsed(); + assert!( + elapsed < TEST_WAIT_TIMEOUT, + "first wait_timeout did not consume any of 5 pre-notifies (took {elapsed:?})", + ); + } +} diff --git a/mountpoint-s3/src/run.rs b/mountpoint-s3/src/run.rs index 8ebfd2e06..cae4658b3 100644 --- a/mountpoint-s3/src/run.rs +++ b/mountpoint-s3/src/run.rs @@ -205,9 +205,10 @@ fn mount(args: CliArgs, client_builder: impl ClientBuilder) -> anyhow::Result