Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 77 additions & 1 deletion crates/pm/src/service/install.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ use crate::model::package::PackageInfo;
use crate::service::rebuild::RebuildService;
use crate::util::cli_enum::{OmitType, PackageAction, SaveType};
use crate::util::cloner::clone_count;
use crate::util::downloader::download_stats;
use crate::util::downloader::{download_stats, resolve_cache_path};
use crate::util::json::load_package_lock_json_from_path;
use crate::util::linker::link;
use crate::util::logger::{
PROGRESS_BAR, finish_progress_bar, log_progress, print_install_counts, start_progress_bar,
};
use futures::StreamExt;
use utoo_ruborist::compat::{is_cpu_compatible, is_os_compatible};

use super::binary::update_package_binary;
Expand Down Expand Up @@ -184,6 +185,73 @@ pub async fn install_packages(
Ok(())
}

fn start_lockfile_cache_prefetch(
groups: &HashMap<usize, Vec<(String, Package)>>,
cwd: &Path,
omit: &HashSet<OmitType>,
) -> Option<tokio::task::JoinHandle<()>> {
let mut packages = Vec::new();

for (path, package) in groups.values().flat_map(|pkgs| pkgs.iter()) {
if should_omit_package(package, omit) || package.link.is_some() {
continue;
}

if let Some(ref cpu) = package.cpu
&& !is_cpu_compatible(cpu)
{
continue;
}

if let Some(ref os) = package.os
&& !is_os_compatible(os)
{
continue;
}

let Some(version) = package.version.clone() else {
continue;
};
let Some(resolved) = package.resolved.clone() else {
continue;
};

let resolved = match resolved.strip_prefix("file:") {
Some(rel) if !Path::new(rel).is_absolute() => {
format!("file:{}", cwd.join(rel).display())
}
_ => resolved,
};

packages.push((package.get_name(path), version, resolved));
}

if packages.is_empty() {
return None;
}

Some(tokio::spawn(async move {
let mut tasks = futures::stream::FuturesUnordered::new();

for (name, version, resolved) in packages {
tasks.push(tokio::spawn(async move {
if resolve_cache_path(&name, &version, &resolved)
.await
.is_none()
{
tracing::debug!("Prefetch skipped or failed for {name}@{version}");
}
}));
}

while let Some(result) = tasks.next().await {
if let Err(e) = result {
tracing::debug!("Lockfile cache prefetch task failed: {e}");
}
}
}))
Comment on lines +193 to +252
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current prefetch implementation can be improved in two ways:

  1. Deduplication: A package (same name, version, and resolved URL) can appear multiple times in the lockfile at different paths (e.g., in nested node_modules). Deduplicating the packages list before prefetching avoids redundant cache checks and unnecessary task spawning.
  2. Concurrency Control: Spawning a separate tokio::spawn for every single package in a large lockfile (which can have thousands of entries) adds significant overhead to the scheduler. Using for_each_concurrent on a stream of packages is more idiomatic and allows for better control over the number of concurrent resolution tasks.

Since the entire prefetch process is already wrapped in a single background tokio::spawn, we don't need to spawn additional tasks for each individual package.

    let mut packages = Vec::new();
    let mut seen = HashSet::new();

    for (path, package) in groups.values().flat_map(|pkgs| pkgs.iter()) {
        if should_omit_package(package, omit) || package.link.is_some() {
            continue;
        }

        if let Some(ref cpu) = package.cpu
            && !is_cpu_compatible(cpu)
        {
            continue;
        }

        if let Some(ref os) = package.os
            && !is_os_compatible(os)
        {
            continue;
        }

        let Some(version) = package.version.clone() else {
            continue;
        };
        let Some(resolved) = package.resolved.clone() else {
            continue;
        };

        let resolved = match resolved.strip_prefix("file:") {
            Some(rel) if !Path::new(rel).is_absolute() => {
                format!("file:{}", cwd.join(rel).display())
            }
            _ => resolved,
        };

        let name = package.get_name(path);
        if seen.insert((name.clone(), version.clone(), resolved.clone())) {
            packages.push((name, version, resolved));
        }
    }

    if packages.is_empty() {
        return None;
    }

    Some(tokio::spawn(async move {
        futures::stream::iter(packages)
            .for_each_concurrent(50, |(name, version, resolved)| async move {
                if resolve_cache_path(&name, &version, &resolved)
                    .await
                    .is_none()
                {
                    tracing::debug!("Prefetch skipped or failed for {name}@{version}");
                }
            })
            .await;
    }))

}

pub struct InstallService;

impl InstallService {
Expand Down Expand Up @@ -264,6 +332,11 @@ impl InstallService {
};

let groups = group_by_depth(&package_lock.packages);
let prefetch_handle = if use_fresh_lock {
start_lockfile_cache_prefetch(&groups, root_path, omit)
} else {
None
};

if !package_lock.packages.is_empty() {
start_progress_bar();
Expand All @@ -280,6 +353,9 @@ impl InstallService {
handles.await_completion().await;
super::pipeline::print_pipeline_summary();
}
if let Some(handle) = prefetch_handle {
let _ = handle.await;
}
finish_progress_bar("node_modules cloned", Some(link_start.elapsed()));

RebuildService::rebuild(&package_lock, root_path, scripts).await?;
Expand Down
Loading