diff --git a/crates/rattler_repodata_gateway/src/gateway/builder.rs b/crates/rattler_repodata_gateway/src/gateway/builder.rs index 24d1c10ce..a8db23ee8 100644 --- a/crates/rattler_repodata_gateway/src/gateway/builder.rs +++ b/crates/rattler_repodata_gateway/src/gateway/builder.rs @@ -49,6 +49,7 @@ pub struct GatewayBuilder { #[cfg(not(target_arch = "wasm32"))] package_cache: Option, max_concurrent_requests: MaxConcurrency, + max_concurrent_io: MaxConcurrency, } impl GatewayBuilder { @@ -133,6 +134,26 @@ impl GatewayBuilder { self } + /// Sets the maximum number of concurrent IO operations (e.g. reading shard + /// cache files from disk). This prevents exhausting the OS file-descriptor + /// limit when many packages are queried at once. + #[must_use] + pub fn with_max_concurrent_io(self, max_concurrent_io: impl Into) -> Self { + Self { + max_concurrent_io: max_concurrent_io.into(), + ..self + } + } + + /// Sets the maximum number of concurrent IO operations. + pub fn set_max_concurrent_io( + &mut self, + max_concurrent_io: impl Into, + ) -> &mut Self { + self.max_concurrent_io = max_concurrent_io.into(); + self + } + /// Finish the construction of the gateway returning a constructed gateway. pub fn finish(self) -> Gateway { let client = self.client.unwrap_or_else(|| { @@ -161,6 +182,12 @@ impl GatewayBuilder { MaxConcurrency::Semaphore(sem) => Some(sem), }; + let io_concurrency_semaphore = match self.max_concurrent_io { + MaxConcurrency::Unlimited => None, + MaxConcurrency::Limited(n) => Some(Arc::new(tokio::sync::Semaphore::new(n))), + MaxConcurrency::Semaphore(sem) => Some(sem), + }; + Gateway { inner: Arc::new(GatewayInner { subdirs: CoalescedMap::new(), @@ -172,6 +199,7 @@ impl GatewayBuilder { package_cache, subdir_run_exports_cache: Arc::default(), concurrent_requests_semaphore, + io_concurrency_semaphore, }), } } diff --git a/crates/rattler_repodata_gateway/src/gateway/mod.rs b/crates/rattler_repodata_gateway/src/gateway/mod.rs index 31a88367b..527d38eec 100644 --- a/crates/rattler_repodata_gateway/src/gateway/mod.rs +++ b/crates/rattler_repodata_gateway/src/gateway/mod.rs @@ -301,8 +301,12 @@ struct GatewayInner { /// A cache for global run exports. subdir_run_exports_cache: Arc, - /// A semaphore to limit the number of concurrent requests. + /// A semaphore to limit the number of concurrent HTTP requests. concurrent_requests_semaphore: Option>, + + /// A semaphore to limit the number of concurrent IO operations (e.g. + /// reading shard files from the on-disk cache). + io_concurrency_semaphore: Option>, } impl GatewayInner { diff --git a/crates/rattler_repodata_gateway/src/gateway/sharded_subdir/mod.rs b/crates/rattler_repodata_gateway/src/gateway/sharded_subdir/mod.rs index 47f365583..d6bbade1d 100644 --- a/crates/rattler_repodata_gateway/src/gateway/sharded_subdir/mod.rs +++ b/crates/rattler_repodata_gateway/src/gateway/sharded_subdir/mod.rs @@ -293,6 +293,7 @@ mod tests { CacheAction::NoCache, None, None, + None, ) .await .unwrap(); @@ -328,6 +329,7 @@ mod tests { CacheAction::NoCache, None, None, + None, ) .await .unwrap(); diff --git a/crates/rattler_repodata_gateway/src/gateway/sharded_subdir/tokio/mod.rs b/crates/rattler_repodata_gateway/src/gateway/sharded_subdir/tokio/mod.rs index cda3e83dd..c716ba7e3 100644 --- a/crates/rattler_repodata_gateway/src/gateway/sharded_subdir/tokio/mod.rs +++ b/crates/rattler_repodata_gateway/src/gateway/sharded_subdir/tokio/mod.rs @@ -36,6 +36,7 @@ pub struct ShardedSubdir { package_base_url: Url, sharded_repodata: ShardedRepodata, concurrent_requests_semaphore: Option>, + io_concurrency_semaphore: Option>, cache_dir: PathBuf, cache_action: CacheAction, } @@ -48,6 +49,7 @@ impl ShardedSubdir { cache_dir: PathBuf, cache_action: CacheAction, concurrent_requests_semaphore: Option>, + io_concurrency_semaphore: Option>, reporter: Option<&dyn Reporter>, ) -> Result { // Construct the base url for the shards (e.g. `/`). @@ -113,6 +115,7 @@ impl ShardedSubdir { cache_dir, cache_action, concurrent_requests_semaphore, + io_concurrency_semaphore, }) } @@ -174,8 +177,17 @@ impl SubdirClient for ShardedSubdir { // Check if we already have the shard in the cache. let shard_cache_path = self.cache_dir.join(format!("{shard:x}.msgpack")); - // Read the cached shard + // Read the cached shard. + // Acquire the IO semaphore permit before opening the file to avoid + // exhausting the OS file-descriptor limit when many shards are fetched + // concurrently (e.g. when querying for `*`). if self.cache_action != CacheAction::NoCache { + let _io_permit = OptionFuture::from( + self.io_concurrency_semaphore + .as_deref() + .map(tokio::sync::Semaphore::acquire), + ) + .await; match tokio_fs::read(&shard_cache_path).await { Ok(cached_bytes) => { // Decode the cached shard diff --git a/crates/rattler_repodata_gateway/src/gateway/subdir_builder.rs b/crates/rattler_repodata_gateway/src/gateway/subdir_builder.rs index 016c73c64..229edce32 100644 --- a/crates/rattler_repodata_gateway/src/gateway/subdir_builder.rs +++ b/crates/rattler_repodata_gateway/src/gateway/subdir_builder.rs @@ -143,6 +143,7 @@ impl<'g> SubdirBuilder<'g> { #[cfg(not(target_arch = "wasm32"))] _source_config.cache_action, self.gateway.concurrent_requests_semaphore.clone(), + self.gateway.io_concurrency_semaphore.clone(), self.reporter.as_deref(), ) .await?;