Skip to content

Commit 770fe63

Browse files
committed
Add basic buffer pruner
Signed-off-by: Yerzhan Mazhkenov <20302932+yerzhan7@users.noreply.github.com>
1 parent eacff7f commit 770fe63

3 files changed

Lines changed: 332 additions & 1 deletion

File tree

mountpoint-s3-fs/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ pub mod metrics;
1818
pub mod metrics_otel;
1919
pub mod object;
2020
pub mod prefetch;
21+
mod pruner;
2122
pub mod s3;
2223
mod superblock;
2324
mod sync;

mountpoint-s3-fs/src/mem_limiter.rs

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ use humansize::make_format;
55
use sysinfo::System;
66
use tracing::{debug, trace};
77

8-
use crate::memory::PagedPool;
8+
use crate::memory::{BufferKind, PagedPool};
99
use crate::prefetch::CursorId;
10+
use crate::pruner::PruningSignal;
1011
use crate::sync::Arc;
1112
use crate::sync::atomic::{AtomicU64, Ordering};
1213

@@ -101,6 +102,9 @@ pub struct MemoryLimiter {
101102
/// Per-cursor active read tracking. When a FUSE read is in progress for a cursor,
102103
/// the requested range is stored here. Absence means the cursor is speculative.
103104
active_reads: DashMap<CursorId, ActiveRead>,
105+
/// Wakes the background pruning loop's outer wait when memory pressure starts.
106+
/// Once the inner tick is running it polls every [`PRUNING_TICK`] regardless.
107+
pruning_signal: Arc<PruningSignal>,
104108
}
105109

106110
impl MemoryLimiter {
@@ -128,6 +132,7 @@ impl MemoryLimiter {
128132
next_cursor_id: AtomicU64::new(1),
129133
additional_mem_reserved,
130134
active_reads: DashMap::new(),
135+
pruning_signal: Arc::new(PruningSignal::new()),
131136
}
132137
}
133138

@@ -267,6 +272,90 @@ impl MemoryLimiter {
267272
// All pool buffer kinds are accounted for here.
268273
self.pool.total_reserved_bytes() as u64
269274
}
275+
276+
// -----------------------------------------------------------------------
277+
// Pruning hooks — see `pruner.rs` for the loop, signal, and round logic.
278+
// -----------------------------------------------------------------------
279+
280+
/// Returns `true` while there is at least one queued allocation request.
281+
/// Callers use this to decide whether new prefetch reservations should
282+
/// scale down or wait.
283+
#[allow(dead_code)]
284+
pub(crate) fn is_memory_pressure(&self) -> bool {
285+
// TODO: Change to something like `!self.allocation_queue.lock().is_empty()`.
286+
false
287+
}
288+
289+
/// Wake the pruning loop. Cheap; safe to call from any path that may have
290+
/// changed pressure state.
291+
pub fn trigger_pruning(&self) {
292+
self.pruning_signal.notify();
293+
}
294+
295+
/// Spawn the background pruning loop. Must be called once after construction,
296+
/// at filesystem init.
297+
pub fn spawn_pruning_loop(self: &Arc<Self>) {
298+
crate::pruner::spawn_pruning_loop(self);
299+
}
300+
301+
/// Shared notify handle. The pruner needs its own clone to park on.
302+
pub(crate) fn pruning_signal(&self) -> &Arc<PruningSignal> {
303+
&self.pruning_signal
304+
}
305+
306+
/// Pool used by the pruner for `trim()` and `reserved_bytes()` queries.
307+
pub(crate) fn pool(&self) -> &PagedPool {
308+
&self.pool
309+
}
310+
311+
/// Returns `true` if any upload buffer is currently held in the pool. An
312+
/// in-flight `UploadPart`/`PutObject` will eventually release memory
313+
/// without our help.
314+
///
315+
/// TODO: tighten to "in-flight UploadPart/PutObject exists":
316+
/// `reserved_bytes(PutObject) + reserved_bytes(Append)
317+
/// > upload_handles_holding_buffers * write_part_size`
318+
/// Each write handle holds at most one "filling" buffer (FUSE write data
319+
/// being staged) at any time without an in-flight request, so excess
320+
/// reserved bytes above that baseline indicate at least one part actually
321+
/// uploading. Requires tracking `upload_handles_holding_buffers` (likely
322+
/// the active-write-handles counter).
323+
pub(crate) fn has_uploads_in_flight(&self) -> bool {
324+
self.pool.reserved_bytes(BufferKind::PutObject) > 0 || self.pool.reserved_bytes(BufferKind::Append) > 0
325+
}
326+
327+
/// Returns `true` if any cursor has an active read in progress. An active
328+
/// read with an allocated buffer is waiting on an in-flight S3 GET, and
329+
/// the response will free the buffer without our help.
330+
///
331+
/// TODO: once we track per-handle in-flight state we can be more precise.
332+
/// For now this returns `true` whenever any cursor has an active read in
333+
/// progress, which over-conservatively prefers waiting over dropping a handle.
334+
pub(crate) fn has_active_reads_with_buffers(&self) -> bool {
335+
!self.active_reads.is_empty()
336+
}
337+
338+
/// Drop the least-recently-read idle prefetch handle to free its read
339+
/// window reservation and buffered parts.
340+
///
341+
/// Returns `true` if a handle was dropped. Stub for now: requires
342+
/// per-handle tracking of the current `RequestTask` so we can drop it
343+
/// synchronously.
344+
pub(crate) fn drop_one_idle_prefetch_handle(&self) -> bool {
345+
// TODO: Iterate the handle registry, filter to entries that have an
346+
// allocated request task but no active read, sort by last-read tick
347+
// ascending, drop the oldest entry's task and signal a reset.
348+
false
349+
}
350+
}
351+
352+
impl Drop for MemoryLimiter {
353+
fn drop(&mut self) {
354+
// Wake the pruning thread so it observes its `Weak<MemoryLimiter>`
355+
// failing to upgrade and exits. Without this, a pruner parked in the
356+
// outer wait at drop time would never wake.
357+
self.pruning_signal.notify();
358+
}
270359
}
271360

272361
/// Returns the effective total memory available to this process in bytes.

mountpoint-s3-fs/src/pruner.rs

Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
//! Background buffer-pruning engine for [`MemoryLimiter`].
2+
//!
3+
//! The pruner owns the loop, the wake signal, the round orchestration, and the
4+
//! coalescing tick. It calls back into the limiter for the actual decisions
5+
//! (which handles to inspect, which buffers to release).
6+
7+
use std::sync::OnceLock;
8+
use std::time::Duration;
9+
10+
use tracing::trace;
11+
12+
use crate::mem_limiter::MemoryLimiter;
13+
use crate::sync::thread::{self, JoinHandle, Thread};
14+
use crate::sync::{Arc, Weak};
15+
16+
/// Outcome of a single pruning round. Used for metrics and tracing.
17+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18+
enum PruningOutcome {
19+
/// Nothing to prune (queue empty). Pressure is defined as
20+
/// "queue non-empty", so an empty queue means pressure has already cleared.
21+
Idle,
22+
/// In-flight uploads or active reads will release buffers naturally; wait.
23+
WaitingForRelease,
24+
/// One idle prefetch handle was dropped this round.
25+
Acted,
26+
}
27+
28+
/// Period of the pruning loop's inner tick while under memory pressure.
29+
/// Each tick re-evaluates the queue, in-flight state, and cooldowns.
30+
pub(crate) const PRUNING_TICK: Duration = Duration::from_millis(1);
31+
/// If the head of the allocation queue has been waiting longer than this, the pruner
32+
/// will drop an idle prefetch handle even if uploads/active reads are in flight.
33+
/// Acts as a starvation backstop.
34+
pub(crate) const PRUNING_STARVATION_THRESHOLD: Duration = Duration::from_millis(5);
35+
36+
/// Spawn the background pruning loop. Must be called once after constructing the
37+
/// limiter, at filesystem init. Holds a [`Weak`] to the limiter so the thread
38+
/// terminates when the limiter is dropped.
39+
///
40+
/// Runs on a dedicated OS thread rather than the CRT event-loop-group.
41+
pub(crate) fn spawn_pruning_loop(limiter: &Arc<MemoryLimiter>) -> JoinHandle<()> {
42+
let weak = Arc::downgrade(limiter);
43+
let signal = limiter.pruning_signal().clone();
44+
let handle = thread::Builder::new()
45+
.name("mem-limiter-pruner".to_string())
46+
.spawn(move || pruning_loop(weak, signal))
47+
.expect("failed to spawn pruning thread");
48+
limiter.pruning_signal().register_thread(handle.thread().clone());
49+
handle
50+
}
51+
52+
/// "Pressure starting" signal for the pruning loop's outer wait.
53+
///
54+
/// Backed by the pruning thread's unpark token: any number of `notify()` calls
55+
/// before the next `wait()` collapse to one wake.
56+
///
57+
/// Once the inner tick is running, notifies are absorbed. That's
58+
/// intentional: under pressure we already know there's pressure, and the inner
59+
/// loop polls every [`PRUNING_TICK`] regardless.
60+
#[derive(Debug, Default)]
61+
pub(crate) struct PruningSignal {
62+
thread: OnceLock<Thread>,
63+
}
64+
65+
impl PruningSignal {
66+
pub(crate) fn new() -> Self {
67+
Self::default()
68+
}
69+
70+
pub(crate) fn notify(&self) {
71+
if let Some(t) = self.thread.get() {
72+
t.unpark();
73+
}
74+
}
75+
76+
/// Register the pruning thread that consumes `notify()` wakes. Called by
77+
/// [`spawn_pruning_loop`] on the spawning thread, using the freshly
78+
/// spawned pruning thread's [`Thread`] handle. Must be called exactly once.
79+
fn register_thread(&self, thread: Thread) {
80+
self.thread
81+
.set(thread)
82+
.expect("PruningSignal::register_thread called twice");
83+
}
84+
85+
/// Park until `notify()` is called. Must only be called from the pruning
86+
/// thread previously registered via [`Self::register_thread`].
87+
fn wait(&self) {
88+
thread::park();
89+
}
90+
}
91+
92+
/// Pruning loop body.
93+
///
94+
/// Two-level structure:
95+
/// - **Outer wait**: park until [`MemoryLimiter::trigger_pruning`] wakes us.
96+
/// - **Inner tick**: run a round every [`PRUNING_TICK`] until the round
97+
/// reports [`PruningOutcome::Idle`] (queue drained). The round itself is
98+
/// the only place that makes timing decisions (in-flight wait, starvation
99+
/// override).
100+
///
101+
/// `thread::sleep` is deliberately uninterruptible by `unpark`, so notifies
102+
/// during pressure do not cause extra rounds.
103+
fn pruning_loop(limiter: Weak<MemoryLimiter>, signal: Arc<PruningSignal>) {
104+
loop {
105+
signal.wait();
106+
107+
// Inner tick: poll every PRUNING_TICK until the queue drains.
108+
loop {
109+
let Some(strong) = limiter.upgrade() else {
110+
return; // limiter dropped — exit
111+
};
112+
let outcome = run_pruning_round(&strong);
113+
metrics::counter!("mem.pruning_rounds").increment(1);
114+
trace!(?outcome, "pruning round complete");
115+
drop(strong);
116+
117+
if matches!(outcome, PruningOutcome::Idle) {
118+
break; // back to outer park
119+
}
120+
thread::sleep(PRUNING_TICK);
121+
}
122+
}
123+
}
124+
125+
/// Run a single pruning round.
126+
///
127+
/// Order of operations:
128+
/// 1. `pool.trim()` — always cheap; releases empty pages back to the
129+
/// allocator. Does NOT directly progress the allocation queue.
130+
/// 2. If the queue is empty, return [`PruningOutcome::Idle`] so the loop
131+
/// exits its inner tick and goes back to parking.
132+
/// 3. If uploads are in flight or active reads hold buffers, let the
133+
/// natural release path do the work — unless the head of the queue
134+
/// has been waiting beyond [`PRUNING_STARVATION_THRESHOLD`].
135+
/// 4. Otherwise drop one idle prefetch handle.
136+
fn run_pruning_round(limiter: &MemoryLimiter) -> PruningOutcome {
137+
// 1. Pool trim — idempotent and harmless. Empty pages may now be reusable
138+
// by a different SizePool after a future allocation.
139+
// TODO: Consider doing trim cooldown (i.e. invoke trim less often)
140+
// if it's contending too much with reserve read lock.
141+
let _trimmed = limiter.pool().trim();
142+
143+
// 2. Allocation queue not yet implemented. For now we have no waiters,
144+
// so any wakeup we receive is transient.
145+
let queue_empty = true; // TODO: inspect allocation_queue
146+
let head_waited = Duration::ZERO; // TODO: queue.front().queued_at.elapsed()
147+
if queue_empty {
148+
return PruningOutcome::Idle;
149+
}
150+
151+
let starving = head_waited >= PRUNING_STARVATION_THRESHOLD;
152+
153+
// 3. Natural release path: in-flight uploads complete, or active reads
154+
// receive their S3 response, freeing buffers without our help.
155+
let in_flight = limiter.has_uploads_in_flight() || limiter.has_active_reads_with_buffers();
156+
if in_flight && !starving {
157+
return PruningOutcome::WaitingForRelease;
158+
}
159+
160+
// 4. Disruptive: drop one idle prefetch handle.
161+
if limiter.drop_one_idle_prefetch_handle() {
162+
metrics::counter!("mem.pruning_resets").increment(1);
163+
return PruningOutcome::Acted;
164+
}
165+
166+
// We attempted to drop an idle prefetch handle but found nothing eligible.
167+
// Wait for the next tick; a release elsewhere may unstick us.
168+
PruningOutcome::WaitingForRelease
169+
}
170+
171+
#[cfg(all(test, not(feature = "shuttle")))]
172+
mod tests {
173+
use std::sync::Arc;
174+
use std::time::{Duration, Instant};
175+
176+
use super::{PruningOutcome, PruningSignal, run_pruning_round};
177+
use crate::mem_limiter::{MINIMUM_MEM_LIMIT, MemoryLimiter};
178+
use crate::memory::PagedPool;
179+
180+
const TEST_WAIT_TIMEOUT: Duration = Duration::from_secs(1);
181+
182+
/// Dropping the limiter while the pruner is parked must wake the thread
183+
/// so it can observe the dead `Weak` and exit. Otherwise the thread leaks.
184+
#[test]
185+
fn pruning_thread_exits_on_limiter_drop() {
186+
let pool = PagedPool::new_with_candidate_sizes([1024]);
187+
let limiter = Arc::new(MemoryLimiter::new(pool, MINIMUM_MEM_LIMIT));
188+
let handle = crate::pruner::spawn_pruning_loop(&limiter);
189+
190+
drop(limiter);
191+
192+
let deadline = Instant::now() + TEST_WAIT_TIMEOUT;
193+
while !handle.is_finished() {
194+
assert!(
195+
Instant::now() < deadline,
196+
"pruning thread did not exit within {TEST_WAIT_TIMEOUT:?} of limiter drop \
197+
— likely missing `Drop` notify on `MemoryLimiter`",
198+
);
199+
std::thread::sleep(Duration::from_millis(10));
200+
}
201+
handle.join().expect("pruning thread panicked");
202+
}
203+
204+
/// `PruningSignal::notify` from another thread must wake the registered
205+
/// pruning thread. This is the basic primitive on which the loop's outer
206+
/// wait depends.
207+
#[test]
208+
fn signal_notify_wakes_registered_waiter() {
209+
let signal = Arc::new(PruningSignal::new());
210+
let (tx, rx) = std::sync::mpsc::channel();
211+
let signal_for_pruner = signal.clone();
212+
let pruner = std::thread::spawn(move || {
213+
signal_for_pruner.wait();
214+
tx.send(()).unwrap();
215+
});
216+
217+
signal.register_thread(pruner.thread().clone());
218+
signal.notify();
219+
220+
rx.recv_timeout(TEST_WAIT_TIMEOUT)
221+
.expect("pruning thread did not wake within timeout of notify()");
222+
pruner.join().expect("pruning thread panicked");
223+
}
224+
225+
/// Contract for `run_pruning_round` while the allocation queue stub
226+
/// reports empty: returns [`PruningOutcome::Idle`]. Pressure is defined
227+
/// as "queue non-empty", so an empty queue means no pressure — the
228+
/// pruner's job here is to observe and exit.
229+
#[test]
230+
fn run_pruning_round_returns_idle_on_empty_queue() {
231+
let pool = PagedPool::new_with_candidate_sizes([1024]);
232+
let limiter = MemoryLimiter::new(pool, MINIMUM_MEM_LIMIT);
233+
234+
let outcome = run_pruning_round(&limiter);
235+
assert_eq!(outcome, PruningOutcome::Idle);
236+
assert!(
237+
!limiter.is_memory_pressure(),
238+
"empty allocation queue must report no memory pressure",
239+
);
240+
}
241+
}

0 commit comments

Comments
 (0)