Skip to content

Commit 097743b

Browse files
erneestocclaude
andcommitted
worker: parallelize directory-cache construction with bounded concurrency
`construct_directory` materialized a directory level strictly serially: `for file { create_file().await }` then `for dir { recurse().await }`. Each file is an independent CAS-blob hardlink (or copy) and each subdirectory an independent subtree, so they can be built concurrently. Build one future per child — file, subdirectory, symlink — into an explicit `Vec`, then drive them through `buffer_unordered(CONSTRUCT_CONCURRENCY)`. `try_fold` sums each child's byte contribution into the tree size and short-circuits on the first error, preserving the existing error semantics. The fan-out is bounded (64, matching `download_to_directory`'s `DOWNLOAD_TO_DIRECTORY_CONCURRENCY`) — never unbounded: thousands of parallel `hardlink(2)` calls contend on APFS's per-volume metadata lock and regress below serial throughput. The futures are collected into a `Vec` rather than built inline in the stream combinator, which avoids the higher-ranked lifetime inference failure that `StoreKey<'_>` triggers in that position. This commit is intentionally isolated as the final change on the branch so HEAD (1+2+5+3) can be A/B benchmarked against HEAD~1 (1+2+5) on a real workload. No correctness behavior changes versus HEAD~1: the materialized tree, file modes, size accounting, and hit/miss return are all identical — only the construction order is now concurrent. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 33d5b1b commit 097743b

1 file changed

Lines changed: 144 additions & 14 deletions

File tree

nativelink-worker/src/directory_cache.rs

Lines changed: 144 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use std::path::{Path, PathBuf};
2020
use std::sync::Arc;
2121
use std::time::SystemTime;
2222

23+
use futures::stream::{StreamExt, TryStreamExt};
2324
use nativelink_error::{Code, Error, ResultExt, make_err};
2425
use nativelink_proto::build::bazel::remote::execution::v2::{
2526
Directory as ProtoDirectory, DirectoryNode, FileNode, SymlinkNode,
@@ -36,6 +37,20 @@ use tokio::fs;
3637
use tokio::sync::{Mutex, RwLock};
3738
use tracing::{debug, trace, warn};
3839

40+
/// Maximum number of file/subdirectory materialization futures polled
41+
/// concurrently while constructing one directory level of a cache entry.
42+
///
43+
/// Matches `download_to_directory`'s `DOWNLOAD_TO_DIRECTORY_CONCURRENCY`: an
44+
/// unbounded fan-out produced thousands of parallel `hardlink(2)` calls
45+
/// fighting APFS's per-volume metadata lock and regressed throughput versus
46+
/// serial. 64 is the empirically chosen ceiling.
47+
const CONSTRUCT_CONCURRENCY: usize = 64;
48+
49+
/// A boxed, `Send` future yielding the byte-size a child contributes to a
50+
/// cache-entry tree. Used to fan out file/subdir/symlink materialization
51+
/// inside `construct_directory` with bounded concurrency.
52+
type SizedFuture<'f> = Pin<Box<dyn Future<Output = Result<u64, Error>> + Send + 'f>>;
53+
3954
/// Configuration for the directory cache
4055
#[derive(Debug, Clone)]
4156
pub struct DirectoryCacheConfig {
@@ -349,6 +364,9 @@ impl DirectoryCache {
349364
///
350365
/// Each directory's final mode (0o755) is set at creation time, so no
351366
/// separate recursive permission pass is needed after construction.
367+
///
368+
/// File creation and subdirectory recursion within one directory level run
369+
/// concurrently, bounded to `CONSTRUCT_CONCURRENCY` in-flight futures.
352370
fn construct_directory<'a>(
353371
&'a self,
354372
digest: DigestInfo,
@@ -368,27 +386,55 @@ impl DirectoryCache {
368386
// (umask-independent) — no post-construction permission walk.
369387
self.create_dir_writable(dest_path).await?;
370388

371-
let mut total_size: u64 = 0;
389+
// Build one future per child (file / subdirectory / symlink) and
390+
// run them with bounded concurrency. Each future yields the bytes
391+
// it contributes to the tree size. The futures are collected into
392+
// an explicit `Vec` rather than built inline inside the stream
393+
// combinator: building per-key futures via `.iter().map(closure)`
394+
// inside `buffer_unordered` trips higher-ranked-lifetime inference
395+
// because `create_file` handles `StoreKey<'_>` internally.
396+
let mut futures: Vec<SizedFuture<'a>> = Vec::with_capacity(
397+
directory.files.len() + directory.directories.len() + directory.symlinks.len(),
398+
);
372399

373-
// Process files
374-
for file in &directory.files {
375-
self.create_file(dest_path, file).await?;
376-
if let Some(file_digest) = &file.digest {
377-
// size_bytes is non-negative; clamp defensively.
378-
total_size += u64::try_from(file_digest.size_bytes).unwrap_or(0);
379-
}
400+
let dest_path_buf = dest_path.to_path_buf();
401+
402+
for file in directory.files {
403+
// size_bytes is non-negative; clamp defensively.
404+
let file_size = file
405+
.digest
406+
.as_ref()
407+
.map_or(0, |d| u64::try_from(d.size_bytes).unwrap_or(0));
408+
let parent = dest_path_buf.clone();
409+
futures.push(Box::pin(async move {
410+
self.create_file(&parent, &file).await?;
411+
Ok(file_size)
412+
}));
380413
}
381414

382-
// Process subdirectories recursively
383-
for dir_node in &directory.directories {
384-
total_size += self.create_subdirectory(dest_path, dir_node).await?;
415+
for dir_node in directory.directories {
416+
let parent = dest_path_buf.clone();
417+
futures.push(Box::pin(async move {
418+
self.create_subdirectory(&parent, &dir_node).await
419+
}));
385420
}
386421

387-
// Process symlinks
388-
for symlink in &directory.symlinks {
389-
self.create_symlink(dest_path, symlink).await?;
422+
for symlink in directory.symlinks {
423+
let parent = dest_path_buf.clone();
424+
futures.push(Box::pin(async move {
425+
self.create_symlink(&parent, &symlink).await?;
426+
Ok(0u64)
427+
}));
390428
}
391429

430+
// Bounded fan-out: at most CONSTRUCT_CONCURRENCY futures polled at
431+
// once. `try_fold` sums the per-child sizes and short-circuits on
432+
// the first error.
433+
let total_size = futures::stream::iter(futures)
434+
.buffer_unordered(CONSTRUCT_CONCURRENCY)
435+
.try_fold(0u64, |acc, child_size| async move { Ok(acc + child_size) })
436+
.await?;
437+
392438
Ok(total_size)
393439
})
394440
}
@@ -1431,4 +1477,88 @@ mod tests {
14311477

14321478
Ok(())
14331479
}
1480+
1481+
/// OPT #3: a wide tree (many files at one level, plus a nested subdir)
1482+
/// must materialize byte-identically when its children are constructed
1483+
/// with bounded concurrency. Exercises the `buffer_unordered` fan-out.
1484+
#[nativelink_test(flavor = "multi_thread", worker_threads = 4)]
1485+
async fn test_parallel_construct_wide_tree() -> Result<(), Error> {
1486+
let temp_dir = TempDir::new().unwrap();
1487+
let cache_root = temp_dir.path().join("cache");
1488+
let (cas_store, slow_store) = make_fast_slow_store(&temp_dir).await;
1489+
1490+
// 100 distinct files at the root.
1491+
const FILE_COUNT: usize = 100;
1492+
let mut root_files = Vec::with_capacity(FILE_COUNT);
1493+
let mut expected = Vec::with_capacity(FILE_COUNT);
1494+
for i in 0..FILE_COUNT {
1495+
let content = format!("content-of-file-{i}").into_bytes();
1496+
// Tags 100.. avoid colliding with other tests' fixed tags.
1497+
let tag = 100u8.wrapping_add(u8::try_from(i).unwrap());
1498+
let digest = upload_blob(&slow_store, tag, &content).await;
1499+
root_files.push(FileNode {
1500+
name: format!("file_{i}.txt"),
1501+
digest: Some(digest.into()),
1502+
is_executable: false,
1503+
..Default::default()
1504+
});
1505+
expected.push((format!("file_{i}.txt"), content));
1506+
}
1507+
1508+
// One nested subdir with its own file.
1509+
let nested_content = b"nested-data".to_vec();
1510+
let nested_digest = upload_blob(&slow_store, 250, &nested_content).await;
1511+
let sub = ProtoDirectory {
1512+
files: vec![FileNode {
1513+
name: "deep.txt".to_string(),
1514+
digest: Some(nested_digest.into()),
1515+
is_executable: false,
1516+
..Default::default()
1517+
}],
1518+
..Default::default()
1519+
};
1520+
let mut sub_data = Vec::new();
1521+
sub.encode(&mut sub_data).unwrap();
1522+
let sub_digest = upload_blob(&slow_store, 251, &sub_data).await;
1523+
1524+
let root = ProtoDirectory {
1525+
files: root_files,
1526+
directories: vec![DirectoryNode {
1527+
name: "sub".to_string(),
1528+
digest: Some(sub_digest.into()),
1529+
}],
1530+
..Default::default()
1531+
};
1532+
let mut root_data = Vec::new();
1533+
root.encode(&mut root_data).unwrap();
1534+
let root_digest = upload_blob(&slow_store, 252, &root_data).await;
1535+
1536+
let config = DirectoryCacheConfig {
1537+
max_entries: 10,
1538+
max_size_bytes: 16 * 1024 * 1024,
1539+
cache_root,
1540+
};
1541+
let cache = DirectoryCache::new(config, cas_store).await?;
1542+
1543+
let dest = temp_dir.path().join("dest");
1544+
let hit = cache.get_or_create(root_digest, &dest).await?;
1545+
assert!(!hit, "first construction is a miss");
1546+
1547+
// Every one of the 100 root files must be present and byte-identical.
1548+
for (name, content) in &expected {
1549+
assert_eq!(
1550+
&fs::read(dest.join(name)).await?,
1551+
content,
1552+
"parallel-constructed file {name} must be byte-identical"
1553+
);
1554+
}
1555+
// The nested subdir's file too.
1556+
assert_eq!(
1557+
fs::read(dest.join("sub").join("deep.txt")).await?,
1558+
nested_content,
1559+
"nested file must be byte-identical"
1560+
);
1561+
1562+
Ok(())
1563+
}
14341564
}

0 commit comments

Comments
 (0)