Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 115 additions & 25 deletions nativelink-worker/src/running_actions_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1366,34 +1366,124 @@ impl RunningActionImpl {
.err_tip(|| format!("Uploading directory {}", full_path.display()))?,
))
} else if metadata.is_symlink() {
let output_symlink = upload_symlink(&full_path, work_directory)
.await
.map(|mut symlink_info| {
symlink_info.name_or_path = NameOrPath::Path(entry);
symlink_info
})
.err_tip(|| format!("Uploading symlink {}", full_path.display()))?;
match fs::metadata(&full_path).await {
Ok(metadata) => {
if metadata.is_dir() {
Ok(OutputType::DirectorySymlink(output_symlink))
} else {
// Note: If it's anything but directory we put it as a file symlink.
Ok(OutputType::FileSymlink(output_symlink))
// Resolve the symlink to determine what it points to.
// Symlinks created by DirectoryCache (absolute paths into
// the cache directory) must NOT be uploaded as symlinks —
// the target path is worker-local and meaningless to the
// client. Instead, follow the symlink and upload the
// resolved content (file or directory).
let target = fs::read_link(&full_path).await.err_tip(|| {
format!("Reading symlink target for {}", full_path.display())
})?;
let is_absolute_symlink = Path::new(&target).is_absolute();

if is_absolute_symlink {
// Absolute symlink — resolve and upload contents.
match fs::metadata(&full_path).await {
Ok(resolved_meta) => {
if resolved_meta.is_dir() {
// Upload as directory (Tree proto).
Ok(OutputType::Directory(
upload_directory(
cas_store.as_pin(),
&full_path,
work_directory,
hasher,
digest_uploaders,
)
.and_then(|(root_dir, children)| async move {
let tree = ProtoTree {
root: Some(root_dir),
children: children.into(),
};
let tree_digest = serialize_and_upload_message(
&tree,
cas_store.as_pin(),
&mut hasher.hasher(),
)
.await
.err_tip(|| format!("While processing {entry}"))?;
Ok(DirectoryInfo {
path: entry,
tree_digest,
})
})
.await
.err_tip(|| {
format!(
"Uploading symlinked directory {}",
full_path.display()
)
})?,
))
} else {
// Upload as file (follow symlink).
Ok(OutputType::File(
upload_file(
cas_store.as_pin(),
&full_path,
hasher,
resolved_meta,
digest_uploaders,
)
.await
.map(|mut file_info| {
file_info.name_or_path = NameOrPath::Path(entry);
file_info
})
.err_tip(|| {
format!(
"Uploading symlinked file {}",
full_path.display()
)
})?,
))
}
}
Err(e) => {
if e.code != Code::NotFound {
return Err(e).err_tip(|| {
format!(
"While resolving absolute symlink {}",
full_path.display()
)
});
}
Ok(OutputType::None)
}
}
Err(e) => {
if e.code != Code::NotFound {
return Err(e).err_tip(|| {
format!(
"While querying target symlink metadata for {}",
full_path.display()
)
});
} else {
// Relative symlink — action intentionally created it.
// Upload as a proper symlink.
let output_symlink = upload_symlink(&full_path, work_directory)
.await
.map(|mut symlink_info| {
symlink_info.name_or_path = NameOrPath::Path(entry);
symlink_info
})
.err_tip(|| format!("Uploading symlink {}", full_path.display()))?;
match fs::metadata(&full_path).await {
Ok(metadata) => {
if metadata.is_dir() {
Ok(OutputType::DirectorySymlink(output_symlink))
} else {
// Note: If it's anything but directory we put it as a file symlink.
Ok(OutputType::FileSymlink(output_symlink))
}
}
Err(e) => {
if e.code != Code::NotFound {
return Err(e).err_tip(|| {
format!(
"While querying target symlink metadata for {}",
full_path.display()
)
});
}
// If the file doesn't exist, we consider it a file. Even though the
// file doesn't exist we still need to populate an entry.
Ok(OutputType::FileSymlink(output_symlink))
}
// If the file doesn't exist, we consider it a file. Even though the
// file doesn't exist we still need to populate an entry.
Ok(OutputType::FileSymlink(output_symlink))
}
}
} else {
Expand Down
158 changes: 154 additions & 4 deletions nativelink-worker/tests/running_actions_manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1087,11 +1087,13 @@ mod tests {
"echo foo > dir1/file && ",
"touch dir1/file2 && ",
"ln -s ../file dir1/dir2/sym &&",
"ln -s /dev/null empty_sym",
Comment thread
palfrey marked this conversation as resolved.
// Relative symlink — preserved as a symlink in the
// ActionResult (absolute symlinks are now resolved).
"ln -s dir1/file rel_sym",
)
.to_string(),
],
output_paths: vec!["dir1".to_string(), "empty_sym".to_string()],
output_paths: vec!["dir1".to_string(), "rel_sym".to_string()],
working_directory: ".".to_string(),
environment_variables: vec![EnvironmentVariable {
name: "PATH".to_string(),
Expand Down Expand Up @@ -1225,8 +1227,8 @@ mod tests {
)?,
}],
output_file_symlinks: vec![SymlinkInfo {
name_or_path: NameOrPath::Path("empty_sym".to_string()),
target: "/dev/null".to_string(),
name_or_path: NameOrPath::Path("rel_sym".to_string()),
target: "dir1/file".to_string(),
}],
output_directory_symlinks: vec![],
server_logs: HashMap::new(),
Expand All @@ -1249,6 +1251,154 @@ mod tests {
Ok(())
}

// Windows does not support symlinks.
Comment thread
erneestoc marked this conversation as resolved.
Outdated
#[cfg(not(target_family = "windows"))]
#[nativelink_test]
async fn upload_absolute_symlink_resolves_contents() -> Result<(), Box<dyn core::error::Error>>
{
// Regression test: when an action produces an absolute symlink as one
// of its declared outputs (for example because the work directory was
// populated via DirectoryCache and contains absolute symlinks into the
// cache), uploading the literal symlink target yields a path that is
// meaningless on the client. The worker must resolve absolute
// symlinks and upload the underlying file/directory contents.
const WORKER_ID: &str = "foo_worker_id";

fn test_monotonic_clock() -> SystemTime {
static CLOCK: AtomicU64 = AtomicU64::new(0);
monotonic_clock(&CLOCK)
}

let (_, slow_store, cas_store, ac_store) = setup_stores().await?;
let root_action_directory = make_temp_path("root_action_directory");
fs::create_dir_all(&root_action_directory).await?;

// Create an out-of-tree file whose absolute path the action will
// symlink into the work directory.
let external_root = make_temp_path("external_payload");
fs::create_dir_all(&external_root).await?;
let external_file = format!("{external_root}/payload.txt");
tokio::fs::write(&external_file, b"hello-from-outside").await?;

let running_actions_manager = Arc::new(RunningActionsManagerImpl::new_with_callbacks(
RunningActionsManagerArgs {
root_action_directory,
execution_configuration: ExecutionConfiguration::default(),
cas_store: cas_store.clone(),
ac_store: Some(Store::new(ac_store.clone())),
historical_store: Store::new(cas_store.clone()),
upload_action_result_config: &UploadActionResultConfig {
upload_ac_results_strategy: UploadCacheResultsStrategy::Never,
..Default::default()
},
max_action_timeout: Duration::MAX,
max_upload_timeout: Duration::from_secs(DEFAULT_MAX_UPLOAD_TIMEOUT),
timeout_handled_externally: false,
directory_cache: None,
#[cfg(target_os = "linux")]
use_namespaces: use_namespaces(),
},
Callbacks {
now_fn: test_monotonic_clock,
sleep_fn: |_duration| Box::pin(future::pending()),
},
)?);
let queued_timestamp = make_system_time(1000);
let action_result = {
let command = Command {
arguments: vec![
"sh".to_string(),
"-c".to_string(),
format!("ln -s {external_file} resolved_output"),
],
output_paths: vec!["resolved_output".to_string()],
working_directory: ".".to_string(),
environment_variables: vec![EnvironmentVariable {
name: "PATH".to_string(),
value: env::var("PATH").unwrap(),
}],
..Default::default()
};
let command_digest = serialize_and_upload_message(
&command,
cas_store.as_pin(),
&mut DigestHasherFunc::Sha256.hasher(),
)
.await?;
let input_root_digest = serialize_and_upload_message(
&Directory::default(),
cas_store.as_pin(),
&mut DigestHasherFunc::Sha256.hasher(),
)
.await?;
let action = Action {
command_digest: Some(command_digest.into()),
input_root_digest: Some(input_root_digest.into()),
..Default::default()
};
let action_digest = serialize_and_upload_message(
&action,
cas_store.as_pin(),
&mut DigestHasherFunc::Sha256.hasher(),
)
.await?;

let execute_request = ExecuteRequest {
action_digest: Some(action_digest.into()),
..Default::default()
};
let operation_id = OperationId::default().to_string();

let running_action_impl = running_actions_manager
.create_and_add_action(
WORKER_ID.to_string(),
StartExecute {
execute_request: Some(execute_request),
operation_id,
queued_timestamp: Some(queued_timestamp.into()),
platform: action.platform.clone(),
worker_id: WORKER_ID.to_string(),
},
)
.await?;

run_action(running_action_impl.clone()).await?
};

// With the fix, the absolute symlink should be resolved and the
// payload uploaded as a regular file with the payload's content
// hash. The output_file_symlinks list must be empty.
assert_eq!(
action_result.output_file_symlinks.len(),
0,
"absolute symlink should be resolved to file, not uploaded as symlink"
);
assert_eq!(
action_result.output_directory_symlinks.len(),
0,
"no directory symlinks expected"
);
assert_eq!(
action_result.output_files.len(),
1,
"absolute symlink should appear as an uploaded file"
);
let uploaded = &action_result.output_files[0];
assert_eq!(
uploaded.name_or_path,
NameOrPath::Path("resolved_output".to_string())
);
assert_eq!(
usize::try_from(uploaded.digest.size_bytes())?,
b"hello-from-outside".len()
);
// Verify the blob actually landed in CAS by re-reading it.
let key: nativelink_util::store_trait::StoreKey<'_> = uploaded.digest.into();
let blob = slow_store.as_ref().get_part_unchunked(key, 0, None).await?;
assert_eq!(blob.as_ref(), b"hello-from-outside");
Ok(())
}

#[nativelink_test]
async fn cleanup_happens_on_job_failure() -> Result<(), Box<dyn core::error::Error>> {
const WORKER_ID: &str = "foo_worker_id";
Expand Down