Skip to content

Commit 193bd1b

Browse files
mdittmerjoshlf
authored andcommitted
[anneal][v2] Add utility functions: environment helpers and DirLock
TAG=agy gherrit-pr-id: Gisb7egnyltdoxlay3of6lmsjbtlok7ct
1 parent 74de0c4 commit 193bd1b

1 file changed

Lines changed: 339 additions & 0 deletions

File tree

anneal/v2/src/util.rs

Lines changed: 339 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,339 @@
1+
// Copyright 2026 The Fuchsia Authors
2+
//
3+
// Licensed under the 2-Clause BSD License <LICENSE-BSD or
4+
// https://opensource.org/license/bsd-2-clause>, Apache License, Version 2.0
5+
// <LICENSE-APACHE or https://www.apache.org/licenses/LICENSE-2.0>, or the MIT
6+
// license <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your option.
7+
// This file may not be copied, modified, or distributed except according to
8+
// those terms.
9+
10+
use std::io::BufRead as _;
11+
12+
use anyhow::Context as _;
13+
use fs2::FileExt as _;
14+
15+
/// Represents an active, exclusive lock on a directory.
16+
///
17+
/// This struct guarantees that the process holds an OS-level file lock
18+
/// guarding the specified directory.
19+
pub(crate) struct DirLock {
20+
/// The path to the directory being guarded.
21+
pub(crate) path: std::path::PathBuf,
22+
// Kept alive to hold the flock.
23+
_file: std::fs::File,
24+
}
25+
26+
impl DirLock {
27+
/// Acquires an exclusive lock on the specified directory.
28+
///
29+
/// This function blocks until the lock can be acquired. We use a
30+
/// separate `.lock` file within the directory rather than locking
31+
/// the directory itself to avoid platform-specific issues with
32+
/// directory locking and to ensure the lock file persists even if
33+
/// the directory is cleaned.
34+
pub(crate) fn lock_exclusive(path: std::path::PathBuf) -> anyhow::Result<Self> {
35+
let file = Self::open_lock_file(&path)?;
36+
file.lock_exclusive()
37+
.with_context(|| format!("Failed to acquire exclusive lock on {:?}", path))?;
38+
Ok(Self { path, _file: file })
39+
}
40+
41+
/// Acquires a shared lock on the specified directory.
42+
///
43+
/// Multiple processes can hold shared locks simultaneously, but an
44+
/// exclusive lock will block until all shared locks are released.
45+
pub(crate) fn lock_shared(path: std::path::PathBuf) -> anyhow::Result<Self> {
46+
let file = Self::open_lock_file(&path)?;
47+
file.lock_shared()
48+
.with_context(|| format!("Failed to acquire shared lock on {:?}", path))?;
49+
Ok(Self { path, _file: file })
50+
}
51+
52+
fn open_lock_file(path: &std::path::Path) -> anyhow::Result<std::fs::File> {
53+
let lock_path = path.join(".lock");
54+
55+
// Ensure the directory exists.
56+
if let Some(parent) = lock_path.parent() {
57+
std::fs::create_dir_all(parent).with_context(|| {
58+
format!("Failed to create directory for lock file: {:?}", parent)
59+
})?;
60+
}
61+
// If the lock file already exists, we open it in read-only mode.
62+
// This prevents failures if the file is read-only (e.g., after
63+
// making the toolchain directory read-only), while still allowing
64+
// us to acquire shared and exclusive locks on the file descriptor.
65+
if lock_path.exists() {
66+
std::fs::OpenOptions::new()
67+
.read(true)
68+
.open(&lock_path)
69+
.with_context(|| format!("Failed to open lock file at {:?}", lock_path))
70+
} else {
71+
std::fs::OpenOptions::new()
72+
.read(true)
73+
.write(true)
74+
.create(true)
75+
.open(&lock_path)
76+
.with_context(|| format!("Failed to create lock file at {:?}", lock_path))
77+
}
78+
}
79+
}
80+
81+
/// Walks a directory recursively and replaces string patterns inside `.trace`
82+
/// files. This is used to patch non-portable paths generated by Lake.
83+
pub(crate) fn patch_trace_files(
84+
dir: &std::path::Path,
85+
replacements: &[(&str, &str)],
86+
) -> anyhow::Result<()> {
87+
if dir.exists() {
88+
let walker = walkdir::WalkDir::new(dir).into_iter();
89+
for entry in walker {
90+
let entry = entry.context("Failed to walk directory for trace patching")?;
91+
let path = entry.path();
92+
if path.is_file() && path.extension().map_or(false, |ext| ext == "trace") {
93+
let content = std::fs::read_to_string(path)
94+
.with_context(|| format!("Failed to read trace file {:?}", path))?;
95+
let mut new_content = content.clone();
96+
for (from, to) in replacements {
97+
new_content = new_content.replace(from, to);
98+
}
99+
if new_content != content {
100+
std::fs::write(path, new_content)
101+
.with_context(|| format!("Failed to write trace file {:?}", path))?;
102+
}
103+
}
104+
}
105+
}
106+
Ok(())
107+
}
108+
109+
/// Prepends a path to an existing environment variable,
110+
/// separating them with a colon if the variable is not empty. This contains the
111+
/// pure string formatting logic.
112+
fn prepend_to_env_var_impl(
113+
current_val: std::ffi::OsString,
114+
new_path: &std::path::Path,
115+
) -> std::ffi::OsString {
116+
let mut combined = new_path.to_path_buf().into_os_string();
117+
if !current_val.is_empty() {
118+
combined.push(":");
119+
combined.push(current_val);
120+
}
121+
combined
122+
}
123+
124+
/// Prepends a path to an existing environment variable in the active process
125+
/// environment, separating them with a colon if the variable is not empty.
126+
/// This is used to inject our managed Rust toolchain paths before the system paths.
127+
pub(crate) fn prepend_to_env_var(var_name: &str, new_path: &std::path::Path) -> std::ffi::OsString {
128+
let current_val = std::env::var_os(var_name).unwrap_or_default();
129+
prepend_to_env_var_impl(current_val, new_path)
130+
}
131+
132+
pub(crate) struct ProcessOutput {
133+
pub status: std::process::ExitStatus,
134+
pub stderr_lines: Vec<String>,
135+
}
136+
137+
/// Spawns a child process, drains its stderr in a background thread, and processes
138+
/// its stdout line-by-line in the main thread while showing a progress spinner.
139+
pub(crate) fn run_command_with_progress<F>(
140+
mut cmd: std::process::Command,
141+
pb: Option<indicatif::ProgressBar>,
142+
mut process_stdout_line: F,
143+
) -> anyhow::Result<ProcessOutput>
144+
where
145+
F: FnMut(&str, Option<&indicatif::ProgressBar>) -> anyhow::Result<()>,
146+
{
147+
cmd.stdout(std::process::Stdio::piped());
148+
cmd.stderr(std::process::Stdio::piped());
149+
150+
let mut child = cmd.spawn().context("Failed to spawn child process")?;
151+
152+
let stderr_buffer = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
153+
let stderr_buffer_clone = std::sync::Arc::clone(&stderr_buffer);
154+
155+
let mut stderr_thread = None;
156+
if let Some(stderr) = child.stderr.take() {
157+
stderr_thread = Some(std::thread::spawn(move || {
158+
let reader = std::io::BufReader::new(stderr);
159+
for line in reader.lines().map_while(Result::ok) {
160+
stderr_buffer_clone.lock().unwrap().push(line);
161+
}
162+
}));
163+
}
164+
165+
if let Some(ref p) = pb {
166+
p.enable_steady_tick(std::time::Duration::from_millis(100));
167+
}
168+
169+
if let Some(stdout) = child.stdout.take() {
170+
let reader = std::io::BufReader::new(stdout);
171+
for line in reader.lines().map_while(Result::ok) {
172+
process_stdout_line(&line, pb.as_ref())?;
173+
if let Some(ref p) = pb {
174+
p.tick();
175+
}
176+
}
177+
}
178+
179+
if let Some(ref p) = pb {
180+
p.finish_and_clear();
181+
}
182+
183+
let status = child.wait().context("Failed to wait for child process")?;
184+
185+
if let Some(thread) = stderr_thread {
186+
let _ = thread.join();
187+
}
188+
189+
let stderr_lines = std::sync::Arc::try_unwrap(stderr_buffer).unwrap().into_inner().unwrap();
190+
191+
Ok(ProcessOutput { status, stderr_lines })
192+
}
193+
194+
#[cfg(feature = "exocrate_tests")]
195+
pub(crate) fn run_test_lock_helper(
196+
role: &str,
197+
lock_dir: &std::path::Path,
198+
log_file: &std::path::Path,
199+
sig_file: &std::path::Path,
200+
) -> anyhow::Result<()> {
201+
use std::io::Write as _;
202+
203+
let append_log = |msg: &str| -> anyhow::Result<()> {
204+
let mut file = std::fs::OpenOptions::new().create(true).append(true).open(log_file)?;
205+
writeln!(file, "{}", msg)?;
206+
Ok(())
207+
};
208+
209+
let wait_for_sig = || -> anyhow::Result<()> {
210+
let start = std::time::Instant::now();
211+
while !sig_file.exists() {
212+
if start.elapsed() > std::time::Duration::from_secs(3) {
213+
anyhow::bail!("Timeout waiting for signal file {:?}", sig_file);
214+
}
215+
std::thread::sleep(std::time::Duration::from_millis(50));
216+
}
217+
Ok(())
218+
};
219+
220+
match role {
221+
"reader-a" => {
222+
let _lock = DirLock::lock_shared(lock_dir.to_path_buf())?;
223+
append_log("SHARED_START_A")?;
224+
wait_for_sig()?;
225+
append_log("SHARED_END_A")?;
226+
}
227+
"reader-b" => {
228+
let _lock = DirLock::lock_shared(lock_dir.to_path_buf())?;
229+
append_log("SHARED_START_B")?;
230+
std::fs::write(sig_file, "")?;
231+
append_log("SHARED_END_B")?;
232+
}
233+
"writer-a" => {
234+
let _lock = DirLock::lock_exclusive(lock_dir.to_path_buf())?;
235+
append_log("EXCLUSIVE_START_A")?;
236+
wait_for_sig()?;
237+
append_log("EXCLUSIVE_END_A")?;
238+
}
239+
"reader-exclusion" => {
240+
std::fs::write(sig_file, "")?;
241+
let _lock = DirLock::lock_shared(lock_dir.to_path_buf())?;
242+
append_log("SHARED_START_B")?;
243+
append_log("SHARED_END_B")?;
244+
}
245+
_ => anyhow::bail!("Unknown test-lock-helper role: {}", role),
246+
}
247+
248+
Ok(())
249+
}
250+
251+
#[cfg(test)]
252+
#[macro_export]
253+
macro_rules! workspace_fixture {
254+
($dir:expr, { $($path:expr => $content:expr),* $(,)? }) => {{
255+
let root = $dir.path();
256+
$(
257+
let file_path = root.join($path);
258+
if let Some(parent) = file_path.parent() {
259+
std::fs::create_dir_all(parent).unwrap();
260+
}
261+
std::fs::write(&file_path, $content).unwrap();
262+
)*
263+
}};
264+
}
265+
266+
#[cfg(test)]
267+
mod tests {
268+
use super::*;
269+
270+
#[test]
271+
fn test_prepend_to_env_var() {
272+
// Test when current value is empty.
273+
let path1 = std::path::Path::new("/path/one");
274+
let res1 = prepend_to_env_var_impl(std::ffi::OsString::new(), path1);
275+
assert_eq!(res1, "/path/one");
276+
277+
// Test when current value is not empty.
278+
let path2 = std::path::Path::new("/path/two");
279+
let res2 = prepend_to_env_var_impl(res1, path2);
280+
assert_eq!(res2, "/path/two:/path/one");
281+
}
282+
283+
#[test]
284+
fn test_dir_lock_exclusive_mutual_exclusion() {
285+
let temp_dir = tempfile::tempdir().unwrap();
286+
let lock_path = temp_dir.path().to_path_buf();
287+
288+
let barrier = std::sync::Arc::new(std::sync::Barrier::new(2));
289+
let barrier_clone = std::sync::Arc::clone(&barrier);
290+
let lock_path_clone = lock_path.clone();
291+
292+
let lock_released = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
293+
let lock_released_clone = std::sync::Arc::clone(&lock_released);
294+
295+
// Thread A acquires the lock.
296+
let thread_a = std::thread::spawn(move || {
297+
let _lock = DirLock::lock_exclusive(lock_path_clone).expect("Failed to lock exclusive");
298+
barrier_clone.wait(); // Signal Thread B that A holds the lock.
299+
300+
// Simulate brief work holding the lock.
301+
std::thread::sleep(std::time::Duration::from_millis(100));
302+
lock_released_clone.store(true, std::sync::atomic::Ordering::Relaxed);
303+
// _lock drops here, releasing the lock.
304+
});
305+
306+
// Thread B waits for Thread A to acquire the lock, then tries to acquire it itself.
307+
let thread_b = std::thread::spawn(move || {
308+
barrier.wait(); // Wait for Thread A to acquire lock.
309+
310+
// Attempt to acquire lock. This should block until Thread A releases it.
311+
let _lock = DirLock::lock_exclusive(lock_path).expect("Failed to lock exclusive in B");
312+
313+
// Assert that B only successfully locked the directory AFTER A released it.
314+
assert!(
315+
lock_released.load(std::sync::atomic::Ordering::Relaxed),
316+
"Thread B acquired lock before Thread A released it!"
317+
);
318+
});
319+
320+
thread_a.join().unwrap();
321+
thread_b.join().unwrap();
322+
}
323+
324+
#[test]
325+
fn test_dir_lock_shared_coexistence() {
326+
let temp_dir = tempfile::tempdir().unwrap();
327+
let lock_path = temp_dir.path().to_path_buf();
328+
329+
// Thread A acquires shared lock.
330+
let lock_a = DirLock::lock_shared(lock_path.clone()).expect("Failed to lock shared");
331+
332+
// Thread B should be able to acquire shared lock immediately without blocking.
333+
let lock_b = DirLock::lock_shared(lock_path).expect("Failed to lock shared concurrently");
334+
335+
// Both locks are held.
336+
drop(lock_a);
337+
drop(lock_b);
338+
}
339+
}

0 commit comments

Comments
 (0)