Skip to content

Move task execution into a daemon#263

Open
arcanis wants to merge 36 commits intomainfrom
mael/daemon-tasks
Open

Move task execution into a daemon#263
arcanis wants to merge 36 commits intomainfrom
mael/daemon-tasks

Conversation

@arcanis
Copy link
Member

@arcanis arcanis commented Mar 4, 2026

This PR adds support for long-running tasks, currently annotated with a @long-running attribute.

To support that, task execution has been moved inside a daemon process managed by Yarn Switch. The core logic still lives inside Yarn (not Yarn Switch), with Yarn Switch being merely responsible to keep records about which daemons are in use in which projects.

The daemons are currently accessible through unauthenticated websockets listening on localhost. It's slightly insecure in a multi-user context, auth should be implemented in a follow-up.

Windows support isn't implemented since zpm doesn't support Windows yet.


Note

High Risk
Introduces a new always-on local WebSocket daemon that can spawn/terminate processes and accept IPC commands; bugs or missing auth could allow unintended task execution or process control on multi-user machines. Also replaces the existing in-process task runner with new daemon-backed execution paths, increasing behavioral and stability risk.

Overview
Moves task execution out of the CLI and into a new per-project daemon exposed over local WebSockets, including a new debug daemon entrypoint and a DaemonClient-based IPC protocol for enqueueing tasks, streaming output, buffering logs, and stopping/listing long-lived tasks.

Updates yarn switch to manage daemon lifecycle (switch daemon --open/--list/--kill/--kill-all) by spawning the daemon, recording PID/port per project under ~/.yarn/switch/daemons, and cleaning up stale entries; also makes the npm registry host configurable via YARNSW_NPM_REGISTRY_SERVER.

Refactors the tasks CLI surface: replaces the monolithic tasks run implementation with daemon-backed runners (--buffered, default interlaced with optional --timestamps, and --silent-dependencies), adds tasks list/tasks stop, and rewires run to fall back to the new task runner when scripts are missing.

Written by Cursor Bugbot for commit 515be20. This will update automatically on new commits. Configure here.

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Mar 4, 2026

Confidence Score: 2/5

  • Not safe to merge — two correctness bugs (cancel_context memory leak and WaitingForSubtasks spurious cancellation) need to be resolved first.
  • Many issues from previous review rounds are now fixed and the overall architecture is sound, but two logic bugs remain: the cancel_context path bypasses the eviction queue (causing unbounded memory growth in production daemons), and find_tasks_to_fail can spuriously cancel tasks whose scripts have already run, orphaning their subtasks. The run.rs fallback change also silently breaks callers outside Yarn Switch context. These are not cosmetic issues — they will manifest in real usage.
  • packages/zpm/src/daemon/coordinator_state.rs (cancel_context), packages/zpm/src/daemon/scheduler/dependencies.rs (find_tasks_to_fail), packages/zpm/src/commands/run.rs (standalone fallback)

Important Files Changed

Filename Overview
packages/zpm/src/daemon/coordinator.rs New core daemon coordinator loop; many previously flagged issues are now fixed (graceful shutdown on inode change, warm-up-after-failure guard, mark_task_closed for all paths), but the CancelContext handler still omits mark_task_closed for cancelled tasks, causing a memory leak.
packages/zpm/src/daemon/scheduler/dependencies.rs New task-readiness scheduler; find_ready_tasks correctly guards is_script_finished but find_tasks_to_fail omits this check, allowing tasks already in WaitingForSubtasks state to be spuriously cancelled when a prerequisite fails after the script exits.
packages/zpm/src/daemon/server/connection.rs New WebSocket connection handler; subscription guards correctly clean up via Drop. poll_notifications iterates over all receivers including closed ones from removed subscriptions, which accumulate over the connection lifetime.
packages/zpm/src/daemon/executor/output.rs Stdout/stderr streaming now correctly tracks done flags and uses conditional select arms, eliminating the busy-loop-on-EOF issue from earlier versions.
packages/zpm/src/daemon/coordinator_state.rs Unified mutable state for the daemon; mark_task_closed now evicts from tasks/prepared/subtasks maps and process_warm_up_deadlines guards against firing for terminal tasks. cancel_context marks tasks as Failed but does not call mark_task_closed, bypassing the eviction queue.
packages/zpm/src/commands/run.rs Fallback path for yarn run now unconditionally creates a TaskRunSilentDependencies with standalone: false, which requires a Yarn Switch daemon context. Callers outside Yarn Switch receive a confusing error rather than inline execution as before.
packages/zpm/src/daemon/client.rs New DaemonClient; connect_standalone now reuses the first successful WebSocket stream instead of probing and discarding, fixing the previous connection-leak concern. Probe polling in wait_for_ready uses proper close frames.
packages/zpm-switch/src/daemons.rs New daemon registry helpers; kill_daemon_gracefully correctly uses SIGTERM-then-SIGKILL escalation. Windows branches reference winapi types that are not declared as a Cargo dependency (previously flagged). is_process_alive / kill_process have correct platform guards.
packages/zpm-switch/src/commands/switch/daemon_open.rs New command to open/register a daemon for a project; probe connections now send a close frame before dropping, fixing the previous connection-reset leak. kill_daemon_gracefully is correctly wrapped in spawn_blocking.
packages/zpm/src/daemon/executor/runner.rs TaskRunner now sends TaskStarted after spawn_script succeeds, correctly fixing the previously-flagged race where TaskStarted was enqueued before the process existed.
Prompt To Fix All With AI
This is a comment left during a code review.
Path: packages/zpm/src/daemon/coordinator_state.rs
Line: 595-612

Comment:
**`cancel_context` never calls `mark_task_closed` — tasks skip the eviction queue**

`cancel_context` marks every non-terminal task in the context as `Failed` and returns their IDs, but never pushes those IDs into the `closed_tasks` `VecDeque`. As a result:

1. Their entries in `output_buffer`, `tasks`, `prepared`, and `subtasks` are never eligible for eviction by `mark_task_closed`'s sliding-window logic.
2. No `TaskCancelled`/`TaskCompleted` notification is broadcast before the connection closes, so the state inside the daemon is left inconsistent (tasks appear non-terminal to any future subscriber that might care).

Every Ctrl-C on a non-trivial build cancels several tasks this way. Over many runs on a long-lived daemon this is a steady memory leak.

```rust
for task_id in tasks_to_cancel {
    self.mark_failed(&task_id);
    let task_id_str = format_contextual_task_id(&task_id);
    cancelled_ids.push(task_id_str.clone());
    // Ensure eviction window sees this task
    self.mark_task_closed(task_id_str);
}
```

How can I resolve this? If you propose a fix, please make it concise.

---

This is a comment left during a code review.
Path: packages/zpm/src/daemon/scheduler/dependencies.rs
Line: 83-89

Comment:
**`find_tasks_to_fail` missing `is_script_finished` guard — spurious cancellation of in-flight subtask parents**

`find_ready_tasks` skips tasks whose state is `is_script_finished()` (which includes `WaitingForSubtasks`), but `find_tasks_to_fail` only checks `is_terminal()`. `WaitingForSubtasks` is *not* terminal — it means the task's own script has already run to completion but it is still waiting for subtasks it spawned via `tasks push`.

**Concrete failure scenario:**
1. Task A depends on long-lived task B. A's script runs, succeeds, and pushes subtask S. A is now in `WaitingForSubtasks`.
2. B subsequently crashes.
3. `find_tasks_to_fail` finds A (B is a failed prerequisite, A is not terminal, A is not in `running`).
4. A is marked `Cancelled` and `TaskCancelled` is broadcast — even though A's script already succeeded.
5. Subtask S is now an orphan: it was spawned by A but A is cancelled, so when S completes it can never cause A to complete/fail properly.

Add the same guard that `find_ready_tasks` uses:

```rust
if task_state.is_terminal() || task_state.is_script_finished() || running.contains(&ctx_task_id) {
    continue;
}
```

How can I resolve this? If you propose a fix, please make it concise.

---

This is a comment left during a code review.
Path: packages/zpm/src/daemon/server/connection.rs
Line: 238-266

Comment:
**Dead subscription receivers accumulate in the polling vector**

When a subscription is removed (`SubscriptionGuard` is dropped), the coordinator drops the `UnboundedSender`, but the corresponding `UnboundedReceiver` remains in `notification_receivers`. `poll_notifications` iterates over the entire slice on every call, touching each dead receiver. A connection that issues many `PushTasks` requests (e.g., a CI runner pushing hundreds of tasks sequentially) will accumulate hundreds of closed receivers that are polled on every notification delivery cycle.

Consider compacting the vector when receivers are known-closed:

```rust
async fn poll_notifications(
    receivers: &mut Vec<mpsc::UnboundedReceiver<DaemonNotification>>,
) -> Option<DaemonNotification> {
    if receivers.is_empty() {
        return std::future::pending::<Option<DaemonNotification>>().await;
    }
    futures::future::poll_fn(|cx| {
        let mut i = 0;
        while i < receivers.len() {
            match receivers[i].poll_recv(cx) {
                std::task::Poll::Ready(Some(notif)) => return std::task::Poll::Ready(Some(notif)),
                std::task::Poll::Ready(None) => { receivers.swap_remove(i); } // remove dead receiver
                std::task::Poll::Pending => { i += 1; }
            }
        }
        std::task::Poll::Pending
    }).await
}
```

How can I resolve this? If you propose a fix, please make it concise.

---

This is a comment left during a code review.
Path: packages/zpm/src/commands/run.rs
Line: 162-171

Comment:
**`yarn run <task>` fallback now hard-requires Yarn Switch daemon context**

`TaskRunSilentDependencies::new` unconditionally sets `standalone: false`. When `yarn run my-task` reaches this fallback path, `DaemonClient::connect` is called, which requires either `YARN_DAEMON_SERVER` or `YARN_SWITCH_PATH_ENV` to be set. If neither is present (e.g., invoking the `zpm` binary directly in tests or CI without Yarn Switch in `PATH`), the call fails with:

> "This command can only be called within a Yarn Switch context."

Previously the fallback ran the task inline without any daemon requirement. The new behaviour is silent until the error surfaces at runtime. If calling without Yarn Switch is a supported path, either use `standalone: true` here or add a fallback that degrades gracefully:

```rust
let mut task_run_silent_dependencies =
    TaskRunSilentDependencies::new(&self.cli_environment, self.name.clone(), self.args.clone());
// standalone = true if we're not inside a Yarn Switch context
task_run_silent_dependencies.standalone = std::env::var(zpm::daemon::YARN_SWITCH_PATH_ENV).is_err()
    && std::env::var(zpm::daemon::DAEMON_SERVER_ENV).is_err();
return task_run_silent_dependencies.execute().await;
```

How can I resolve this? If you propose a fix, please make it concise.

Last reviewed commit: a357d25

@@ -0,0 +1,344 @@
use std::collections::{HashMap, HashSet};
use std::io::Write;
use std::os::unix::fs::MetadataExt;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unix-only import used unconditionally

std::os::unix::fs::MetadataExt is a Unix-only trait imported unconditionally. This will cause a compile error on Windows. The import and all code that calls .ino() (lines ~45–47) must be gated behind #[cfg(unix)], with the inode-watching block either disabled or replaced with a no-op on non-Unix platforms.

Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/zpm/src/daemon/coordinator.rs
Line: 3

Comment:
Unix-only import used unconditionally

`std::os::unix::fs::MetadataExt` is a Unix-only trait imported unconditionally. This will cause a compile error on Windows. The import and all code that calls `.ino()` (lines ~45–47) must be gated behind `#[cfg(unix)]`, with the inode-watching block either disabled or replaced with a no-op on non-Unix platforms.

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +58 to +107
let output_buffer: OutputBuffer
= Arc::new(RwLock::new(HashMap::new()));

let subscription_registry
= Arc::new(SubscriptionRegistry::new());

let long_lived_registry
= Arc::new(LongLivedRegistry::new());

let scheduler_for_loop
= scheduler.clone();

let (loop_event_tx, mut loop_event_rx)
= mpsc::unbounded_channel::<ExecutorEvent>();

let subscription_registry_for_loop
= subscription_registry.clone();

let subscription_registry_for_events
= subscription_registry.clone();

let output_buffer_for_events
= output_buffer.clone();

let long_lived_registry_for_events
= long_lived_registry.clone();

let scheduler_for_events
= scheduler.clone();

tokio::spawn(async move {
while let Some(event) = loop_event_rx.recv().await {
if let ExecutorEvent::Output { task_id, line, stream } = &event {
if let Ok(mut buffer) = output_buffer_for_events.write() {
let lines: &mut Vec<BufferedOutputLine>
= buffer
.entry(task_id.to_string())
.or_insert_with(Vec::new);

lines.push(BufferedOutputLine {
line: line.to_string(),
stream: stream.as_str().to_string(),
});

if lines.len() > OUTPUT_BUFFER_MAX_LINES {
let excess
= lines.len() - OUTPUT_BUFFER_MAX_LINES;

lines.drain(0..excess);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unbounded memory growth in output buffer

The output_buffer HashMap (created at line 58–59) accumulates an entry for every task ID that ever runs in this daemon session. While the per-task line count is capped at OUTPUT_BUFFER_MAX_LINES (1000 lines), the number of task entries in the HashMap is never pruned. For a long-running daemon that processes thousands of short-lived tasks, this will steadily grow the resident memory of the daemon process. Entries for completed tasks (particularly non-long-lived ones whose output has already been retrieved by the client) should be removed once they are no longer needed.

Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/zpm/src/daemon/coordinator.rs
Line: 58-107

Comment:
Unbounded memory growth in output buffer

The `output_buffer` HashMap (created at line 58–59) accumulates an entry for every task ID that ever runs in this daemon session. While the per-task line count is capped at `OUTPUT_BUFFER_MAX_LINES` (1000 lines), the number of task entries in the HashMap is never pruned. For a long-running daemon that processes thousands of short-lived tasks, this will steadily grow the resident memory of the daemon process. Entries for completed tasks (particularly non-long-lived ones whose output has already been retrieved by the client) should be removed once they are no longer needed.

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +41 to +49
pub fn kill(&self) {
#[cfg(unix)]
{
let _ = std::process::Command::new("kill")
.arg("-9")
.arg(format!("-{}", self.pid))
.status();
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SIGKILL on entire process group prevents graceful cleanup

kill -9 -{pid} sends SIGKILL to every process in the daemon's process group. Because SIGKILL cannot be caught or ignored, neither the daemon nor any of its running task children will have a chance to flush buffers, clean up temporary files, or release resources. In addition, if any task subprocess moves itself to a different process group, it will survive this kill.

For the standalone case it may be acceptable to be forceful, but using SIGTERM first (with a timeout and SIGKILL as a fallback) would be safer and more consistent with the SIGTERM used elsewhere in the codebase.

Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/zpm/src/daemon/client.rs
Line: 41-49

Comment:
SIGKILL on entire process group prevents graceful cleanup

`kill -9 -{pid}` sends SIGKILL to every process in the daemon's process group. Because SIGKILL cannot be caught or ignored, neither the daemon nor any of its running task children will have a chance to flush buffers, clean up temporary files, or release resources. In addition, if any task subprocess moves itself to a different process group, it will survive this kill.

For the standalone case it may be acceptable to be forceful, but using SIGTERM first (with a timeout and SIGKILL as a fallback) would be safer and more consistent with the SIGTERM used elsewhere in the codebase.

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +100 to +153
#[cfg(windows)]
{
use std::ptr::null_mut;
unsafe {
let handle = winapi::um::processthreadsapi::OpenProcess(
winapi::um::winnt::PROCESS_QUERY_LIMITED_INFORMATION,
0,
pid,
);
if handle.is_null() {
false
} else {
winapi::um::handleapi::CloseHandle(handle);
true
}
}
}

#[cfg(not(any(unix, windows)))]
{
true
}
}

pub fn kill_process(pid: u32) -> bool {
#[cfg(unix)]
{
unsafe { libc::kill(pid as i32, libc::SIGTERM) == 0 }
}

#[cfg(windows)]
{
use std::ptr::null_mut;
unsafe {
let handle = winapi::um::processthreadsapi::OpenProcess(
winapi::um::winnt::PROCESS_TERMINATE,
0,
pid,
);
if handle.is_null() {
false
} else {
let result = winapi::um::processthreadsapi::TerminateProcess(handle, 1) != 0;
winapi::um::handleapi::CloseHandle(handle);
result
}
}
}

#[cfg(not(any(unix, windows)))]
{
false
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

winapi crate referenced but not declared as a dependency

Both is_process_alive and kill_process have #[cfg(windows)] branches that reference winapi::um::processthreadsapi, winapi::um::winnt, and winapi::um::handleapi. However, winapi does not appear in the Cargo.toml for zpm-switch (nor in the workspace Cargo.toml changes in this PR). This will produce a compile error on Windows. You need to add:

[target.'cfg(windows)'.dependencies]
winapi = { version = "0.3", features = ["processthreadsapi", "winnt", "handleapi"] }
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/zpm-switch/src/daemons.rs
Line: 100-153

Comment:
`winapi` crate referenced but not declared as a dependency

Both `is_process_alive` and `kill_process` have `#[cfg(windows)]` branches that reference `winapi::um::processthreadsapi`, `winapi::um::winnt`, and `winapi::um::handleapi`. However, `winapi` does not appear in the `Cargo.toml` for `zpm-switch` (nor in the workspace `Cargo.toml` changes in this PR). This will produce a compile error on Windows. You need to add:

```toml
[target.'cfg(windows)'.dependencies]
winapi = { version = "0.3", features = ["processthreadsapi", "winnt", "handleapi"] }
```

How can I resolve this? If you propose a fix, please make it concise.

client.push_tasks(task_subscriptions, parent_task_id, None, None).await?;

Ok(ExitStatus::from_raw(0))
Ok(ExitStatus::from_raw(0 << 8))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 << 8 is always 0 — confusing no-op

ExitStatus::from_raw(0 << 8) evaluates identically to ExitStatus::from_raw(0). The << 8 shift pattern is used elsewhere to encode non-zero exit codes in the wait-status format, but shifting zero is a no-op and reads as if the author forgot to put a real value in.

Suggested change
Ok(ExitStatus::from_raw(0 << 8))
Ok(ExitStatus::from_raw(0))
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/zpm/src/commands/tasks/push.rs
Line: 66

Comment:
`0 << 8` is always `0` — confusing no-op

`ExitStatus::from_raw(0 << 8)` evaluates identically to `ExitStatus::from_raw(0)`. The `<< 8` shift pattern is used elsewhere to encode non-zero exit codes in the wait-status format, but shifting zero is a no-op and reads as if the author forgot to put a real value in.

```suggestion
        Ok(ExitStatus::from_raw(0))
```

How can I resolve this? If you propose a fix, please make it concise.

taskfile Outdated
Comment on lines +1 to +22
bar:
sleep 5

bar2:
sleep 10

x:
python3 -c "import time; print(f'ts:{int(time.time()*1000)}:line1')"
sleep 1
python3 -c "import time; print(f'ts:{int(time.time()*1000)}:line2')"
sleep 1
python3 -c "import time; print(f'ts:{int(time.time()*1000)}:line3')"

producer:
for x in {1..10}; do
echo "producer: $x"
sleep 1
done

foo: bar& bar2&
echo "foo"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debug/test tasks left in the repository root taskfile

The tasks bar, bar2, x, producer, and foo appear to be development scratch entries added to test the new daemon functionality. They don't appear to serve any project-level purpose and should be removed before merging, or moved to a test fixture if they are needed for acceptance tests.

Prompt To Fix With AI
This is a comment left during a code review.
Path: taskfile
Line: 1-22

Comment:
Debug/test tasks left in the repository root `taskfile`

The tasks `bar`, `bar2`, `x`, `producer`, and `foo` appear to be development scratch entries added to test the new daemon functionality. They don't appear to serve any project-level purpose and should be removed before merging, or moved to a test fixture if they are needed for acceptance tests.

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +21 to +54
pub async fn execute(&self) -> Result<(), Error> {
let project_cwd = get_final_cwd()?;

let find_result = find_closest_package_manager(&project_cwd)?;

let detected_root = find_result
.detected_root_path
.ok_or(Error::NoProjectFound)?;

let Some(daemon) = daemons::get_daemon(&detected_root)? else {
println!(
"{} No daemon registered for this project",
DataType::Info.colorize("ℹ")
);
return Ok(());
};

if !daemons::is_process_alive(daemon.pid) {
daemons::unregister_daemon(&detected_root)?;
println!(
"{} Daemon was not running (cleaned up stale entry)",
DataType::Info.colorize("ℹ")
);
return Ok(());
}

if daemons::kill_process(daemon.pid) {
daemons::unregister_daemon(&detected_root)?;
println!(
"{} Stopped daemon for {} (PID: {})",
DataType::Success.colorize("✓"),
detected_root.to_print_string(),
daemon.pid
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Killing the daemon does not terminate its running task children

daemons::kill_process sends SIGTERM only to the daemon process itself (the yarn debug daemon binary). All task subprocesses that the daemon has spawned are in the same session but may be in their own process groups. When the daemon receives SIGTERM it will exit — but because nothing in the daemon's signal handling path terminates the child processes, those tasks continue running as orphans.

This means switch daemon --kill can leave long-running tasks (e.g. @long-lived dev servers) silently running in the background after the user believes they have been stopped. The daemon should either propagate the signal to its children on shutdown, or the kill command should enumerate and terminate task children before sending SIGTERM to the daemon.

Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/zpm-switch/src/commands/switch/daemon_kill.rs
Line: 21-54

Comment:
Killing the daemon does not terminate its running task children

`daemons::kill_process` sends `SIGTERM` only to the daemon process itself (the `yarn debug daemon` binary). All task subprocesses that the daemon has spawned are in the same session but may be in their own process groups. When the daemon receives SIGTERM it will exit — but because nothing in the daemon's signal handling path terminates the child processes, those tasks continue running as orphans.

This means `switch daemon --kill` can leave long-running tasks (e.g. `@long-lived` dev servers) silently running in the background after the user believes they have been stopped. The daemon should either propagate the signal to its children on shutdown, or the kill command should enumerate and terminate task children before sending SIGTERM to the daemon.

How can I resolve this? If you propose a fix, please make it concise.

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 3 potential issues.

Bugbot Free Tier Details

You are on the Bugbot Free tier. On this plan, Bugbot will review limited PRs each billing cycle.

To receive Bugbot reviews on all of your PRs, visit the Cursor dashboard to activate Pro and start your 14-day free trial.

Fix All in Cursor

Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

parent_task_id,
workspace,
output_subscription: _,
status_subscription: _,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Subscription scopes silently discarded in request dispatch

High Severity

The output_subscription and status_subscription fields in PushTasks are destructured but discarded (bound to _) in dispatch_request. The client sends different scopes per run mode — e.g., run_silent_dependencies sends SubscriptionScope::TargetOnly for output — but these are never forwarded to handle_push_tasks or the subscription registry. As a result, on_output_line in the silent-dependencies handler receives output for all tasks (not just targets), leaking dependency output that was intended to be suppressed.

Additional Locations (1)
Fix in Cursor Fix in Web

self.verbose_level,
).await;

x
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leftover debug variable pattern in execute method

Low Severity

The execute method assigns the result to a temporary variable x and then returns it, unlike the equivalent methods in run_buffered.rs and run_silent_dependencies.rs which return the run_task(...) call directly. This let x = ...; x pattern is a common debugging artifact.

Fix in Cursor Fix in Web

project.active_workspace().ok()?.name.clone()
};

Some(TaskId { workspace, task_name })
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicated build_task_id function across handler modules

Low Severity

The build_task_id function is identically duplicated in both push_tasks.rs and stop_task.rs. Both resolve a task name and optional workspace string into a TaskId using the same logic. This could live in the parent handlers/mod.rs module to avoid inconsistent future changes.

Additional Locations (1)
Fix in Cursor Fix in Web

Comment on lines +43 to +60

long_lived_registry.remove(&task_id);

DaemonResponse::TaskStopped {
success: true,
error: None,
}
} else {
long_lived_registry.remove(&task_id);

DaemonResponse::TaskStopped {
success: true,
error: Some("Task had no process ID, removed from registry".to_string()),
}
}
}

fn build_task_id(task_name: &str, workspace: Option<&str>, project: &Project) -> Option<TaskId> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

process_id is never populated — stop_task never kills the process

LongLivedRegistry::set_process_id exists but is never called anywhere in the codebase. Every LongLivedEntry is created with process_id: None (see long_lived.rs register method), so entry.process_id will always be None here.

As a result, handle_stop_task always falls into the else branch — it removes the task from the registry (making it look stopped), but the underlying process keeps running indefinitely. The kill -TERM path is dead code.

The fix requires that TaskRunner::run (in executor/runner.rs) also records the spawned PID into LongLivedRegistry for long-lived tasks. The executor needs a reference to LongLivedRegistry and the associated TaskId to call long_lived_registry.set_process_id(&base_task_id, pid) after spawning.

Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/zpm/src/daemon/handlers/stop_task.rs
Line: 43-60

Comment:
**`process_id` is never populated — stop_task never kills the process**

`LongLivedRegistry::set_process_id` exists but is never called anywhere in the codebase. Every `LongLivedEntry` is created with `process_id: None` (see `long_lived.rs` `register` method), so `entry.process_id` will always be `None` here.

As a result, `handle_stop_task` always falls into the `else` branch — it removes the task from the registry (making it look stopped), but the underlying process keeps running indefinitely. The `kill -TERM` path is dead code.

The fix requires that `TaskRunner::run` (in `executor/runner.rs`) also records the spawned PID into `LongLivedRegistry` for long-lived tasks. The executor needs a reference to `LongLivedRegistry` and the associated `TaskId` to call `long_lived_registry.set_process_id(&base_task_id, pid)` after spawning.

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +44 to +48
long_lived_registry.remove(&task_id);

DaemonResponse::TaskStopped {
success: true,
error: None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unix-only kill command without platform guard

std::process::Command::new("kill") is a Unix shell utility; it does not exist on Windows. This code compiles but will fail at runtime on Windows. The block should be gated behind #[cfg(unix)], with a Windows equivalent (e.g., using the windows-sys or winapi crate, consistent with how daemons.rs handles cross-platform process signalling).

Suggested change
long_lived_registry.remove(&task_id);
DaemonResponse::TaskStopped {
success: true,
error: None,
if let Some(pid) = entry.process_id {
#[cfg(unix)]
let _ = std::process::Command::new("kill")
.arg("-TERM")
.arg(pid.to_string())
.status();
long_lived_registry.remove(&task_id);
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/zpm/src/daemon/handlers/stop_task.rs
Line: 44-48

Comment:
**Unix-only `kill` command without platform guard**

`std::process::Command::new("kill")` is a Unix shell utility; it does not exist on Windows. This code compiles but will fail at runtime on Windows. The block should be gated behind `#[cfg(unix)]`, with a Windows equivalent (e.g., using the `windows-sys` or `winapi` crate, consistent with how `daemons.rs` handles cross-platform process signalling).

```suggestion
    if let Some(pid) = entry.process_id {
        #[cfg(unix)]
        let _ = std::process::Command::new("kill")
            .arg("-TERM")
            .arg(pid.to_string())
            .status();

        long_lived_registry.remove(&task_id);
```

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +183 to +196
async fn check_daemon_ready(&self, port: u16) -> Result<(), Error> {
let url
= format!("ws://127.0.0.1:{}", port);

// Just attempt to establish a WebSocket connection - if it succeeds, daemon is ready
tokio_tungstenite::connect_async(&url)
.await
.map_err(|e| {
Error::DaemonConnectionFailed(Arc::new(std::io::Error::new(
std::io::ErrorKind::ConnectionRefused,
e.to_string(),
)))
})?;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check_daemon_ready leaks WebSocket connections during polling

tokio_tungstenite::connect_async is called and the returned (WsStream, Response) tuple is immediately discarded (the ? propagates errors, but on success the stream falls out of scope). Dropping a WebSocketStream without sending a Close frame tears down the TCP connection ungracefully. wait_for_ready calls this up to 100 times (every 50 ms), so during a slow startup the daemon may receive ~100 abrupt connection resets, each generating a server-side error log entry.

A Ping/Pong exchange or a proper close frame should be sent before dropping the stream:

async fn check_daemon_ready(&self, port: u16) -> Result<(), Error> {
    let url = format!("ws://127.0.0.1:{}", port);
    let (mut ws, _) = tokio_tungstenite::connect_async(&url)
        .await
        .map_err(|e| Error::DaemonConnectionFailed(...))?;
    ws.close(None).await.ok();
    Ok(())
}
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/zpm-switch/src/commands/switch/daemon_open.rs
Line: 183-196

Comment:
**`check_daemon_ready` leaks WebSocket connections during polling**

`tokio_tungstenite::connect_async` is called and the returned `(WsStream, Response)` tuple is immediately discarded (the `?` propagates errors, but on success the stream falls out of scope). Dropping a `WebSocketStream` without sending a `Close` frame tears down the TCP connection ungracefully. `wait_for_ready` calls this up to 100 times (every 50 ms), so during a slow startup the daemon may receive ~100 abrupt connection resets, each generating a server-side error log entry.

A `Ping`/`Pong` exchange or a proper close frame should be sent before dropping the stream:

```rust
async fn check_daemon_ready(&self, port: u16) -> Result<(), Error> {
    let url = format!("ws://127.0.0.1:{}", port);
    let (mut ws, _) = tokio_tungstenite::connect_async(&url)
        .await
        .map_err(|e| Error::DaemonConnectionFailed(...))?;
    ws.close(None).await.ok();
    Ok(())
}
```

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +155 to +180
})
}
ExecutorEvent::Finished { .. } => None,
ExecutorEvent::Failed { task_id, error } => {
Some(DaemonNotification::TaskFailed {
task_id: task_id.clone(),
error: error.clone(),
})
}
};

if let Some(n) = notification {
subscription_registry_for_events.broadcast(n.clone());

if let DaemonNotification::TaskStarted { task_id } = &n {
if let Some(ctx_task_id) = scheduler_for_events.parse_contextual_task_id(task_id) {
if scheduler_for_events.is_long_lived(&ctx_task_id) {
let task_id_clone
= task_id.clone();

let ctx_task_id_clone
= ctx_task_id.clone();

let registry_clone
= long_lived_registry_for_events.clone();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Warm-up timer fires even after task failure, sending a spurious TaskWarmUpComplete notification

The warm-up tokio::spawn fires unconditionally 500 ms after the TaskStarted event. If a long-lived task exits (or crashes) within those 500 ms, the main loop will have already sent TaskCompleted/TaskFailed to subscribers. The delayed spawn then calls scheduler_clone.mark_warm_up_complete and broadcasts DaemonNotification::TaskWarmUpComplete for a task that has already failed.

Any client or subscriber watching for TaskWarmUpComplete to unblock dependent work could be misled into thinking the dependency is healthy when it has actually exited. The spawn should check whether the task has already completed/failed before marking it warm and broadcasting:

tokio::spawn(async move {
    tokio::time::sleep(Duration::from_millis(LONG_LIVED_WARMUP_MS)).await;

    // Bail out if the task already failed/completed
    if scheduler_clone.is_failed_or_completed(&ctx_task_id_clone) {
        return;
    }
    // ... rest of warm-up logic
});
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/zpm/src/daemon/coordinator.rs
Line: 155-180

Comment:
**Warm-up timer fires even after task failure, sending a spurious `TaskWarmUpComplete` notification**

The warm-up `tokio::spawn` fires unconditionally 500 ms after the `TaskStarted` event. If a long-lived task exits (or crashes) within those 500 ms, the main loop will have already sent `TaskCompleted`/`TaskFailed` to subscribers. The delayed spawn then calls `scheduler_clone.mark_warm_up_complete` and broadcasts `DaemonNotification::TaskWarmUpComplete` for a task that has already failed.

Any client or subscriber watching for `TaskWarmUpComplete` to unblock dependent work could be misled into thinking the dependency is healthy when it has actually exited. The spawn should check whether the task has already completed/failed before marking it warm and broadcasting:

```rust
tokio::spawn(async move {
    tokio::time::sleep(Duration::from_millis(LONG_LIVED_WARMUP_MS)).await;

    // Bail out if the task already failed/completed
    if scheduler_clone.is_failed_or_completed(&ctx_task_id_clone) {
        return;
    }
    // ... rest of warm-up logic
});
```

How can I resolve this? If you propose a fix, please make it concise.

@github-actions
Copy link

github-actions bot commented Mar 12, 2026

⏱️ Benchmark Results

gatsby install-full-cold

Metric Base Head Difference
Mean 2.550s 2.559s +0.34% ⚠️
Median 2.567s 2.562s -0.20% ✅
Min 2.352s 2.373s
Max 2.663s 2.874s
Std Dev 0.064s 0.082s
📊 Raw benchmark data (gatsby install-full-cold)

Base times: 2.663s, 2.528s, 2.492s, 2.548s, 2.545s, 2.468s, 2.569s, 2.570s, 2.561s, 2.571s, 2.505s, 2.472s, 2.570s, 2.548s, 2.573s, 2.352s, 2.605s, 2.597s, 2.600s, 2.583s, 2.621s, 2.571s, 2.563s, 2.533s, 2.557s, 2.597s, 2.390s, 2.574s, 2.607s, 2.565s

Head times: 2.608s, 2.578s, 2.582s, 2.560s, 2.588s, 2.485s, 2.874s, 2.543s, 2.513s, 2.593s, 2.495s, 2.535s, 2.611s, 2.541s, 2.577s, 2.534s, 2.521s, 2.622s, 2.415s, 2.588s, 2.588s, 2.521s, 2.617s, 2.553s, 2.591s, 2.492s, 2.585s, 2.563s, 2.510s, 2.373s


gatsby install-cache-and-lock (warm, with lockfile)

Metric Base Head Difference
Mean 0.452s 0.454s +0.43% ⚠️
Median 0.452s 0.446s -1.20% ✅
Min 0.442s 0.442s
Max 0.471s 0.593s
Std Dev 0.006s 0.028s
📊 Raw benchmark data (gatsby install-cache-and-lock (warm, with lockfile))

Base times: 0.450s, 0.445s, 0.445s, 0.442s, 0.443s, 0.444s, 0.442s, 0.448s, 0.448s, 0.453s, 0.453s, 0.451s, 0.452s, 0.455s, 0.450s, 0.457s, 0.458s, 0.451s, 0.450s, 0.458s, 0.471s, 0.452s, 0.447s, 0.455s, 0.460s, 0.458s, 0.457s, 0.453s, 0.456s, 0.460s

Head times: 0.446s, 0.446s, 0.442s, 0.445s, 0.448s, 0.499s, 0.593s, 0.456s, 0.442s, 0.443s, 0.447s, 0.450s, 0.452s, 0.451s, 0.452s, 0.448s, 0.446s, 0.455s, 0.446s, 0.445s, 0.444s, 0.446s, 0.447s, 0.444s, 0.443s, 0.460s, 0.443s, 0.451s, 0.444s, 0.448s

@cursor
Copy link

cursor bot commented Mar 12, 2026

You have run out of free Bugbot PR reviews for this billing cycle. This will reset on April 6.

To receive reviews on all of your PRs, visit the Cursor dashboard to activate Pro and start your 14-day free trial.

Comment on lines +63 to +82
for _ in 0..MAX_RETRIES {
match long_lived_registry.try_claim_registration(tid) {
Some(existing) => {
// Task already exists
if !existing.contextual_task_id.is_empty() {
// Registration is complete, attach to existing task
result = RegistrationResult::AttachedToExisting(existing);
break;
}
// contextual_task_id is empty - another caller is currently registering
// Wait briefly and retry
std::thread::sleep(std::time::Duration::from_millis(RETRY_DELAY_MS));
}
None => {
// We've claimed the registration, proceed to create the task
result = RegistrationResult::WeClaimedRegistration;
break;
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::thread::sleep blocks the Tokio worker thread

std::thread::sleep is called from a synchronous function (handle_push_tasks) that is invoked directly from the async handle_connection WebSocket handler via dispatch_request. This blocks whichever Tokio worker thread is driving the connection for the full RETRY_DELAY_MS * MAX_RETRIES duration (up to 5 seconds in the worst case — 50 retries × 100 ms each).

Because Tokio's default multi-threaded scheduler uses a fixed-size thread pool, a blocked thread means reduced throughput for all other concurrent connections and tasks. If multiple clients trigger this path simultaneously (e.g., several parallel yarn run calls that all start the same long-lived task at roughly the same time), multiple threads could be blocked simultaneously.

The fix requires making dispatch_request and handle_push_tasks async so that tokio::time::sleep(...).await can be used instead of std::thread::sleep, or moving the retry logic to a tokio::task::spawn_blocking call.

Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/zpm/src/daemon/handlers/push_tasks.rs
Line: 63-82

Comment:
**`std::thread::sleep` blocks the Tokio worker thread**

`std::thread::sleep` is called from a synchronous function (`handle_push_tasks`) that is invoked directly from the async `handle_connection` WebSocket handler via `dispatch_request`. This blocks whichever Tokio worker thread is driving the connection for the full `RETRY_DELAY_MS * MAX_RETRIES` duration (up to 5 seconds in the worst case — 50 retries × 100 ms each).

Because Tokio's default multi-threaded scheduler uses a fixed-size thread pool, a blocked thread means reduced throughput for all other concurrent connections and tasks. If multiple clients trigger this path simultaneously (e.g., several parallel `yarn run` calls that all start the same long-lived task at roughly the same time), multiple threads could be blocked simultaneously.

The fix requires making `dispatch_request` and `handle_push_tasks` async so that `tokio::time::sleep(...).await` can be used instead of `std::thread::sleep`, or moving the retry logic to a `tokio::task::spawn_blocking` call.

How can I resolve this? If you propose a fix, please make it concise.

@@ -0,0 +1,258 @@
use std::collections::HashSet;
use std::os::unix::process::ExitStatusExt;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unix-only import without platform guard causes Windows compile failure

std::os::unix::process::ExitStatusExt is a Unix-specific trait that provides ExitStatus::from_raw. This import — and the use of ExitStatus::from_raw(ctx.exit_code << 8) on line 257 — will produce a compile error on Windows.

The import and the from_raw call should be gated:

Suggested change
use std::os::unix::process::ExitStatusExt;
#[cfg(unix)]
use std::os::unix::process::ExitStatusExt;

And line 257 should have a Windows alternative (e.g. ExitStatus::from_raw(ctx.exit_code) using std::os::windows::process::ExitStatusExt::from_raw).

Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/zpm/src/commands/tasks/runner.rs
Line: 2

Comment:
**Unix-only import without platform guard causes Windows compile failure**

`std::os::unix::process::ExitStatusExt` is a Unix-specific trait that provides `ExitStatus::from_raw`. This import — and the use of `ExitStatus::from_raw(ctx.exit_code << 8)` on line 257 — will produce a compile error on Windows.

The import and the `from_raw` call should be gated:

```suggestion
#[cfg(unix)]
use std::os::unix::process::ExitStatusExt;
```

And line 257 should have a Windows alternative (e.g. `ExitStatus::from_raw(ctx.exit_code)` using `std::os::windows::process::ExitStatusExt::from_raw`).

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +112 to +122
for _ in 0..max_attempts {
match tokio_tungstenite::connect_async(&url).await {
Ok(_) => {
let client
= Self::connect_to_url(&url).await?;

return Ok((client, StandaloneDaemonHandle { abort_handle }));
}
Err(_) => tokio::time::sleep(poll_interval).await,
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First successful probe connection is leaked before the real connection is opened

When the daemon becomes ready, the Ok(_) arm discards the successfully established (WsStream, Response) without sending a Close frame, then immediately opens a second connection via Self::connect_to_url. This means every call to connect_standalone silently tears down one WebSocket connection ungracefully (TCP RST), generating a server-side error log entry and leaving the daemon in a briefly inconsistent state.

The probe loop should send a proper close frame before dropping the stream, or the first successful connection should be reused rather than opened twice:

Ok((mut ws, _)) => {
    // Gracefully close the probe connection
    ws.close(None).await.ok();
    let client = Self::connect_to_url(&url).await?;
    return Ok((client, StandaloneDaemonHandle { abort_handle }));
}
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/zpm/src/daemon/client.rs
Line: 112-122

Comment:
**First successful probe connection is leaked before the real connection is opened**

When the daemon becomes ready, the `Ok(_)` arm discards the successfully established `(WsStream, Response)` without sending a `Close` frame, then immediately opens a second connection via `Self::connect_to_url`. This means every call to `connect_standalone` silently tears down one WebSocket connection ungracefully (TCP RST), generating a server-side error log entry and leaving the daemon in a briefly inconsistent state.

The probe loop should send a proper close frame before dropping the stream, or the first successful connection should be reused rather than opened twice:

```rust
Ok((mut ws, _)) => {
    // Gracefully close the probe connection
    ws.close(None).await.ok();
    let client = Self::connect_to_url(&url).await?;
    return Ok((client, StandaloneDaemonHandle { abort_handle }));
}
```

How can I resolve this? If you propose a fix, please make it concise.

@cursor
Copy link

cursor bot commented Mar 13, 2026

You have used all of your free Bugbot PR reviews.

To receive reviews on all of your PRs, visit the Cursor dashboard to activate Pro and start your 14-day free trial.

Comment on lines +306 to +316
CoordinatorCommand::TaskFailed { task_id, error } => {
state.broadcast(DaemonNotification::TaskFailed {
task_id: task_id.clone(),
error,
});

if let Some(ctx_task_id) = state.parse_contextual_task_id_simple(&task_id) {
handle_task_failure(&ctx_task_id, 1, state);
}

state.mark_task_closed(task_id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CoordinatorCommand::TaskFailed emits double notification for the same task

When a process-level failure is reported (e.g., the executor couldn't spawn the child), the handler first broadcasts DaemonNotification::TaskFailed, then immediately calls handle_task_failure which broadcasts a second DaemonNotification::TaskCompleted { exit_code: 1 } for the same task ID. Every active subscriber receives both notifications.

For non-target dependency tasks this causes observable problems:

  1. runner.rs calls handler.on_task_failed(...) → returns None (not a target), loop continues.
  2. DaemonNotification::TaskCompleted arrives for the same task → handler.on_task_completed(...) is invoked, causing SilentDependenciesHandler and BufferedHandler to call get_task_output and print "Process started" / output lines for a task that never actually started.

For target tasks the second notification is harmlessly swallowed (the runner has already returned), but the design is fragile.

The fix is to not call handle_task_failure (and therefore not broadcast TaskCompleted) from the TaskFailed branch. Process-level failures should only emit TaskFailed; the cascade of failing dependents can be driven by the existing dependency resolution already triggered through the failure:

CoordinatorCommand::TaskFailed { task_id, error } => {
    state.broadcast(DaemonNotification::TaskFailed {
        task_id: task_id.clone(),
        error,
    });

    if let Some(ctx_task_id) = state.parse_contextual_task_id_simple(&task_id) {
        // Mark as failed for dependency tracking, but do NOT re-broadcast TaskCompleted
        state.mark_failed(&ctx_task_id);
    }

    state.mark_task_closed(task_id);
}

Dependent tasks that were waiting on this task will be caught by the next find_tasks_to_fail pass inside process_ready_tasks.

Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/zpm/src/daemon/coordinator.rs
Line: 306-316

Comment:
**`CoordinatorCommand::TaskFailed` emits double notification for the same task**

When a process-level failure is reported (e.g., the executor couldn't spawn the child), the handler first broadcasts `DaemonNotification::TaskFailed`, then immediately calls `handle_task_failure` which broadcasts a second `DaemonNotification::TaskCompleted { exit_code: 1 }` for the same task ID. Every active subscriber receives both notifications.

For **non-target dependency** tasks this causes observable problems:
1. `runner.rs` calls `handler.on_task_failed(...)` → returns `None` (not a target), loop continues.
2. `DaemonNotification::TaskCompleted` arrives for the same task → `handler.on_task_completed(...)` is invoked, causing `SilentDependenciesHandler` and `BufferedHandler` to call `get_task_output` and print "Process started" / output lines for a task that never actually started.

For **target** tasks the second notification is harmlessly swallowed (the runner has already returned), but the design is fragile.

The fix is to not call `handle_task_failure` (and therefore not broadcast `TaskCompleted`) from the `TaskFailed` branch. Process-level failures should only emit `TaskFailed`; the cascade of failing dependents can be driven by the existing dependency resolution already triggered through the failure:

```rust
CoordinatorCommand::TaskFailed { task_id, error } => {
    state.broadcast(DaemonNotification::TaskFailed {
        task_id: task_id.clone(),
        error,
    });

    if let Some(ctx_task_id) = state.parse_contextual_task_id_simple(&task_id) {
        // Mark as failed for dependency tracking, but do NOT re-broadcast TaskCompleted
        state.mark_failed(&ctx_task_id);
    }

    state.mark_task_closed(task_id);
}
```
Dependent tasks that were waiting on this task will be caught by the next `find_tasks_to_fail` pass inside `process_ready_tasks`.

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +754 to +762
async fn watch_project_root(project_root: Path, initial_inode: u64) {
loop {
tokio::time::sleep(Duration::from_secs(5)).await;

let current_inode = project_root.fs_metadata().map(|m| m.ino()).ok();

if current_inode != Some(initial_inode) {
std::process::exit(0);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

watch_project_root orphans running task processes on hard exit

When the project root's inode changes, std::process::exit(0) is called directly. This bypasses the graceful_shutdown function used by all signal handlers, which sends CoordinatorCommand::Shutdown, collects all registered PIDs, sends SIGTERM to each process group, and waits for them to exit. Calling std::process::exit directly leaves every running task child as an orphan process.

graceful_shutdown requires a CommandSender; pass the sender in so this path can use the same shutdown procedure:

async fn watch_project_root(project_root: Path, initial_inode: u64, command_tx: CommandSender) {
    loop {
        tokio::time::sleep(Duration::from_secs(5)).await;

        let current_inode = project_root.fs_metadata().map(|m| m.ino()).ok();

        if current_inode != Some(initial_inode) {
            graceful_shutdown(command_tx).await;
            return;
        }
    }
}
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/zpm/src/daemon/coordinator.rs
Line: 754-762

Comment:
**`watch_project_root` orphans running task processes on hard exit**

When the project root's inode changes, `std::process::exit(0)` is called directly. This bypasses the `graceful_shutdown` function used by all signal handlers, which sends `CoordinatorCommand::Shutdown`, collects all registered PIDs, sends SIGTERM to each process group, and waits for them to exit. Calling `std::process::exit` directly leaves every running task child as an orphan process.

`graceful_shutdown` requires a `CommandSender`; pass the sender in so this path can use the same shutdown procedure:

```rust
async fn watch_project_root(project_root: Path, initial_inode: u64, command_tx: CommandSender) {
    loop {
        tokio::time::sleep(Duration::from_secs(5)).await;

        let current_inode = project_root.fs_metadata().map(|m| m.ino()).ok();

        if current_inode != Some(initial_inode) {
            graceful_shutdown(command_tx).await;
            return;
        }
    }
}
```

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +84 to +111
) {
if !is_target {
if let Some((_, ref progress_state)) = self.progress_handle {
progress_state.remove_task(&format_task_id(task_id));
}

if exit_code != 0 {
self.stop_progress();

let lines
= ctx.client.get_task_output(task_id).await.ok();

let mut stdout
= std::io::stdout().lock();

writeln!(stdout, "[{}]: Process started", format_task_id(task_id)).ok();

if let Some(lines) = lines {
for output_line in lines {
writeln!(stdout, "[{}]: {}", format_task_id(task_id), output_line.line).ok();
}
}

writeln!(stdout, "[{}]: Process exited (exit code {})", format_task_id(task_id), exit_code).ok();
}
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Process started" printed for dependency tasks that never ran

There are two distinct paths that reach this on_task_completed branch with a non-zero exit_code for a dependency task:

  1. Process-level failure + double notification (see the companion comment in coordinator.rs): a task that failed to start emits both TaskFailed and TaskCompleted. The on_task_failed handler already printed an error; on_task_completed then fires again and prints "Process started" a second time for the same task.

  2. Cascade failure via process_ready_tasks: when a dependency's own prerequisite fails, find_tasks_to_fail marks the dependency as failed and broadcasts only TaskCompleted { exit_code: 1 } — the task never executed at all, so "Process started" is factually wrong.

In both cases printing "Process started" is misleading. For case 2 you should distinguish "task was skipped because a prerequisite failed" from "task ran and exited with an error":

async fn on_task_completed(&mut self, ctx: &mut TaskRunContext, task_id: &str, exit_code: i32, is_target: bool) {
    if !is_target {
        if let Some((_, ref progress_state)) = self.progress_handle {
            progress_state.remove_task(&format_task_id(task_id));
        }

        if exit_code != 0 {
            self.stop_progress();
            // Only show output if there is any; a cascade-failed task has no output to show
            let lines = ctx.client.get_task_output(task_id).await.ok();
            if lines.as_ref().map_or(false, |l| !l.is_empty()) {
                let mut stdout = std::io::stdout().lock();
                for output_line in lines.unwrap() {
                    writeln!(stdout, "[{}]: {}", format_task_id(task_id), output_line.line).ok();
                }
                writeln!(stdout, "[{}]: Process exited (exit code {})", format_task_id(task_id), exit_code).ok();
            }
        }
    }
}
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/zpm/src/commands/tasks/run_silent_dependencies.rs
Line: 84-111

Comment:
**"Process started" printed for dependency tasks that never ran**

There are two distinct paths that reach this `on_task_completed` branch with a non-zero `exit_code` for a dependency task:

1. **Process-level failure + double notification** (see the companion comment in `coordinator.rs`): a task that failed to *start* emits both `TaskFailed` and `TaskCompleted`. The `on_task_failed` handler already printed an error; `on_task_completed` then fires again and prints "Process started" a second time for the same task.

2. **Cascade failure via `process_ready_tasks`**: when a dependency's own prerequisite fails, `find_tasks_to_fail` marks the dependency as failed and broadcasts only `TaskCompleted { exit_code: 1 }` — the task never executed at all, so "Process started" is factually wrong.

In both cases printing "Process started" is misleading. For case 2 you should distinguish "task was skipped because a prerequisite failed" from "task ran and exited with an error":

```rust
async fn on_task_completed(&mut self, ctx: &mut TaskRunContext, task_id: &str, exit_code: i32, is_target: bool) {
    if !is_target {
        if let Some((_, ref progress_state)) = self.progress_handle {
            progress_state.remove_task(&format_task_id(task_id));
        }

        if exit_code != 0 {
            self.stop_progress();
            // Only show output if there is any; a cascade-failed task has no output to show
            let lines = ctx.client.get_task_output(task_id).await.ok();
            if lines.as_ref().map_or(false, |l| !l.is_empty()) {
                let mut stdout = std::io::stdout().lock();
                for output_line in lines.unwrap() {
                    writeln!(stdout, "[{}]: {}", format_task_id(task_id), output_line.line).ok();
                }
                writeln!(stdout, "[{}]: Process exited (exit code {})", format_task_id(task_id), exit_code).ok();
            }
        }
    }
}
```

How can I resolve this? If you propose a fix, please make it concise.

@cursor
Copy link

cursor bot commented Mar 13, 2026

You have used all of your free Bugbot PR reviews.

To receive reviews on all of your PRs, visit the Cursor dashboard to activate Pro and start your 14-day free trial.

Comment on lines +484 to +549
fn handle_task_failure(
task_id: &ContextualTaskId,
exit_code: i32,
state: &mut CoordinatorState,
) {
state.mark_failed(task_id);

let task_id_str = format_contextual_task_id(task_id);
state.broadcast(DaemonNotification::TaskCompleted {
task_id: task_id_str,
exit_code,
});

// Propagate failure to parents that are waiting for subtasks
let parents = state.find_parents(task_id);
for parent in parents {
if state.get_waiting_exit_code(&parent).is_some() {
state.mark_failed(&parent);

let parent_id_str = format_contextual_task_id(&parent);
state.broadcast(DaemonNotification::TaskCompleted {
task_id: parent_id_str,
exit_code,
});
}
}
}

fn handle_task_success(
task_id: &ContextualTaskId,
exit_code: i32,
state: &mut CoordinatorState,
) {
if state.try_complete_task(task_id) {
let task_id_str = format_contextual_task_id(task_id);
state.broadcast(DaemonNotification::TaskCompleted {
task_id: task_id_str,
exit_code,
});

// Try to complete parents that are waiting for subtasks
let parents = state.find_parents(task_id);
for parent in parents {
if let Some(parent_exit_code) = state.get_waiting_exit_code(&parent) {
if state.try_complete_task(&parent) {
let parent_id_str = format_contextual_task_id(&parent);
state.broadcast(DaemonNotification::TaskCompleted {
task_id: parent_id_str,
exit_code: parent_exit_code,
});
}
}
}
} else {
// Check if any subtask has already failed
if state.has_failed_subtask(task_id) {
state.mark_failed(task_id);

let task_id_str = format_contextual_task_id(task_id);
state.broadcast(DaemonNotification::TaskCompleted {
task_id: task_id_str,
exit_code: 1,
});
}
// Otherwise task stays in WaitingForSubtasks state until all subtasks complete
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mark_task_closed not called for normal completions — output buffer grows without bound

CoordinatorState::mark_task_closed is only called from the CoordinatorCommand::TaskFailed handler (line 317), which covers only the narrow case where the executor itself fails to spawn the process. Both handle_task_failure and handle_task_success — which handle the far more common paths of process exit with non-zero or zero exit codes — never call mark_task_closed.

As a result, the output_buffer entries for every normally-completed task accumulate in the HashMap forever. The max_closed_tasks eviction mechanism added to address the previous memory concern is effectively dead code for the common case, since the closed_tasks queue is never populated for regular task completions.

For a long-running daemon that processes many short-lived builds or test runs, this will cause steady resident-memory growth over the daemon's lifetime.

mark_task_closed should be called at the end of handle_task_failure and handle_task_success so the eviction queue is populated for all terminal task states:

fn handle_task_failure(task_id: &ContextualTaskId, exit_code: i32, state: &mut CoordinatorState) {
    // ... existing logic ...
    let task_id_str = format_contextual_task_id(task_id);
    state.broadcast(DaemonNotification::TaskCompleted { task_id: task_id_str.clone(), exit_code });
    state.mark_task_closed(task_id_str); // <-- add this
    // ...
}

fn handle_task_success(task_id: &ContextualTaskId, exit_code: i32, state: &mut CoordinatorState) {
    // ... existing logic ...
    if state.try_complete_task(task_id) {
        let task_id_str = format_contextual_task_id(task_id);
        state.broadcast(DaemonNotification::TaskCompleted { task_id: task_id_str.clone(), exit_code });
        state.mark_task_closed(task_id_str); // <-- add this
    }
    // ...
}
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/zpm/src/daemon/coordinator.rs
Line: 484-549

Comment:
**`mark_task_closed` not called for normal completions — output buffer grows without bound**

`CoordinatorState::mark_task_closed` is only called from the `CoordinatorCommand::TaskFailed` handler (line 317), which covers only the narrow case where the executor itself fails to spawn the process. Both `handle_task_failure` and `handle_task_success` — which handle the far more common paths of process exit with non-zero or zero exit codes — never call `mark_task_closed`.

As a result, the `output_buffer` entries for every normally-completed task accumulate in the `HashMap` forever. The `max_closed_tasks` eviction mechanism added to address the previous memory concern is effectively dead code for the common case, since the `closed_tasks` queue is never populated for regular task completions.

For a long-running daemon that processes many short-lived builds or test runs, this will cause steady resident-memory growth over the daemon's lifetime.

`mark_task_closed` should be called at the end of `handle_task_failure` and `handle_task_success` so the eviction queue is populated for all terminal task states:

```rust
fn handle_task_failure(task_id: &ContextualTaskId, exit_code: i32, state: &mut CoordinatorState) {
    // ... existing logic ...
    let task_id_str = format_contextual_task_id(task_id);
    state.broadcast(DaemonNotification::TaskCompleted { task_id: task_id_str.clone(), exit_code });
    state.mark_task_closed(task_id_str); // <-- add this
    // ...
}

fn handle_task_success(task_id: &ContextualTaskId, exit_code: i32, state: &mut CoordinatorState) {
    // ... existing logic ...
    if state.try_complete_task(task_id) {
        let task_id_str = format_contextual_task_id(task_id);
        state.broadcast(DaemonNotification::TaskCompleted { task_id: task_id_str.clone(), exit_code });
        state.mark_task_closed(task_id_str); // <-- add this
    }
    // ...
}
```

How can I resolve this? If you propose a fix, please make it concise.

taskfile Outdated
doc:
cd documentation && yarn astro dev

@long-livewd
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in attribute: @long-livewd instead of @long-lived

The long task has a misspelled attribute. The attribute check in the codebase looks for attr.name == "long-lived" (e.g. coordinator.rs check_if_long_lived and coordinator_state.rs prepare_new_tasks), so this task will silently be treated as a regular short-lived task rather than a long-lived one. The warm-up period won't be scheduled, dependents won't wait for it to warm up, and it won't appear in tasks list.

Suggested change
@long-livewd
@long-lived
Prompt To Fix With AI
This is a comment left during a code review.
Path: taskfile
Line: 27

Comment:
**Typo in attribute: `@long-livewd` instead of `@long-lived`**

The `long` task has a misspelled attribute. The attribute check in the codebase looks for `attr.name == "long-lived"` (e.g. `coordinator.rs` `check_if_long_lived` and `coordinator_state.rs` `prepare_new_tasks`), so this task will silently be treated as a regular short-lived task rather than a long-lived one. The warm-up period won't be scheduled, dependents won't wait for it to warm up, and it won't appear in `tasks list`.

```suggestion
@long-lived
```

How can I resolve this? If you propose a fix, please make it concise.

@cursor
Copy link

cursor bot commented Mar 13, 2026

You have used all of your free Bugbot PR reviews.

To receive reviews on all of your PRs, visit the Cursor dashboard to activate Pro and start your 14-day free trial.

Comment on lines +94 to +99
if let Some(sub_id) = subscription_id {
let _ = command_tx.send(CoordinatorCommand::AddTasksToSubscription {
subscription_id: sub_id,
target_task_ids: result.task_ids.clone(),
dependency_task_ids: result.dependency_ids.clone(),
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TaskStarted/TaskCompleted can be missed before subscription filter is populated

AddTasksToSubscription is sent only after the PushTasks response is received by the client and another round-trip completes. In the coordinator loop, process_ready_tasks runs at the top of every iteration — so after PushTasks is processed, the very next loop tick may call executor_pool.spawn(…), which synchronously sends CoordinatorCommand::TaskStarted to the command channel. If that TaskStarted command is processed before AddTasksToSubscription arrives, the subscription filter is still empty and the broadcast is silently dropped.

For a task with no unmet prerequisites (e.g., the very first task in a fresh daemon), the sequence is:

  1. Coordinator processes PushTasks → sends response_tx
  2. Coordinator loops → process_ready_tasksspawn(task)TaskStarted command enqueued
  3. Coordinator processes TaskStartedbroadcast(TaskStarted) → subscription filter is empty → notification lost
  4. Coordinator processes AddTasksToSubscription → filter now has the task IDs

This means on_task_started may never fire for fast-starting tasks, which in --silent-dependencies mode prevents the progress bar from registering the dependency, and in verbose modes omits "Process started" lines.

For the rare case of an extremely fast task, TaskCompleted could also be missed, causing the runner loop to hang indefinitely waiting for a completion that will never arrive.

Fix: Send AddTasksToSubscription before the TasksEnqueued response is returned to the client, so the filter is populated atomically with the task push:

// In handle_push_tasks, before returning the response:
if let Some(sub_id) = subscription_id {
    let _ = command_tx.send(CoordinatorCommand::AddTasksToSubscription {
        subscription_id: sub_id,
        target_task_ids: result.task_ids.clone(),
        dependency_task_ids: result.dependency_ids.clone(),
    });
}

let _ = response_tx.send(result);

Alternatively, merge PushTasks and AddTasksToSubscription into a single atomic coordinator command.

Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/zpm/src/daemon/handlers.rs
Line: 94-99

Comment:
**`TaskStarted`/`TaskCompleted` can be missed before subscription filter is populated**

`AddTasksToSubscription` is sent only *after* the `PushTasks` response is received by the client and another round-trip completes. In the coordinator loop, `process_ready_tasks` runs at the top of every iteration — so after `PushTasks` is processed, the very next loop tick may call `executor_pool.spawn(…)`, which synchronously sends `CoordinatorCommand::TaskStarted` to the command channel. If that `TaskStarted` command is processed *before* `AddTasksToSubscription` arrives, the subscription filter is still empty and the broadcast is silently dropped.

For a task with no unmet prerequisites (e.g., the very first task in a fresh daemon), the sequence is:
1. Coordinator processes `PushTasks` → sends `response_tx`
2. Coordinator loops → `process_ready_tasks``spawn(task)``TaskStarted` command enqueued
3. Coordinator processes `TaskStarted``broadcast(TaskStarted)` → subscription filter is **empty** → notification lost
4. Coordinator processes `AddTasksToSubscription` → filter now has the task IDs

This means `on_task_started` may never fire for fast-starting tasks, which in `--silent-dependencies` mode prevents the progress bar from registering the dependency, and in verbose modes omits "Process started" lines.

For the rare case of an extremely fast task, `TaskCompleted` could also be missed, causing the runner loop to hang indefinitely waiting for a completion that will never arrive.

**Fix:** Send `AddTasksToSubscription` *before* the `TasksEnqueued` response is returned to the client, so the filter is populated atomically with the task push:

```rust
// In handle_push_tasks, before returning the response:
if let Some(sub_id) = subscription_id {
    let _ = command_tx.send(CoordinatorCommand::AddTasksToSubscription {
        subscription_id: sub_id,
        target_task_ids: result.task_ids.clone(),
        dependency_task_ids: result.dependency_ids.clone(),
    });
}

let _ = response_tx.send(result);
```

Alternatively, merge `PushTasks` and `AddTasksToSubscription` into a single atomic coordinator command.

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +422 to +431
} else {
// No script - complete immediately
state.mark_completed(&task_id);

let task_id_str = format_contextual_task_id(&task_id);
state.broadcast(DaemonNotification::TaskCompleted {
task_id: task_id_str,
exit_code: 0,
});
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mark_task_closed not called for no-script tasks

Tasks without a script are completed in-place here but mark_task_closed is never called. This means the task ID is never pushed into the closed_tasks VecDeque, so the eviction logic in mark_task_closed (while closed_tasks.len() > max_closed_tasks { … }) is never triggered on their behalf. For long-running daemons that process many no-script passthrough tasks, the tasks and output_buffer HashMaps accumulate entries that are never driven through the eviction window.

Suggested change
} else {
// No script - complete immediately
state.mark_completed(&task_id);
let task_id_str = format_contextual_task_id(&task_id);
state.broadcast(DaemonNotification::TaskCompleted {
task_id: task_id_str,
exit_code: 0,
});
}
// No script - complete immediately
state.mark_completed(&task_id);
let task_id_str = format_contextual_task_id(&task_id);
state.broadcast(DaemonNotification::TaskCompleted {
task_id: task_id_str.clone(),
exit_code: 0,
});
state.mark_task_closed(task_id_str);
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/zpm/src/daemon/coordinator.rs
Line: 422-431

Comment:
**`mark_task_closed` not called for no-script tasks**

Tasks without a script are completed in-place here but `mark_task_closed` is never called. This means the task ID is never pushed into the `closed_tasks` `VecDeque`, so the eviction logic in `mark_task_closed` (`while closed_tasks.len() > max_closed_tasks { … }`) is never triggered on their behalf. For long-running daemons that process many no-script passthrough tasks, the `tasks` and `output_buffer` HashMaps accumulate entries that are never driven through the eviction window.

```suggestion
            // No script - complete immediately
            state.mark_completed(&task_id);

            let task_id_str = format_contextual_task_id(&task_id);
            state.broadcast(DaemonNotification::TaskCompleted {
                task_id: task_id_str.clone(),
                exit_code: 0,
            });
            state.mark_task_closed(task_id_str);
```

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +22 to +25
// SAFETY: We're setting SIG_IGN which is always safe
let prev_handler = unsafe { libc::signal(libc::SIGINT, libc::SIG_IGN) };
Self { prev_handler }
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SIG_ERR return from libc::signal() not checked

POSIX specifies that signal() returns SIG_ERR on failure. If that happens, self.prev_handler stores SIG_ERR, and on drop() the code calls libc::signal(SIGINT, SIG_ERR), which is explicitly undefined behaviour on some platforms and will silently fail on others, leaving SIGINT in an unknown disposition for the rest of the process lifetime.

While signal(SIGINT, SIG_IGN) almost never fails in practice, the safety comment says "setting SIG_IGN is always safe" — the failure case is worth handling explicitly:

Suggested change
// SAFETY: We're setting SIG_IGN which is always safe
let prev_handler = unsafe { libc::signal(libc::SIGINT, libc::SIG_IGN) };
Self { prev_handler }
}
// SAFETY: We're setting SIG_IGN which is always safe
let prev_handler = unsafe { libc::signal(libc::SIGINT, libc::SIG_IGN) };
// If signal() returns SIG_ERR, store SIG_DFL as the safe fallback so that
// Drop restores a known-valid handler rather than attempting to set SIG_ERR.
let prev_handler = if prev_handler == libc::SIG_ERR {
libc::SIG_DFL
} else {
prev_handler
};
Self { prev_handler }
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/zpm-utils/src/process.rs
Line: 22-25

Comment:
**`SIG_ERR` return from `libc::signal()` not checked**

POSIX specifies that `signal()` returns `SIG_ERR` on failure. If that happens, `self.prev_handler` stores `SIG_ERR`, and on `drop()` the code calls `libc::signal(SIGINT, SIG_ERR)`, which is explicitly undefined behaviour on some platforms and will silently fail on others, leaving SIGINT in an unknown disposition for the rest of the process lifetime.

While `signal(SIGINT, SIG_IGN)` almost never fails in practice, the safety comment says "setting SIG_IGN is always safe" — the failure case is worth handling explicitly:

```suggestion
        // SAFETY: We're setting SIG_IGN which is always safe
        let prev_handler = unsafe { libc::signal(libc::SIGINT, libc::SIG_IGN) };
        // If signal() returns SIG_ERR, store SIG_DFL as the safe fallback so that
        // Drop restores a known-valid handler rather than attempting to set SIG_ERR.
        let prev_handler = if prev_handler == libc::SIG_ERR {
            libc::SIG_DFL
        } else {
            prev_handler
        };
        Self { prev_handler }
```

How can I resolve this? If you propose a fix, please make it concise.

@cursor
Copy link

cursor bot commented Mar 13, 2026

You have used all of your free Bugbot PR reviews.

To receive reviews on all of your PRs, visit the Cursor dashboard to activate Pro and start your 14-day free trial.

Comment on lines +78 to +83
stream: Stream,
},

/// Task failed with an error.
TaskFailed {
task_id: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CoordinatorCommand::TaskFailed is defined but never sent

CoordinatorCommand::TaskFailed is declared here (line 80) and has a handler in coordinator.rs that broadcasts DaemonNotification::TaskFailed, but there is no call to command_tx.send(CoordinatorCommand::TaskFailed { … }) anywhere in the new executor code (pool.rs, runner.rs). Process-level failures (spawn errors, non-zero exit codes) all route through handle_task_failure, which broadcasts DaemonNotification::TaskCompleted { exit_code: non_zero } instead.

This has two consequences:

  1. DaemonNotification::TaskFailed is never broadcast, meaning the on_task_failed callbacks in every handler (run_buffered.rs, run_interlaced.rs, run_silent_dependencies.rs) are unreachable dead code.

  2. For the run_silent_dependencies mode, a failing target task reaches on_task_completed(is_target=true, exit_code != 0), which skips the if !is_target body and does nothing — the user sees no failure message at all before the process exits with a non-zero status.

If the TaskFailed path is intentionally removed in favour of always reporting via TaskCompleted, the handler in coordinator.rs and all three on_task_failed implementations should be deleted to avoid confusion. If it should still be used (e.g. for spawn errors), CoordinatorCommand::TaskFailed needs to be sent from the executor when runner.run() returns Err.

Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/zpm/src/daemon/coordinator_commands.rs
Line: 78-83

Comment:
**`CoordinatorCommand::TaskFailed` is defined but never sent**

`CoordinatorCommand::TaskFailed` is declared here (line 80) and has a handler in `coordinator.rs` that broadcasts `DaemonNotification::TaskFailed`, but there is no call to `command_tx.send(CoordinatorCommand::TaskFailed { … })` anywhere in the new executor code (`pool.rs`, `runner.rs`). Process-level failures (spawn errors, non-zero exit codes) all route through `handle_task_failure`, which broadcasts `DaemonNotification::TaskCompleted { exit_code: non_zero }` instead.

This has two consequences:

1. **`DaemonNotification::TaskFailed` is never broadcast**, meaning the `on_task_failed` callbacks in every handler (`run_buffered.rs`, `run_interlaced.rs`, `run_silent_dependencies.rs`) are unreachable dead code.

2. For the **`run_silent_dependencies` mode**, a failing _target_ task reaches `on_task_completed(is_target=true, exit_code != 0)`, which skips the `if !is_target` body and does nothing — the user sees no failure message at all before the process exits with a non-zero status.

If the `TaskFailed` path is intentionally removed in favour of always reporting via `TaskCompleted`, the handler in coordinator.rs and all three `on_task_failed` implementations should be deleted to avoid confusion. If it should still be used (e.g. for spawn errors), `CoordinatorCommand::TaskFailed` needs to be sent from the executor when `runner.run()` returns `Err`.

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +44 to +55
if self.check_daemon_ready(existing.port).await.is_ok() {
println!("ws://127.0.0.1:{}", existing.port);
return Ok(());
}
}

daemons::unregister_daemon(&detected_root)?;
}

if let Some(link) = get_link(&detected_root)? {
if let LinkTarget::Local { bin_path } = link.link_target {
return self.start_with_binary(&detected_root, &bin_path, "local").await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alive daemon orphaned when port readiness check fails

When a stale registry entry is found, the code checks two things in sequence: is_process_alive(existing.pid) (true) and check_daemon_ready(existing.port) (fails). On check failure, unregister_daemon is called and execution falls through to spawn a brand-new daemon — but the original daemon process is still running. It will continue to accept connections on its port, hold file locks, and run any long-lived tasks indefinitely, with no way for the user (or the registry) to track or stop it.

This can happen legitimately: a momentary network hiccup, the daemon catching SIGSTOP, or a transient port-bind race during startup.

A safer approach: if the process is alive but not ready within the polling window, send SIGTERM to existing.pid before unregistering and spawning a replacement:

if daemons::is_process_alive(existing.pid) {
    if self.check_daemon_ready(existing.port).await.is_ok() {
        println!("ws://127.0.0.1:{}", existing.port);
        return Ok(());
    }
    // Process alive but not answering — terminate it before replacing
    daemons::kill_process(existing.pid);
}
daemons::unregister_daemon(&detected_root)?;
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/zpm-switch/src/commands/switch/daemon_open.rs
Line: 44-55

Comment:
**Alive daemon orphaned when port readiness check fails**

When a stale registry entry is found, the code checks two things in sequence: `is_process_alive(existing.pid)` (true) and `check_daemon_ready(existing.port)` (fails). On check failure, `unregister_daemon` is called and execution falls through to spawn a brand-new daemon — but the original daemon process is **still running**. It will continue to accept connections on its port, hold file locks, and run any long-lived tasks indefinitely, with no way for the user (or the registry) to track or stop it.

This can happen legitimately: a momentary network hiccup, the daemon catching SIGSTOP, or a transient port-bind race during startup.

A safer approach: if the process is alive but not ready within the polling window, send SIGTERM to `existing.pid` before unregistering and spawning a replacement:

```rust
if daemons::is_process_alive(existing.pid) {
    if self.check_daemon_ready(existing.port).await.is_ok() {
        println!("ws://127.0.0.1:{}", existing.port);
        return Ok(());
    }
    // Process alive but not answering — terminate it before replacing
    daemons::kill_process(existing.pid);
}
daemons::unregister_daemon(&detected_root)?;
```

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +800 to +815
&mut self,
subscription_id: SubscriptionId,
target_task_ids: Vec<String>,
dependency_task_ids: Vec<String>,
) {
if let Some(sub) = self.subscriptions.get_mut(&subscription_id) {
for task_id in target_task_ids {
sub.filter.add_target_task(task_id);
}
for task_id in dependency_task_ids {
sub.filter.add_dependency_task(task_id);
}
}
}

pub fn remove_subscription(&mut self, subscription_id: SubscriptionId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tasks, prepared, and subtasks maps grow without bound

mark_task_closed (called at the end of every completed, cancelled, or failed task) only evicts entries from output_buffer via the max_closed_tasks window. The three maps that hold per-task metadata are never trimmed:

  • tasks: HashMap<ContextualTaskId, TaskInfo> — one entry per contextual task execution
  • prepared: BTreeMap<ContextualTaskId, PreparedTask> — contains the script, env, args, and cwd for every task that was ever scheduled
  • subtasks: HashMap<ContextualTaskId, HashSet<ContextualTaskId>> — parent→children relationships

Because each run_task call generates a fresh UUID context (Uuid::new_v4()), every task run creates unique ContextualTaskId values that are never re-used and never cleaned up. For a long-lived daemon that processes thousands of CI builds or test runs, these maps will grow monotonically for the lifetime of the process.

mark_task_closed should also evict the corresponding entries from tasks, prepared, and subtasks (using the same closed_tasks eviction queue already in place for output_buffer):

pub fn mark_task_closed(&mut self, task_id: String) {
    self.closed_tasks.push_back(task_id);

    while self.closed_tasks.len() > self.max_closed_tasks {
        if let Some(oldest) = self.closed_tasks.pop_front() {
            self.output_buffer.remove(&oldest);
            // Also clean task metadata
            if let Ok(ctx_id) = parse_contextual_task_id_str(&oldest) {
                self.tasks.remove(&ctx_id);
                self.prepared.remove(&ctx_id);
                self.subtasks.remove(&ctx_id);
            }
        }
    }
}
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/zpm/src/daemon/coordinator_state.rs
Line: 800-815

Comment:
**`tasks`, `prepared`, and `subtasks` maps grow without bound**

`mark_task_closed` (called at the end of every completed, cancelled, or failed task) only evicts entries from `output_buffer` via the `max_closed_tasks` window. The three maps that hold per-task metadata are **never trimmed**:

- `tasks: HashMap<ContextualTaskId, TaskInfo>` — one entry per contextual task execution
- `prepared: BTreeMap<ContextualTaskId, PreparedTask>` — contains the script, env, args, and cwd for every task that was ever scheduled
- `subtasks: HashMap<ContextualTaskId, HashSet<ContextualTaskId>>` — parent→children relationships

Because each `run_task` call generates a fresh UUID context (`Uuid::new_v4()`), every task run creates unique `ContextualTaskId` values that are never re-used and never cleaned up. For a long-lived daemon that processes thousands of CI builds or test runs, these maps will grow monotonically for the lifetime of the process.

`mark_task_closed` should also evict the corresponding entries from `tasks`, `prepared`, and `subtasks` (using the same `closed_tasks` eviction queue already in place for `output_buffer`):

```rust
pub fn mark_task_closed(&mut self, task_id: String) {
    self.closed_tasks.push_back(task_id);

    while self.closed_tasks.len() > self.max_closed_tasks {
        if let Some(oldest) = self.closed_tasks.pop_front() {
            self.output_buffer.remove(&oldest);
            // Also clean task metadata
            if let Ok(ctx_id) = parse_contextual_task_id_str(&oldest) {
                self.tasks.remove(&ctx_id);
                self.prepared.remove(&ctx_id);
                self.subtasks.remove(&ctx_id);
            }
        }
    }
}
```

How can I resolve this? If you propose a fix, please make it concise.

@cursor
Copy link

cursor bot commented Mar 13, 2026

You have used all of your free Bugbot PR reviews.

To receive reviews on all of your PRs, visit the Cursor dashboard to activate Pro and start your 14-day free trial.

Comment on lines +110 to +130

let lines = ctx.client.get_task_output(task_id).await.ok();

if lines.as_ref().map_or(false, |l| !l.is_empty()) {
let mut stdout = std::io::stdout().lock();

writeln!(stdout, "[{}]: Process started", format_task_id(task_id)).ok();

for output_line in lines.unwrap() {
writeln!(stdout, "[{}]: {}", format_task_id(task_id), output_line.line).ok();
}

writeln!(stdout, "[{}]: Process exited (exit code {})", format_task_id(task_id), exit_code).ok();
}
}
}

async fn on_task_cancelled(
&mut self,
_ctx: &mut TaskRunContext,
task_id: &str,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Target task output printed twice on failure

SilentDependenciesHandler sets output_subscription: SubscriptionScope::TargetOnly, which means the coordinator streams every TaskOutputLine notification for the target task live to on_output_line. Each line is printed immediately (without a prefix) to stdout.

When the same target task exits with a non-zero code, on_task_completed falls into the else if exit_code != 0 branch and calls get_task_output, then re-prints every buffered line again — this time with the [task-name]: prefix plus "Process started" and "Process exited" framing. Every line of output therefore appears twice: once without a prefix (live) and once with a prefix (buffered replay), producing confusing duplicate output.

The non-target case (lines 92–108) is correct because output_subscription: TargetOnly means dependency output is not streamed live; get_task_output is the first and only print. The target case needs different handling.

Suggested change
let lines = ctx.client.get_task_output(task_id).await.ok();
if lines.as_ref().map_or(false, |l| !l.is_empty()) {
let mut stdout = std::io::stdout().lock();
writeln!(stdout, "[{}]: Process started", format_task_id(task_id)).ok();
for output_line in lines.unwrap() {
writeln!(stdout, "[{}]: {}", format_task_id(task_id), output_line.line).ok();
}
writeln!(stdout, "[{}]: Process exited (exit code {})", format_task_id(task_id), exit_code).ok();
}
}
}
async fn on_task_cancelled(
&mut self,
_ctx: &mut TaskRunContext,
task_id: &str,
} else if exit_code != 0 {
// Target task failed.
// Output was already printed live via on_output_line (TargetOnly subscription),
// so do NOT replay it from the buffer — that would duplicate every line.
self.stop_progress();
}

If you also need the "Process started / Process exited" framing for failing targets, consider switching the target output subscription to SubscriptionScope::None and then printing entirely from the buffer here, mirroring what the non-target path does.

Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/zpm/src/commands/tasks/run_silent_dependencies.rs
Line: 110-130

Comment:
**Target task output printed twice on failure**

`SilentDependenciesHandler` sets `output_subscription: SubscriptionScope::TargetOnly`, which means the coordinator streams every `TaskOutputLine` notification for the target task live to `on_output_line`. Each line is printed immediately (without a prefix) to stdout.

When the same target task exits with a non-zero code, `on_task_completed` falls into the `else if exit_code != 0` branch and calls `get_task_output`, then re-prints every buffered line again — this time with the `[task-name]:` prefix plus "Process started" and "Process exited" framing. Every line of output therefore appears **twice**: once without a prefix (live) and once with a prefix (buffered replay), producing confusing duplicate output.

The non-target case (lines 92–108) is correct because `output_subscription: TargetOnly` means dependency output is **not** streamed live; `get_task_output` is the first and only print. The target case needs different handling.

```suggestion
    } else if exit_code != 0 {
        // Target task failed.
        // Output was already printed live via on_output_line (TargetOnly subscription),
        // so do NOT replay it from the buffer — that would duplicate every line.
        self.stop_progress();
    }
```
If you also need the "Process started / Process exited" framing for failing targets, consider switching the target output subscription to `SubscriptionScope::None` and then printing entirely from the buffer here, mirroring what the non-target path does.

How can I resolve this? If you propose a fix, please make it concise.

if daemons::kill_daemon_gracefully(daemon.pid) {
daemons::unregister_daemon(&detected_root)?;
println!(
"{} Stopped daemon for {} (PID: {})",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocking kill_daemon_gracefully inside async executor

kill_daemon_gracefully internally calls std::thread::sleep in two polling loops (up to 60 × 100 ms + 10 × 100 ms = 7 seconds of wall time) and is invoked here from an async fn execute() body running on a Tokio worker thread. Blocking a Tokio worker thread stalls all tasks scheduled on that thread for the full duration.

For daemon_kill this may only be one daemon, but daemon_kill_all calls this function sequentially in a loop for every registered daemon — each one potentially blocking for another 7 seconds.

The same pattern occurs in daemon_kill_all.rs (line 40) and daemon_open.rs (line 50).

Use tokio::task::spawn_blocking to run the blocking work on a dedicated thread pool:

let success = tokio::task::spawn_blocking(move || {
    daemons::kill_daemon_gracefully(daemon.pid)
})
.await
.unwrap_or(false);

if success {
    ...
}

Or, better, rewrite kill_daemon_gracefully to be async using tokio::time::sleep and tokio::process APIs so it can be awaited directly without blocking any thread.

Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/zpm-switch/src/commands/switch/daemon_kill.rs
Line: 50

Comment:
**Blocking `kill_daemon_gracefully` inside async executor**

`kill_daemon_gracefully` internally calls `std::thread::sleep` in two polling loops (up to 60 × 100 ms + 10 × 100 ms = **7 seconds** of wall time) and is invoked here from an `async fn execute()` body running on a Tokio worker thread. Blocking a Tokio worker thread stalls all tasks scheduled on that thread for the full duration.

For `daemon_kill` this may only be one daemon, but `daemon_kill_all` calls this function **sequentially in a loop** for every registered daemon — each one potentially blocking for another 7 seconds.

The same pattern occurs in `daemon_kill_all.rs` (line 40) and `daemon_open.rs` (line 50).

Use `tokio::task::spawn_blocking` to run the blocking work on a dedicated thread pool:

```rust
let success = tokio::task::spawn_blocking(move || {
    daemons::kill_daemon_gracefully(daemon.pid)
})
.await
.unwrap_or(false);

if success {
    ...
}
```

Or, better, rewrite `kill_daemon_gracefully` to be async using `tokio::time::sleep` and `tokio::process` APIs so it can be awaited directly without blocking any thread.

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +60 to +62
let _ = std::io::stdout().flush();
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TaskStarted sent before the executor future is polled

executor_pool.spawn(task_id, prepared) immediately enqueues CoordinatorCommand::TaskStarted to the command channel before pushing the future onto FuturesUnordered. The coordinator will broadcast TaskStarted to all subscribers in the very next command-processing iteration. However, the actual process is only spawned when executor_pool.poll_next() drives the future forward — which doesn't happen until the following select! arm fires.

For long-lived tasks this creates a subtle window: TaskStarted is broadcast (and the 500 ms warm-up timer is armed) before runner.run() has even called spawn_script. If the OS is slow to schedule the executor future, or if many tasks are spawned in the same process_ready_tasks call, the warm-up timer could fire before the process is actually alive, sending TaskWarmUpComplete to dependents who then launch before their long-lived prerequisite is truly up.

Consider only sending TaskStarted from inside the executor future, after the process has been successfully spawned (i.e. after env.spawn_script(...) returns Ok) — consistent with how RegisterPid is sent today in runner.rs.

Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/zpm/src/daemon/coordinator.rs
Line: 60-62

Comment:
**`TaskStarted` sent before the executor future is polled**

`executor_pool.spawn(task_id, prepared)` immediately enqueues `CoordinatorCommand::TaskStarted` to the command channel before pushing the future onto `FuturesUnordered`. The coordinator will broadcast `TaskStarted` to all subscribers in the very next command-processing iteration. However, the actual process is only spawned when `executor_pool.poll_next()` drives the future forward — which doesn't happen until the *following* `select!` arm fires.

For **long-lived tasks** this creates a subtle window: `TaskStarted` is broadcast (and the 500 ms warm-up timer is armed) before `runner.run()` has even called `spawn_script`. If the OS is slow to schedule the executor future, or if many tasks are spawned in the same `process_ready_tasks` call, the warm-up timer could fire before the process is actually alive, sending `TaskWarmUpComplete` to dependents who then launch before their long-lived prerequisite is truly up.

Consider only sending `TaskStarted` from inside the executor future, *after* the process has been successfully spawned (i.e. after `env.spawn_script(...)` returns `Ok`) — consistent with how `RegisterPid` is sent today in `runner.rs`.

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +595 to +612
})
.cloned()
.collect();

let mut cancelled_ids = Vec::new();

for task_id in tasks_to_cancel {
self.mark_failed(&task_id);
cancelled_ids.push(format_contextual_task_id(&task_id));
}

cancelled_ids
}

fn parse_contextual_task_id(&self, project: &Project, task_id_str: &str) -> Option<ContextualTaskId> {
let (task_part, context_id) = task_id_str.rsplit_once('@')?;
let (workspace_str, task_name_str) = task_part.split_once(':')?;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cancel_context never calls mark_task_closed — tasks skip the eviction queue

cancel_context marks every non-terminal task in the context as Failed and returns their IDs, but never pushes those IDs into the closed_tasks VecDeque. As a result:

  1. Their entries in output_buffer, tasks, prepared, and subtasks are never eligible for eviction by mark_task_closed's sliding-window logic.
  2. No TaskCancelled/TaskCompleted notification is broadcast before the connection closes, so the state inside the daemon is left inconsistent (tasks appear non-terminal to any future subscriber that might care).

Every Ctrl-C on a non-trivial build cancels several tasks this way. Over many runs on a long-lived daemon this is a steady memory leak.

for task_id in tasks_to_cancel {
    self.mark_failed(&task_id);
    let task_id_str = format_contextual_task_id(&task_id);
    cancelled_ids.push(task_id_str.clone());
    // Ensure eviction window sees this task
    self.mark_task_closed(task_id_str);
}
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/zpm/src/daemon/coordinator_state.rs
Line: 595-612

Comment:
**`cancel_context` never calls `mark_task_closed` — tasks skip the eviction queue**

`cancel_context` marks every non-terminal task in the context as `Failed` and returns their IDs, but never pushes those IDs into the `closed_tasks` `VecDeque`. As a result:

1. Their entries in `output_buffer`, `tasks`, `prepared`, and `subtasks` are never eligible for eviction by `mark_task_closed`'s sliding-window logic.
2. No `TaskCancelled`/`TaskCompleted` notification is broadcast before the connection closes, so the state inside the daemon is left inconsistent (tasks appear non-terminal to any future subscriber that might care).

Every Ctrl-C on a non-trivial build cancels several tasks this way. Over many runs on a long-lived daemon this is a steady memory leak.

```rust
for task_id in tasks_to_cancel {
    self.mark_failed(&task_id);
    let task_id_str = format_contextual_task_id(&task_id);
    cancelled_ids.push(task_id_str.clone());
    // Ensure eviction window sees this task
    self.mark_task_closed(task_id_str);
}
```

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +83 to +89
}
}
}

ready
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

find_tasks_to_fail missing is_script_finished guard — spurious cancellation of in-flight subtask parents

find_ready_tasks skips tasks whose state is is_script_finished() (which includes WaitingForSubtasks), but find_tasks_to_fail only checks is_terminal(). WaitingForSubtasks is not terminal — it means the task's own script has already run to completion but it is still waiting for subtasks it spawned via tasks push.

Concrete failure scenario:

  1. Task A depends on long-lived task B. A's script runs, succeeds, and pushes subtask S. A is now in WaitingForSubtasks.
  2. B subsequently crashes.
  3. find_tasks_to_fail finds A (B is a failed prerequisite, A is not terminal, A is not in running).
  4. A is marked Cancelled and TaskCancelled is broadcast — even though A's script already succeeded.
  5. Subtask S is now an orphan: it was spawned by A but A is cancelled, so when S completes it can never cause A to complete/fail properly.

Add the same guard that find_ready_tasks uses:

if task_state.is_terminal() || task_state.is_script_finished() || running.contains(&ctx_task_id) {
    continue;
}
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/zpm/src/daemon/scheduler/dependencies.rs
Line: 83-89

Comment:
**`find_tasks_to_fail` missing `is_script_finished` guard — spurious cancellation of in-flight subtask parents**

`find_ready_tasks` skips tasks whose state is `is_script_finished()` (which includes `WaitingForSubtasks`), but `find_tasks_to_fail` only checks `is_terminal()`. `WaitingForSubtasks` is *not* terminal — it means the task's own script has already run to completion but it is still waiting for subtasks it spawned via `tasks push`.

**Concrete failure scenario:**
1. Task A depends on long-lived task B. A's script runs, succeeds, and pushes subtask S. A is now in `WaitingForSubtasks`.
2. B subsequently crashes.
3. `find_tasks_to_fail` finds A (B is a failed prerequisite, A is not terminal, A is not in `running`).
4. A is marked `Cancelled` and `TaskCancelled` is broadcast — even though A's script already succeeded.
5. Subtask S is now an orphan: it was spawned by A but A is cancelled, so when S completes it can never cause A to complete/fail properly.

Add the same guard that `find_ready_tasks` uses:

```rust
if task_state.is_terminal() || task_state.is_script_finished() || running.contains(&ctx_task_id) {
    continue;
}
```

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +238 to +266
}
}
}

// ============================================================================
// Notification Polling
// ============================================================================

async fn poll_notifications(
receivers: &mut [mpsc::UnboundedReceiver<DaemonNotification>],
) -> Option<DaemonNotification> {
if receivers.is_empty() {
std::future::pending::<Option<DaemonNotification>>().await
} else {
futures::future::poll_fn(|cx| {
for rx in receivers.iter_mut() {
match rx.poll_recv(cx) {
std::task::Poll::Ready(Some(notif)) => {
return std::task::Poll::Ready(Some(notif));
}
std::task::Poll::Ready(None) => {}
std::task::Poll::Pending => {}
}
}
std::task::Poll::Pending
})
.await
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dead subscription receivers accumulate in the polling vector

When a subscription is removed (SubscriptionGuard is dropped), the coordinator drops the UnboundedSender, but the corresponding UnboundedReceiver remains in notification_receivers. poll_notifications iterates over the entire slice on every call, touching each dead receiver. A connection that issues many PushTasks requests (e.g., a CI runner pushing hundreds of tasks sequentially) will accumulate hundreds of closed receivers that are polled on every notification delivery cycle.

Consider compacting the vector when receivers are known-closed:

async fn poll_notifications(
    receivers: &mut Vec<mpsc::UnboundedReceiver<DaemonNotification>>,
) -> Option<DaemonNotification> {
    if receivers.is_empty() {
        return std::future::pending::<Option<DaemonNotification>>().await;
    }
    futures::future::poll_fn(|cx| {
        let mut i = 0;
        while i < receivers.len() {
            match receivers[i].poll_recv(cx) {
                std::task::Poll::Ready(Some(notif)) => return std::task::Poll::Ready(Some(notif)),
                std::task::Poll::Ready(None) => { receivers.swap_remove(i); } // remove dead receiver
                std::task::Poll::Pending => { i += 1; }
            }
        }
        std::task::Poll::Pending
    }).await
}
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/zpm/src/daemon/server/connection.rs
Line: 238-266

Comment:
**Dead subscription receivers accumulate in the polling vector**

When a subscription is removed (`SubscriptionGuard` is dropped), the coordinator drops the `UnboundedSender`, but the corresponding `UnboundedReceiver` remains in `notification_receivers`. `poll_notifications` iterates over the entire slice on every call, touching each dead receiver. A connection that issues many `PushTasks` requests (e.g., a CI runner pushing hundreds of tasks sequentially) will accumulate hundreds of closed receivers that are polled on every notification delivery cycle.

Consider compacting the vector when receivers are known-closed:

```rust
async fn poll_notifications(
    receivers: &mut Vec<mpsc::UnboundedReceiver<DaemonNotification>>,
) -> Option<DaemonNotification> {
    if receivers.is_empty() {
        return std::future::pending::<Option<DaemonNotification>>().await;
    }
    futures::future::poll_fn(|cx| {
        let mut i = 0;
        while i < receivers.len() {
            match receivers[i].poll_recv(cx) {
                std::task::Poll::Ready(Some(notif)) => return std::task::Poll::Ready(Some(notif)),
                std::task::Poll::Ready(None) => { receivers.swap_remove(i); } // remove dead receiver
                std::task::Poll::Pending => { i += 1; }
            }
        }
        std::task::Poll::Pending
    }).await
}
```

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines 162 to 171

Err(Error::ScriptNotFound(_)) | Err(Error::GlobalScriptNotFound(_)) => {
// Try task files as a fallback before looking for binaries
if task_run::task_exists(&project, &self.name) {
return task_run::run_task(
&project,
&self.name,
&self.args,
0, // verbose_level
true, // silent_dependencies
true, // interlaced
project.config.settings.enable_timers.value,
).await;
let task_run_silent_dependencies
= TaskRunSilentDependencies::new(&self.cli_environment, self.name.clone(), self.args.clone());

return task_run_silent_dependencies.execute().await;
}

// Fall back to binary lookup
execute_binary(true).await
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yarn run <task> fallback now hard-requires Yarn Switch daemon context

TaskRunSilentDependencies::new unconditionally sets standalone: false. When yarn run my-task reaches this fallback path, DaemonClient::connect is called, which requires either YARN_DAEMON_SERVER or YARN_SWITCH_PATH_ENV to be set. If neither is present (e.g., invoking the zpm binary directly in tests or CI without Yarn Switch in PATH), the call fails with:

"This command can only be called within a Yarn Switch context."

Previously the fallback ran the task inline without any daemon requirement. The new behaviour is silent until the error surfaces at runtime. If calling without Yarn Switch is a supported path, either use standalone: true here or add a fallback that degrades gracefully:

let mut task_run_silent_dependencies =
    TaskRunSilentDependencies::new(&self.cli_environment, self.name.clone(), self.args.clone());
// standalone = true if we're not inside a Yarn Switch context
task_run_silent_dependencies.standalone = std::env::var(zpm::daemon::YARN_SWITCH_PATH_ENV).is_err()
    && std::env::var(zpm::daemon::DAEMON_SERVER_ENV).is_err();
return task_run_silent_dependencies.execute().await;
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/zpm/src/commands/run.rs
Line: 162-171

Comment:
**`yarn run <task>` fallback now hard-requires Yarn Switch daemon context**

`TaskRunSilentDependencies::new` unconditionally sets `standalone: false`. When `yarn run my-task` reaches this fallback path, `DaemonClient::connect` is called, which requires either `YARN_DAEMON_SERVER` or `YARN_SWITCH_PATH_ENV` to be set. If neither is present (e.g., invoking the `zpm` binary directly in tests or CI without Yarn Switch in `PATH`), the call fails with:

> "This command can only be called within a Yarn Switch context."

Previously the fallback ran the task inline without any daemon requirement. The new behaviour is silent until the error surfaces at runtime. If calling without Yarn Switch is a supported path, either use `standalone: true` here or add a fallback that degrades gracefully:

```rust
let mut task_run_silent_dependencies =
    TaskRunSilentDependencies::new(&self.cli_environment, self.name.clone(), self.args.clone());
// standalone = true if we're not inside a Yarn Switch context
task_run_silent_dependencies.standalone = std::env::var(zpm::daemon::YARN_SWITCH_PATH_ENV).is_err()
    && std::env::var(zpm::daemon::DAEMON_SERVER_ENV).is_err();
return task_run_silent_dependencies.execute().await;
```

How can I resolve this? If you propose a fix, please make it concise.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant