From 863bf43fbd05b792bc089c5287710ae289d4dd47 Mon Sep 17 00:00:00 2001 From: Mohammed Afridi Date: Fri, 27 Feb 2026 13:16:12 +0530 Subject: [PATCH 1/7] remve global pachake bottle neck --- crates/rattler/src/install/installer/mod.rs | 22 ++++++++--- .../src/package_cache/cache_lock.rs | 39 +++++++++++++++---- crates/rattler_cache/src/package_cache/mod.rs | 17 ++++---- 3 files changed, 56 insertions(+), 22 deletions(-) diff --git a/crates/rattler/src/install/installer/mod.rs b/crates/rattler/src/install/installer/mod.rs index 451fcd95d1..2d5426d7f1 100644 --- a/crates/rattler/src/install/installer/mod.rs +++ b/crates/rattler/src/install/installer/mod.rs @@ -487,12 +487,22 @@ impl Installer { ) }); - // Acquire a global lock on the package cache for the entire installation. - // This significantly reduces overhead by avoiding per-package locking. - let _global_cache_lock = package_cache - .acquire_global_lock() - .await - .map_err(InstallerError::FailedToAcquireCacheLock)?; + // Optionally acquire a global lock on the package cache for the entire installation. + // + // Warning: this serializes installs across multiple *processes* sharing the same cache. + // Keep it opt-in via an environment variable. + let _global_cache_lock = if std::env::var_os("RATTLER_PACKAGE_CACHE_GLOBAL_LOCK") + .is_some_and(|v| v != std::ffi::OsStr::new("0")) + { + Some( + package_cache + .acquire_global_lock() + .await + .map_err(InstallerError::FailedToAcquireCacheLock)?, + ) + } else { + None + }; // Construct a driver. let driver = InstallDriver::builder() diff --git a/crates/rattler_cache/src/package_cache/cache_lock.rs b/crates/rattler_cache/src/package_cache/cache_lock.rs index 3e5d23aa69..c4852dcae5 100644 --- a/crates/rattler_cache/src/package_cache/cache_lock.rs +++ b/crates/rattler_cache/src/package_cache/cache_lock.rs @@ -18,8 +18,9 @@ use crate::package_cache::PackageCacheLayerError; /// This struct represents a cache entry that has been validated and is ready for use. /// It holds the cache entry's path, revision number, and optional SHA256 hash. /// -/// Note: Concurrent access is coordinated via the global cache lock mechanism -/// (see [`CacheGlobalLock`]). Individual cache entries do not hold locks. +/// Note: Concurrent access to a cache entry is coordinated by taking an +/// exclusive lock on the cache entry's metadata lock file (the `*.lock` file +/// next to the package directory). pub struct CacheMetadata { pub(super) revision: u64, pub(super) sha256: Option, @@ -140,13 +141,13 @@ pub struct CacheMetadataFile { impl CacheMetadataFile { /// Acquires a handle to the cache metadata file. /// - /// Opens the file with both read and write permissions. Since concurrent access - /// is coordinated via [`CacheGlobalLock`], this single method is sufficient for - /// all metadata operations. + /// Opens the file with both read and write permissions and acquires an + /// exclusive OS file lock on it. This prevents multiple processes from + /// concurrently modifying the cache entry on disk. pub async fn acquire(path: &Path) -> Result { let lock_file_path = path.to_path_buf(); - simple_spawn_blocking::tokio::run_blocking_task(move || { + let acquire_lock_fut = simple_spawn_blocking::tokio::run_blocking_task(move || { let file = std::fs::OpenOptions::new() .create(true) .read(true) @@ -163,11 +164,27 @@ impl CacheMetadataFile { ) })?; + file.lock_exclusive().map_err(|e| { + PackageCacheLayerError::LockError( + format!( + "failed to acquire exclusive lock on cache metadata file: '{}'", + lock_file_path.display() + ), + e, + ) + })?; + Ok(CacheMetadataFile { file: Arc::new(file), }) - }) - .await + }); + + tokio::select!( + lock = acquire_lock_fut => lock, + _ = warn_timeout_future( + "Blocking waiting for cache entry lock file".to_string() + ) => unreachable!("warn_timeout_future should never finish") + ) } } @@ -294,6 +311,12 @@ impl CacheMetadataFile { } } +impl Drop for CacheMetadataFile { + fn drop(&mut self) { + let _ = fs4::fs_std::FileExt::unlock(&*self.file); + } +} + async fn warn_timeout_future(message: String) { loop { tokio::time::sleep(Duration::from_secs(30)).await; diff --git a/crates/rattler_cache/src/package_cache/mod.rs b/crates/rattler_cache/src/package_cache/mod.rs index 3d9d78e560..4ace9f8f10 100644 --- a/crates/rattler_cache/src/package_cache/mod.rs +++ b/crates/rattler_cache/src/package_cache/mod.rs @@ -258,12 +258,12 @@ impl PackageCache { /// Acquires a global lock on the package cache. /// - /// This lock can be used to coordinate multiple package operations, - /// reducing the overhead of acquiring individual locks for each package. - /// The lock is held until the returned `CacheGlobalLock` is dropped. + /// This lock serializes cache operations across *processes* sharing the same cache + /// directory. It is held until the returned `CacheGlobalLock` is dropped. /// - /// This is particularly useful when installing many packages at once, - /// as it significantly reduces the number of file locking syscalls. + /// Prefer the default per-entry locking behavior for multi-process scenarios + /// (e.g. many concurrent installs sharing one cache). Only use this lock when + /// you explicitly want to trade parallelism for reduced lock contention. pub async fn acquire_global_lock(&self) -> Result { // Use the first writable layer's path for the global cache lock let (_, writable_layers) = self.split_layers(); @@ -603,7 +603,7 @@ where E: Error + Send + Sync + 'static, { // Open the cache metadata file to read/write revision and hash information. - // Concurrent access is coordinated via the global cache lock. + // Concurrent access is coordinated by an exclusive lock on the metadata file. let lock_file_path = { // Append the `.lock` extension to the cache path to create the lock file path. let mut path_str = path.as_os_str().to_owned(); @@ -701,8 +701,9 @@ where } // If the cache is stale, we need to fetch the package again. - // Since we hold the global cache lock, we can safely update the metadata - // and fetch the package without worrying about concurrent modifications. + // Since we hold the cache entry lock (on the metadata file), we can safely + // update the metadata and fetch the package without worrying about concurrent + // modifications of this cache entry. if let Some(ref fetch_fn) = fetch { // Write the new revision let new_revision = cache_revision + 1; From ef5cf9442f8bbc6c9c535557740c241c586ad355 Mon Sep 17 00:00:00 2001 From: Mohammed Afridi Date: Fri, 27 Feb 2026 15:33:11 +0530 Subject: [PATCH 2/7] global_cache_lock block removed --- crates/rattler/src/install/installer/mod.rs | 20 +++----------------- 1 file changed, 3 insertions(+), 17 deletions(-) diff --git a/crates/rattler/src/install/installer/mod.rs b/crates/rattler/src/install/installer/mod.rs index 2d5426d7f1..751a2b5d4c 100644 --- a/crates/rattler/src/install/installer/mod.rs +++ b/crates/rattler/src/install/installer/mod.rs @@ -487,24 +487,10 @@ impl Installer { ) }); - // Optionally acquire a global lock on the package cache for the entire installation. - // - // Warning: this serializes installs across multiple *processes* sharing the same cache. - // Keep it opt-in via an environment variable. - let _global_cache_lock = if std::env::var_os("RATTLER_PACKAGE_CACHE_GLOBAL_LOCK") - .is_some_and(|v| v != std::ffi::OsStr::new("0")) - { - Some( - package_cache - .acquire_global_lock() - .await - .map_err(InstallerError::FailedToAcquireCacheLock)?, - ) - } else { - None - }; + + - // Construct a driver. + let driver = InstallDriver::builder() .execute_link_scripts(self.execute_link_scripts) .with_io_concurrency_semaphore( From 3215a346b6b899611272318dfa064663f9dc6b99 Mon Sep 17 00:00:00 2001 From: Mohammed Afridi Date: Fri, 27 Feb 2026 15:36:49 +0530 Subject: [PATCH 3/7] global_cache_lock block removed --- crates/rattler/src/install/installer/mod.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/crates/rattler/src/install/installer/mod.rs b/crates/rattler/src/install/installer/mod.rs index 751a2b5d4c..cb45266e30 100644 --- a/crates/rattler/src/install/installer/mod.rs +++ b/crates/rattler/src/install/installer/mod.rs @@ -486,12 +486,7 @@ impl Installer { .join(rattler_cache::PACKAGE_CACHE_DIR), ) }); - - - - - - let driver = InstallDriver::builder() + let driver = InstallDriver::builder() .execute_link_scripts(self.execute_link_scripts) .with_io_concurrency_semaphore( self.io_semaphore.unwrap_or(Arc::new(Semaphore::new(100))), From 648b3c4cfd57475c55e6e3ec88f599d843dd13b9 Mon Sep 17 00:00:00 2001 From: Mohammed Afridi Date: Fri, 27 Feb 2026 15:37:05 +0530 Subject: [PATCH 4/7] global_cache_lock block removed --- crates/rattler/src/install/installer/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/rattler/src/install/installer/mod.rs b/crates/rattler/src/install/installer/mod.rs index cb45266e30..696d2e7812 100644 --- a/crates/rattler/src/install/installer/mod.rs +++ b/crates/rattler/src/install/installer/mod.rs @@ -486,7 +486,7 @@ impl Installer { .join(rattler_cache::PACKAGE_CACHE_DIR), ) }); - let driver = InstallDriver::builder() + let driver = InstallDriver::builder() .execute_link_scripts(self.execute_link_scripts) .with_io_concurrency_semaphore( self.io_semaphore.unwrap_or(Arc::new(Semaphore::new(100))), From e12d2427a4569d712bc8a684ae061272eaea683c Mon Sep 17 00:00:00 2001 From: Mohammed Afridi Date: Fri, 27 Feb 2026 15:41:06 +0530 Subject: [PATCH 5/7] corrected indendation --- crates/rattler/src/install/installer/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/rattler/src/install/installer/mod.rs b/crates/rattler/src/install/installer/mod.rs index 696d2e7812..037b21c2d4 100644 --- a/crates/rattler/src/install/installer/mod.rs +++ b/crates/rattler/src/install/installer/mod.rs @@ -486,7 +486,7 @@ impl Installer { .join(rattler_cache::PACKAGE_CACHE_DIR), ) }); - let driver = InstallDriver::builder() + let driver = InstallDriver::builder() .execute_link_scripts(self.execute_link_scripts) .with_io_concurrency_semaphore( self.io_semaphore.unwrap_or(Arc::new(Semaphore::new(100))), From a48f216cfc359ec17191e5aa96e9ddaa3397bfaa Mon Sep 17 00:00:00 2001 From: Mohammed Afridi Date: Sun, 1 Mar 2026 12:14:39 +0530 Subject: [PATCH 6/7] checked benchmarks and imprvoed results --- crates/rattler_cache/Cargo.toml | 5 ++ .../benches/concurrent_cache_lock.rs | 52 +++++++++++++++++++ 2 files changed, 57 insertions(+) create mode 100644 crates/rattler_cache/benches/concurrent_cache_lock.rs diff --git a/crates/rattler_cache/Cargo.toml b/crates/rattler_cache/Cargo.toml index 2a9a3560aa..c9d5db0a87 100644 --- a/crates/rattler_cache/Cargo.toml +++ b/crates/rattler_cache/Cargo.toml @@ -53,3 +53,8 @@ tokio-stream = { workspace = true } tower-http = { workspace = true, features = ["fs"] } tools = { path = "../tools" } reqwest-retry = { workspace = true } +criterion = { workspace = true } + +[[bench]] +name = "concurrent_cache_lock" +harness = false diff --git a/crates/rattler_cache/benches/concurrent_cache_lock.rs b/crates/rattler_cache/benches/concurrent_cache_lock.rs new file mode 100644 index 0000000000..854afc6eae --- /dev/null +++ b/crates/rattler_cache/benches/concurrent_cache_lock.rs @@ -0,0 +1,52 @@ +// Run: cargo bench -p rattler_cache --bench concurrent_cache_lock +use std::path::{Path, PathBuf}; +use criterion::{criterion_group, criterion_main, Criterion}; +use rattler_cache::package_cache::PackageCache; +use tempfile::tempdir; +use tokio::runtime::Runtime; + +fn paths() -> Vec { + let root = Path::new(env!("CARGO_MANIFEST_DIR")).join("../../test-data"); + vec![ + root.join("clobber/clobber-python-0.1.0-cpython.conda"), + root.join("clobber/clobber-1-0.1.0-h4616a5c_0.tar.bz2"), + root.join("clobber/clobber-2-0.1.0-h4616a5c_0.tar.bz2"), + root.join("clobber/clobber-3-0.1.0-h4616a5c_0.tar.bz2"), + root.join("packages/empty-0.1.0-h4616a5c_0.conda"), + ] +} + +fn bench(c: &mut Criterion) { + let paths = paths(); + let rt = Runtime::new().unwrap(); + let mut g = c.benchmark_group("cache_lock"); + g.sample_size(10); + + g.bench_function("concurrent_per_package_lock", |b| { + b.iter(|| { + let cache = PackageCache::new(tempdir().unwrap().path()); + let paths = paths.clone(); + rt.block_on(async { + let handles: Vec<_> = paths.iter().map(|p| { + let c = cache.clone(); + let p = p.clone(); + tokio::spawn(async move { c.get_or_fetch_from_path(&p, None).await }) + }).collect(); + for h in handles { h.await.unwrap().unwrap(); } + }) + }); + }); + + g.bench_function("serial_simulating_global_lock", |b| { + b.iter(|| { + let cache = PackageCache::new(tempdir().unwrap().path()); + let paths = paths.clone(); + rt.block_on(async { + for p in &paths { cache.get_or_fetch_from_path(p, None).await.unwrap(); } + }) + }); + }); +} + +criterion_group!(benches, bench); +criterion_main!(benches); From 9cbc2cf065b16cfb3b68f63634a1d1f41046ed07 Mon Sep 17 00:00:00 2001 From: Mohammed Afridi Date: Sun, 1 Mar 2026 12:23:18 +0530 Subject: [PATCH 7/7] checked benchmarks and imprvoed results --- .../benches/concurrent_cache_lock.rs | 49 ++++++++++++------- 1 file changed, 31 insertions(+), 18 deletions(-) diff --git a/crates/rattler_cache/benches/concurrent_cache_lock.rs b/crates/rattler_cache/benches/concurrent_cache_lock.rs index 854afc6eae..fbae181d71 100644 --- a/crates/rattler_cache/benches/concurrent_cache_lock.rs +++ b/crates/rattler_cache/benches/concurrent_cache_lock.rs @@ -1,11 +1,15 @@ -// Run: cargo bench -p rattler_cache --bench concurrent_cache_lock +//! Benchmark: concurrent vs serial package cache fetch. +//! +//! Run with: `cargo bench -p rattler_cache --bench concurrent_cache_lock` + use std::path::{Path, PathBuf}; + use criterion::{criterion_group, criterion_main, Criterion}; use rattler_cache::package_cache::PackageCache; use tempfile::tempdir; use tokio::runtime::Runtime; -fn paths() -> Vec { +fn benchmark_package_paths() -> Vec { let root = Path::new(env!("CARGO_MANIFEST_DIR")).join("../../test-data"); vec![ root.join("clobber/clobber-python-0.1.0-cpython.conda"), @@ -16,37 +20,46 @@ fn paths() -> Vec { ] } -fn bench(c: &mut Criterion) { - let paths = paths(); +fn criterion_benchmark(c: &mut Criterion) { + let package_paths = benchmark_package_paths(); let rt = Runtime::new().unwrap(); - let mut g = c.benchmark_group("cache_lock"); - g.sample_size(10); + let mut group = c.benchmark_group("cache_lock"); + group.sample_size(10); - g.bench_function("concurrent_per_package_lock", |b| { + + group.bench_function("concurrent_per_package_lock", |b| { b.iter(|| { let cache = PackageCache::new(tempdir().unwrap().path()); - let paths = paths.clone(); + let paths = package_paths.clone(); rt.block_on(async { - let handles: Vec<_> = paths.iter().map(|p| { - let c = cache.clone(); - let p = p.clone(); - tokio::spawn(async move { c.get_or_fetch_from_path(&p, None).await }) - }).collect(); - for h in handles { h.await.unwrap().unwrap(); } + let handles: Vec<_> = paths + .iter() + .map(|path| { + let cache = cache.clone(); + let path = path.clone(); + tokio::spawn(async move { cache.get_or_fetch_from_path(&path, None).await }) + }) + .collect(); + for handle in handles { + handle.await.unwrap().unwrap(); + } }) }); }); - g.bench_function("serial_simulating_global_lock", |b| { + // Fetch packages one by one (simulates serialisation under a global lock). + group.bench_function("serial_simulating_global_lock", |b| { b.iter(|| { let cache = PackageCache::new(tempdir().unwrap().path()); - let paths = paths.clone(); + let paths = package_paths.clone(); rt.block_on(async { - for p in &paths { cache.get_or_fetch_from_path(p, None).await.unwrap(); } + for path in &paths { + cache.get_or_fetch_from_path(path, None).await.unwrap(); + } }) }); }); } -criterion_group!(benches, bench); +criterion_group!(benches, criterion_benchmark); criterion_main!(benches);