Skip to content

Commit 26200e4

Browse files
committed
test(pm): cover install parent clone barrier
1 parent 411b027 commit 26200e4

1 file changed

Lines changed: 58 additions & 10 deletions

File tree

crates/pm/src/service/install_scheduler.rs

Lines changed: 58 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ struct SchedulerState {
142142
clone_limit: usize,
143143
download_done: HashMap<String, PathBuf>,
144144
download_active: HashSet<String>,
145-
download_waiters: HashMap<String, Vec<CloneSpec>>,
145+
fetch_waiters: HashMap<String, Vec<CloneSpec>>,
146146
download_queue: VecDeque<PackageFetch>,
147147
extract_active: HashSet<String>,
148148
extract_queue: VecDeque<DownloadedPackage>,
@@ -166,7 +166,7 @@ impl SchedulerState {
166166
clone_limit: clone_concurrency_limit(),
167167
download_done: HashMap::new(),
168168
download_active: HashSet::new(),
169-
download_waiters: HashMap::new(),
169+
fetch_waiters: HashMap::new(),
170170
download_queue: VecDeque::new(),
171171
extract_active: HashSet::new(),
172172
extract_queue: VecDeque::new(),
@@ -274,19 +274,20 @@ impl SchedulerState {
274274
return;
275275
}
276276

277-
if let Some(waiters) = self.download_waiters.get_mut(&key) {
277+
if let Some(waiters) = self.fetch_waiters.get_mut(&key) {
278278
if let Some(spec) = waiter {
279279
waiters.push(spec);
280280
}
281281
return;
282282
}
283283

284-
self.download_waiters
285-
.insert(key, waiter.into_iter().collect());
284+
self.fetch_waiters.insert(key, waiter.into_iter().collect());
286285
self.download_queue.push_back(package);
287286
}
288287

289288
fn pump_downloads(&mut self) {
289+
// Bound downloaded tarballs waiting for extraction so network prefetch
290+
// cannot outrun CPU/disk extraction and pile up Bytes.
290291
while self.download_active.len() < self.download_limit
291292
&& self.extract_backlog() < self.download_limit
292293
{
@@ -402,7 +403,7 @@ impl SchedulerState {
402403
}
403404

404405
fn complete_download(&mut self, key: String, result: Result<PathBuf, String>) {
405-
let waiters = self.download_waiters.remove(&key).unwrap_or_default();
406+
let waiters = self.fetch_waiters.remove(&key).unwrap_or_default();
406407
match result {
407408
Ok(cache_path) => {
408409
self.download_done.insert(key, cache_path.clone());
@@ -488,7 +489,7 @@ mod tests {
488489
state.ensure_download(package, None);
489490

490491
assert_eq!(state.download_queue.len(), 1);
491-
assert_eq!(state.download_waiters["react@18.2.0"].len(), 1);
492+
assert_eq!(state.fetch_waiters["react@18.2.0"].len(), 1);
492493
}
493494

494495
#[test]
@@ -499,15 +500,15 @@ mod tests {
499500
let waiter = clone_spec("react", "18.2.0", "/tmp/project/node_modules/react");
500501

501502
state.download_active.insert(key.clone());
502-
state.download_waiters.insert(key.clone(), vec![waiter]);
503+
state.fetch_waiters.insert(key.clone(), vec![waiter]);
503504

504505
state.handle_done(OpDone::Download {
505506
package: package.clone(),
506507
result: Ok(DownloadOutcome::Bytes(Bytes::from_static(b"tgz"))),
507508
});
508509

509510
assert!(!state.download_active.contains(&key));
510-
assert!(state.download_waiters.contains_key(&key));
511+
assert!(state.fetch_waiters.contains_key(&key));
511512
assert_eq!(state.extract_queue.len(), 1);
512513
assert_eq!(state.extract_queue[0].package.key(), key);
513514
}
@@ -520,7 +521,7 @@ mod tests {
520521
let cache_path = PathBuf::from("/tmp/cache/react/18.2.0");
521522

522523
state.extract_active.insert(key.clone());
523-
state.download_waiters.insert(key.clone(), vec![waiter]);
524+
state.fetch_waiters.insert(key.clone(), vec![waiter]);
524525

525526
state.handle_done(OpDone::Extract {
526527
key: key.clone(),
@@ -546,4 +547,51 @@ mod tests {
546547
assert_eq!(state.clone_waiters[&target].len(), 2);
547548
assert_eq!(state.ops.len(), 1);
548549
}
550+
551+
#[tokio::test]
552+
async fn queue_clone_blocks_child_until_parent_completes() {
553+
let mut state = state();
554+
let parent_target = PathBuf::from("/tmp/project/node_modules/@scope");
555+
let child_target = parent_target.join("child");
556+
let parent = clone_spec("scope", "1.0.0", parent_target.to_string_lossy().as_ref());
557+
let mut child = clone_spec("child", "1.0.0", child_target.to_string_lossy().as_ref());
558+
child.parent = Some(parent_target.clone());
559+
560+
state.queue_clone(parent, None);
561+
let parent_ops = state.ops.len();
562+
state.queue_clone(child, None);
563+
564+
assert_eq!(state.blocked_by_parent[&parent_target].len(), 1);
565+
assert_eq!(state.ops.len(), parent_ops);
566+
567+
state.complete_clone(parent_target.clone(), Ok(()));
568+
569+
assert!(!state.blocked_by_parent.contains_key(&parent_target));
570+
assert_eq!(state.ops.len(), parent_ops + 1);
571+
}
572+
573+
#[tokio::test]
574+
async fn parent_clone_failure_wakes_blocked_children() {
575+
let mut state = state();
576+
let parent_target = PathBuf::from("/tmp/project/node_modules/@scope");
577+
let child_target = parent_target.join("child");
578+
let parent = clone_spec("scope", "1.0.0", parent_target.to_string_lossy().as_ref());
579+
let mut child = clone_spec("child", "1.0.0", child_target.to_string_lossy().as_ref());
580+
child.parent = Some(parent_target.clone());
581+
let (child_tx, child_rx) = oneshot::channel();
582+
583+
state.queue_clone(parent, None);
584+
state.queue_clone(child, Some(child_tx));
585+
586+
assert_eq!(state.blocked_by_parent[&parent_target].len(), 1);
587+
588+
state.complete_clone(parent_target.clone(), Err("boom".to_string()));
589+
590+
let error = child_rx.await.unwrap().unwrap_err();
591+
let parent_path = parent_target.to_string_lossy();
592+
assert!(error.contains("parent package"));
593+
assert!(error.contains(parent_path.as_ref()));
594+
assert!(!state.clone_waiters.contains_key(&child_target));
595+
assert!(!state.blocked_by_parent.contains_key(&parent_target));
596+
}
549597
}

0 commit comments

Comments
 (0)