Skip to content

Commit 47602d1

Browse files
Fix directory collision on action retries by waiting for cleanup and removing stales (TraceMachina#1868)
1 parent 6aaee38 commit 47602d1

2 files changed

Lines changed: 413 additions & 10 deletions

File tree

nativelink-worker/src/running_actions_manager.rs

Lines changed: 161 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,14 @@ use core::pin::Pin;
1919
use core::sync::atomic::{AtomicBool, Ordering};
2020
use core::time::Duration;
2121
use std::borrow::Cow;
22-
use std::collections::HashMap;
2322
use std::collections::vec_deque::VecDeque;
23+
use std::collections::{HashMap, HashSet};
2424
use std::ffi::{OsStr, OsString};
2525
#[cfg(target_family = "unix")]
2626
use std::fs::Permissions;
2727
#[cfg(target_family = "unix")]
2828
use std::os::unix::fs::{MetadataExt, PermissionsExt};
29-
use std::path::Path;
29+
use std::path::{Path, PathBuf};
3030
use std::process::Stdio;
3131
use std::sync::{Arc, Weak};
3232
use std::time::SystemTime;
@@ -74,10 +74,11 @@ use scopeguard::{ScopeGuard, guard};
7474
use serde::Deserialize;
7575
use tokio::io::{AsyncReadExt, AsyncSeekExt};
7676
use tokio::process;
77-
use tokio::sync::{oneshot, watch};
77+
use tokio::sync::{Notify, oneshot, watch};
78+
use tokio::time::Instant;
7879
use tokio_stream::wrappers::ReadDirStream;
7980
use tonic::Request;
80-
use tracing::{debug, error, info, warn};
81+
use tracing::{debug, error, info, trace, warn};
8182
use uuid::Uuid;
8283

8384
/// For simplicity we use a fixed exit code for cases when our program is terminated
@@ -527,20 +528,38 @@ async fn do_cleanup(
527528
operation_id: &OperationId,
528529
action_directory: &str,
529530
) -> Result<(), Error> {
531+
// Mark this operation as being cleaned up
532+
{
533+
let mut cleaning = running_actions_manager.cleaning_up_operations.lock();
534+
cleaning.insert(operation_id.clone());
535+
}
536+
530537
debug!("Worker cleaning up");
531538
// Note: We need to be careful to keep trying to cleanup even if one of the steps fails.
532539
let remove_dir_result = fs::remove_dir_all(action_directory)
533540
.await
534541
.err_tip(|| format!("Could not remove working directory {action_directory}"));
535-
if let Err(err) = running_actions_manager.cleanup_action(operation_id) {
542+
543+
let cleanup_result = if let Err(err) = running_actions_manager.cleanup_action(operation_id) {
536544
error!(?operation_id, ?err, "Error cleaning up action");
537-
return Result::<(), Error>::Err(err).merge(remove_dir_result);
538-
}
539-
if let Err(err) = remove_dir_result {
545+
Result::<(), Error>::Err(err).merge(remove_dir_result)
546+
} else if let Err(err) = remove_dir_result {
540547
error!(?operation_id, ?err, "Error removing working directory");
541-
return Err(err);
548+
Err(err)
549+
} else {
550+
Ok(())
551+
};
552+
553+
// Remove from cleaning set and notify waiters
554+
{
555+
let mut cleaning = running_actions_manager.cleaning_up_operations.lock();
556+
cleaning.remove(operation_id);
542557
}
543-
Ok(())
558+
running_actions_manager
559+
.cleanup_complete_notify
560+
.notify_waiters();
561+
562+
cleanup_result
544563
}
545564

546565
pub trait RunningAction: Sync + Send + Sized + Unpin + 'static {
@@ -1655,9 +1674,22 @@ pub struct RunningActionsManagerImpl {
16551674
action_done_tx: watch::Sender<()>,
16561675
callbacks: Callbacks,
16571676
metrics: Arc<Metrics>,
1677+
/// Track operations being cleaned up to avoid directory collisions during action retries.
1678+
/// When an action fails and is retried on the same worker, we need to ensure the previous
1679+
/// attempt's directory is fully cleaned up before creating a new one.
1680+
/// See: <https://github.com/TraceMachina/nativelink/issues/1859>
1681+
cleaning_up_operations: Mutex<HashSet<OperationId>>,
1682+
/// Notify waiters when a cleanup operation completes. This is used in conjunction with
1683+
/// `cleaning_up_operations` to coordinate directory cleanup and creation.
1684+
cleanup_complete_notify: Arc<Notify>,
16581685
}
16591686

16601687
impl RunningActionsManagerImpl {
1688+
/// Maximum time to wait for a cleanup operation to complete before timing out.
1689+
/// TODO(marcussorealheis): Consider making cleanup wait timeout configurable in the future
1690+
const MAX_WAIT: Duration = Duration::from_secs(30);
1691+
/// Maximum backoff duration for exponential backoff when waiting for cleanup.
1692+
const MAX_BACKOFF: Duration = Duration::from_millis(500);
16611693
pub fn new_with_callbacks(
16621694
args: RunningActionsManagerArgs<'_>,
16631695
callbacks: Callbacks,
@@ -1690,6 +1722,8 @@ impl RunningActionsManagerImpl {
16901722
action_done_tx,
16911723
callbacks,
16921724
metrics: Arc::new(Metrics::default()),
1725+
cleaning_up_operations: Mutex::new(HashSet::new()),
1726+
cleanup_complete_notify: Arc::new(Notify::new()),
16931727
})
16941728
}
16951729

@@ -1703,6 +1737,105 @@ impl RunningActionsManagerImpl {
17031737
)
17041738
}
17051739

1740+
/// Fixes a race condition that occurs when an action fails to execute on a worker, and the same worker
1741+
/// attempts to re-execute the same action before the physical cleanup (file is removed) completes.
1742+
/// See this issue for additional details: <https://github.com/TraceMachina/nativelink/issues/1859>
1743+
async fn wait_for_cleanup_if_needed(&self, operation_id: &OperationId) -> Result<(), Error> {
1744+
let start = Instant::now();
1745+
let mut backoff = Duration::from_millis(10);
1746+
let mut has_waited = false;
1747+
1748+
loop {
1749+
let should_wait = {
1750+
let cleaning = self.cleaning_up_operations.lock();
1751+
cleaning.contains(operation_id)
1752+
};
1753+
1754+
if !should_wait {
1755+
let dir_path =
1756+
PathBuf::from(&self.root_action_directory).join(operation_id.to_string());
1757+
1758+
if !dir_path.exists() {
1759+
return Ok(());
1760+
}
1761+
1762+
// Safety check: ensure we're only removing directories under root_action_directory
1763+
let root_path = Path::new(&self.root_action_directory);
1764+
let canonical_root = root_path.canonicalize().err_tip(|| {
1765+
format!(
1766+
"Failed to canonicalize root directory: {}",
1767+
self.root_action_directory
1768+
)
1769+
})?;
1770+
let canonical_dir = dir_path.canonicalize().err_tip(|| {
1771+
format!("Failed to canonicalize directory: {}", dir_path.display())
1772+
})?;
1773+
1774+
if !canonical_dir.starts_with(&canonical_root) {
1775+
return Err(make_err!(
1776+
Code::Internal,
1777+
"Attempted to remove directory outside of root_action_directory: {}",
1778+
dir_path.display()
1779+
));
1780+
}
1781+
1782+
// Directory exists but not being cleaned - remove it
1783+
warn!(
1784+
"Removing stale directory for {}: {}",
1785+
operation_id,
1786+
dir_path.display()
1787+
);
1788+
self.metrics.stale_removals.inc();
1789+
1790+
// Try to remove the directory, with one retry on failure
1791+
let remove_result = fs::remove_dir_all(&dir_path).await;
1792+
if let Err(e) = remove_result {
1793+
// Retry once after a short delay in case the directory is temporarily locked
1794+
tokio::time::sleep(Duration::from_millis(100)).await;
1795+
fs::remove_dir_all(&dir_path).await.err_tip(|| {
1796+
format!(
1797+
"Failed to remove stale directory {} for retry of {} after retry (original error: {})",
1798+
dir_path.display(),
1799+
operation_id,
1800+
e
1801+
)
1802+
})?;
1803+
}
1804+
return Ok(());
1805+
}
1806+
1807+
if start.elapsed() > Self::MAX_WAIT {
1808+
self.metrics.cleanup_wait_timeouts.inc();
1809+
return Err(make_err!(
1810+
Code::DeadlineExceeded,
1811+
"Timeout waiting for previous operation cleanup: {} (waited {:?})",
1812+
operation_id,
1813+
start.elapsed()
1814+
));
1815+
}
1816+
1817+
if !has_waited {
1818+
self.metrics.cleanup_waits.inc();
1819+
has_waited = true;
1820+
}
1821+
1822+
trace!(
1823+
"Waiting for cleanup of {} (elapsed: {:?}, backoff: {:?})",
1824+
operation_id,
1825+
start.elapsed(),
1826+
backoff
1827+
);
1828+
1829+
tokio::select! {
1830+
() = self.cleanup_complete_notify.notified() => {},
1831+
() = tokio::time::sleep(backoff) => {
1832+
// Exponential backoff
1833+
backoff = (backoff * 2).min(Self::MAX_BACKOFF);
1834+
},
1835+
}
1836+
}
1837+
}
1838+
17061839
fn make_action_directory<'a>(
17071840
&'a self,
17081841
operation_id: &'a OperationId,
@@ -1800,6 +1933,8 @@ impl RunningActionsManager for RunningActionsManagerImpl {
18001933
?action_info,
18011934
"Worker received action",
18021935
);
1936+
// Wait for any previous cleanup to complete before creating directory
1937+
self.wait_for_cleanup_if_needed(&operation_id).await?;
18031938
let action_directory = self.make_action_directory(&operation_id).await?;
18041939
let execution_metadata = ExecutionMetadata {
18051940
worker: worker_id,
@@ -1836,6 +1971,16 @@ impl RunningActionsManager for RunningActionsManagerImpl {
18361971
));
18371972
{
18381973
let mut running_actions = self.running_actions.lock();
1974+
// Check if action already exists and is still alive
1975+
if let Some(existing_weak) = running_actions.get(&operation_id) {
1976+
if let Some(_existing_action) = existing_weak.upgrade() {
1977+
return Err(make_err!(
1978+
Code::AlreadyExists,
1979+
"Action with operation_id {} is already running",
1980+
operation_id
1981+
));
1982+
}
1983+
}
18391984
running_actions.insert(operation_id, Arc::downgrade(&running_action));
18401985
}
18411986
Ok(running_action)
@@ -1939,6 +2084,12 @@ pub struct Metrics {
19392084
cleanup: AsyncCounterWrapper,
19402085
#[metric(help = "Stats about the get_finished_result command.")]
19412086
get_finished_result: AsyncCounterWrapper,
2087+
#[metric(help = "Number of times an action waited for cleanup to complete.")]
2088+
cleanup_waits: CounterWithTime,
2089+
#[metric(help = "Number of stale directories removed during action retries.")]
2090+
stale_removals: CounterWithTime,
2091+
#[metric(help = "Number of timeouts while waiting for cleanup to complete.")]
2092+
cleanup_wait_timeouts: CounterWithTime,
19422093
#[metric(help = "Stats about the get_proto_command_from_store command.")]
19432094
get_proto_command_from_store: AsyncCounterWrapper,
19442095
#[metric(help = "Stats about the download_to_directory command.")]

0 commit comments

Comments
 (0)