Skip to content

Commit 342e4f4

Browse files
erneestocclaude
andcommitted
worker: drop parallel directory-cache construction (#3)
Reverts the bounded-concurrency construct on this POC branch. After #1 makes construct metadata-only, intra-tree parallelism is bounded by APFS metadata serialization; on a busy worker (inter-action concurrency already saturates the box) 64-wide spawn_blocking fan-out risks oversubscription and stealing cycles from the compiles. Keeping #1/#2/TraceMachina#5. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 158922f commit 342e4f4

1 file changed

Lines changed: 14 additions & 144 deletions

File tree

nativelink-worker/src/directory_cache.rs

Lines changed: 14 additions & 144 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ use std::path::{Path, PathBuf};
2020
use std::sync::Arc;
2121
use std::time::SystemTime;
2222

23-
use futures::stream::{StreamExt, TryStreamExt};
2423
use nativelink_error::{Code, Error, ResultExt, make_err};
2524
use nativelink_proto::build::bazel::remote::execution::v2::{
2625
Directory as ProtoDirectory, DirectoryNode, FileNode, SymlinkNode,
@@ -37,20 +36,6 @@ use tokio::fs;
3736
use tokio::sync::{Mutex, RwLock};
3837
use tracing::{debug, trace, warn};
3938

40-
/// Maximum number of file/subdirectory materialization futures polled
41-
/// concurrently while constructing one directory level of a cache entry.
42-
///
43-
/// Matches `download_to_directory`'s `DOWNLOAD_TO_DIRECTORY_CONCURRENCY`: an
44-
/// unbounded fan-out produced thousands of parallel `hardlink(2)` calls
45-
/// fighting APFS's per-volume metadata lock and regressed throughput versus
46-
/// serial. 64 is the empirically chosen ceiling.
47-
const CONSTRUCT_CONCURRENCY: usize = 64;
48-
49-
/// A boxed, `Send` future yielding the byte-size a child contributes to a
50-
/// cache-entry tree. Used to fan out file/subdir/symlink materialization
51-
/// inside `construct_directory` with bounded concurrency.
52-
type SizedFuture<'f> = Pin<Box<dyn Future<Output = Result<u64, Error>> + Send + 'f>>;
53-
5439
/// Configuration for the directory cache
5540
#[derive(Debug, Clone)]
5641
pub struct DirectoryCacheConfig {
@@ -364,9 +349,6 @@ impl DirectoryCache {
364349
///
365350
/// Each directory's final mode (0o755) is set at creation time, so no
366351
/// separate recursive permission pass is needed after construction.
367-
///
368-
/// File creation and subdirectory recursion within one directory level run
369-
/// concurrently, bounded to `CONSTRUCT_CONCURRENCY` in-flight futures.
370352
fn construct_directory<'a>(
371353
&'a self,
372354
digest: DigestInfo,
@@ -386,55 +368,27 @@ impl DirectoryCache {
386368
// (umask-independent) — no post-construction permission walk.
387369
self.create_dir_writable(dest_path).await?;
388370

389-
// Build one future per child (file / subdirectory / symlink) and
390-
// run them with bounded concurrency. Each future yields the bytes
391-
// it contributes to the tree size. The futures are collected into
392-
// an explicit `Vec` rather than built inline inside the stream
393-
// combinator: building per-key futures via `.iter().map(closure)`
394-
// inside `buffer_unordered` trips higher-ranked-lifetime inference
395-
// because `create_file` handles `StoreKey<'_>` internally.
396-
let mut futures: Vec<SizedFuture<'a>> = Vec::with_capacity(
397-
directory.files.len() + directory.directories.len() + directory.symlinks.len(),
398-
);
371+
let mut total_size: u64 = 0;
399372

400-
let dest_path_buf = dest_path.to_path_buf();
401-
402-
for file in directory.files {
403-
// size_bytes is non-negative; clamp defensively.
404-
let file_size = file
405-
.digest
406-
.as_ref()
407-
.map_or(0, |d| u64::try_from(d.size_bytes).unwrap_or(0));
408-
let parent = dest_path_buf.clone();
409-
futures.push(Box::pin(async move {
410-
self.create_file(&parent, &file).await?;
411-
Ok(file_size)
412-
}));
373+
// Process files
374+
for file in &directory.files {
375+
self.create_file(dest_path, file).await?;
376+
if let Some(file_digest) = &file.digest {
377+
// size_bytes is non-negative; clamp defensively.
378+
total_size += u64::try_from(file_digest.size_bytes).unwrap_or(0);
379+
}
413380
}
414381

415-
for dir_node in directory.directories {
416-
let parent = dest_path_buf.clone();
417-
futures.push(Box::pin(async move {
418-
self.create_subdirectory(&parent, &dir_node).await
419-
}));
382+
// Process subdirectories recursively
383+
for dir_node in &directory.directories {
384+
total_size += self.create_subdirectory(dest_path, dir_node).await?;
420385
}
421386

422-
for symlink in directory.symlinks {
423-
let parent = dest_path_buf.clone();
424-
futures.push(Box::pin(async move {
425-
self.create_symlink(&parent, &symlink).await?;
426-
Ok(0u64)
427-
}));
387+
// Process symlinks
388+
for symlink in &directory.symlinks {
389+
self.create_symlink(dest_path, symlink).await?;
428390
}
429391

430-
// Bounded fan-out: at most CONSTRUCT_CONCURRENCY futures polled at
431-
// once. `try_fold` sums the per-child sizes and short-circuits on
432-
// the first error.
433-
let total_size = futures::stream::iter(futures)
434-
.buffer_unordered(CONSTRUCT_CONCURRENCY)
435-
.try_fold(0u64, |acc, child_size| async move { Ok(acc + child_size) })
436-
.await?;
437-
438392
Ok(total_size)
439393
})
440394
}
@@ -1477,88 +1431,4 @@ mod tests {
14771431

14781432
Ok(())
14791433
}
1480-
1481-
/// OPT #3: a wide tree (many files at one level, plus a nested subdir)
1482-
/// must materialize byte-identically when its children are constructed
1483-
/// with bounded concurrency. Exercises the `buffer_unordered` fan-out.
1484-
#[nativelink_test(flavor = "multi_thread", worker_threads = 4)]
1485-
async fn test_parallel_construct_wide_tree() -> Result<(), Error> {
1486-
let temp_dir = TempDir::new().unwrap();
1487-
let cache_root = temp_dir.path().join("cache");
1488-
let (cas_store, slow_store) = make_fast_slow_store(&temp_dir).await;
1489-
1490-
// 100 distinct files at the root.
1491-
const FILE_COUNT: usize = 100;
1492-
let mut root_files = Vec::with_capacity(FILE_COUNT);
1493-
let mut expected = Vec::with_capacity(FILE_COUNT);
1494-
for i in 0..FILE_COUNT {
1495-
let content = format!("content-of-file-{i}").into_bytes();
1496-
// Tags 100.. avoid colliding with other tests' fixed tags.
1497-
let tag = 100u8.wrapping_add(u8::try_from(i).unwrap());
1498-
let digest = upload_blob(&slow_store, tag, &content).await;
1499-
root_files.push(FileNode {
1500-
name: format!("file_{i}.txt"),
1501-
digest: Some(digest.into()),
1502-
is_executable: false,
1503-
..Default::default()
1504-
});
1505-
expected.push((format!("file_{i}.txt"), content));
1506-
}
1507-
1508-
// One nested subdir with its own file.
1509-
let nested_content = b"nested-data".to_vec();
1510-
let nested_digest = upload_blob(&slow_store, 250, &nested_content).await;
1511-
let sub = ProtoDirectory {
1512-
files: vec![FileNode {
1513-
name: "deep.txt".to_string(),
1514-
digest: Some(nested_digest.into()),
1515-
is_executable: false,
1516-
..Default::default()
1517-
}],
1518-
..Default::default()
1519-
};
1520-
let mut sub_data = Vec::new();
1521-
sub.encode(&mut sub_data).unwrap();
1522-
let sub_digest = upload_blob(&slow_store, 251, &sub_data).await;
1523-
1524-
let root = ProtoDirectory {
1525-
files: root_files,
1526-
directories: vec![DirectoryNode {
1527-
name: "sub".to_string(),
1528-
digest: Some(sub_digest.into()),
1529-
}],
1530-
..Default::default()
1531-
};
1532-
let mut root_data = Vec::new();
1533-
root.encode(&mut root_data).unwrap();
1534-
let root_digest = upload_blob(&slow_store, 252, &root_data).await;
1535-
1536-
let config = DirectoryCacheConfig {
1537-
max_entries: 10,
1538-
max_size_bytes: 16 * 1024 * 1024,
1539-
cache_root,
1540-
};
1541-
let cache = DirectoryCache::new(config, cas_store).await?;
1542-
1543-
let dest = temp_dir.path().join("dest");
1544-
let hit = cache.get_or_create(root_digest, &dest).await?;
1545-
assert!(!hit, "first construction is a miss");
1546-
1547-
// Every one of the 100 root files must be present and byte-identical.
1548-
for (name, content) in &expected {
1549-
assert_eq!(
1550-
&fs::read(dest.join(name)).await?,
1551-
content,
1552-
"parallel-constructed file {name} must be byte-identical"
1553-
);
1554-
}
1555-
// The nested subdir's file too.
1556-
assert_eq!(
1557-
fs::read(dest.join("sub").join("deep.txt")).await?,
1558-
nested_content,
1559-
"nested file must be byte-identical"
1560-
);
1561-
1562-
Ok(())
1563-
}
15641434
}

0 commit comments

Comments
 (0)