Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions src/cargo/core/compiler/build_runner/compilation_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,12 +250,16 @@ impl<'a, 'gctx: 'a> CompilationFiles<'a, 'gctx> {
false => "-",
};
let name = unit.pkg.package_id().name();
let meta = self.metas[unit];
let hash = meta
let hash = self.unit_hash(unit);
format!("{name}{separator}{hash}")
}

/// The directory hash to use for a given unit
pub fn unit_hash(&self, unit: &Unit) -> String {
self.metas[unit]
.pkg_dir()
.map(|h| h.to_string())
.unwrap_or_else(|| self.target_short_hash(unit));
format!("{name}{separator}{hash}")
.unwrap_or_else(|| self.target_short_hash(unit))
}

/// Returns the final artifact path for the host (`/…/target/debug`)
Expand Down Expand Up @@ -296,6 +300,15 @@ impl<'a, 'gctx: 'a> CompilationFiles<'a, 'gctx> {
self.layout(unit.kind).build_dir().fingerprint(&dir)
}

/// The lock location for a given build unit.
pub fn build_unit_lock(&self, unit: &Unit) -> PathBuf {
let dir = self.pkg_dir(unit);
self.layout(unit.kind)
.build_dir()
.build_unit(&dir)
.join(".lock")
Comment thread
epage marked this conversation as resolved.
}

/// Directory where incremental output for the given unit should go.
pub fn incremental_dir(&self, unit: &Unit) -> &Path {
self.layout(unit.kind).build_dir().incremental()
Expand Down
9 changes: 8 additions & 1 deletion src/cargo/core/compiler/build_runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::sync::{Arc, Mutex};

use crate::core::PackageId;
use crate::core::compiler::compilation::{self, UnitOutput};
use crate::core::compiler::locking::LockManager;
use crate::core::compiler::{self, Unit, UserIntent, artifact};
use crate::util::cache_lock::CacheLockMode;
use crate::util::errors::CargoResult;
Expand Down Expand Up @@ -87,6 +88,9 @@ pub struct BuildRunner<'a, 'gctx> {
/// because the target has a type error. This is in an Arc<Mutex<..>>
/// because it is continuously updated as the job progresses.
pub failed_scrape_units: Arc<Mutex<HashSet<UnitHash>>>,

/// Manages locks for build units when fine grain locking is enabled.
pub lock_manager: Arc<LockManager>,
}

impl<'a, 'gctx> BuildRunner<'a, 'gctx> {
Expand Down Expand Up @@ -126,6 +130,7 @@ impl<'a, 'gctx> BuildRunner<'a, 'gctx> {
lto: HashMap::new(),
metadata_for_doc_units: HashMap::new(),
failed_scrape_units: Arc::new(Mutex::new(HashSet::new())),
lock_manager: Arc::new(LockManager::new()),
})
}

Expand Down Expand Up @@ -417,7 +422,8 @@ impl<'a, 'gctx> BuildRunner<'a, 'gctx> {
| UserIntent::Doctest
| UserIntent::Bench => true,
};
let host_layout = Layout::new(self.bcx.ws, None, &dest, must_take_artifact_dir_lock)?;
let host_layout =
Layout::new(self.bcx.ws, None, &dest, must_take_artifact_dir_lock, false)?;
let mut targets = HashMap::new();
for kind in self.bcx.all_kinds.iter() {
if let CompileKind::Target(target) = *kind {
Expand All @@ -426,6 +432,7 @@ impl<'a, 'gctx> BuildRunner<'a, 'gctx> {
Some(target),
&dest,
must_take_artifact_dir_lock,
false,
)?;
targets.insert(target, layout);
}
Expand Down
6 changes: 6 additions & 0 deletions src/cargo/core/compiler/job_queue/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ impl Job {
let prev = mem::replace(&mut self.work, Work::noop());
self.work = next.then(prev);
}

/// Chains the given work by putting it after of our own unit of work.
pub fn after(&mut self, next: Work) {
let prev = mem::replace(&mut self.work, Work::noop());
self.work = prev.then(next);
}
}

impl fmt::Debug for Job {
Expand Down
16 changes: 15 additions & 1 deletion src/cargo/core/compiler/job_queue/job_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ use std::{cell::Cell, marker, sync::Arc};

use cargo_util::ProcessBuilder;

use crate::CargoResult;
use crate::core::compiler::future_incompat::FutureBreakageItem;
use crate::core::compiler::locking::LockKey;
use crate::core::compiler::timings::SectionTiming;
use crate::util::Queue;
use crate::{CargoResult, core::compiler::locking::LockManager};

use super::{Artifact, DiagDedupe, Job, JobId, Message};

Expand Down Expand Up @@ -47,6 +48,9 @@ pub struct JobState<'a, 'gctx> {
/// sending a double message later on.
rmeta_required: Cell<bool>,

/// Manages locks for build units when fine grain locking is enabled.
lock_manager: Arc<LockManager>,

// Historical versions of Cargo made use of the `'a` argument here, so to
// leave the door open to future refactorings keep it here.
_marker: marker::PhantomData<&'a ()>,
Expand All @@ -58,12 +62,14 @@ impl<'a, 'gctx> JobState<'a, 'gctx> {
messages: Arc<Queue<Message>>,
output: Option<&'a DiagDedupe<'gctx>>,
rmeta_required: bool,
lock_manager: Arc<LockManager>,
) -> Self {
Self {
id,
messages,
output,
rmeta_required: Cell::new(rmeta_required),
lock_manager,
_marker: marker::PhantomData,
}
}
Expand Down Expand Up @@ -141,6 +147,14 @@ impl<'a, 'gctx> JobState<'a, 'gctx> {
.push(Message::Finish(self.id, Artifact::Metadata, Ok(())));
}

pub fn lock_exclusive(&self, lock: &LockKey) -> CargoResult<()> {
self.lock_manager.lock(lock)
}

pub fn downgrade_to_shared(&self, lock: &LockKey) -> CargoResult<()> {
self.lock_manager.downgrade_to_shared(lock)
}

pub fn on_section_timing_emitted(&self, section: SectionTiming) {
self.messages.push(Message::SectionTiming(self.id, section));
}
Expand Down
3 changes: 2 additions & 1 deletion src/cargo/core/compiler/job_queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -951,9 +951,10 @@ impl<'gctx> DrainState<'gctx> {
let messages = self.messages.clone();
let is_fresh = job.freshness().is_fresh();
let rmeta_required = build_runner.rmeta_required(unit);
let lock_manager = build_runner.lock_manager.clone();

let doit = move |diag_dedupe| {
let state = JobState::new(id, messages, diag_dedupe, rmeta_required);
let state = JobState::new(id, messages, diag_dedupe, rmeta_required, lock_manager);
state.run_to_finish(job);
};

Expand Down
20 changes: 15 additions & 5 deletions src/cargo/core/compiler/layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ impl Layout {
target: Option<CompileTarget>,
dest: &str,
must_take_artifact_dir_lock: bool,
must_take_build_dir_lock_exclusively: bool,
) -> CargoResult<Layout> {
let is_new_layout = ws.gctx().cli_unstable().build_dir_new_layout;
let mut root = ws.target_dir();
Expand Down Expand Up @@ -160,11 +161,20 @@ impl Layout {
{
None
} else {
Some(build_dest.open_rw_exclusive_create(
".cargo-lock",
ws.gctx(),
"build directory",
)?)
if ws.gctx().cli_unstable().fine_grain_locking && !must_take_build_dir_lock_exclusively
{
Some(build_dest.open_ro_shared_create(
".cargo-lock",
ws.gctx(),
"build directory",
)?)
} else {
Some(build_dest.open_rw_exclusive_create(
".cargo-lock",
ws.gctx(),
"build directory",
)?)
}
};
let build_root = build_root.into_path_unlocked();
let build_dest = build_dest.as_path_unlocked();
Expand Down
108 changes: 108 additions & 0 deletions src/cargo/core/compiler/locking.rs
Comment thread
epage marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
//! This module handles the locking logic during compilation.

use crate::{
CargoResult,
core::compiler::{BuildRunner, Unit},
util::{FileLock, Filesystem},
};
use anyhow::bail;
use std::{
collections::HashMap,
fmt::{Display, Formatter},
path::PathBuf,
sync::Mutex,
};
use tracing::instrument;

/// A struct to store the lock handles for build units during compilation.
pub struct LockManager {
locks: Mutex<HashMap<LockKey, FileLock>>,
}

impl LockManager {
pub fn new() -> Self {
Self {
locks: Mutex::new(HashMap::new()),
}
}

/// Takes a shared lock on a given [`Unit`]
/// This prevents other Cargo instances from compiling (writing) to
/// this build unit.
///
/// This function returns a [`LockKey`] which can be used to
/// upgrade/unlock the lock.
#[instrument(skip_all, fields(key))]
pub fn lock_shared(
Comment thread
epage marked this conversation as resolved.
&self,
build_runner: &BuildRunner<'_, '_>,
unit: &Unit,
) -> CargoResult<LockKey> {
let key = LockKey::from_unit(build_runner, unit);
tracing::Span::current().record("key", key.0.to_str());

let mut locks = self.locks.lock().unwrap();
if let Some(lock) = locks.get_mut(&key) {
lock.file().lock_shared()?;
} else {
let fs = Filesystem::new(key.0.clone());
let lock_msg = format!(
"{} ({})",
unit.pkg.name(),
build_runner.files().unit_hash(unit)
);
let lock = fs.open_ro_shared_create(&key.0, build_runner.bcx.gctx, &lock_msg)?;
locks.insert(key.clone(), lock);
}
Comment thread
epage marked this conversation as resolved.

Ok(key)
}

#[instrument(skip(self))]
pub fn lock(&self, key: &LockKey) -> CargoResult<()> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it work to also have a Blocking message for this?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into this and I think it should be possible. We don't have direct access to GlobalContext inside of JobState.
However, we do have indirect access through DiagDedupe.

I think we can modify the Filesystem functions to accept a trait like ReportBlocking and implement that trait for Shell and DiagDedupe.

Though perhaps we defer that to a follow up PR?

let mut locks = self.locks.lock().unwrap();
if let Some(lock) = locks.get_mut(&key) {
lock.file().lock()?;
} else {
bail!("lock was not found in lock manager: {key}");
}

Ok(())
}

/// Upgrades an existing exclusive lock into a shared lock.
#[instrument(skip(self))]
pub fn downgrade_to_shared(&self, key: &LockKey) -> CargoResult<()> {
let mut locks = self.locks.lock().unwrap();
let Some(lock) = locks.get_mut(key) else {
bail!("lock was not found in lock manager: {key}");
};
lock.file().lock_shared()?;
Ok(())
}

#[instrument(skip(self))]
pub fn unlock(&self, key: &LockKey) -> CargoResult<()> {
let mut locks = self.locks.lock().unwrap();
if let Some(lock) = locks.get_mut(key) {
lock.file().unlock()?;
};

Ok(())
}
}

#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub struct LockKey(PathBuf);

impl LockKey {
fn from_unit(build_runner: &BuildRunner<'_, '_>, unit: &Unit) -> Self {
Self(build_runner.files().build_unit_lock(unit))
}
}

impl Display for LockKey {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0.display())
}
}
39 changes: 39 additions & 0 deletions src/cargo/core/compiler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub mod future_incompat;
pub(crate) mod job_queue;
pub(crate) mod layout;
mod links;
mod locking;
mod lto;
mod output_depinfo;
mod output_sbom;
Expand Down Expand Up @@ -94,6 +95,7 @@ use self::output_sbom::build_sbom;
use self::unit_graph::UnitDep;

use crate::core::compiler::future_incompat::FutureIncompatReport;
use crate::core::compiler::locking::LockKey;
use crate::core::compiler::timings::SectionTiming;
pub use crate::core::compiler::unit::{Unit, UnitInterner};
use crate::core::manifest::TargetSourcePath;
Expand Down Expand Up @@ -187,6 +189,12 @@ fn compile<'gctx>(
return Ok(());
}

let lock = if build_runner.bcx.gctx.cli_unstable().fine_grain_locking {
Some(build_runner.lock_manager.lock_shared(build_runner, unit)?)
} else {
None
};

// If we are in `--compile-time-deps` and the given unit is not a compile time
// dependency, skip compiling the unit and jumps to dependencies, which still
// have chances to be compile time dependencies
Expand Down Expand Up @@ -228,6 +236,23 @@ fn compile<'gctx>(
work.then(link_targets(build_runner, unit, true)?)
});

// If -Zfine-grain-locking is enabled, we wrap the job with an upgrade to exclusive
// lock before starting, then downgrade to a shared lock after the job is finished.
if build_runner.bcx.gctx.cli_unstable().fine_grain_locking && job.freshness().is_dirty()
{
if let Some(lock) = lock {
// Here we unlock the current shared lock to avoid deadlocking with other cargo
// processes. Then we configure our compile job to take an exclusive lock
// before starting. Once we are done compiling (including both rmeta and rlib)
// we downgrade to a shared lock to allow other cargo's to read the build unit.
// We will hold this shared lock for the remainder of compilation to prevent
// other cargo from re-compiling while we are still using the unit.
build_runner.lock_manager.unlock(&lock)?;
job.before(prebuild_lock_exclusive(lock.clone()));
Comment thread
epage marked this conversation as resolved.
job.after(downgrade_lock_to_shared(lock));
}
}

job
};
jobs.enqueue(build_runner, unit, job)?;
Expand Down Expand Up @@ -589,6 +614,20 @@ fn verbose_if_simple_exit_code(err: Error) -> Error {
}
}

fn prebuild_lock_exclusive(lock: LockKey) -> Work {
Work::new(move |state| {
state.lock_exclusive(&lock)?;
Ok(())
})
}

fn downgrade_lock_to_shared(lock: LockKey) -> Work {
Work::new(move |state| {
state.downgrade_to_shared(&lock)?;
Ok(())
})
}

/// Link the compiled target (often of form `foo-{metadata_hash}`) to the
/// final target. This must happen during both "Fresh" and "Compile".
fn link_targets(
Expand Down
Loading