diff --git a/crates/rattler/src/install/installer/mod.rs b/crates/rattler/src/install/installer/mod.rs index 451fcd95d..037b21c2d 100644 --- a/crates/rattler/src/install/installer/mod.rs +++ b/crates/rattler/src/install/installer/mod.rs @@ -486,15 +486,6 @@ impl Installer { .join(rattler_cache::PACKAGE_CACHE_DIR), ) }); - - // Acquire a global lock on the package cache for the entire installation. - // This significantly reduces overhead by avoiding per-package locking. - let _global_cache_lock = package_cache - .acquire_global_lock() - .await - .map_err(InstallerError::FailedToAcquireCacheLock)?; - - // Construct a driver. let driver = InstallDriver::builder() .execute_link_scripts(self.execute_link_scripts) .with_io_concurrency_semaphore( diff --git a/crates/rattler_cache/Cargo.toml b/crates/rattler_cache/Cargo.toml index 2a9a3560a..c9d5db0a8 100644 --- a/crates/rattler_cache/Cargo.toml +++ b/crates/rattler_cache/Cargo.toml @@ -53,3 +53,8 @@ tokio-stream = { workspace = true } tower-http = { workspace = true, features = ["fs"] } tools = { path = "../tools" } reqwest-retry = { workspace = true } +criterion = { workspace = true } + +[[bench]] +name = "concurrent_cache_lock" +harness = false diff --git a/crates/rattler_cache/benches/concurrent_cache_lock.rs b/crates/rattler_cache/benches/concurrent_cache_lock.rs new file mode 100644 index 000000000..fbae181d7 --- /dev/null +++ b/crates/rattler_cache/benches/concurrent_cache_lock.rs @@ -0,0 +1,65 @@ +//! Benchmark: concurrent vs serial package cache fetch. +//! +//! Run with: `cargo bench -p rattler_cache --bench concurrent_cache_lock` + +use std::path::{Path, PathBuf}; + +use criterion::{criterion_group, criterion_main, Criterion}; +use rattler_cache::package_cache::PackageCache; +use tempfile::tempdir; +use tokio::runtime::Runtime; + +fn benchmark_package_paths() -> Vec { + let root = Path::new(env!("CARGO_MANIFEST_DIR")).join("../../test-data"); + vec![ + root.join("clobber/clobber-python-0.1.0-cpython.conda"), + root.join("clobber/clobber-1-0.1.0-h4616a5c_0.tar.bz2"), + root.join("clobber/clobber-2-0.1.0-h4616a5c_0.tar.bz2"), + root.join("clobber/clobber-3-0.1.0-h4616a5c_0.tar.bz2"), + root.join("packages/empty-0.1.0-h4616a5c_0.conda"), + ] +} + +fn criterion_benchmark(c: &mut Criterion) { + let package_paths = benchmark_package_paths(); + let rt = Runtime::new().unwrap(); + let mut group = c.benchmark_group("cache_lock"); + group.sample_size(10); + + + group.bench_function("concurrent_per_package_lock", |b| { + b.iter(|| { + let cache = PackageCache::new(tempdir().unwrap().path()); + let paths = package_paths.clone(); + rt.block_on(async { + let handles: Vec<_> = paths + .iter() + .map(|path| { + let cache = cache.clone(); + let path = path.clone(); + tokio::spawn(async move { cache.get_or_fetch_from_path(&path, None).await }) + }) + .collect(); + for handle in handles { + handle.await.unwrap().unwrap(); + } + }) + }); + }); + + // Fetch packages one by one (simulates serialisation under a global lock). + group.bench_function("serial_simulating_global_lock", |b| { + b.iter(|| { + let cache = PackageCache::new(tempdir().unwrap().path()); + let paths = package_paths.clone(); + rt.block_on(async { + for path in &paths { + cache.get_or_fetch_from_path(path, None).await.unwrap(); + } + }) + }); + }); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/crates/rattler_cache/src/package_cache/cache_lock.rs b/crates/rattler_cache/src/package_cache/cache_lock.rs index 3e5d23aa6..c4852dcae 100644 --- a/crates/rattler_cache/src/package_cache/cache_lock.rs +++ b/crates/rattler_cache/src/package_cache/cache_lock.rs @@ -18,8 +18,9 @@ use crate::package_cache::PackageCacheLayerError; /// This struct represents a cache entry that has been validated and is ready for use. /// It holds the cache entry's path, revision number, and optional SHA256 hash. /// -/// Note: Concurrent access is coordinated via the global cache lock mechanism -/// (see [`CacheGlobalLock`]). Individual cache entries do not hold locks. +/// Note: Concurrent access to a cache entry is coordinated by taking an +/// exclusive lock on the cache entry's metadata lock file (the `*.lock` file +/// next to the package directory). pub struct CacheMetadata { pub(super) revision: u64, pub(super) sha256: Option, @@ -140,13 +141,13 @@ pub struct CacheMetadataFile { impl CacheMetadataFile { /// Acquires a handle to the cache metadata file. /// - /// Opens the file with both read and write permissions. Since concurrent access - /// is coordinated via [`CacheGlobalLock`], this single method is sufficient for - /// all metadata operations. + /// Opens the file with both read and write permissions and acquires an + /// exclusive OS file lock on it. This prevents multiple processes from + /// concurrently modifying the cache entry on disk. pub async fn acquire(path: &Path) -> Result { let lock_file_path = path.to_path_buf(); - simple_spawn_blocking::tokio::run_blocking_task(move || { + let acquire_lock_fut = simple_spawn_blocking::tokio::run_blocking_task(move || { let file = std::fs::OpenOptions::new() .create(true) .read(true) @@ -163,11 +164,27 @@ impl CacheMetadataFile { ) })?; + file.lock_exclusive().map_err(|e| { + PackageCacheLayerError::LockError( + format!( + "failed to acquire exclusive lock on cache metadata file: '{}'", + lock_file_path.display() + ), + e, + ) + })?; + Ok(CacheMetadataFile { file: Arc::new(file), }) - }) - .await + }); + + tokio::select!( + lock = acquire_lock_fut => lock, + _ = warn_timeout_future( + "Blocking waiting for cache entry lock file".to_string() + ) => unreachable!("warn_timeout_future should never finish") + ) } } @@ -294,6 +311,12 @@ impl CacheMetadataFile { } } +impl Drop for CacheMetadataFile { + fn drop(&mut self) { + let _ = fs4::fs_std::FileExt::unlock(&*self.file); + } +} + async fn warn_timeout_future(message: String) { loop { tokio::time::sleep(Duration::from_secs(30)).await; diff --git a/crates/rattler_cache/src/package_cache/mod.rs b/crates/rattler_cache/src/package_cache/mod.rs index 3d9d78e56..4ace9f8f1 100644 --- a/crates/rattler_cache/src/package_cache/mod.rs +++ b/crates/rattler_cache/src/package_cache/mod.rs @@ -258,12 +258,12 @@ impl PackageCache { /// Acquires a global lock on the package cache. /// - /// This lock can be used to coordinate multiple package operations, - /// reducing the overhead of acquiring individual locks for each package. - /// The lock is held until the returned `CacheGlobalLock` is dropped. + /// This lock serializes cache operations across *processes* sharing the same cache + /// directory. It is held until the returned `CacheGlobalLock` is dropped. /// - /// This is particularly useful when installing many packages at once, - /// as it significantly reduces the number of file locking syscalls. + /// Prefer the default per-entry locking behavior for multi-process scenarios + /// (e.g. many concurrent installs sharing one cache). Only use this lock when + /// you explicitly want to trade parallelism for reduced lock contention. pub async fn acquire_global_lock(&self) -> Result { // Use the first writable layer's path for the global cache lock let (_, writable_layers) = self.split_layers(); @@ -603,7 +603,7 @@ where E: Error + Send + Sync + 'static, { // Open the cache metadata file to read/write revision and hash information. - // Concurrent access is coordinated via the global cache lock. + // Concurrent access is coordinated by an exclusive lock on the metadata file. let lock_file_path = { // Append the `.lock` extension to the cache path to create the lock file path. let mut path_str = path.as_os_str().to_owned(); @@ -701,8 +701,9 @@ where } // If the cache is stale, we need to fetch the package again. - // Since we hold the global cache lock, we can safely update the metadata - // and fetch the package without worrying about concurrent modifications. + // Since we hold the cache entry lock (on the metadata file), we can safely + // update the metadata and fetch the package without worrying about concurrent + // modifications of this cache entry. if let Some(ref fetch_fn) = fetch { // Write the new revision let new_revision = cache_revision + 1;