From 4d913865b31767d0adc630f7a06d42fa373facd3 Mon Sep 17 00:00:00 2001 From: sriram Date: Fri, 27 Mar 2026 22:17:01 +0530 Subject: [PATCH] feat: provide per-operation concurrent limit --- core/layers/concurrent-limit/src/lib.rs | 204 ++++++++++++++++++++++-- 1 file changed, 194 insertions(+), 10 deletions(-) diff --git a/core/layers/concurrent-limit/src/lib.rs b/core/layers/concurrent-limit/src/lib.rs index ce0ef012d46c..1354adcdae6e 100644 --- a/core/layers/concurrent-limit/src/lib.rs +++ b/core/layers/concurrent-limit/src/lib.rs @@ -20,6 +20,7 @@ #![cfg_attr(docsrs, feature(doc_cfg))] #![deny(missing_docs)] +use std::collections::HashMap; use std::future::Future; use std::pin::Pin; use std::sync::Arc; @@ -82,6 +83,28 @@ impl ConcurrentLimitSemaphore for Arc { /// # } /// ``` /// +/// Set per-operation concurrent limits to control different operations +/// independently: +/// +/// ```no_run +/// # use opendal_core::services; +/// # use opendal_core::Operator; +/// # use opendal_core::Result; +/// # use opendal_core::raw::Operation; +/// # use opendal_layer_concurrent_limit::ConcurrentLimitLayer; +/// # +/// # fn main() -> Result<()> { +/// let _ = Operator::new(services::Memory::default())? +/// .layer( +/// ConcurrentLimitLayer::new(1024) +/// .with_operation_limit(Operation::Read, 64) +/// .with_operation_limit(Operation::Write, 32), +/// ) +/// .finish(); +/// # Ok(()) +/// # } +/// ``` +/// /// Share a concurrent limit layer between the operators: /// /// ```no_run @@ -106,6 +129,7 @@ impl ConcurrentLimitSemaphore for Arc { pub struct ConcurrentLimitLayer> { operation_semaphore: S, http_semaphore: Option, + operation_limits: Option>>>, } impl ConcurrentLimitLayer> { @@ -126,6 +150,47 @@ impl ConcurrentLimitLayer> { pub fn with_http_concurrent_limit(self, permits: usize) -> Self { self.with_http_semaphore(Arc::new(Semaphore::new(permits))) } + + /// Set a concurrent limit for a specific operation type. + /// + /// When a per-operation limit is configured, that operation will acquire + /// a permit from its dedicated semaphore instead of the global one. This + /// allows fine-grained control over concurrency for different operation + /// types. + /// + /// Operations without a dedicated limit will continue to use the global + /// semaphore. + /// + /// # Examples + /// + /// Limit read and write concurrency while leaving metadata operations + /// unrestricted by the global limit: + /// + /// ```no_run + /// # use opendal_core::services; + /// # use opendal_core::Operator; + /// # use opendal_core::Result; + /// # use opendal_core::raw::Operation; + /// # use opendal_layer_concurrent_limit::ConcurrentLimitLayer; + /// # + /// # fn main() -> Result<()> { + /// let _ = Operator::new(services::Memory::default())? + /// .layer( + /// ConcurrentLimitLayer::new(1024) + /// .with_operation_limit(Operation::Read, 64) + /// .with_operation_limit(Operation::Write, 32), + /// ) + /// .finish(); + /// # Ok(()) + /// # } + /// ``` + pub fn with_operation_limit(mut self, op: Operation, permits: usize) -> Self { + let limits = self + .operation_limits + .get_or_insert_with(|| Arc::new(HashMap::new())); + Arc::make_mut(limits).insert(op, Arc::new(Semaphore::new(permits))); + self + } } impl ConcurrentLimitLayer { @@ -142,6 +207,7 @@ impl ConcurrentLimitLayer { Self { operation_semaphore, http_semaphore: None, + operation_limits: None, } } @@ -172,10 +238,21 @@ where ConcurrentLimitAccessor { inner, semaphore: self.operation_semaphore.clone(), + operation_limits: self.operation_limits.clone(), } } } +/// A permit that can come from either the global semaphore (generic `S`) or +/// a per-operation `Arc`. +#[doc(hidden)] +pub enum ConcurrentLimitPermit

{ + /// Permit from the global semaphore. + Global(P), + /// Permit from a per-operation semaphore. + PerOperation(OwnedSemaphorePermit), +} + #[doc(hidden)] pub struct ConcurrentLimitHttpFetcher { inner: HttpFetcher, @@ -230,6 +307,7 @@ where pub struct ConcurrentLimitAccessor { inner: A, semaphore: S, + operation_limits: Option>>>, } impl std::fmt::Debug for ConcurrentLimitAccessor { @@ -240,28 +318,42 @@ impl std::fmt::Debug for ConcurrentLimit } } +impl ConcurrentLimitAccessor { + /// Acquire a permit for the given operation. If a per-operation semaphore + /// is configured for this operation, acquire from it; otherwise fall back + /// to the global semaphore. + async fn acquire_for(&self, op: Operation) -> ConcurrentLimitPermit { + if let Some(limits) = &self.operation_limits { + if let Some(sem) = limits.get(&op) { + return ConcurrentLimitPermit::PerOperation(sem.clone().acquire_owned(1).await); + } + } + ConcurrentLimitPermit::Global(self.semaphore.acquire().await) + } +} + impl LayeredAccess for ConcurrentLimitAccessor where S::Permit: Unpin, { type Inner = A; - type Reader = ConcurrentLimitWrapper; - type Writer = ConcurrentLimitWrapper; - type Lister = ConcurrentLimitWrapper; - type Deleter = ConcurrentLimitWrapper; + type Reader = ConcurrentLimitWrapper>; + type Writer = ConcurrentLimitWrapper>; + type Lister = ConcurrentLimitWrapper>; + type Deleter = ConcurrentLimitWrapper>; fn inner(&self) -> &Self::Inner { &self.inner } async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result { - let _permit = self.semaphore.acquire().await; + let _permit = self.acquire_for(Operation::CreateDir).await; self.inner.create_dir(path, args).await } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - let permit = self.semaphore.acquire().await; + let permit = self.acquire_for(Operation::Read).await; self.inner .read(path, args) @@ -270,7 +362,7 @@ where } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - let permit = self.semaphore.acquire().await; + let permit = self.acquire_for(Operation::Write).await; self.inner .write(path, args) @@ -279,13 +371,13 @@ where } async fn stat(&self, path: &str, args: OpStat) -> Result { - let _permit = self.semaphore.acquire().await; + let _permit = self.acquire_for(Operation::Stat).await; self.inner.stat(path, args).await } async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> { - let permit = self.semaphore.acquire().await; + let permit = self.acquire_for(Operation::Delete).await; self.inner .delete() @@ -294,7 +386,7 @@ where } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { - let permit = self.semaphore.acquire().await; + let permit = self.acquire_for(Operation::List).await; self.inner .list(path, args) @@ -397,6 +489,98 @@ mod tests { ); } + #[tokio::test] + async fn per_operation_limit_isolates_operations() { + // Stat has its own per-operation semaphore (1 permit). + // Exhausting the global semaphore should NOT block stat. + let global_sem = Arc::new(Semaphore::new(1)); + let layer = ConcurrentLimitLayer::with_semaphore(global_sem.clone()) + .with_operation_limit(Operation::Stat, 1); + + let op = Operator::new(services::Memory::default()) + .expect("operator must build") + .layer(layer) + .finish(); + + // Exhaust the global semaphore externally. + let _permit = global_sem.clone().acquire_owned(1).await; + + // Stat should still work because it uses its dedicated per-operation + // semaphore, not the exhausted global one. + let stat_result = timeout(Duration::from_millis(200), op.stat("any")).await; + assert!( + stat_result.is_ok(), + "stat should not be blocked by exhausted global semaphore" + ); + } + + #[tokio::test] + async fn per_operation_limit_blocks_same_operation() { + // Stat has its own per-operation limit of 1. + // Externally exhaust the per-operation stat semaphore, then verify + // that stat blocks. + let layer = ConcurrentLimitLayer::new(1024).with_operation_limit(Operation::Stat, 1); + + // Grab a reference to the per-operation semaphore so we can + // externally exhaust it. Build the layer, then extract the + // semaphore from the operation_limits map. + let op = Operator::new(services::Memory::default()) + .expect("operator must build") + .layer(layer.clone()) + .finish(); + + // Exhaust the per-operation stat semaphore by cloning the Arc from + // the layer's internal map. + let stat_sem = layer + .operation_limits + .as_ref() + .expect("operation_limits must exist") + .get(&Operation::Stat) + .expect("stat semaphore must exist") + .clone(); + let _permit = stat_sem.acquire_owned(1).await; + + // Stat should block because its per-operation semaphore is exhausted. + let blocked = timeout(Duration::from_millis(50), op.stat("any")).await; + assert!( + blocked.is_err(), + "stat should be blocked by exhausted per-operation semaphore" + ); + } + + #[tokio::test] + async fn operations_without_per_op_limit_use_global() { + // Only stat gets a per-operation limit. Other operations (like + // create_dir) should fall back to the global semaphore. + let global_sem = Arc::new(Semaphore::new(1)); + let layer = ConcurrentLimitLayer::with_semaphore(global_sem.clone()) + .with_operation_limit(Operation::Stat, 10); + + let op = Operator::new(services::Memory::default()) + .expect("operator must build") + .layer(layer) + .finish(); + + // Exhaust the global semaphore externally. + let _permit = global_sem.clone().acquire_owned(1).await; + + // Stat should still work because it has a dedicated per-operation + // semaphore with 10 permits, bypassing the exhausted global one. + let stat_result = timeout(Duration::from_millis(200), op.stat("any")).await; + assert!( + stat_result.is_ok(), + "stat should use per-operation semaphore, not the exhausted global one" + ); + + // create_dir has no per-operation limit, so it falls back to the + // global semaphore which is exhausted -- it should block. + let blocked = timeout(Duration::from_millis(50), op.create_dir("blocked/")).await; + assert!( + blocked.is_err(), + "create_dir should be blocked by exhausted global semaphore" + ); + } + #[tokio::test] async fn http_semaphore_holds_until_body_dropped() { struct DummyFetcher;