Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions crates/rattler_repodata_gateway/src/gateway/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub struct GatewayBuilder {
#[cfg(not(target_arch = "wasm32"))]
package_cache: Option<PackageCache>,
max_concurrent_requests: MaxConcurrency,
max_concurrent_io: MaxConcurrency,
}

impl GatewayBuilder {
Expand Down Expand Up @@ -133,6 +134,26 @@ impl GatewayBuilder {
self
}

/// Sets the maximum number of concurrent IO operations (e.g. reading shard
/// cache files from disk). This prevents exhausting the OS file-descriptor
/// limit when many packages are queried at once.
#[must_use]
pub fn with_max_concurrent_io(self, max_concurrent_io: impl Into<MaxConcurrency>) -> Self {
Self {
max_concurrent_io: max_concurrent_io.into(),
..self
}
}

/// Sets the maximum number of concurrent IO operations.
pub fn set_max_concurrent_io(
&mut self,
max_concurrent_io: impl Into<MaxConcurrency>,
) -> &mut Self {
self.max_concurrent_io = max_concurrent_io.into();
self
}

/// Finish the construction of the gateway returning a constructed gateway.
pub fn finish(self) -> Gateway {
let client = self.client.unwrap_or_else(|| {
Expand Down Expand Up @@ -161,6 +182,12 @@ impl GatewayBuilder {
MaxConcurrency::Semaphore(sem) => Some(sem),
};

let io_concurrency_semaphore = match self.max_concurrent_io {
MaxConcurrency::Unlimited => None,
MaxConcurrency::Limited(n) => Some(Arc::new(tokio::sync::Semaphore::new(n))),
MaxConcurrency::Semaphore(sem) => Some(sem),
};

Gateway {
inner: Arc::new(GatewayInner {
subdirs: CoalescedMap::new(),
Expand All @@ -172,6 +199,7 @@ impl GatewayBuilder {
package_cache,
subdir_run_exports_cache: Arc::default(),
concurrent_requests_semaphore,
io_concurrency_semaphore,
}),
}
}
Expand Down
6 changes: 5 additions & 1 deletion crates/rattler_repodata_gateway/src/gateway/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,12 @@ struct GatewayInner {
/// A cache for global run exports.
subdir_run_exports_cache: Arc<SubdirRunExportsCache>,

/// A semaphore to limit the number of concurrent requests.
/// A semaphore to limit the number of concurrent HTTP requests.
concurrent_requests_semaphore: Option<Arc<tokio::sync::Semaphore>>,

/// A semaphore to limit the number of concurrent IO operations (e.g.
/// reading shard files from the on-disk cache).
io_concurrency_semaphore: Option<Arc<tokio::sync::Semaphore>>,
}

impl GatewayInner {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ mod tests {
CacheAction::NoCache,
None,
None,
None,
)
.await
.unwrap();
Expand Down Expand Up @@ -328,6 +329,7 @@ mod tests {
CacheAction::NoCache,
None,
None,
None,
)
.await
.unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub struct ShardedSubdir {
package_base_url: Url,
sharded_repodata: ShardedRepodata,
concurrent_requests_semaphore: Option<Arc<tokio::sync::Semaphore>>,
io_concurrency_semaphore: Option<Arc<tokio::sync::Semaphore>>,
cache_dir: PathBuf,
cache_action: CacheAction,
}
Expand All @@ -48,6 +49,7 @@ impl ShardedSubdir {
cache_dir: PathBuf,
cache_action: CacheAction,
concurrent_requests_semaphore: Option<Arc<tokio::sync::Semaphore>>,
io_concurrency_semaphore: Option<Arc<tokio::sync::Semaphore>>,
reporter: Option<&dyn Reporter>,
) -> Result<Self, GatewayError> {
// Construct the base url for the shards (e.g. `<channel>/<subdir>`).
Expand Down Expand Up @@ -113,6 +115,7 @@ impl ShardedSubdir {
cache_dir,
cache_action,
concurrent_requests_semaphore,
io_concurrency_semaphore,
})
}

Expand Down Expand Up @@ -174,8 +177,17 @@ impl SubdirClient for ShardedSubdir {
// Check if we already have the shard in the cache.
let shard_cache_path = self.cache_dir.join(format!("{shard:x}.msgpack"));

// Read the cached shard
// Read the cached shard.
// Acquire the IO semaphore permit before opening the file to avoid
// exhausting the OS file-descriptor limit when many shards are fetched
// concurrently (e.g. when querying for `*`).
if self.cache_action != CacheAction::NoCache {
let _io_permit = OptionFuture::from(
self.io_concurrency_semaphore
.as_deref()
.map(tokio::sync::Semaphore::acquire),
)
.await;
match tokio_fs::read(&shard_cache_path).await {
Ok(cached_bytes) => {
// Decode the cached shard
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ impl<'g> SubdirBuilder<'g> {
#[cfg(not(target_arch = "wasm32"))]
_source_config.cache_action,
self.gateway.concurrent_requests_semaphore.clone(),
self.gateway.io_concurrency_semaphore.clone(),
self.reporter.as_deref(),
)
.await?;
Expand Down
Loading