fix(pm): drain pipeline worker tasks#2844
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces concurrency limits to the download and clone pipeline workers by utilizing tokio::task::JoinSet and a configurable max_in_flight limit. It also adds a wait_existing_if_pending method to OnceMap to ensure that child package clones do not hang when waiting for local or workspace parents that are not in the clone cache. Review feedback suggests improving error handling by propagating panics in the join_next helper instead of silently logging them, and logging failures in the download_to_cache task to improve observability.
| async fn join_next(join_set: &mut JoinSet<()>, worker_name: &str) { | ||
| if let Some(result) = join_set.join_next().await | ||
| && let Err(e) = result | ||
| { | ||
| tracing::debug!("{worker_name} task failed: {e}"); | ||
| } | ||
| } |
There was a problem hiding this comment.
The current implementation of join_next logs all task failures, including panics, at the debug level and continues. This effectively implements recovery logic for panics, which violates the general rule that panics should be treated as unrecoverable bugs. If a subtask panics, it usually indicates a serious issue that should not be silently ignored or recovered from in a background worker.
Consider checking if the error is a panic and propagating it to ensure the worker fails as expected.
async fn join_next(join_set: &mut JoinSet<()>, worker_name: &str) {
if let Some(Err(e)) = join_set.join_next().await {
if e.is_panic() {
std::panic::resume_unwind(e.into_panic());
} else {
tracing::debug!("{worker_name} task failed: {e}");
}
}
}References
- Do not implement recovery logic for panics. Panics should be treated as unrecoverable bugs that need to be fixed, not as transient, recoverable errors.
| tasks.spawn(async move { | ||
| download_to_cache(&name, &version, &tarball_url).await; | ||
| }); |
There was a problem hiding this comment.
The result of download_to_cache is currently ignored. If a download fails, the error is lost, and the pipeline continues silently. While the install phase might retry, it's better to log the failure in the pipeline for observability, consistent with how clone failures are handled in the clone worker.
tasks.spawn(async move {
if let Err(e) = download_to_cache(&name, &version, &tarball_url).await {
tracing::debug!("Pipeline download failed for {name}@{version}: {e:#}");
}
});|
Closing as stale: this draft is a one-off agent experiment from 2026-04-27 with no follow-up, and overlaps with sibling PRs exploring the same optimization. Reopen if revisited. |
Summary
Verification
Note: full cargo clippy --all-targets -- -D warnings --no-deps is blocked in this environment because openssl-sys requires pkg-config/OpenSSL for non-PM workspace targets.