Skip to content

Commit d5c497f

Browse files
committed
perf(pm): add sync cloner primitive
1 parent 329bbe1 commit d5c497f

1 file changed

Lines changed: 225 additions & 79 deletions

File tree

crates/pm/src/util/cloner.rs

Lines changed: 225 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
33

44
use anyhow::{Context, Result};
55
use once_cell::sync::Lazy;
6+
use serde::de::DeserializeOwned;
67
use tokio_retry::Retry;
78
use utoo_ruborist::manifest::IdentityView;
89
use utoo_ruborist::util::OnceMap;
@@ -33,6 +34,24 @@ pub fn clone_count() -> usize {
3334
CLONE_COUNT.load(Ordering::Relaxed)
3435
}
3536

37+
/// Clone a package from an already-resolved cache path without touching the
38+
/// global clone/download single-flight maps. Schedulers that own deduplication
39+
/// use this sync primitive from their worker pool.
40+
pub fn clone_package_from_cache_sync(
41+
name: &str,
42+
version: &str,
43+
tarball_url: &str,
44+
cache_path: &Path,
45+
target_path: &Path,
46+
) -> Result<()> {
47+
let is_git = is_git_url(tarball_url);
48+
let fresh = clone_package_sync(cache_path, target_path, name, version, !is_git)?;
49+
if fresh {
50+
CLONE_COUNT.fetch_add(1, Ordering::Relaxed);
51+
}
52+
Ok(())
53+
}
54+
3655
/// Normalize a target path into the canonical key used by `CLONE_CACHE`.
3756
#[cfg(windows)]
3857
fn cache_key(target_path: &Path) -> PathBuf {
@@ -78,15 +97,17 @@ pub async fn clone_package_once(
7897
let tarball_url = tarball_url.to_string();
7998
let target_path = target_path.to_path_buf();
8099

81-
// Git packages are extracted flat (no `package/` wrapper directory),
82-
// so skip `find_real_src` which would incorrectly pick a subdirectory.
83-
let is_git = is_git_url(&tarball_url);
84-
85100
CLONE_CACHE
86101
.get_or_init(key, || async move {
87102
let cache_path = resolve_cache_path(&name, &version, &tarball_url).await?;
88-
let fresh = clone_package(&cache_path, &target_path, &name, &version, !is_git)
89-
.await
103+
tokio::task::spawn_blocking(move || {
104+
clone_package_from_cache_sync(
105+
&name,
106+
&version,
107+
&tarball_url,
108+
&cache_path,
109+
&target_path,
110+
)
90111
.inspect_err(|e| {
91112
tracing::warn!(
92113
"Clone failed: {}@{} to {}: {:#}",
@@ -96,12 +117,10 @@ pub async fn clone_package_once(
96117
e
97118
)
98119
})
99-
.ok()?;
100-
101-
if fresh {
102-
CLONE_COUNT.fetch_add(1, Ordering::Relaxed);
103-
}
104-
Some(())
120+
.ok()
121+
})
122+
.await
123+
.ok()?
105124
})
106125
.await
107126
.map(|_| ())
@@ -172,83 +191,165 @@ mod hardlink_clone {
172191
Ok(())
173192
}
174193

175-
/// Clone directory using spawn_blocking for sync I/O.
176-
/// Uses hardlink when possible, falls back to copy.
177-
pub async fn clone_dir(src: &Path, dst: &Path) -> Result<()> {
194+
/// Clone directory using sync I/O. Uses hardlink when possible, falls back
195+
/// to copy.
196+
pub fn clone_dir_sync(src: &Path, dst: &Path) -> Result<()> {
178197
let err_msg = format!("Failed to clone {} to {}", src.display(), dst.display());
198+
199+
if !fs::metadata(src)?.is_dir() {
200+
return Err(io::Error::new(
201+
io::ErrorKind::NotADirectory,
202+
"Source is not a directory",
203+
))
204+
.with_context(|| err_msg);
205+
}
206+
207+
let mut force_copy = has_install_script_sync(src);
208+
209+
let mut files = Vec::new();
210+
let mut dirs = Vec::new();
211+
collect_entries(src, dst, &mut files, &mut dirs)?;
212+
213+
let mut created_dirs = HashSet::new();
214+
for dir in &dirs {
215+
if created_dirs.insert(dir.clone())
216+
&& let Err(e) = fs::create_dir_all(dir)
217+
&& e.kind() != io::ErrorKind::AlreadyExists
218+
{
219+
return Err(e).with_context(|| err_msg.clone());
220+
}
221+
}
222+
223+
let mut warned_per_file = false;
224+
for entry in &files {
225+
if force_copy {
226+
copy_file_sync(&entry.src, &entry.dst)?;
227+
} else if let Err(e) = fs::hard_link(&entry.src, &entry.dst) {
228+
if e.kind() == io::ErrorKind::CrossesDevices {
229+
tracing::warn!(
230+
"cross-device hardlink {} -> {}: {}; falling back to copy for remaining files",
231+
src.display(),
232+
dst.display(),
233+
e
234+
);
235+
force_copy = true;
236+
} else if !warned_per_file {
237+
tracing::warn!(
238+
"hardlink failed for {} -> {}: {}; falling back to copy (further per-file failures suppressed)",
239+
entry.src.display(),
240+
entry.dst.display(),
241+
e
242+
);
243+
warned_per_file = true;
244+
}
245+
copy_file_sync(&entry.src, &entry.dst)?;
246+
}
247+
}
248+
Ok(())
249+
}
250+
251+
/// Clone directory using spawn_blocking for callers that are still async.
252+
pub async fn clone_dir(src: &Path, dst: &Path) -> Result<()> {
179253
let src = src.to_path_buf();
180254
let dst = dst.to_path_buf();
255+
tokio::task::spawn_blocking(move || clone_dir_sync(&src, &dst)).await?
256+
}
257+
}
181258

182-
tokio::task::spawn_blocking(move || {
183-
if !fs::metadata(&src)?.is_dir() {
184-
return Err(io::Error::new(
185-
io::ErrorKind::NotADirectory,
186-
"Source is not a directory",
187-
));
259+
fn load_package_json_sync<T: DeserializeOwned>(path: &Path) -> Result<T> {
260+
let pkg_path = path.join("package.json");
261+
let content = std::fs::read_to_string(&pkg_path)
262+
.with_context(|| format!("Failed to read file {pkg_path:?}"))?;
263+
264+
match serde_json::from_str(&content) {
265+
Ok(v) => Ok(v),
266+
Err(original_err) => match serde_json::from_str::<serde_json::Value>(&content) {
267+
Ok(value) => serde_json::from_value(value)
268+
.with_context(|| format!("Failed to deserialize {pkg_path:?}")),
269+
Err(_) => {
270+
Err(original_err).with_context(|| format!("Failed to parse JSON from {pkg_path:?}"))
188271
}
272+
},
273+
}
274+
}
189275

190-
let mut force_copy = has_install_script_sync(&src);
191-
192-
// Phase 1: Collect all files and directories
193-
let mut files = Vec::new();
194-
let mut dirs = Vec::new();
195-
collect_entries(&src, &dst, &mut files, &mut dirs)?;
196-
197-
// Phase 2: Create all directories
198-
let mut created_dirs = HashSet::new();
199-
for dir in &dirs {
200-
if created_dirs.insert(dir.clone())
201-
&& let Err(e) = fs::create_dir_all(dir)
202-
&& e.kind() != io::ErrorKind::AlreadyExists
203-
{
204-
return Err(e);
205-
}
276+
fn validate_name_version_sync(dst: &Path, name: &str, version: &str) -> bool {
277+
let Ok(pkg) = load_package_json_sync::<IdentityView>(dst) else {
278+
return false;
279+
};
280+
pkg.name == name && pkg.version == version
281+
}
282+
283+
fn find_real_src_sync(src: &Path) -> Option<PathBuf> {
284+
for entry in std::fs::read_dir(src).ok()? {
285+
let entry = entry.ok()?;
286+
if entry.file_type().ok()?.is_dir() {
287+
let path = entry.path();
288+
if path.file_name().is_some_and(|name| name != ".utoo_built") {
289+
return Some(path);
206290
}
291+
}
292+
}
293+
None
294+
}
207295

208-
// Phase 3: Clone files (hardlink, fall back to copy on error).
209-
//
210-
// EXDEV (src cache and dst on different filesystems, e.g. a
211-
// global install where ~/.cache/nm lives on a different volume
212-
// than /usr/local) is a property of the src/dst pair — every
213-
// remaining file would fail the same way, so latch `force_copy`
214-
// and skip hardlink for the rest of this clone.
215-
//
216-
// Any other hardlink error (EMLINK on a single inode whose link
217-
// count is exhausted, EPERM on a specific file, etc.) is
218-
// per-file: copy this one and keep trying hardlink on the next.
219-
// We warn only on the first such failure per package to avoid
220-
// spamming hundreds of identical warnings when an entire package
221-
// can't be hardlinked.
222-
let mut warned_per_file = false;
223-
for entry in &files {
224-
if force_copy {
225-
copy_file_sync(&entry.src, &entry.dst)?;
226-
} else if let Err(e) = fs::hard_link(&entry.src, &entry.dst) {
227-
if e.kind() == io::ErrorKind::CrossesDevices {
228-
tracing::warn!(
229-
"cross-device hardlink {} -> {}: {}; falling back to copy for remaining files",
230-
src.display(),
231-
dst.display(),
232-
e
233-
);
234-
force_copy = true;
235-
} else if !warned_per_file {
236-
tracing::warn!(
237-
"hardlink failed for {} -> {}: {}; falling back to copy (further per-file failures suppressed)",
238-
entry.src.display(),
239-
entry.dst.display(),
240-
e
241-
);
242-
warned_per_file = true;
243-
}
244-
copy_file_sync(&entry.src, &entry.dst)?;
245-
}
296+
#[cfg(target_os = "macos")]
297+
fn clone_dir_native_sync(real_src: &Path, dst: &Path) -> Result<()> {
298+
let src_c = CString::new(real_src.as_os_str().as_bytes())?;
299+
let dst_c = CString::new(dst.as_os_str().as_bytes())?;
300+
let mut last_error = None;
301+
302+
for delay in std::iter::once(std::time::Duration::ZERO).chain(create_retry_strategy()) {
303+
if !delay.is_zero() {
304+
std::thread::sleep(delay);
305+
}
306+
307+
match unsafe { clonefile(src_c.as_ptr(), dst_c.as_ptr(), 0) } {
308+
0 => return Ok(()),
309+
_ => {
310+
let err = std::io::Error::last_os_error();
311+
let _ = std::fs::remove_dir_all(dst);
312+
last_error = Some(err);
246313
}
247-
Ok(())
248-
})
249-
.await?
250-
.with_context(|| err_msg)
314+
}
315+
}
316+
317+
Err(anyhow::anyhow!(
318+
"clonefile {} -> {}: {}",
319+
real_src.display(),
320+
dst.display(),
321+
last_error
322+
.map(|e| e.to_string())
323+
.unwrap_or_else(|| "unknown error".to_string())
324+
))
325+
}
326+
327+
#[cfg(not(target_os = "macos"))]
328+
fn clone_dir_native_sync(real_src: &Path, dst: &Path) -> Result<()> {
329+
hardlink_clone::clone_dir_sync(real_src, dst)
330+
.with_context(|| format!("clone_dir {} -> {}", real_src.display(), dst.display()))
331+
}
332+
333+
fn clone_sync(src: &Path, dst: &Path, find_real: bool) -> Result<()> {
334+
let real_src = if find_real {
335+
find_real_src_sync(src)
336+
.ok_or_else(|| anyhow::anyhow!("Cannot find valid source directory in {src:?}"))?
337+
} else {
338+
src.to_path_buf()
339+
};
340+
341+
if dst.try_exists()?
342+
&& let Err(e) = std::fs::remove_dir_all(dst)
343+
{
344+
tracing::warn!("Failed to clean target directory {}: {}", dst.display(), e);
251345
}
346+
347+
if let Some(parent) = dst.parent() {
348+
std::fs::create_dir_all(parent)?;
349+
}
350+
351+
clone_dir_native_sync(&real_src, dst)?;
352+
Ok(())
252353
}
253354

254355
async fn validate_directory(src: &Path, dst: &Path) -> Result<bool> {
@@ -445,6 +546,26 @@ pub async fn clone_package(
445546
Ok(true)
446547
}
447548

549+
/// Sync clone path with the same name/version validation as [`clone_package`].
550+
fn clone_package_sync(
551+
src: &Path,
552+
dst: &Path,
553+
name: &str,
554+
version: &str,
555+
find_real: bool,
556+
) -> Result<bool> {
557+
if dst.try_exists()? {
558+
if validate_name_version_sync(dst, name, version) {
559+
return Ok(false);
560+
}
561+
if let Err(e) = std::fs::remove_dir_all(dst) {
562+
tracing::warn!("Failed to clean target directory {}: {}", dst.display(), e);
563+
}
564+
}
565+
clone_sync(src, dst, find_real)?;
566+
Ok(true)
567+
}
568+
448569
#[cfg(test)]
449570
mod tests {
450571
use tempfile::TempDir;
@@ -804,6 +925,31 @@ mod tests {
804925
Ok(())
805926
}
806927

928+
#[test]
929+
fn test_clone_package_from_cache_sync_fresh_install() -> Result<()> {
930+
let temp = TempDir::new()?;
931+
let cache_dir = temp.path().join("cache/lodash/4.17.21");
932+
let src_dir = cache_dir.join("package");
933+
let dst_dir = temp.path().join("node_modules/lodash");
934+
935+
std::fs::create_dir_all(&src_dir)?;
936+
let pkg_json = create_package_json("lodash", "4.17.21");
937+
std::fs::write(src_dir.join("package.json"), &pkg_json)?;
938+
939+
clone_package_from_cache_sync(
940+
"lodash",
941+
"4.17.21",
942+
"https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz",
943+
&cache_dir,
944+
&dst_dir,
945+
)?;
946+
947+
assert!(dst_dir.join("package.json").exists());
948+
let content = std::fs::read_to_string(dst_dir.join("package.json"))?;
949+
assert!(content.contains("lodash"));
950+
Ok(())
951+
}
952+
807953
#[tokio::test]
808954
async fn test_clone_package_git_flat_layout() -> Result<()> {
809955
let temp = TempDir::new()?;

0 commit comments

Comments
 (0)