Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions crates/pm/src/service/pipeline/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use std::path::PathBuf;
use super::receiver::PipelineChannels;
use crate::util::cloner::{clone_package_once, wait_clone_if_pending};
use crate::util::downloader::{download_to_cache, is_git_url};
use crate::util::user_config::get_manifests_concurrency_limit_sync;
use tokio::task::JoinSet;

/// Pipeline worker handles for awaiting completion.
pub struct PipelineHandles {
Expand All @@ -18,10 +20,26 @@ impl PipelineHandles {
}
}

async fn join_next(join_set: &mut JoinSet<()>, worker_name: &str) {
if let Some(result) = join_set.join_next().await
&& let Err(e) = result
{
tracing::debug!("{worker_name} task failed: {e}");
}
}
Comment on lines +23 to +29
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current implementation of join_next logs all task failures, including panics, at the debug level and continues. This effectively implements recovery logic for panics, which violates the general rule that panics should be treated as unrecoverable bugs. If a subtask panics, it usually indicates a serious issue that should not be silently ignored or recovered from in a background worker.

Consider checking if the error is a panic and propagating it to ensure the worker fails as expected.

async fn join_next(join_set: &mut JoinSet<()>, worker_name: &str) {
    if let Some(Err(e)) = join_set.join_next().await {
        if e.is_panic() {
            std::panic::resume_unwind(e.into_panic());
        } else {
            tracing::debug!("{worker_name} task failed: {e}");
        }
    }
}
References
  1. Do not implement recovery logic for panics. Panics should be treated as unrecoverable bugs that need to be fixed, not as transient, recoverable errors.


async fn drain_tasks(join_set: &mut JoinSet<()>, worker_name: &str) {
while !join_set.is_empty() {
join_next(join_set, worker_name).await;
}
}

/// Start download and clone pipeline workers, returning handles to await completion.
pub fn start_workers(channels: PipelineChannels, cwd: PathBuf) -> PipelineHandles {
let download_handle = tokio::spawn(async move {
let mut rx = channels.download_rx;
let mut tasks = JoinSet::new();
let max_in_flight = get_manifests_concurrency_limit_sync().max(1);
while let Some(info) = rx.recv().await {
let Some(tarball_url) = info.tarball_url else {
continue;
Expand All @@ -39,14 +57,20 @@ pub fn start_workers(channels: PipelineChannels, cwd: PathBuf) -> PipelineHandle
}
let name = info.name;
let version = info.version;
tokio::spawn(async move {
tasks.spawn(async move {
download_to_cache(&name, &version, &tarball_url).await;
});
Comment on lines +60 to 62
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The result of download_to_cache is currently ignored. If a download fails, the error is lost, and the pipeline continues silently. While the install phase might retry, it's better to log the failure in the pipeline for observability, consistent with how clone failures are handled in the clone worker.

            tasks.spawn(async move {
                if let Err(e) = download_to_cache(&name, &version, &tarball_url).await {
                    tracing::debug!("Pipeline download failed for {name}@{version}: {e:#}");
                }
            });

if tasks.len() >= max_in_flight {
join_next(&mut tasks, "pipeline download").await;
}
}
drain_tasks(&mut tasks, "pipeline download").await;
});

let clone_handle = tokio::spawn(async move {
let mut rx = channels.clone_rx;
let mut tasks = JoinSet::new();
let max_in_flight = get_manifests_concurrency_limit_sync().max(1);
while let Some(msg) = rx.recv().await {
let Some(tarball_url) = msg.info.tarball_url else {
continue;
Expand All @@ -55,15 +79,19 @@ pub fn start_workers(channels: PipelineChannels, cwd: PathBuf) -> PipelineHandle
let version = msg.info.version;
let target = cwd.join(&msg.path);
let parent_path = msg.parent_path.map(|p| cwd.join(&p));
tokio::spawn(async move {
tasks.spawn(async move {
if let Some(ref parent) = parent_path {
wait_clone_if_pending(&parent.to_string_lossy()).await;
}
if let Err(e) = clone_package_once(&name, &version, &tarball_url, &target).await {
tracing::debug!("Pipeline pre-clone failed for {name}@{version}: {e:#}");
}
});
if tasks.len() >= max_in_flight {
join_next(&mut tasks, "pipeline clone").await;
}
}
drain_tasks(&mut tasks, "pipeline clone").await;
});

PipelineHandles {
Expand Down
8 changes: 5 additions & 3 deletions crates/pm/src/util/cloner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,15 @@ fn cache_key(target_path: &Path) -> PathBuf {
target_path.to_path_buf()
}

/// Wait for a pending clone at the given target path to complete (if any).
/// Wait for an already-pending clone at the given target path to complete.
///
/// Used by the pipeline clone worker to ensure parent packages are
/// cloned before their children.
/// cloned before their children when the parent itself is being cloned.
/// Missing keys are ignored: local/workspace parents never enter the clone
/// cache, so waiting for creation would hang child package clones.
pub async fn wait_clone_if_pending(target_path: &str) {
CLONE_CACHE
.wait_if_pending(&cache_key(Path::new(target_path)))
.wait_existing_if_pending(&cache_key(Path::new(target_path)))
.await;
}

Expand Down
59 changes: 59 additions & 0 deletions crates/pm/src/util/oncemap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,34 @@ where
notified.await;
}

/// Wait only if the key is already registered and in progress.
///
/// Unlike [`wait_if_pending`], this does not create a waiter for missing
/// keys. Use it when a missing key means "no work will be scheduled" rather
/// than "work has not been scheduled yet".
pub async fn wait_existing_if_pending(&self, key: &K) {
let Some(entry) = self.map.get(key) else {
return;
};

match entry.value() {
Value::Done(_) => {}
Value::Waiting(notify) => {
let notify = Arc::clone(notify);
let notified = notify.notified();
drop(entry);

if let Some(entry) = self.map.get(key)
&& matches!(entry.value(), Value::Done(_))
{
return;
}

notified.await;
}
}
}

/// Complete a pre-registered key with a value.
///
/// This is used in conjunction with `register()` for the pre-registration pattern:
Expand Down Expand Up @@ -375,6 +403,37 @@ mod tests {
assert_eq!(attempt.load(Ordering::SeqCst), 2);
}

#[tokio::test]
async fn test_wait_existing_if_pending_ignores_missing_key() {
let map: OnceMap<String, i32> = OnceMap::new();

tokio::time::timeout(
Duration::from_millis(50),
map.wait_existing_if_pending(&"missing".to_string()),
)
.await
.expect("missing keys should not register a waiter");
}

#[tokio::test]
async fn test_wait_existing_if_pending_waits_for_registered_key() {
let map = Arc::new(OnceMap::<String, i32>::new());
let key = "key".to_string();
let notify = map.register(key.clone()).unwrap();

let waiter_map = Arc::clone(&map);
let waiter_key = key.clone();
let waiter = tokio::spawn(async move {
waiter_map.wait_existing_if_pending(&waiter_key).await;
});

tokio::time::sleep(Duration::from_millis(10)).await;
assert!(!waiter.is_finished());

map.complete(key, Some(42), notify);
waiter.await.unwrap();
}

/// Test that waiters don't miss notifications due to race conditions.
///
/// This test verifies the fix for a race condition where a waiter could
Expand Down
Loading