11use crate :: util:: cli_enum:: ScriptPolicy ;
2- use anyhow:: Context ;
3- use anyhow :: Result ;
2+ use anyhow:: { Context as _ , Result } ;
3+ use futures :: stream :: { FuturesUnordered , StreamExt } ;
44use std:: collections:: { HashMap , HashSet } ;
55use std:: path:: Path ;
66use std:: time:: Instant ;
@@ -10,8 +10,9 @@ use crate::fs;
1010use crate :: helper:: global_bin:: get_global_bin_dir;
1111use crate :: helper:: lock:: {
1212 Package , UpdatePackageJsonOptions , extract_package_name, group_by_depth, is_pkg_lock_outdated,
13- prepare_global_package_json, update_package_json,
13+ prepare_global_package_json, save_package_lock , update_package_json,
1414} ;
15+ use crate :: helper:: ruborist_context:: { Context , spawn_save_project_cache} ;
1516use crate :: helper:: workspace:: init_project_root;
1617use crate :: model:: package:: PackageInfo ;
1718use crate :: service:: rebuild:: RebuildService ;
@@ -62,29 +63,27 @@ fn should_omit_package(package: &Package, omit: &HashSet<OmitType>) -> bool {
6263 false
6364}
6465
65- pub async fn install_packages (
66+ async fn install_packages (
6667 groups : & HashMap < usize , Vec < ( String , Package ) > > ,
6768 cwd : & Path ,
6869 omit : & HashSet < OmitType > ,
69- scheduler : Option < & super :: install_scheduler:: InstallScheduler > ,
70+ scheduler : & super :: install_scheduler:: InstallScheduler ,
7071) -> Result < ( ) > {
71- use crate :: util:: cloner:: clone_package_once;
72-
7372 // Surface the clean step in the spinner — it doesn't move `pos`, so
7473 // without a message the bar looks frozen on large trees.
7574 log_progress ( "validating node_modules" ) ;
7675 clean_deps ( groups, cwd) . await ?;
7776 log_progress ( "linking packages" ) ;
7877
7978 // Always process level-by-level to ensure parent directories exist before
80- // children. Within each level, tasks run concurrently. The pipeline's
81- // clone_worker may have already cloned some packages — clone_package_once
82- // deduplicates via CLONE_CACHE so no double work occurs .
79+ // children. Within each level, tasks run concurrently. The install
80+ // scheduler owns clone/download dedupe, so package tasks only request the
81+ // concrete target they need .
8382 let mut depths: Vec < _ > = groups. keys ( ) . cloned ( ) . collect ( ) ;
8483 depths. sort_unstable ( ) ;
8584
8685 for depth in depths. iter ( ) {
87- let mut clone_tasks: Vec < tokio :: task :: JoinHandle < Result < ( ) > > > = Vec :: new ( ) ;
86+ let mut clone_tasks = FuturesUnordered :: new ( ) ;
8887
8988 if let Some ( packages) = groups. get ( depth) {
9089 for ( path, package) in packages. iter ( ) {
@@ -141,30 +140,17 @@ pub async fn install_packages(
141140 . ok_or_else ( || anyhow:: anyhow!( "package {name} missing version" ) ) ?;
142141 let cwd_clone = cwd. to_path_buf ( ) ;
143142 let target_path = cwd_clone. join ( & path) ;
143+ let scheduler = scheduler. clone ( ) ;
144144
145145 // Check if this is an optional dependency
146146 let is_optional =
147147 package. optional == Some ( true ) || package. dev_optional == Some ( true ) ;
148- let scheduler = scheduler. cloned ( ) ;
149-
150- let task = tokio:: spawn ( async move {
151- let clone_result = match scheduler {
152- Some ( scheduler) => {
153- scheduler
154- . ensure_clone (
155- name. clone ( ) ,
156- version,
157- resolved,
158- target_path. clone ( ) ,
159- )
160- . await
161- }
162- None => {
163- clone_package_once ( & name, & version, & resolved, & target_path) . await
164- }
165- } ;
166148
167- if let Err ( e) = clone_result {
149+ clone_tasks. push ( async move {
150+ if let Err ( e) = scheduler
151+ . ensure_clone ( name. clone ( ) , version, resolved, target_path. clone ( ) )
152+ . await
153+ {
168154 if is_optional {
169155 tracing:: warn!(
170156 "Optional dependency {name} failed (ignored): {e:#}"
@@ -178,21 +164,33 @@ pub async fn install_packages(
178164 log_progress ( & format ! ( "{name} resolved" ) ) ;
179165 update_package_binary ( & target_path, & name) . await
180166 } ) ;
181- clone_tasks. push ( task) ;
182167 } else {
183168 PROGRESS_BAR . inc ( 1 ) ;
184169 }
185170 }
186171 }
187172
188- for task in clone_tasks {
189- task . await ? ?;
173+ while let Some ( result ) = clone_tasks. next ( ) . await {
174+ result ?;
190175 }
191176 }
192177
193178 Ok ( ( ) )
194179}
195180
181+ async fn resolve_package_lock_with_scheduler (
182+ root_path : & Path ,
183+ scheduler : super :: install_scheduler:: InstallScheduler ,
184+ ) -> Result < utoo_ruborist:: lock:: PackageLock > {
185+ let options = Context :: install_deps_options ( root_path. to_path_buf ( ) , scheduler) . await ;
186+ let output = utoo_ruborist:: service:: build_deps ( options) . await ?;
187+
188+ save_package_lock ( root_path, & output. lock ) . await ?;
189+ spawn_save_project_cache ( root_path. to_path_buf ( ) , output. project_cache ) ;
190+
191+ Ok ( output. lock )
192+ }
193+
196194pub struct InstallService ;
197195
198196impl InstallService {
@@ -260,16 +258,31 @@ impl InstallService {
260258 // itself emits a `tracing::warn` with the specific mismatch reason.
261259 let use_fresh_lock = fs:: try_exists ( & lock_path) . await . unwrap_or ( false )
262260 && !is_pkg_lock_outdated ( root_path) . await . unwrap_or ( true ) ;
263-
264- let ( package_lock, pipeline_handles) = if use_fresh_lock {
265- let lock = load_package_lock_json_from_path ( root_path) . await ?;
266- ( lock, None )
261+ let scheduler_handle = super :: install_scheduler:: InstallSchedulerHandle :: start ( ) ;
262+ let scheduler = scheduler_handle. scheduler ( ) ;
263+
264+ let ( package_lock, used_scheduler_prefetch) = if use_fresh_lock {
265+ let lock = match load_package_lock_json_from_path ( root_path) . await {
266+ Ok ( lock) => lock,
267+ Err ( e) => {
268+ scheduler_handle. shutdown ( ) . await ;
269+ return Err ( e) ;
270+ }
271+ } ;
272+ ( lock, false )
267273 } else {
268274 start_progress_bar ( ) ;
269275 let resolve_start = Instant :: now ( ) ;
270- let result = super :: pipeline:: resolve_with_pipeline ( root_path) . await ?;
276+ let lock = match resolve_package_lock_with_scheduler ( root_path, scheduler. clone ( ) ) . await
277+ {
278+ Ok ( lock) => lock,
279+ Err ( e) => {
280+ scheduler_handle. shutdown ( ) . await ;
281+ return Err ( e) ;
282+ }
283+ } ;
271284 finish_progress_bar ( "package-lock.json resolved" , Some ( resolve_start. elapsed ( ) ) ) ;
272- ( result . package_lock , Some ( result . handles ) )
285+ ( lock , true )
273286 } ;
274287
275288 let groups = group_by_depth ( & package_lock. packages ) ;
@@ -280,28 +293,15 @@ impl InstallService {
280293 }
281294
282295 let link_start = Instant :: now ( ) ;
283- let scheduler_handle = if use_fresh_lock {
284- Some ( super :: install_scheduler:: InstallSchedulerHandle :: start ( ) )
285- } else {
286- None
287- } ;
288- let scheduler = scheduler_handle. as_ref ( ) . map ( |handle| handle. scheduler ( ) ) ;
289-
290- let install_result = install_packages ( & groups, root_path, omit, scheduler. as_ref ( ) )
296+ let install_result = install_packages ( & groups, root_path, omit, & scheduler)
291297 . await
292298 . context ( "Failed to install packages" ) ;
293299
294- if let Some ( handle) = scheduler_handle {
295- handle. shutdown ( ) . await ;
300+ scheduler_handle. shutdown ( ) . await ;
301+ if used_scheduler_prefetch {
302+ super :: install_scheduler:: print_summary ( ) ;
296303 }
297-
298304 install_result?;
299-
300- // Wait for pipeline workers to complete (if any)
301- if let Some ( handles) = pipeline_handles {
302- handles. await_completion ( ) . await ;
303- super :: pipeline:: print_pipeline_summary ( ) ;
304- }
305305 finish_progress_bar ( "node_modules cloned" , Some ( link_start. elapsed ( ) ) ) ;
306306
307307 RebuildService :: rebuild ( & package_lock, root_path, scripts) . await ?;
0 commit comments