Skip to content

Commit e8924f2

Browse files
committed
[anneal][v2] Add utility functions: environment helpers and DirLock
TAG=agy gherrit-pr-id: Gisb7egnyltdoxlay3of6lmsjbtlok7ct
1 parent 84de255 commit e8924f2

1 file changed

Lines changed: 338 additions & 0 deletions

File tree

anneal/v2/src/util.rs

Lines changed: 338 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,338 @@
1+
// Copyright 2026 The Fuchsia Authors
2+
//
3+
// Licensed under the 2-Clause BSD License <LICENSE-BSD or
4+
// https://opensource.org/license/bsd-2-clause>, Apache License, Version 2.0
5+
// <LICENSE-APACHE or https://www.apache.org/licenses/LICENSE-2.0>, or the MIT
6+
// license <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your option.
7+
// This file may not be copied, modified, or distributed except according to
8+
// those terms.
9+
10+
use anyhow::Context as _;
11+
use fs2::FileExt as _;
12+
use std::io::BufRead as _;
13+
14+
/// Represents an active, exclusive lock on a directory.
15+
///
16+
/// This struct guarantees that the process holds an OS-level file lock
17+
/// guarding the specified directory.
18+
pub(crate) struct DirLock {
19+
/// The path to the directory being guarded.
20+
pub(crate) path: std::path::PathBuf,
21+
// Kept alive to hold the flock.
22+
_file: std::fs::File,
23+
}
24+
25+
impl DirLock {
26+
/// Acquires an exclusive lock on the specified directory.
27+
///
28+
/// This function blocks until the lock can be acquired. We use a
29+
/// separate `.lock` file within the directory rather than locking
30+
/// the directory itself to avoid platform-specific issues with
31+
/// directory locking and to ensure the lock file persists even if
32+
/// the directory is cleaned.
33+
pub(crate) fn lock_exclusive(path: std::path::PathBuf) -> anyhow::Result<Self> {
34+
let file = Self::open_lock_file(&path)?;
35+
file.lock_exclusive()
36+
.with_context(|| format!("Failed to acquire exclusive lock on {:?}", path))?;
37+
Ok(Self { path, _file: file })
38+
}
39+
40+
/// Acquires a shared lock on the specified directory.
41+
///
42+
/// Multiple processes can hold shared locks simultaneously, but an
43+
/// exclusive lock will block until all shared locks are released.
44+
pub(crate) fn lock_shared(path: std::path::PathBuf) -> anyhow::Result<Self> {
45+
let file = Self::open_lock_file(&path)?;
46+
file.lock_shared()
47+
.with_context(|| format!("Failed to acquire shared lock on {:?}", path))?;
48+
Ok(Self { path, _file: file })
49+
}
50+
51+
fn open_lock_file(path: &std::path::Path) -> anyhow::Result<std::fs::File> {
52+
let lock_path = path.join(".lock");
53+
54+
// Ensure the directory exists.
55+
if let Some(parent) = lock_path.parent() {
56+
std::fs::create_dir_all(parent).with_context(|| {
57+
format!("Failed to create directory for lock file: {:?}", parent)
58+
})?;
59+
}
60+
// If the lock file already exists, we open it in read-only mode.
61+
// This prevents failures if the file is read-only (e.g., after
62+
// making the toolchain directory read-only), while still allowing
63+
// us to acquire shared and exclusive locks on the file descriptor.
64+
if lock_path.exists() {
65+
std::fs::OpenOptions::new()
66+
.read(true)
67+
.open(&lock_path)
68+
.with_context(|| format!("Failed to open lock file at {:?}", lock_path))
69+
} else {
70+
std::fs::OpenOptions::new()
71+
.read(true)
72+
.write(true)
73+
.create(true)
74+
.open(&lock_path)
75+
.with_context(|| format!("Failed to create lock file at {:?}", lock_path))
76+
}
77+
}
78+
}
79+
80+
/// Walks a directory recursively and replaces string patterns inside `.trace`
81+
/// files. This is used to patch non-portable paths generated by Lake.
82+
pub(crate) fn patch_trace_files(
83+
dir: &std::path::Path,
84+
replacements: &[(&str, &str)],
85+
) -> anyhow::Result<()> {
86+
if dir.exists() {
87+
let walker = walkdir::WalkDir::new(dir).into_iter();
88+
for entry in walker {
89+
let entry = entry.context("Failed to walk directory for trace patching")?;
90+
let path = entry.path();
91+
if path.is_file() && path.extension().map_or(false, |ext| ext == "trace") {
92+
let content = std::fs::read_to_string(path)
93+
.with_context(|| format!("Failed to read trace file {:?}", path))?;
94+
let mut new_content = content.clone();
95+
for (from, to) in replacements {
96+
new_content = new_content.replace(from, to);
97+
}
98+
if new_content != content {
99+
std::fs::write(path, new_content)
100+
.with_context(|| format!("Failed to write trace file {:?}", path))?;
101+
}
102+
}
103+
}
104+
}
105+
Ok(())
106+
}
107+
108+
/// Prepends a path to an existing environment variable,
109+
/// separating them with a colon if the variable is not empty. This contains the
110+
/// pure string formatting logic.
111+
fn prepend_to_env_var_impl(
112+
current_val: std::ffi::OsString,
113+
new_path: &std::path::Path,
114+
) -> std::ffi::OsString {
115+
let mut combined = new_path.to_path_buf().into_os_string();
116+
if !current_val.is_empty() {
117+
combined.push(":");
118+
combined.push(current_val);
119+
}
120+
combined
121+
}
122+
123+
/// Prepends a path to an existing environment variable in the active process
124+
/// environment, separating them with a colon if the variable is not empty.
125+
/// This is used to inject our managed Rust toolchain paths before the system paths.
126+
pub(crate) fn prepend_to_env_var(var_name: &str, new_path: &std::path::Path) -> std::ffi::OsString {
127+
let current_val = std::env::var_os(var_name).unwrap_or_default();
128+
prepend_to_env_var_impl(current_val, new_path)
129+
}
130+
131+
pub(crate) struct ProcessOutput {
132+
pub status: std::process::ExitStatus,
133+
pub stderr_lines: Vec<String>,
134+
}
135+
136+
/// Spawns a child process, drains its stderr in a background thread, and processes
137+
/// its stdout line-by-line in the main thread while showing a progress spinner.
138+
pub(crate) fn run_command_with_progress<F>(
139+
mut cmd: std::process::Command,
140+
pb: Option<indicatif::ProgressBar>,
141+
mut process_stdout_line: F,
142+
) -> anyhow::Result<ProcessOutput>
143+
where
144+
F: FnMut(&str, Option<&indicatif::ProgressBar>) -> anyhow::Result<()>,
145+
{
146+
cmd.stdout(std::process::Stdio::piped());
147+
cmd.stderr(std::process::Stdio::piped());
148+
149+
let mut child = cmd.spawn().context("Failed to spawn child process")?;
150+
151+
let stderr_buffer = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
152+
let stderr_buffer_clone = std::sync::Arc::clone(&stderr_buffer);
153+
154+
let mut stderr_thread = None;
155+
if let Some(stderr) = child.stderr.take() {
156+
stderr_thread = Some(std::thread::spawn(move || {
157+
let reader = std::io::BufReader::new(stderr);
158+
for line in reader.lines().map_while(Result::ok) {
159+
stderr_buffer_clone.lock().unwrap().push(line);
160+
}
161+
}));
162+
}
163+
164+
if let Some(ref p) = pb {
165+
p.enable_steady_tick(std::time::Duration::from_millis(100));
166+
}
167+
168+
if let Some(stdout) = child.stdout.take() {
169+
let reader = std::io::BufReader::new(stdout);
170+
for line in reader.lines().map_while(Result::ok) {
171+
process_stdout_line(&line, pb.as_ref())?;
172+
if let Some(ref p) = pb {
173+
p.tick();
174+
}
175+
}
176+
}
177+
178+
if let Some(ref p) = pb {
179+
p.finish_and_clear();
180+
}
181+
182+
let status = child.wait().context("Failed to wait for child process")?;
183+
184+
if let Some(thread) = stderr_thread {
185+
let _ = thread.join();
186+
}
187+
188+
let stderr_lines = std::sync::Arc::try_unwrap(stderr_buffer).unwrap().into_inner().unwrap();
189+
190+
Ok(ProcessOutput { status, stderr_lines })
191+
}
192+
193+
#[cfg(feature = "exocrate_tests")]
194+
pub(crate) fn run_test_lock_helper(
195+
role: &str,
196+
lock_dir: &std::path::Path,
197+
log_file: &std::path::Path,
198+
sig_file: &std::path::Path,
199+
) -> anyhow::Result<()> {
200+
use std::io::Write as _;
201+
202+
let append_log = |msg: &str| -> anyhow::Result<()> {
203+
let mut file = std::fs::OpenOptions::new().create(true).append(true).open(log_file)?;
204+
writeln!(file, "{}", msg)?;
205+
Ok(())
206+
};
207+
208+
let wait_for_sig = || -> anyhow::Result<()> {
209+
let start = std::time::Instant::now();
210+
while !sig_file.exists() {
211+
if start.elapsed() > std::time::Duration::from_secs(3) {
212+
anyhow::bail!("Timeout waiting for signal file {:?}", sig_file);
213+
}
214+
std::thread::sleep(std::time::Duration::from_millis(50));
215+
}
216+
Ok(())
217+
};
218+
219+
match role {
220+
"reader-a" => {
221+
let _lock = DirLock::lock_shared(lock_dir.to_path_buf())?;
222+
append_log("SHARED_START_A")?;
223+
wait_for_sig()?;
224+
append_log("SHARED_END_A")?;
225+
}
226+
"reader-b" => {
227+
let _lock = DirLock::lock_shared(lock_dir.to_path_buf())?;
228+
append_log("SHARED_START_B")?;
229+
std::fs::write(sig_file, "")?;
230+
append_log("SHARED_END_B")?;
231+
}
232+
"writer-a" => {
233+
let _lock = DirLock::lock_exclusive(lock_dir.to_path_buf())?;
234+
append_log("EXCLUSIVE_START_A")?;
235+
wait_for_sig()?;
236+
append_log("EXCLUSIVE_END_A")?;
237+
}
238+
"reader-exclusion" => {
239+
std::fs::write(sig_file, "")?;
240+
let _lock = DirLock::lock_shared(lock_dir.to_path_buf())?;
241+
append_log("SHARED_START_B")?;
242+
append_log("SHARED_END_B")?;
243+
}
244+
_ => anyhow::bail!("Unknown test-lock-helper role: {}", role),
245+
}
246+
247+
Ok(())
248+
}
249+
250+
#[cfg(test)]
251+
#[macro_export]
252+
macro_rules! workspace_fixture {
253+
($dir:expr, { $($path:expr => $content:expr),* $(,)? }) => {{
254+
let root = $dir.path();
255+
$(
256+
let file_path = root.join($path);
257+
if let Some(parent) = file_path.parent() {
258+
std::fs::create_dir_all(parent).unwrap();
259+
}
260+
std::fs::write(&file_path, $content).unwrap();
261+
)*
262+
}};
263+
}
264+
265+
#[cfg(test)]
266+
mod tests {
267+
use super::*;
268+
269+
#[test]
270+
fn test_prepend_to_env_var() {
271+
// Test when current value is empty.
272+
let path1 = std::path::Path::new("/path/one");
273+
let res1 = prepend_to_env_var_impl(std::ffi::OsString::new(), path1);
274+
assert_eq!(res1, "/path/one");
275+
276+
// Test when current value is not empty.
277+
let path2 = std::path::Path::new("/path/two");
278+
let res2 = prepend_to_env_var_impl(res1, path2);
279+
assert_eq!(res2, "/path/two:/path/one");
280+
}
281+
282+
#[test]
283+
fn test_dir_lock_exclusive_mutual_exclusion() {
284+
let temp_dir = tempfile::tempdir().unwrap();
285+
let lock_path = temp_dir.path().to_path_buf();
286+
287+
let barrier = std::sync::Arc::new(std::sync::Barrier::new(2));
288+
let barrier_clone = std::sync::Arc::clone(&barrier);
289+
let lock_path_clone = lock_path.clone();
290+
291+
let lock_released = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
292+
let lock_released_clone = std::sync::Arc::clone(&lock_released);
293+
294+
// Thread A acquires the lock.
295+
let thread_a = std::thread::spawn(move || {
296+
let _lock = DirLock::lock_exclusive(lock_path_clone).expect("Failed to lock exclusive");
297+
barrier_clone.wait(); // Signal Thread B that A holds the lock.
298+
299+
// Simulate brief work holding the lock.
300+
std::thread::sleep(std::time::Duration::from_millis(100));
301+
lock_released_clone.store(true, std::sync::atomic::Ordering::Relaxed);
302+
// _lock drops here, releasing the lock.
303+
});
304+
305+
// Thread B waits for Thread A to acquire the lock, then tries to acquire it itself.
306+
let thread_b = std::thread::spawn(move || {
307+
barrier.wait(); // Wait for Thread A to acquire lock.
308+
309+
// Attempt to acquire lock. This should block until Thread A releases it.
310+
let _lock = DirLock::lock_exclusive(lock_path).expect("Failed to lock exclusive in B");
311+
312+
// Assert that B only successfully locked the directory AFTER A released it.
313+
assert!(
314+
lock_released.load(std::sync::atomic::Ordering::Relaxed),
315+
"Thread B acquired lock before Thread A released it!"
316+
);
317+
});
318+
319+
thread_a.join().unwrap();
320+
thread_b.join().unwrap();
321+
}
322+
323+
#[test]
324+
fn test_dir_lock_shared_coexistence() {
325+
let temp_dir = tempfile::tempdir().unwrap();
326+
let lock_path = temp_dir.path().to_path_buf();
327+
328+
// Thread A acquires shared lock.
329+
let lock_a = DirLock::lock_shared(lock_path.clone()).expect("Failed to lock shared");
330+
331+
// Thread B should be able to acquire shared lock immediately without blocking.
332+
let lock_b = DirLock::lock_shared(lock_path).expect("Failed to lock shared concurrently");
333+
334+
// Both locks are held.
335+
drop(lock_a);
336+
drop(lock_b);
337+
}
338+
}

0 commit comments

Comments
 (0)