Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions crates/rattler/src/install/installer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,15 +486,6 @@ impl Installer {
.join(rattler_cache::PACKAGE_CACHE_DIR),
)
});

// Acquire a global lock on the package cache for the entire installation.
// This significantly reduces overhead by avoiding per-package locking.
let _global_cache_lock = package_cache
.acquire_global_lock()
.await
.map_err(InstallerError::FailedToAcquireCacheLock)?;

// Construct a driver.
let driver = InstallDriver::builder()
.execute_link_scripts(self.execute_link_scripts)
.with_io_concurrency_semaphore(
Expand Down
5 changes: 5 additions & 0 deletions crates/rattler_cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,8 @@ tokio-stream = { workspace = true }
tower-http = { workspace = true, features = ["fs"] }
tools = { path = "../tools" }
reqwest-retry = { workspace = true }
criterion = { workspace = true }

[[bench]]
name = "concurrent_cache_lock"
harness = false
65 changes: 65 additions & 0 deletions crates/rattler_cache/benches/concurrent_cache_lock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
//! Benchmark: concurrent vs serial package cache fetch.
//!
//! Run with: `cargo bench -p rattler_cache --bench concurrent_cache_lock`

use std::path::{Path, PathBuf};

use criterion::{criterion_group, criterion_main, Criterion};
use rattler_cache::package_cache::PackageCache;
use tempfile::tempdir;
use tokio::runtime::Runtime;

fn benchmark_package_paths() -> Vec<PathBuf> {
let root = Path::new(env!("CARGO_MANIFEST_DIR")).join("../../test-data");
vec![
root.join("clobber/clobber-python-0.1.0-cpython.conda"),
root.join("clobber/clobber-1-0.1.0-h4616a5c_0.tar.bz2"),
root.join("clobber/clobber-2-0.1.0-h4616a5c_0.tar.bz2"),
root.join("clobber/clobber-3-0.1.0-h4616a5c_0.tar.bz2"),
root.join("packages/empty-0.1.0-h4616a5c_0.conda"),
]
}

fn criterion_benchmark(c: &mut Criterion) {
let package_paths = benchmark_package_paths();
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("cache_lock");
group.sample_size(10);


group.bench_function("concurrent_per_package_lock", |b| {
b.iter(|| {
let cache = PackageCache::new(tempdir().unwrap().path());
let paths = package_paths.clone();
rt.block_on(async {
let handles: Vec<_> = paths
.iter()
.map(|path| {
let cache = cache.clone();
let path = path.clone();
tokio::spawn(async move { cache.get_or_fetch_from_path(&path, None).await })
})
.collect();
for handle in handles {
handle.await.unwrap().unwrap();
}
})
});
});

// Fetch packages one by one (simulates serialisation under a global lock).
group.bench_function("serial_simulating_global_lock", |b| {
b.iter(|| {
let cache = PackageCache::new(tempdir().unwrap().path());
let paths = package_paths.clone();
rt.block_on(async {
for path in &paths {
cache.get_or_fetch_from_path(path, None).await.unwrap();
}
})
});
});
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
39 changes: 31 additions & 8 deletions crates/rattler_cache/src/package_cache/cache_lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ use crate::package_cache::PackageCacheLayerError;
/// This struct represents a cache entry that has been validated and is ready for use.
/// It holds the cache entry's path, revision number, and optional SHA256 hash.
///
/// Note: Concurrent access is coordinated via the global cache lock mechanism
/// (see [`CacheGlobalLock`]). Individual cache entries do not hold locks.
/// Note: Concurrent access to a cache entry is coordinated by taking an
/// exclusive lock on the cache entry's metadata lock file (the `*.lock` file
/// next to the package directory).
pub struct CacheMetadata {
pub(super) revision: u64,
pub(super) sha256: Option<Sha256Hash>,
Expand Down Expand Up @@ -140,13 +141,13 @@ pub struct CacheMetadataFile {
impl CacheMetadataFile {
/// Acquires a handle to the cache metadata file.
///
/// Opens the file with both read and write permissions. Since concurrent access
/// is coordinated via [`CacheGlobalLock`], this single method is sufficient for
/// all metadata operations.
/// Opens the file with both read and write permissions and acquires an
/// exclusive OS file lock on it. This prevents multiple processes from
/// concurrently modifying the cache entry on disk.
pub async fn acquire(path: &Path) -> Result<Self, PackageCacheLayerError> {
let lock_file_path = path.to_path_buf();

simple_spawn_blocking::tokio::run_blocking_task(move || {
let acquire_lock_fut = simple_spawn_blocking::tokio::run_blocking_task(move || {
let file = std::fs::OpenOptions::new()
.create(true)
.read(true)
Expand All @@ -163,11 +164,27 @@ impl CacheMetadataFile {
)
})?;

file.lock_exclusive().map_err(|e| {
PackageCacheLayerError::LockError(
format!(
"failed to acquire exclusive lock on cache metadata file: '{}'",
lock_file_path.display()
),
e,
)
})?;

Ok(CacheMetadataFile {
file: Arc::new(file),
})
})
.await
});

tokio::select!(
lock = acquire_lock_fut => lock,
_ = warn_timeout_future(
"Blocking waiting for cache entry lock file".to_string()
) => unreachable!("warn_timeout_future should never finish")
)
}
}

Expand Down Expand Up @@ -294,6 +311,12 @@ impl CacheMetadataFile {
}
}

impl Drop for CacheMetadataFile {
fn drop(&mut self) {
let _ = fs4::fs_std::FileExt::unlock(&*self.file);
}
}

async fn warn_timeout_future(message: String) {
loop {
tokio::time::sleep(Duration::from_secs(30)).await;
Expand Down
17 changes: 9 additions & 8 deletions crates/rattler_cache/src/package_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,12 +258,12 @@ impl PackageCache {

/// Acquires a global lock on the package cache.
///
/// This lock can be used to coordinate multiple package operations,
/// reducing the overhead of acquiring individual locks for each package.
/// The lock is held until the returned `CacheGlobalLock` is dropped.
/// This lock serializes cache operations across *processes* sharing the same cache
/// directory. It is held until the returned `CacheGlobalLock` is dropped.
///
/// This is particularly useful when installing many packages at once,
/// as it significantly reduces the number of file locking syscalls.
/// Prefer the default per-entry locking behavior for multi-process scenarios
/// (e.g. many concurrent installs sharing one cache). Only use this lock when
/// you explicitly want to trade parallelism for reduced lock contention.
pub async fn acquire_global_lock(&self) -> Result<CacheGlobalLock, PackageCacheError> {
// Use the first writable layer's path for the global cache lock
let (_, writable_layers) = self.split_layers();
Expand Down Expand Up @@ -603,7 +603,7 @@ where
E: Error + Send + Sync + 'static,
{
// Open the cache metadata file to read/write revision and hash information.
// Concurrent access is coordinated via the global cache lock.
// Concurrent access is coordinated by an exclusive lock on the metadata file.
let lock_file_path = {
// Append the `.lock` extension to the cache path to create the lock file path.
let mut path_str = path.as_os_str().to_owned();
Expand Down Expand Up @@ -701,8 +701,9 @@ where
}

// If the cache is stale, we need to fetch the package again.
// Since we hold the global cache lock, we can safely update the metadata
// and fetch the package without worrying about concurrent modifications.
// Since we hold the cache entry lock (on the metadata file), we can safely
// update the metadata and fetch the package without worrying about concurrent
// modifications of this cache entry.
if let Some(ref fetch_fn) = fetch {
// Write the new revision
let new_revision = cache_revision + 1;
Expand Down
Loading