Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions crates/pm/src/helper/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::fs;
use crate::helper::workspace::find_workspaces;
use crate::util::cli_enum::{PackageAction, SaveType};
use crate::util::cloner::clone_package;
use crate::util::downloader::{is_git_url, resolve_cache_path};
use crate::util::downloader::{download_and_extract_to_cache, is_git_url};
use crate::util::git_resolver::{resolve_git_spec, resolve_github_spec};
use crate::util::json::{load_package_lock_json_from_path, read_json_file};

Expand Down Expand Up @@ -267,9 +267,9 @@ pub async fn prepare_global_package_json(npm_spec: &str, prefix: Option<&str>) -
.ok_or_else(|| anyhow!("Failed to get tarball URL from manifest"))?;

// Download and extract package to cache.
let cache_path = resolve_cache_path(&name, &resolved.version, tarball_url)
let cache_path = download_and_extract_to_cache(&name, &resolved.version, tarball_url)
.await
.ok_or_else(|| anyhow!("Failed to download package {name}"))?;
.with_context(|| format!("Failed to download package {name}"))?;

// If the package has install scripts, create a flag file
// in linux, we can use hardlink when FICLONE is not supported
Expand Down
18 changes: 7 additions & 11 deletions crates/pm/src/helper/ruborist_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use utoo_ruborist::service::{
BuildDepsOptions, BuildDepsOutput, Glob, ManifestStore, UnifiedRegistry,
};

use crate::service::pipeline::{PipelineChannels, PipelineReceiver};
use crate::service::install_scheduler::{InstallEventReceiver, InstallScheduler};
use crate::util::cache::get_cache_dir;
use crate::util::logger::ProgressReceiver;
use crate::util::manifest_store::DiskManifestStore;
Expand Down Expand Up @@ -66,17 +66,13 @@ impl Context {
}
}

/// Create BuildDepsOptions with PipelineReceiver for concurrent download/clone.
/// Returns (options, channels) where channels are used to start pipeline workers.
pub async fn pipeline_deps_options(
/// Create BuildDepsOptions that forwards package events to the install scheduler.
pub async fn install_deps_options(
cwd: PathBuf,
) -> (
BuildDepsOptions<GlobImpl, PipelineReceiver<ProgressReceiver>>,
PipelineChannels,
) {
let (receiver, channels) = PipelineReceiver::new(ProgressReceiver);
let options = Self::deps_options(cwd, receiver).await;
(options, channels)
scheduler: InstallScheduler,
) -> BuildDepsOptions<GlobImpl, InstallEventReceiver<ProgressReceiver>> {
let receiver = InstallEventReceiver::new(ProgressReceiver, scheduler, cwd.clone());
Self::deps_options(cwd, receiver).await
}

/// Resolve dependency tree with plain ProgressReceiver. Returns
Expand Down
112 changes: 56 additions & 56 deletions crates/pm/src/service/install.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::util::cli_enum::ScriptPolicy;
use anyhow::Context;
use anyhow::Result;
use anyhow::{Context as _, Result};
use futures::stream::{FuturesUnordered, StreamExt};
use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::time::Instant;
Expand All @@ -10,8 +10,9 @@ use crate::fs;
use crate::helper::global_bin::get_global_bin_dir;
use crate::helper::lock::{
Package, UpdatePackageJsonOptions, extract_package_name, group_by_depth, is_pkg_lock_outdated,
prepare_global_package_json, update_package_json,
prepare_global_package_json, save_package_lock, update_package_json,
};
use crate::helper::ruborist_context::{Context, spawn_save_project_cache};
use crate::helper::workspace::init_project_root;
use crate::model::package::PackageInfo;
use crate::service::rebuild::RebuildService;
Expand Down Expand Up @@ -62,29 +63,27 @@ fn should_omit_package(package: &Package, omit: &HashSet<OmitType>) -> bool {
false
}

pub async fn install_packages(
async fn install_packages(
groups: &HashMap<usize, Vec<(String, Package)>>,
cwd: &Path,
omit: &HashSet<OmitType>,
scheduler: Option<&super::install_scheduler::InstallScheduler>,
scheduler: &super::install_scheduler::InstallScheduler,
) -> Result<()> {
use crate::util::cloner::clone_package_once;

// Surface the clean step in the spinner — it doesn't move `pos`, so
// without a message the bar looks frozen on large trees.
log_progress("validating node_modules");
clean_deps(groups, cwd).await?;
log_progress("linking packages");

// Always process level-by-level to ensure parent directories exist before
// children. Within each level, tasks run concurrently. The pipeline's
// clone_worker may have already cloned some packages — clone_package_once
// deduplicates via CLONE_CACHE so no double work occurs.
// children. Within each level, tasks run concurrently. The install
// scheduler owns clone/download dedupe, so package tasks only request the
// concrete target they need.
let mut depths: Vec<_> = groups.keys().cloned().collect();
depths.sort_unstable();

for depth in depths.iter() {
let mut clone_tasks: Vec<tokio::task::JoinHandle<Result<()>>> = Vec::new();
let mut clone_tasks = FuturesUnordered::new();

if let Some(packages) = groups.get(depth) {
for (path, package) in packages.iter() {
Expand Down Expand Up @@ -141,30 +140,17 @@ pub async fn install_packages(
.ok_or_else(|| anyhow::anyhow!("package {name} missing version"))?;
let cwd_clone = cwd.to_path_buf();
let target_path = cwd_clone.join(&path);
let scheduler = scheduler.clone();

// Check if this is an optional dependency
let is_optional =
package.optional == Some(true) || package.dev_optional == Some(true);
let scheduler = scheduler.cloned();

let task = tokio::spawn(async move {
let clone_result = match scheduler {
Some(scheduler) => {
scheduler
.ensure_clone(
name.clone(),
version,
resolved,
target_path.clone(),
)
.await
}
None => {
clone_package_once(&name, &version, &resolved, &target_path).await
}
};

if let Err(e) = clone_result {
clone_tasks.push(async move {
if let Err(e) = scheduler
.ensure_clone(name.clone(), version, resolved, target_path.clone())
.await
{
if is_optional {
tracing::warn!(
"Optional dependency {name} failed (ignored): {e:#}"
Expand All @@ -178,21 +164,33 @@ pub async fn install_packages(
log_progress(&format!("{name} resolved"));
update_package_binary(&target_path, &name).await
});
clone_tasks.push(task);
} else {
PROGRESS_BAR.inc(1);
}
}
}

for task in clone_tasks {
task.await??;
while let Some(result) = clone_tasks.next().await {
result?;
}
}

Ok(())
}

async fn resolve_package_lock_with_scheduler(
root_path: &Path,
scheduler: super::install_scheduler::InstallScheduler,
) -> Result<utoo_ruborist::lock::PackageLock> {
let options = Context::install_deps_options(root_path.to_path_buf(), scheduler).await;
let output = utoo_ruborist::service::build_deps(options).await?;

save_package_lock(root_path, &output.lock).await?;
spawn_save_project_cache(root_path.to_path_buf(), output.project_cache);

Ok(output.lock)
}

pub struct InstallService;

impl InstallService {
Expand Down Expand Up @@ -260,16 +258,31 @@ impl InstallService {
// itself emits a `tracing::warn` with the specific mismatch reason.
let use_fresh_lock = fs::try_exists(&lock_path).await.unwrap_or(false)
&& !is_pkg_lock_outdated(root_path).await.unwrap_or(true);

let (package_lock, pipeline_handles) = if use_fresh_lock {
let lock = load_package_lock_json_from_path(root_path).await?;
(lock, None)
let scheduler_handle = super::install_scheduler::InstallSchedulerHandle::start();
let scheduler = scheduler_handle.scheduler();

let (package_lock, used_scheduler_prefetch) = if use_fresh_lock {
let lock = match load_package_lock_json_from_path(root_path).await {
Ok(lock) => lock,
Err(e) => {
scheduler_handle.shutdown().await;
return Err(e);
}
};
(lock, false)
} else {
start_progress_bar();
let resolve_start = Instant::now();
let result = super::pipeline::resolve_with_pipeline(root_path).await?;
let lock = match resolve_package_lock_with_scheduler(root_path, scheduler.clone()).await
{
Ok(lock) => lock,
Err(e) => {
scheduler_handle.shutdown().await;
return Err(e);
}
};
finish_progress_bar("package-lock.json resolved", Some(resolve_start.elapsed()));
(result.package_lock, Some(result.handles))
(lock, true)
};

let groups = group_by_depth(&package_lock.packages);
Expand All @@ -280,28 +293,15 @@ impl InstallService {
}

let link_start = Instant::now();
let scheduler_handle = if use_fresh_lock {
Some(super::install_scheduler::InstallSchedulerHandle::start())
} else {
None
};
let scheduler = scheduler_handle.as_ref().map(|handle| handle.scheduler());

let install_result = install_packages(&groups, root_path, omit, scheduler.as_ref())
let install_result = install_packages(&groups, root_path, omit, &scheduler)
.await
.context("Failed to install packages");

if let Some(handle) = scheduler_handle {
handle.shutdown().await;
scheduler_handle.shutdown().await;
if used_scheduler_prefetch {
super::install_scheduler::print_summary();
}

install_result?;

// Wait for pipeline workers to complete (if any)
if let Some(handles) = pipeline_handles {
handles.await_completion().await;
super::pipeline::print_pipeline_summary();
}
finish_progress_bar("node_modules cloned", Some(link_start.elapsed()));

RebuildService::rebuild(&package_lock, root_path, scripts).await?;
Expand Down
Loading
Loading