diff --git a/mountpoint-s3-fs/src/memory.rs b/mountpoint-s3-fs/src/memory.rs index 43a48be1e..9a1fea57a 100644 --- a/mountpoint-s3-fs/src/memory.rs +++ b/mountpoint-s3-fs/src/memory.rs @@ -5,6 +5,8 @@ mod pool; mod stats; pub use buffers::{PoolBuffer, PoolBufferMut}; -pub use limiter::{ActiveRead, ActiveReadGuard, BufferArea, MINIMUM_MEM_LIMIT, effective_total_memory}; +pub use limiter::{ + ActiveRead, ActiveReadGuard, BufferArea, CursorHandle, CursorState, MINIMUM_MEM_LIMIT, effective_total_memory, +}; pub use pool::PagedPool; pub use stats::BufferKind; diff --git a/mountpoint-s3-fs/src/memory/limiter.rs b/mountpoint-s3-fs/src/memory/limiter.rs index 1554df9e5..874679c7a 100644 --- a/mountpoint-s3-fs/src/memory/limiter.rs +++ b/mountpoint-s3-fs/src/memory/limiter.rs @@ -6,9 +6,10 @@ use sysinfo::System; use tracing::{debug, trace}; use crate::prefetch::CursorId; -use crate::sync::Arc; use crate::sync::atomic::{AtomicU64, Ordering}; +use crate::sync::{Arc, Mutex, Weak}; +use super::PagedPool; use super::stats::PoolStats; pub const MINIMUM_MEM_LIMIT: u64 = 512 * 1024 * 1024; @@ -29,39 +30,6 @@ impl BufferArea { } } -/// Describes an active FUSE read request for a specific cursor. -/// Used by the pruning logic to determine which buffers are high priority -/// (actively being waited on by a user) vs. speculative (prefetched ahead). -#[derive(Debug, Clone, Copy)] -pub struct ActiveRead { - /// Start offset of the read in the file. - pub offset: u64, - /// Size of the read in bytes. - pub size: usize, -} - -impl ActiveRead { - /// Check if this active read overlaps with the given range. - pub fn overlaps(&self, offset: u64, size: usize) -> bool { - let self_end = self.offset + self.size as u64; - let other_end = offset + size as u64; - self.offset < other_end && offset < self_end - } -} - -/// RAII guard that clears the active read for a cursor when dropped. -/// Ensures the active read is always cleared, even on early returns or panics. -pub struct ActiveReadGuard { - active_reads: Arc>, - cursor_id: CursorId, -} - -impl Drop for ActiveReadGuard { - fn drop(&mut self) { - self.active_reads.remove(&self.cursor_id); - } -} - /// `MemoryLimiter` tracks memory used by Mountpoint and makes decisions if a new memory reservation request can be accepted. /// Currently, there are two metrics we take into account: /// 1) the memory directly reserved on the limiter by prefetcher instances (read path). @@ -86,19 +54,14 @@ impl Drop for ActiveReadGuard { #[derive(Debug)] pub struct MemoryLimiter { mem_limit: u64, - /// Global total of reserved memory (lock-free). Used in budget checks (try_reserve, available_mem). + /// Global total of reserved memory (lock-free). Used in budget checks. mem_reserved: Arc, - /// Per-cursor reservation tracking. Keyed by CursorId (unique per BackpressureController - /// lifetime) so that late on_pool_reserve calls from cancelled CRT meta-requests cannot - /// incorrectly decrement a re-created entry for the same cursor. - mem_reserved_per_cursor: Arc>>, + /// Unified per-cursor state. Cursors own a strong reference to their state. + cursors: Arc>>, /// Counter for generating unique [CursorId]s. next_cursor_id: AtomicU64, /// Additional reserved memory for other non-buffer usage like storing metadata additional_mem_reserved: u64, - /// Per-cursor active read tracking. When a FUSE read is in progress for a cursor, - /// the requested range is stored here. Absence means the cursor is speculative. - active_reads: Arc>, } impl MemoryLimiter { @@ -111,27 +74,24 @@ impl MemoryLimiter { formatter(mem_limit), formatter(additional_mem_reserved) ); - let mem_reserved = Arc::new(AtomicU64::new(0)); - let mem_reserved_per_cursor: Arc>> = Arc::new(DashMap::new()); Self { mem_limit, - mem_reserved, - mem_reserved_per_cursor, + mem_reserved: Default::default(), + cursors: Default::default(), next_cursor_id: AtomicU64::new(1), additional_mem_reserved, - active_reads: Arc::new(DashMap::new()), } } /// Reserve the memory for future uses. Always succeeds, even if it means going beyond /// the configured memory limit. - pub fn reserve(&self, cursor_id: CursorId, area: BufferArea, size: u64) { - self.add_reservation(cursor_id, size); + fn reserve(&self, area: BufferArea, size: u64) { + self.mem_reserved.fetch_add(size, Ordering::SeqCst); metrics::gauge!("mem.bytes_reserved", "area" => area.as_str()).increment(size as f64); } /// Reserve the memory for future uses. If there is not enough memory returns `false`. - pub fn try_reserve(&self, cursor_id: CursorId, area: BufferArea, size: u64, stats: &PoolStats) -> bool { + fn try_reserve(&self, area: BufferArea, size: u64, stats: &PoolStats) -> bool { let start = Instant::now(); let mut mem_reserved = self.mem_reserved.load(Ordering::SeqCst); loop { @@ -154,10 +114,6 @@ impl MemoryLimiter { Ordering::SeqCst, ) { Ok(_) => { - self.mem_reserved_per_cursor - .entry(cursor_id) - .or_default() - .fetch_add(size, Ordering::SeqCst); metrics::gauge!("mem.bytes_reserved", "area" => area.as_str()).increment(size as f64); metrics::histogram!("mem.reserve_latency_us", "area" => area.as_str()) .record(start.elapsed().as_micros() as f64); @@ -169,17 +125,20 @@ impl MemoryLimiter { } /// Release all remaining reservation for a cursor and remove it from tracking. - pub fn release_cursor(&self, cursor_id: CursorId, area: BufferArea) { - if let Some((_, reservation)) = self.mem_reserved_per_cursor.remove(&cursor_id) { - let remaining = reservation.swap(0, Ordering::SeqCst); + fn release_cursor(&self, cursor_id: CursorId, cursor_reserved: &AtomicU64) { + if self.cursors.remove(&cursor_id).is_some() { + let remaining = cursor_reserved.swap(0, Ordering::SeqCst); self.mem_reserved.fetch_sub(remaining, Ordering::SeqCst); - metrics::gauge!("mem.bytes_reserved", "area" => area.as_str()).decrement(remaining as f64); + metrics::gauge!("mem.bytes_reserved", "area" => BufferArea::Prefetch.as_str()).decrement(remaining as f64); } } - /// Generate a new unique [CursorId] for per-cursor memory tracking. - pub fn next_cursor_id(&self) -> CursorId { - CursorId::new_from_raw(self.next_cursor_id.fetch_add(1, Ordering::Relaxed)) + /// Create a new cursor, insert its state into the map, and return the shared state handle. + pub fn create_cursor(&self, pool: &PagedPool) -> CursorHandle { + let id = CursorId::new_from_raw(self.next_cursor_id.fetch_add(1, Ordering::Relaxed)); + let state = Arc::new(CursorState::new(pool.clone(), id)); + self.cursors.insert(id, Arc::downgrade(&state)); + CursorHandle { state } } /// Query available memory tracked by the memory limiter. @@ -192,33 +151,17 @@ impl MemoryLimiter { .saturating_sub(self.additional_mem_reserved) } - /// Record that a FUSE read is active for the given cursor at the specified range. - /// Returns a guard that will clear the active read when dropped. - pub fn set_active_read(&self, cursor_id: CursorId, offset: u64, size: usize) -> ActiveReadGuard { - self.active_reads.insert(cursor_id, ActiveRead { offset, size }); - ActiveReadGuard { - active_reads: self.active_reads.clone(), - cursor_id, - } - } - /// Check if the given cursor has an active read overlapping the specified range. pub fn has_active_read_in_range(&self, cursor_id: CursorId, offset: u64, size: usize) -> bool { - self.active_reads + // The weak reference fails to upgrade iff the cursor has already been dropped, which means + // it has no active read. + self.cursors .get(&cursor_id) - .map(|r| r.overlaps(offset, size)) + .and_then(|r| r.upgrade()) + .and_then(|s| s.active_read.lock().unwrap().map(|r| r.overlaps(offset, size))) .unwrap_or(false) } - /// Increment both the global total and the per-cursor reservation. - fn add_reservation(&self, cursor_id: CursorId, size: u64) { - self.mem_reserved.fetch_add(size, Ordering::SeqCst); - self.mem_reserved_per_cursor - .entry(cursor_id) - .or_default() - .fetch_add(size, Ordering::SeqCst); - } - /// Called by the pool on every buffer allocation. For download buffers with a known cursor, /// this converts reservation from "intent" (`mem_reserved`) to "actual allocation" (pool stats) /// by decrementing both the global and per-cursor counters. @@ -226,17 +169,19 @@ impl MemoryLimiter { /// No-op when `cursor_id` is `None` (e.g. uploads) or the cursor has already been removed /// by `release_cursor`. pub fn on_pool_reserve(&self, bytes: usize, cursor_id: Option) { - let Some(cursor_id) = cursor_id else { - return; - }; - // Clone the Arc to release the DashMap shard lock before doing atomic operations. - let Some(cursor_reservation) = self.mem_reserved_per_cursor.get(&cursor_id).map(|r| r.value().clone()) else { + let Some(state) = cursor_id + .and_then(|id| self.cursors.get(&id)) + .and_then(|r| r.value().upgrade()) + else { return; }; - let mut current = cursor_reservation.load(Ordering::SeqCst); + let mut current = state.mem_reserved.load(Ordering::SeqCst); let decremented = loop { let new_val = current.saturating_sub(bytes as u64); - match cursor_reservation.compare_exchange_weak(current, new_val, Ordering::SeqCst, Ordering::SeqCst) { + match state + .mem_reserved + .compare_exchange_weak(current, new_val, Ordering::SeqCst, Ordering::SeqCst) + { Ok(_) => break current - new_val, Err(actual) => current = actual, } @@ -263,132 +208,241 @@ pub fn effective_total_memory() -> u64 { .unwrap_or_else(|| sys.total_memory()) } +/// Describes an active FUSE read request for a specific cursor. +/// Used by the pruning logic to determine which buffers are high priority +/// (actively being waited on by a user) vs. speculative (prefetched ahead). +#[derive(Debug, Clone, Copy)] +pub struct ActiveRead { + /// Start offset of the read in the file. + pub offset: u64, + /// Size of the read in bytes. + pub size: usize, +} + +impl ActiveRead { + /// Check if this active read overlaps with the given range. + pub fn overlaps(&self, offset: u64, size: usize) -> bool { + let self_end = self.offset + self.size as u64; + let other_end = offset + size as u64; + self.offset < other_end && offset < self_end + } +} + +/// Per-cursor state shared between the memory pool's limiter, +/// the BackpressureController, and the Cursor. +#[derive(Debug)] +pub struct CursorState { + /// The memory pool tracking this cursor. + pool: PagedPool, + /// The unique identifier for this cursor. + cursor_id: CursorId, + /// Reservation balance (bytes of intent not yet converted to pool allocations). + mem_reserved: AtomicU64, + /// Active FUSE read range. None when no read is in progress. + active_read: Mutex>, +} + +impl CursorState { + fn new(pool: PagedPool, cursor_id: CursorId) -> Self { + Self { + pool, + cursor_id, + mem_reserved: AtomicU64::new(0), + active_read: Mutex::new(None), + } + } + + /// The unique identifier for this cursor. + pub fn id(&self) -> CursorId { + self.cursor_id + } + + /// Reserve memory unconditionally. Increments both the per-cursor and global counters. + pub fn reserve(&self, size: u64) { + self.pool.limiter().reserve(BufferArea::Prefetch, size); + self.mem_reserved.fetch_add(size, Ordering::SeqCst); + } + + /// Try to reserve memory. Returns false if the budget would be exceeded. + pub fn try_reserve(&self, size: u64) -> bool { + if !self + .pool + .limiter() + .try_reserve(BufferArea::Prefetch, size, self.pool.stats()) + { + return false; + } + self.mem_reserved.fetch_add(size, Ordering::SeqCst); + true + } + + /// Query available memory tracked by the associated memory limiter. + pub fn available_mem(&self) -> u64 { + self.pool.available_mem() + } +} + +impl Drop for CursorState { + fn drop(&mut self) { + // The limiter holds a weak reference to `CursorState`, so this `Drop` runs when all strong + // references are gone. `release_cursor` removes the reference and decrements the global reservation counter. + self.pool.limiter().release_cursor(self.cursor_id, &self.mem_reserved); + } +} + +/// Handle to a CursorState. Allows to set active reads. +#[derive(Debug)] +pub struct CursorHandle { + state: Arc, +} + +impl CursorHandle { + /// Id of this cursor. + pub fn id(&self) -> CursorId { + self.state.id() + } + + /// The state of the cursor. + pub fn state(&self) -> Arc { + self.state.clone() + } + + /// Record an active FUSE read. Returns a guard that clears it on drop. + pub fn set_active_read(&self, offset: u64, size: usize) -> ActiveReadGuard { + *self.state.active_read.lock().unwrap() = Some(ActiveRead { offset, size }); + ActiveReadGuard { state: self.state() } + } +} + +/// RAII guard that clears the active read for a cursor when dropped. +pub struct ActiveReadGuard { + state: Arc, +} + +impl Drop for ActiveReadGuard { + fn drop(&mut self) { + *self.state.active_read.lock().unwrap() = None; + } +} + #[cfg(test)] mod tests { + // TODO: Consider which tests are specific to the MemoryLimiter and which are testing the whole PagedPool. + use super::*; use crate::memory::{BufferKind, PagedPool}; - use crate::sync::Arc; use crate::sync::atomic::Ordering; - fn new_limiter() -> Arc { - Arc::new(MemoryLimiter::new(MINIMUM_MEM_LIMIT)) + fn new_pool() -> PagedPool { + PagedPool::new_with_candidate_sizes_minimally_limited([1024]) } #[test] fn test_reserve_and_release_cursor() { - let limiter = new_limiter(); - let cursor = limiter.next_cursor_id(); + let pool = new_pool(); + let limiter = pool.limiter(); + let cursor = pool.create_cursor().state(); - limiter.reserve(cursor, BufferArea::Prefetch, 100); + cursor.reserve(100); assert_eq!(limiter.mem_reserved.load(Ordering::SeqCst), 100); - limiter.release_cursor(cursor, BufferArea::Prefetch); + let cursor_id = cursor.id(); + drop(cursor); assert_eq!(limiter.mem_reserved.load(Ordering::SeqCst), 0); - assert!(!limiter.mem_reserved_per_cursor.contains_key(&cursor)); + assert!(!limiter.cursors.contains_key(&cursor_id)); } #[test] fn test_pool_allocation_decrements_mem_reserved() { - let pool = PagedPool::new_with_candidate_sizes_minimally_limited([1024]); - let limiter = pool.inner_limiter(); - let cursor = limiter.next_cursor_id(); + let pool = new_pool(); + let limiter = pool.limiter(); + let cursor = pool.create_cursor().state(); // Reserve 1024 bytes of intent - limiter.reserve(cursor, BufferArea::Prefetch, 1024); + cursor.reserve(1024); assert_eq!(limiter.mem_reserved.load(Ordering::SeqCst), 1024); - // Pool allocation calls on_pool_reserve, decrementing mem_reserved - let _buffer = pool.get_buffer_mut(1024, BufferKind::GetObject, Some(cursor)); + // Pool allocation triggers on_reserve callback, decrementing mem_reserved + let _buffer = pool.get_buffer_mut(1024, BufferKind::GetObject, Some(cursor.id())); assert_eq!(limiter.mem_reserved.load(Ordering::SeqCst), 0); } #[test] - fn test_reserve_pool_allocate_release_cursor() { - let pool = PagedPool::new_with_candidate_sizes_minimally_limited([1024]); - let limiter = pool.inner_limiter(); - let cursor = limiter.next_cursor_id(); + fn test_reserve_pool_allocate_drop() { + let pool = new_pool(); + let limiter = pool.limiter(); + let cursor = pool.create_cursor().state(); // Reserve 2048 bytes of intent - limiter.reserve(cursor, BufferArea::Prefetch, 2048); + cursor.reserve(2048); assert_eq!(limiter.mem_reserved.load(Ordering::SeqCst), 2048); - // Pool allocates 1024 — on_pool_reserve decrements both global and per-cursor - let _buffer = pool.get_buffer_mut(1024, BufferKind::GetObject, Some(cursor)); + // Pool allocates 1024 — callback decrements both global and per-cursor + let _buffer = pool.get_buffer_mut(1024, BufferKind::GetObject, Some(cursor.id())); assert_eq!(limiter.mem_reserved.load(Ordering::SeqCst), 1024); - // release_cursor releases the remaining per-cursor balance (2048 - 1024 = 1024) - limiter.release_cursor(cursor, BufferArea::Prefetch); + // dropping the cursor releases the remaining per-cursor balance (2048 - 1024 = 1024) + let cursor_id = cursor.id(); + drop(cursor); assert_eq!(limiter.mem_reserved.load(Ordering::SeqCst), 0); - assert!(!limiter.mem_reserved_per_cursor.contains_key(&cursor)); + assert!(!limiter.cursors.contains_key(&cursor_id)); } #[test] fn test_try_reserve_respects_limit() { - let limiter = new_limiter(); - let cursor = limiter.next_cursor_id(); - let stats = crate::memory::stats::PoolStats::default(); + let pool = new_pool(); + let cursor = pool.create_cursor().state(); // Fill up to the limit (minus additional_mem_reserved) - let available = limiter.available_mem(&stats); - limiter.reserve(cursor, BufferArea::Prefetch, available); + let available = pool.available_mem(); + cursor.reserve(available); // Should fail — no room left - assert!(!limiter.try_reserve(cursor, BufferArea::Prefetch, 1, &stats)); + assert!(!cursor.try_reserve(1)); } #[test] fn test_multiple_cursors_independent() { - let limiter = new_limiter(); - let cursor1 = limiter.next_cursor_id(); - let cursor2 = limiter.next_cursor_id(); + let pool = new_pool(); + let limiter = pool.limiter(); + let cursor1 = pool.create_cursor().state(); + let cursor2 = pool.create_cursor().state(); - limiter.reserve(cursor1, BufferArea::Prefetch, 100); - limiter.reserve(cursor2, BufferArea::Prefetch, 200); + cursor1.reserve(100); + cursor2.reserve(200); assert_eq!(limiter.mem_reserved.load(Ordering::SeqCst), 300); // Release cursor1 — only its 100 bytes - limiter.release_cursor(cursor1, BufferArea::Prefetch); + let cursor1_id = cursor1.id(); + drop(cursor1); assert_eq!(limiter.mem_reserved.load(Ordering::SeqCst), 200); - assert!(!limiter.mem_reserved_per_cursor.contains_key(&cursor1)); + assert!(!limiter.cursors.contains_key(&cursor1_id)); // Cursor2 still tracked - assert!(limiter.mem_reserved_per_cursor.contains_key(&cursor2)); - - limiter.release_cursor(cursor2, BufferArea::Prefetch); - assert_eq!(limiter.mem_reserved.load(Ordering::SeqCst), 0); - assert!(limiter.mem_reserved_per_cursor.is_empty()); - } - - #[test] - fn test_release_cursor_idempotent() { - let limiter = new_limiter(); - let cursor = limiter.next_cursor_id(); - - limiter.reserve(cursor, BufferArea::Prefetch, 100); - limiter.release_cursor(cursor, BufferArea::Prefetch); + assert!(limiter.cursors.contains_key(&cursor2.id())); - // Second call is a no-op - limiter.release_cursor(cursor, BufferArea::Prefetch); + drop(cursor2); assert_eq!(limiter.mem_reserved.load(Ordering::SeqCst), 0); - assert!(!limiter.mem_reserved_per_cursor.contains_key(&cursor)); + assert!(limiter.cursors.is_empty()); } #[test] fn test_on_pool_reserve_noop_after_release_cursor() { - // Simulates the cancellation race: on_pool_reserve fires after release_cursor - // removed the entry. The call should be a no-op. + // Simulates the cancellation race: on_reserve fires after release_cursor + // removed the entry. The callback should be a no-op. let pool = PagedPool::new_with_candidate_sizes_minimally_limited([1024]); - let limiter = pool.inner_limiter(); - let cursor = limiter.next_cursor_id(); + let limiter = pool.limiter(); + let cursor = pool.create_cursor().state(); - limiter.reserve(cursor, BufferArea::Prefetch, 1024); - limiter.release_cursor(cursor, BufferArea::Prefetch); + cursor.reserve(1024); + let cursor_id = cursor.id(); + drop(cursor); assert_eq!(limiter.mem_reserved.load(Ordering::SeqCst), 0); - assert!(!limiter.mem_reserved_per_cursor.contains_key(&cursor)); + assert!(!limiter.cursors.contains_key(&cursor_id)); // Late allocation for the cancelled request — cursor is gone - let _buffer = pool.get_buffer_mut(1024, BufferKind::GetObject, Some(cursor)); + let _buffer = pool.get_buffer_mut(1024, BufferKind::GetObject, Some(cursor_id)); // mem_reserved should stay at 0, not go negative or wrap assert_eq!(limiter.mem_reserved.load(Ordering::SeqCst), 0); @@ -397,38 +451,40 @@ mod tests { #[test] fn test_on_pool_reserve_saturates_on_over_decrement() { let pool = PagedPool::new_with_candidate_sizes_minimally_limited([1024]); - let limiter = pool.inner_limiter(); - let cursor = limiter.next_cursor_id(); + let limiter = pool.limiter(); + let cursor = pool.create_cursor().state(); // Reserve only 512 bytes - limiter.reserve(cursor, BufferArea::Prefetch, 512); + cursor.reserve(512); assert_eq!(limiter.mem_reserved.load(Ordering::SeqCst), 512); // Pool allocates 1024 — on_pool_reserve should saturate at 512 (not underflow) - let _buffer = pool.get_buffer_mut(1024, BufferKind::GetObject, Some(cursor)); + let cursor_id = cursor.id(); + let _buffer = pool.get_buffer_mut(1024, BufferKind::GetObject, Some(cursor_id)); assert_eq!(limiter.mem_reserved.load(Ordering::SeqCst), 0); - // release_cursor should subtract 0 (the per-cursor counter is already 0) - limiter.release_cursor(cursor, BufferArea::Prefetch); + // dropping the cursor should subtract 0 (the per-cursor counter is already 0) + drop(cursor); assert_eq!(limiter.mem_reserved.load(Ordering::SeqCst), 0); - assert!(!limiter.mem_reserved_per_cursor.contains_key(&cursor)); + assert!(!limiter.cursors.contains_key(&cursor_id)); } #[test] fn test_available_mem_accounts_for_pool_allocations() { let pool = PagedPool::new_with_candidate_sizes_minimally_limited([1024]); - let limiter = pool.inner_limiter(); - let cursor = limiter.next_cursor_id(); - let stats = pool.inner_stats(); + let limiter = pool.limiter(); + let cursor = pool.create_cursor().state(); + let stats = pool.stats(); let initial_available = limiter.available_mem(stats); // Reserve intent — available decreases - limiter.reserve(cursor, BufferArea::Prefetch, 1024); + cursor.reserve(1024); assert_eq!(limiter.available_mem(stats), initial_available - 1024); // Pool allocates (on_pool_reserve converts intent to pool stats) — available stays the same - let buffer = pool.get_buffer_mut(1024, BufferKind::GetObject, Some(cursor)); + // because mem_reserved decreases by 1024 while pool stats increase by 1024. + let buffer = pool.get_buffer_mut(1024, BufferKind::GetObject, Some(cursor.id())); assert_eq!(limiter.mem_reserved.load(Ordering::SeqCst), 0); assert_eq!(limiter.available_mem(stats), initial_available - 1024); @@ -440,8 +496,8 @@ mod tests { #[test] fn test_upload_allocation_does_not_affect_mem_reserved() { let pool = PagedPool::new_with_candidate_sizes_minimally_limited([1024]); - let limiter = pool.inner_limiter(); - let stats = pool.inner_stats(); + let limiter = pool.limiter(); + let stats = pool.stats(); let initial_available = limiter.available_mem(stats); assert_eq!(limiter.mem_reserved.load(Ordering::SeqCst), 0); @@ -460,11 +516,13 @@ mod tests { #[test] fn test_active_read_overlap_detection() { - let limiter = new_limiter(); - let cursor_id = CursorId::new_from_raw(1); + let pool = new_pool(); + let limiter = pool.limiter(); + let cursor = pool.create_cursor(); + let cursor_id = cursor.id(); // Active read at [1000, 5096) - let _guard = limiter.set_active_read(cursor_id, 1000, 4096); + let _guard = cursor.set_active_read(1000, 4096); // Overlapping ranges → true assert!(limiter.has_active_read_in_range(cursor_id, 1000, 4096)); // exact match @@ -476,30 +534,36 @@ mod tests { assert!(!limiter.has_active_read_in_range(cursor_id, 0, 500)); // before assert!(!limiter.has_active_read_in_range(cursor_id, 5096, 1000)); // after - // Different cursor_id → false - assert!(!limiter.has_active_read_in_range(CursorId::new_from_raw(2), 1000, 4096)); + // Different cursor → false + let other_cursor = pool.create_cursor(); + assert!(!limiter.has_active_read_in_range(other_cursor.id(), 1000, 4096)); } /// Simulates the allocation queue's perspective: one thread holds an active read /// while another thread queries whether a given range is active. #[test] fn test_query_active_read_from_another_thread() { - let limiter = new_limiter(); - let cursor_id = CursorId::new_from_raw(1); - - let guard = limiter.set_active_read(cursor_id, 1000, 4096); - - let limiter_clone = Arc::clone(&limiter); - let query_thread = std::thread::spawn(move || { - // Allocation for the active range → high priority - assert!(limiter_clone.has_active_read_in_range(cursor_id, 1000, 4096)); - // Allocation for a prefetch-ahead range → low priority - assert!(!limiter_clone.has_active_read_in_range(cursor_id, 50000, 4096)); - }); - query_thread.join().unwrap(); + let pool = new_pool(); + let limiter = pool.limiter(); + let cursor = pool.create_cursor(); + + let guard = cursor.set_active_read(1000, 4096); + + { + let pool = pool.clone(); + let cursor_id = cursor.id(); + let query_thread = std::thread::spawn(move || { + let limiter = pool.limiter(); + // Allocation for the active range → high priority + assert!(limiter.has_active_read_in_range(cursor_id, 1000, 4096)); + // Allocation for a prefetch-ahead range → low priority + assert!(!limiter.has_active_read_in_range(cursor_id, 50000, 4096)); + }); + query_thread.join().unwrap(); + } drop(guard); - assert!(!limiter.has_active_read_in_range(cursor_id, 1000, 4096)); + assert!(!limiter.has_active_read_in_range(cursor.id(), 1000, 4096)); } /// When the `TEST_CGROUP_MEM_LIMIT_MB` environment variable is set (e.g. in a diff --git a/mountpoint-s3-fs/src/memory/pool.rs b/mountpoint-s3-fs/src/memory/pool.rs index cfa697390..36673ecdf 100644 --- a/mountpoint-s3-fs/src/memory/pool.rs +++ b/mountpoint-s3-fs/src/memory/pool.rs @@ -6,7 +6,7 @@ use crate::prefetch::CursorId; use crate::sync::{Arc, RwLock}; use super::buffers::{PoolBuffer, PoolBufferMut}; -use super::limiter::{ActiveReadGuard, BufferArea, MemoryLimiter}; +use super::limiter::{CursorHandle, MemoryLimiter}; use super::pages::{Page, PagedBufferPtr}; use super::stats::{BufferKind, PoolStats, SizePoolStats}; @@ -183,40 +183,9 @@ impl PagedPool { .sum() } - // TODO: Refactor MemoryLimiter unit tests to remove inner_stats and inner_limiter. We should have clearly separated unit tests for the PagedPool and the MemoryLimiter. - - /// Expose the internal stats for testing purposes (e.g., to wire a standalone limiter). - #[cfg(test)] - pub(crate) fn inner_stats(&self) -> &PoolStats { - &self.inner.stats - } - - /// Expose the internal limiter for testing purposes. - #[cfg(test)] - pub(crate) fn inner_limiter(&self) -> &MemoryLimiter { - &self.inner.limiter - } - - // ─── Delegation methods for MemoryLimiter ─────────────────────────────────── - - /// Reserve memory for future uses. Always succeeds (unconditional). - pub fn reserve(&self, cursor_id: CursorId, area: BufferArea, size: u64) { - self.inner.limiter.reserve(cursor_id, area, size); - } - - /// Reserve memory if available. Returns `false` if over budget. - pub fn try_reserve(&self, cursor_id: CursorId, area: BufferArea, size: u64) -> bool { - self.inner.limiter.try_reserve(cursor_id, area, size, &self.inner.stats) - } - - /// Release all remaining reservation for a cursor and remove it from tracking. - pub fn release_cursor(&self, cursor_id: CursorId, area: BufferArea) { - self.inner.limiter.release_cursor(cursor_id, area); - } - - /// Generate a new unique CursorId. - pub fn next_cursor_id(&self) -> CursorId { - self.inner.limiter.next_cursor_id() + /// Create a new cursor and return the shared state handle. + pub fn create_cursor(&self) -> CursorHandle { + self.inner.limiter.create_cursor(self) } /// Query available memory. @@ -224,16 +193,20 @@ impl PagedPool { self.inner.limiter.available_mem(&self.inner.stats) } - /// Record that a FUSE read is active for the given cursor. - /// Returns a guard that clears the active read on drop. - pub fn set_active_read(&self, cursor_id: CursorId, offset: u64, size: usize) -> ActiveReadGuard { - self.inner.limiter.set_active_read(cursor_id, offset, size) - } - /// Check if the given cursor has an active read overlapping the specified range. pub fn has_active_read_in_range(&self, cursor_id: CursorId, offset: u64, size: usize) -> bool { self.inner.limiter.has_active_read_in_range(cursor_id, offset, size) } + + // ─── Internal components exposed to the rest of the `memory` module. ────────── + + pub(super) fn stats(&self) -> &PoolStats { + &self.inner.stats + } + + pub(super) fn limiter(&self) -> &MemoryLimiter { + &self.inner.limiter + } } impl MemoryPool for PagedPool { diff --git a/mountpoint-s3-fs/src/prefetch.rs b/mountpoint-s3-fs/src/prefetch.rs index d449da712..94a0281dc 100644 --- a/mountpoint-s3-fs/src/prefetch.rs +++ b/mountpoint-s3-fs/src/prefetch.rs @@ -42,7 +42,6 @@ use tracing::trace; use crate::checksums::{ChecksummedBytes, IntegrityError}; use crate::data_cache::DataCache; use crate::fs::error_metadata::{ErrorMetadata, MOUNTPOINT_ERROR_CLIENT}; -use crate::memory::PagedPool; use crate::metrics::defs::FUSE_CACHE_HIT; use crate::object::ObjectId; @@ -186,7 +185,6 @@ fn determine_max_read_size() -> usize { #[derive(Debug)] pub struct Prefetcher { part_stream: PartStream, - pool: PagedPool, config: PrefetcherConfig, } @@ -208,12 +206,8 @@ where } /// Create a new [Prefetcher] from the given [ObjectPartStream] instance. - pub fn new(part_stream: PartStream, pool: PagedPool, config: PrefetcherConfig) -> Self { - Self { - part_stream, - pool, - config, - } + pub fn new(part_stream: PartStream, config: PrefetcherConfig) -> Self { + Self { part_stream, config } } /// Start a new prefetch request to the specified object. @@ -221,14 +215,7 @@ where where Client: ObjectClient + Clone + Send + Sync + 'static, { - PrefetchGetObject::new( - self.part_stream.clone(), - self.pool.clone(), - self.config, - bucket, - object_id, - size, - ) + PrefetchGetObject::new(self.part_stream.clone(), self.config, bucket, object_id, size) } } @@ -239,7 +226,6 @@ where Client: ObjectClient + Clone + Send + Sync + 'static, { part_stream: PartStream, - pool: PagedPool, config: PrefetcherConfig, bucket: String, object_id: ObjectId, @@ -256,7 +242,6 @@ where /// Create and spawn a new prefetching request for an object fn new( part_stream: PartStream, - pool: PagedPool, config: PrefetcherConfig, bucket: String, object_id: ObjectId, @@ -264,7 +249,6 @@ where ) -> Self { PrefetchGetObject { part_stream, - pool, config, bucket, object_id, @@ -347,39 +331,33 @@ where /// Create a new Cursor and associated backpressure GetObject request which has a range from current offset /// to the end of the file. fn create_cursor(&self, offset: u64) -> Result, PrefetchReadError> { - let start = offset; let object_size = self.size as usize; - let read_part_size = self.part_stream.client().read_part_size(); - let range = RequestRange::new(object_size, start, object_size); - - // The prefetcher now relies on backpressure mechanism so it must be enabled - match self.part_stream.client().initial_read_window_size() { - Some(value) => { - // Also, make sure that we don't get blocked from the beginning - if value == 0 { - return Err(PrefetchReadError::BackpressurePreconditionFailed); - } - } - None => return Err(PrefetchReadError::BackpressurePreconditionFailed), - }; + let client = self.part_stream.client(); + + // Validate backpressure preconditions: client must have backpressure enabled + // with an initial read window size greater than 0. + match client.initial_read_window_size() { + Some(0) | None => return Err(PrefetchReadError::BackpressurePreconditionFailed), + Some(_) => {} + } - let cursor_id = self.pool.next_cursor_id(); - let config = RequestTaskConfig { - cursor_id, + let pool = self.part_stream.pool(); + let cursor_handle = pool.create_cursor(); + let task_config = RequestTaskConfig { + cursor_state: cursor_handle.state(), bucket: self.bucket.clone(), object_id: self.object_id.clone(), - range, - read_part_size, + range: RequestRange::new(object_size, offset, object_size), + read_part_size: client.read_part_size(), preferred_part_size: self.preferred_part_size, initial_request_size: self.config.initial_request_size, max_read_window_size: self.config.max_read_window_size, read_window_size_multiplier: self.config.sequential_prefetch_multiplier, }; - let request_task = self.part_stream.spawn_get_object_request(config); + let request_task = self.part_stream.spawn_request_task(task_config); Ok(Cursor::new( - cursor_id, request_task, - self.pool.clone(), + cursor_handle, &self.config, self.object_id.clone(), offset, diff --git a/mountpoint-s3-fs/src/prefetch/backpressure_controller.rs b/mountpoint-s3-fs/src/prefetch/backpressure_controller.rs index 0a6da8698..90f2f9913 100644 --- a/mountpoint-s3-fs/src/prefetch/backpressure_controller.rs +++ b/mountpoint-s3-fs/src/prefetch/backpressure_controller.rs @@ -4,8 +4,8 @@ use async_channel::{Receiver, RecvError, Sender, unbounded}; use humansize::make_format; use tracing::trace; -use crate::memory::BufferArea; -use crate::memory::PagedPool; +use crate::memory::CursorState; +use crate::sync::Arc; use super::CursorId; use super::PrefetchReadError; @@ -52,8 +52,6 @@ impl ReadWindowAlignmentConfig { } pub struct BackpressureConfig { - /// Id of the associated Cursor - pub cursor_id: CursorId, /// Backpressure's initial read window size pub initial_read_window_size: usize, /// Minimum read window size that the backpressure controller is allowed to scale down to @@ -71,7 +69,6 @@ pub struct BackpressureConfig { impl BackpressureConfig { pub fn new(config: &RequestTaskConfig, read_window_alignment_config: ReadWindowAlignmentConfig) -> Self { Self { - cursor_id: config.cursor_id, initial_read_window_size: config.initial_read_window_size(), min_read_window_size: config.read_part_size, max_read_window_size: config.max_read_window_size, @@ -106,13 +103,8 @@ pub struct BackpressureController { /// /// The request can return data up to this offset *exclusively*. request_end_offset: u64, - /// Memory limiter is used to guide decisions on how much data to prefetch and to track - /// per-cursor memory reservations. - /// - /// For example, when memory is low we should scale down [Self::preferred_read_window_size]. - pool: PagedPool, - /// Unique cursor ID for per-cursor memory reservation tracking. - cursor_id: CursorId, + /// Cursor state handle for direct reservation operations. + cursor_state: Arc, /// Enable alignment of read window end to part boundary read_window_alignment_config: ReadWindowAlignmentConfig, } @@ -142,17 +134,14 @@ pub struct BackpressureLimiter { /// informing a producer (a holder of the [BackpressureLimiter]) when it should provide data more aggressively. pub fn new_backpressure_controller( config: BackpressureConfig, - pool: PagedPool, + cursor_state: Arc, ) -> (BackpressureController, BackpressureLimiter) { // Minimum window size multiplier as the scaling up and down won't work if the multiplier is 1. const MIN_WINDOW_SIZE_MULTIPLIER: usize = 2; let read_window_end_offset = config.request_range.start + config.initial_read_window_size as u64; - pool.reserve( - config.cursor_id, - BufferArea::Prefetch, - config.initial_read_window_size as u64, - ); + cursor_state.reserve(config.initial_read_window_size as u64); + let cursor_id = cursor_state.id(); let (read_window_updater, read_window_increment_queue) = unbounded(); let read_window_increment_queue = ReadWindowIncrementQueue::new(read_window_increment_queue); @@ -165,8 +154,7 @@ pub fn new_backpressure_controller( read_window_end_offset, next_read_offset: config.request_range.start, request_end_offset: config.request_range.end, - cursor_id: config.cursor_id, - pool, + cursor_state, read_window_alignment_config: config.read_window_alignment_config, }; @@ -176,7 +164,7 @@ pub fn new_backpressure_controller( read_window_increment_queue, read_window_end_offset, request_end_offset: config.request_range.end, - cursor_id: config.cursor_id, + cursor_id, }; (controller, limiter) @@ -223,18 +211,14 @@ impl BackpressureController { // read window size. if self.preferred_read_window_size <= self.min_read_window_size { trace!(new_read_window_end_offset, "sending a read window increment"); - self.pool - .reserve(self.cursor_id, BufferArea::Prefetch, to_increase as u64); + self.cursor_state.reserve(to_increase as u64); self.increment_read_window(to_increase).await; break; } // Try to reserve the memory for the length we want to increase before sending the request, // scale down the read window if it fails. - if self - .pool - .try_reserve(self.cursor_id, BufferArea::Prefetch, to_increase as u64) - { + if self.cursor_state.try_reserve(to_increase as u64) { trace!(new_read_window_end_offset, "sending a read window increment"); self.increment_read_window(to_increase).await; break; @@ -278,7 +262,7 @@ impl BackpressureController { // because only `preferred_read_window_size` is increased but the actual read window will // be updated later on `DataRead` event (where we do reserve memory). let to_increase = (new_read_window_size - self.preferred_read_window_size) as u64; - let available_mem = self.pool.available_mem(); + let available_mem = self.cursor_state.available_mem(); if available_mem >= to_increase { let formatter = make_format(humansize::BINARY); trace!( @@ -313,15 +297,6 @@ impl BackpressureController { } } -impl Drop for BackpressureController { - fn drop(&mut self) { - // Release whatever remains of this cursor's reservation. The per-cursor counter - // tracks only the unallocated portion — pool allocations already decremented it - // via the on_reserve callback. - self.pool.release_cursor(self.cursor_id, BufferArea::Prefetch); - } -} - impl BackpressureLimiter { pub fn read_window_end_offset(&self) -> u64 { self.read_window_end_offset @@ -420,7 +395,6 @@ mod tests { fn test_read_window_scale_up(initial_read_window_size: usize, read_window_size_multiplier: usize) { let request_range = 0..(5 * 1024 * 1024 * 1024); let backpressure_config = BackpressureConfig { - cursor_id: CursorId::new_from_raw(0), initial_read_window_size, min_read_window_size: 8 * 1024 * 1024, max_read_window_size: 2 * 1024 * 1024 * 1024, @@ -449,7 +423,6 @@ mod tests { fn test_read_window_scale_down(initial_read_window_size: usize, read_window_size_multiplier: usize) { let request_range = 0..(5 * 1024 * 1024 * 1024); let backpressure_config = BackpressureConfig { - cursor_id: CursorId::new_from_raw(0), initial_read_window_size, min_read_window_size: 8 * 1024 * 1024, max_read_window_size: 2 * 1024 * 1024 * 1024, @@ -480,7 +453,6 @@ mod tests { // OK, back to basics. Just reproduce what happened, verify it passes after the fix. #[allow(clippy::identity_op)] let backpressure_config = BackpressureConfig { - cursor_id: CursorId::new_from_raw(0), initial_read_window_size: 1 * MIB, min_read_window_size: 8 * MIB, max_read_window_size: 2 * GIB, @@ -536,6 +508,7 @@ mod tests { ) -> (BackpressureController, BackpressureLimiter) { let pool = PagedPool::new_with_candidate_sizes([8 * 1024 * 1024], backpressure_config.max_read_window_size as u64); - new_backpressure_controller(backpressure_config, pool) + let cursor_state = pool.create_cursor().state(); + new_backpressure_controller(backpressure_config, cursor_state) } } diff --git a/mountpoint-s3-fs/src/prefetch/builder.rs b/mountpoint-s3-fs/src/prefetch/builder.rs index b5e1ed562..281f4d78d 100644 --- a/mountpoint-s3-fs/src/prefetch/builder.rs +++ b/mountpoint-s3-fs/src/prefetch/builder.rs @@ -73,7 +73,7 @@ where prefetcher_config: PrefetcherConfig, ) -> Prefetcher { let part_stream = ClientPartStream::new(runtime, self.client, pool.clone()); - Prefetcher::new(PartStream::new(part_stream), pool, prefetcher_config) + Prefetcher::new(PartStream::new(part_stream), prefetcher_config) } } @@ -94,6 +94,6 @@ where prefetcher_config: PrefetcherConfig, ) -> Prefetcher { let part_stream = CachingPartStream::new(runtime, self.client, pool.clone(), self.cache); - Prefetcher::new(PartStream::new(part_stream), pool, prefetcher_config) + Prefetcher::new(PartStream::new(part_stream), prefetcher_config) } } diff --git a/mountpoint-s3-fs/src/prefetch/caching_stream.rs b/mountpoint-s3-fs/src/prefetch/caching_stream.rs index 9e7a24ee4..5c7112d9a 100644 --- a/mountpoint-s3-fs/src/prefetch/caching_stream.rs +++ b/mountpoint-s3-fs/src/prefetch/caching_stream.rs @@ -1,5 +1,5 @@ +use std::ops::Range; use std::time::Instant; -use std::{ops::Range, sync::Arc}; use futures::task::{Spawn, SpawnExt}; use futures::{Stream, StreamExt, pin_mut}; @@ -13,6 +13,7 @@ use crate::data_cache::{BlockIndex, DataCache}; use crate::memory::PagedPool; use crate::object::ObjectId; use crate::prefetch::backpressure_controller::ReadWindowAlignmentConfig; +use crate::sync::Arc; use super::PrefetchReadError; use super::backpressure_controller::{BackpressureConfig, BackpressureLimiter, new_backpressure_controller}; @@ -49,15 +50,16 @@ where Cache: DataCache + Send + Sync + 'static, Client: ObjectClient + Clone + Send + Sync + 'static, { - fn spawn_get_object_request(&self, config: RequestTaskConfig) -> RequestTask { + fn spawn_request_task(&self, config: RequestTaskConfig) -> RequestTask { let range = config.range; let backpressure_config = BackpressureConfig::new( &config, ReadWindowAlignmentConfig::Disable, // we don't know where S3 request starts, so can not align the read window ); + let cursor_state = config.cursor_state.clone(); let (backpressure_controller, backpressure_limiter) = - new_backpressure_controller(backpressure_config, self.pool.clone()); + new_backpressure_controller(backpressure_config, cursor_state); let (part_queue, part_queue_producer) = unbounded_part_queue(); trace!(?range, "spawning request"); @@ -81,6 +83,10 @@ where fn client(&self) -> &Client { &self.client } + + fn pool(&self) -> &PagedPool { + &self.pool + } } #[derive(Debug)] @@ -422,7 +428,6 @@ mod tests { data_cache::InMemoryDataCache, memory::{MINIMUM_MEM_LIMIT, PagedPool}, object::ObjectId, - prefetch::CursorId, }; use super::*; @@ -478,7 +483,7 @@ mod tests { // First request (from client) let get_object_counter = mock_client.new_counter(Operation::GetObject); let config = RequestTaskConfig { - cursor_id: CursorId::new_from_raw(0), + cursor_state: pool.create_cursor().state(), bucket: bucket.to_owned(), object_id: id.clone(), range, @@ -488,7 +493,7 @@ mod tests { max_read_window_size, read_window_size_multiplier, }; - let request_task = stream.spawn_get_object_request(config); + let request_task = stream.spawn_request_task(config); compare_read(&id, &object, request_task); get_object_counter.count() }; @@ -505,7 +510,7 @@ mod tests { // Second request (from cache) let get_object_counter = mock_client.new_counter(Operation::GetObject); let config = RequestTaskConfig { - cursor_id: CursorId::new_from_raw(0), + cursor_state: pool.create_cursor().state(), bucket: bucket.to_owned(), object_id: id.clone(), range, @@ -515,7 +520,7 @@ mod tests { max_read_window_size, read_window_size_multiplier, }; - let request_task = stream.spawn_get_object_request(config); + let request_task = stream.spawn_request_task(config); compare_read(&id, &object, request_task); get_object_counter.count() }; @@ -558,7 +563,7 @@ mod tests { for offset in [0, 512 * KB, 1 * MB, 4 * MB, 9 * MB] { for preferred_size in [1 * KB, 512 * KB, 4 * MB, 12 * MB, 16 * MB] { let config = RequestTaskConfig { - cursor_id: CursorId::new_from_raw(0), + cursor_state: pool.create_cursor().state(), bucket: bucket.to_owned(), object_id: id.clone(), range: RequestRange::new(object_size, offset as u64, preferred_size), @@ -568,7 +573,7 @@ mod tests { max_read_window_size, read_window_size_multiplier, }; - let request_task = stream.spawn_get_object_request(config); + let request_task = stream.spawn_request_task(config); compare_read(&id, &object, request_task); } } diff --git a/mountpoint-s3-fs/src/prefetch/cursor.rs b/mountpoint-s3-fs/src/prefetch/cursor.rs index a79412396..c02360fb0 100644 --- a/mountpoint-s3-fs/src/prefetch/cursor.rs +++ b/mountpoint-s3-fs/src/prefetch/cursor.rs @@ -3,7 +3,7 @@ use mountpoint_s3_client::ObjectClient; use tracing::trace; use crate::checksums::ChecksummedBytes; -use crate::memory::PagedPool; +use crate::memory::CursorHandle; use crate::metrics::defs::PREFETCH_RESET_STATE; use crate::object::ObjectId; @@ -19,14 +19,12 @@ pub struct Cursor where Client: ObjectClient + Clone + Send + Sync + 'static, { - /// Unique id for this cursor - cursor_id: CursorId, /// Id of the object to download object_id: ObjectId, /// Start offset for sequential read, used for calculating contiguous read metric start_offset: u64, - /// Associated memory limiter - pool: PagedPool, + /// Per-cursor state (reservation + active read tracking) + cursor_handle: CursorHandle, /// Background task to request data request_task: RequestTask, /// Holds data for backward seeks @@ -46,18 +44,16 @@ where { /// Create a new cursor at the given offset. pub fn new( - cursor_id: CursorId, request_task: RequestTask, - pool: PagedPool, + cursor_handle: CursorHandle, config: &PrefetcherConfig, object_id: ObjectId, offset: u64, ) -> Self { Self { - cursor_id, object_id, start_offset: offset, - pool, + cursor_handle, request_task, backward_seek_window: SeekWindow::new(config.max_backward_seek_distance as usize), max_forward_seek_wait_distance: config.max_forward_seek_wait_distance, @@ -75,7 +71,7 @@ where &mut self, length: usize, ) -> Result<(ChecksummedBytes, bool), PrefetchReadError> { - let _active_read_guard = self.pool.set_active_read(self.cursor_id, self.current_offset, length); + let _active_read_guard = self.cursor_handle.set_active_read(self.current_offset, length); self.do_read(length).await } @@ -90,7 +86,7 @@ where // the skipped bytes the prefetcher must consume to reach the requested offset. let active_start = self.current_offset.min(offset); let active_size = length + offset.saturating_sub(active_start) as usize; - let _active_read_guard = self.pool.set_active_read(self.cursor_id, active_start, active_size); + let _active_read_guard = self.cursor_handle.set_active_read(active_start, active_size); if !self.try_seek(offset).await? { // Seek failed diff --git a/mountpoint-s3-fs/src/prefetch/part_stream.rs b/mountpoint-s3-fs/src/prefetch/part_stream.rs index 21579f160..c81778de6 100644 --- a/mountpoint-s3-fs/src/prefetch/part_stream.rs +++ b/mountpoint-s3-fs/src/prefetch/part_stream.rs @@ -4,38 +4,41 @@ use futures::{Stream, StreamExt, pin_mut}; use mountpoint_s3_client::ObjectClient; use mountpoint_s3_client::types::{ClientBackpressureHandle, GetBodyPart, GetObjectParams, GetObjectResponse}; use std::marker::{Send, Sync}; -use std::sync::Arc; use std::{fmt::Debug, ops::Range}; use tracing::{Instrument, debug_span, error, trace}; use crate::async_util::Runtime; use crate::checksums::ChecksummedBytes; -use crate::memory::PagedPool; +use crate::memory::{CursorState, PagedPool}; use crate::object::ObjectId; use crate::prefetch::backpressure_controller::ReadWindowAlignmentConfig; +use crate::sync::Arc; -use super::CursorId; use super::PrefetchReadError; use super::backpressure_controller::{BackpressureConfig, BackpressureLimiter, new_backpressure_controller}; +use super::cursor::CursorId; use super::part::{Part, PartSource}; use super::part_queue::{PartQueueProducer, unbounded_part_queue}; use super::task::RequestTask; /// A generic interface to retrieve data from objects in a S3-like store. pub trait ObjectPartStream { - /// Spawns a request to get the content of an object. The object data will be retrieved in fixed size - /// parts and can then be consumed using [RequestTask::read]. Callers need to specify a preferred - /// size for the parts, but implementations are allowed to ignore it. - fn spawn_get_object_request(&self, config: RequestTaskConfig) -> RequestTask; + /// Spawn a background task that issues GetObject requests and pushes parts into the + /// returned [RequestTask]. Implementations differ in how they source data (direct S3 + /// vs cache-then-S3). + fn spawn_request_task(&self, config: RequestTaskConfig) -> RequestTask; /// The underlying [ObjectClient]. fn client(&self) -> &Client; + + /// The shared [PagedPool]. + fn pool(&self) -> &PagedPool; } +/// Internal configuration for spawning a [RequestTask]. #[derive(Clone, Debug)] -/// The configs for spawning a task in [ObjectPartStream::spawn_get_object_request]. pub struct RequestTaskConfig { - pub cursor_id: CursorId, + pub cursor_state: Arc, pub bucket: String, pub object_id: ObjectId, pub range: RequestRange, @@ -194,13 +197,17 @@ where } } - pub fn spawn_get_object_request(&self, config: RequestTaskConfig) -> RequestTask { - self.inner.spawn_get_object_request(config) + pub fn spawn_request_task(&self, config: RequestTaskConfig) -> RequestTask { + self.inner.spawn_request_task(config) } pub fn client(&self) -> &Client { self.inner.client() } + + pub fn pool(&self) -> &PagedPool { + self.inner.pool() + } } impl Debug for PartStream { @@ -226,7 +233,7 @@ impl ClientPartStream = Result>; impl ObjectPartStream for ClientPartStream { - fn spawn_get_object_request(&self, config: RequestTaskConfig) -> RequestTask { + fn spawn_request_task(&self, config: RequestTaskConfig) -> RequestTask { assert!(config.preferred_part_size > 0); let range = config.range; @@ -238,8 +245,9 @@ impl ObjectPartStream ObjectPartStream &Client { &self.client } + + fn pool(&self) -> &PagedPool { + &self.pool + } } struct ClientPartComposer {