Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions src/shell/commands/executable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,28 @@ impl ShellCommand for ExecutableCommand {

context.state.track_child_process(&child);

// Notify about the spawned child process
let process_signaler = context.state.process_signaler().clone();
if let Some(pid) = child.id() {
process_signaler.notify_spawn(pid);
}

// avoid deadlock since this is holding onto the pipes
drop(sub_command);

loop {
tokio::select! {
result = child.wait() => match result {
Ok(status) => return ExecuteResult::Continue(
status.code().unwrap_or(1),
Vec::new(),
Vec::new(),
),
Ok(status) => {
process_signaler.notify_exit();
return ExecuteResult::Continue(
status.code().unwrap_or(1),
Vec::new(),
Vec::new(),
);
}
Err(err) => {
process_signaler.notify_exit();
let _ = stderr.write_line(&format!("{}", err));
return ExecuteResult::from_exit_code(1);
}
Expand All @@ -83,6 +93,7 @@ impl ShellCommand for ExecutableCommand {
if cfg!(not(unix)) && signal.causes_abort() {
let _ = child.start_kill();
let status = child.wait().await.ok();
process_signaler.notify_exit();
return ExecuteResult::from_exit_code(
status.and_then(|s| s.code()).unwrap_or(signal.aborted_code()),
);
Expand Down
63 changes: 63 additions & 0 deletions src/shell/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::shell::types::EnvChange;
use crate::shell::types::ExecuteResult;
use crate::shell::types::FutureExecuteResult;
use crate::shell::types::KillSignal;
use crate::shell::types::ProcessSignaler;
use crate::shell::types::ShellPipeReader;
use crate::shell::types::ShellPipeWriter;
use crate::shell::types::ShellState;
Expand Down Expand Up @@ -80,6 +81,68 @@ pub async fn execute(
.await
}

/// Executes a command list and returns the ProcessSignaler for monitoring child processes.
///
/// This is useful when you need to track spawned child PIDs for signal forwarding.
/// The returned `ProcessSignaler` can be used to:
/// - Get the current foreground process PID via `current_pid()`
/// - Subscribe to process spawn notifications via `subscribe()`
///
/// # Example
///
/// ```ignore
/// use deno_task_shell::{execute_with_signaler, KillSignal, SignalKind};
///
/// let kill_signal = KillSignal::default();
/// let (signaler, execute_future) = execute_with_signaler(
/// list,
/// env_vars,
/// cwd,
/// custom_commands,
/// kill_signal.clone(),
/// );
///
/// // Check the current child process
/// if let Some(child_pid) = signaler.current_pid() {
/// // Decide whether to forward signals based on process group
/// let child_pgid = unsafe { libc::getpgid(child_pid as i32) };
/// let our_pgid = unsafe { libc::getpgid(0) };
///
/// if child_pgid != our_pgid {
/// kill_signal.send(SignalKind::SIGINT);
/// }
/// }
///
/// let exit_code = execute_future.await;
/// ```
pub fn execute_with_signaler(
list: SequentialList,
env_vars: HashMap<OsString, OsString>,
cwd: PathBuf,
custom_commands: HashMap<String, Rc<dyn ShellCommand>>,
kill_signal: KillSignal,
) -> (ProcessSignaler, impl std::future::Future<Output = i32>) {
let signaler = ProcessSignaler::new();
let state = ShellState::new_with_process_signaler(
env_vars,
cwd,
custom_commands,
kill_signal,
signaler.clone(),
);
let future = async move {
execute_with_pipes(
list,
state,
ShellPipeReader::stdin(),
ShellPipeWriter::stdout(),
ShellPipeWriter::stderr(),
)
.await
};
(signaler, future)
}

/// Executes a `SequentialList` of commands with specified input and output pipes.
///
/// This function accepts a list of commands, a shell state, and pipes for standard input, output, and error.
Expand Down
2 changes: 2 additions & 0 deletions src/shell/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ pub use commands::ShellCommand;
pub use commands::ShellCommandContext;
pub use execute::execute;
pub use execute::execute_with_pipes;
pub use execute::execute_with_signaler;
pub use types::EnvChange;
pub use types::ExecuteResult;
pub use types::FutureExecuteResult;
pub use types::KillSignal;
pub use types::KillSignalDropGuard;
pub use types::ProcessSignaler;
pub use types::ShellPipeReader;
pub use types::ShellPipeWriter;
pub use types::ShellState;
Expand Down
177 changes: 177 additions & 0 deletions src/shell/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub struct ShellState {
kill_signal: KillSignal,
process_tracker: ChildProcessTracker,
tree_exit_code_cell: TreeExitCodeCell,
process_signaler: ProcessSignaler,
}

impl ShellState {
Expand All @@ -65,6 +66,25 @@ impl ShellState {
cwd: PathBuf,
custom_commands: HashMap<String, Rc<dyn ShellCommand>>,
kill_signal: KillSignal,
) -> Self {
Self::new_with_process_signaler(
env_vars,
cwd,
custom_commands,
kill_signal,
ProcessSignaler::new(),
)
}

/// Creates a new ShellState with a custom ProcessSignaler.
///
/// Use this when you need to track child process PIDs for signal forwarding.
pub fn new_with_process_signaler(
env_vars: HashMap<OsString, OsString>,
cwd: PathBuf,
custom_commands: HashMap<String, Rc<dyn ShellCommand>>,
kill_signal: KillSignal,
process_signaler: ProcessSignaler,
) -> Self {
assert!(cwd.is_absolute());
let mut commands = builtin_commands();
Expand All @@ -77,6 +97,7 @@ impl ShellState {
kill_signal,
process_tracker: ChildProcessTracker::new(),
tree_exit_code_cell: Default::default(),
process_signaler,
};
// ensure the data is normalized
for (name, value) in env_vars {
Expand Down Expand Up @@ -169,6 +190,11 @@ impl ShellState {
&self.kill_signal
}

/// Returns the process signaler for tracking child process PIDs.
pub fn process_signaler(&self) -> &ProcessSignaler {
&self.process_signaler
}

pub fn track_child_process(&self, child: &tokio::process::Child) {
self.process_tracker.track(child);
}
Expand Down Expand Up @@ -632,6 +658,81 @@ impl KillSignalDropGuard {
}
}

#[derive(Debug, Default)]
struct ProcessSignalerInner {
/// The PID of the current foreground process, if any.
current_pid: Cell<Option<u32>>,
/// Sender for process spawn notifications.
/// Lazily initialized on first subscribe.
sender: RefCell<Option<broadcast::Sender<u32>>>,
}

/// Provides access to the currently running foreground child process.
///
/// This is useful for signal forwarding scenarios where you need to check
/// if the child process is in the same process group as the parent.
///
/// # Example
///
/// ```ignore
/// let signaler = ProcessSignaler::new();
/// let mut receiver = signaler.subscribe();
///
/// // In a signal handler, check if we should forward the signal
/// if let Some(child_pid) = signaler.current_pid() {
/// // Check if child is in same process group
/// let child_pgid = unsafe { libc::getpgid(child_pid as i32) };
/// let our_pgid = unsafe { libc::getpgid(0) };
///
/// if child_pgid != our_pgid {
/// // Child in different process group, forward signal
/// kill_signal.send(SignalKind::SIGINT);
/// }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this would work because there could be multiple child processes running in different groups?

/// }
/// ```
#[derive(Debug, Clone, Default)]
pub struct ProcessSignaler(Rc<ProcessSignalerInner>);

impl ProcessSignaler {
/// Creates a new ProcessSignaler.
pub fn new() -> Self {
Self::default()
}

/// Returns the PID of the current foreground child process, if any.
///
/// Returns `None` if no child process is currently running.
pub fn current_pid(&self) -> Option<u32> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There could be multiple child processes running at the same time:

deno eval 'console.log(1)' ; deno eval 'console.log(2)' &

It seems this would only hold one of them?

self.0.current_pid.get()
}

/// Subscribe to receive notifications when child processes spawn.
///
/// Returns a receiver that yields PIDs of spawned processes.
/// The channel is lazily created on first subscription.
pub fn subscribe(&self) -> broadcast::Receiver<u32> {
let mut sender_ref = self.0.sender.borrow_mut();
if sender_ref.is_none() {
let (sender, _) = broadcast::channel(16);
*sender_ref = Some(sender);
}
sender_ref.as_ref().unwrap().subscribe()
}

/// Called internally when a child process is spawned.
pub(crate) fn notify_spawn(&self, pid: u32) {
self.0.current_pid.set(Some(pid));
if let Some(sender) = self.0.sender.borrow().as_ref() {
let _ = sender.send(pid);
}
}

/// Called internally when a child process exits.
pub(crate) fn notify_exit(&self) {
self.0.current_pid.set(None);
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum SignalKind {
SIGTERM,
Expand Down Expand Up @@ -715,6 +816,7 @@ impl From<SignalKind> for i32 {

#[cfg(test)]
mod test {
use super::ProcessSignaler;
use crate::KillSignal;
use crate::SignalKind;

Expand Down Expand Up @@ -860,4 +962,79 @@ mod test {
Some(SignalKind::SIGTERM.aborted_code())
);
}

#[test]
fn test_process_signaler_current_pid() {
let signaler = ProcessSignaler::new();
assert_eq!(signaler.current_pid(), None);

signaler.notify_spawn(1234);
assert_eq!(signaler.current_pid(), Some(1234));

signaler.notify_spawn(5678);
assert_eq!(signaler.current_pid(), Some(5678));

signaler.notify_exit();
assert_eq!(signaler.current_pid(), None);
}

#[test]
fn test_process_signaler_clone() {
let signaler = ProcessSignaler::new();
let signaler_clone = signaler.clone();

signaler.notify_spawn(1234);

// Both should see the same PID since they share the inner state
assert_eq!(signaler.current_pid(), Some(1234));
assert_eq!(signaler_clone.current_pid(), Some(1234));

signaler_clone.notify_exit();
assert_eq!(signaler.current_pid(), None);
assert_eq!(signaler_clone.current_pid(), None);
}

#[tokio::test]
async fn test_process_signaler_subscribe() {
let signaler = ProcessSignaler::new();
let mut receiver = signaler.subscribe();

// Spawn notification should be received
signaler.notify_spawn(1234);

let pid = receiver.recv().await.unwrap();
assert_eq!(pid, 1234);

// Multiple spawns should be received
signaler.notify_spawn(5678);
let pid2 = receiver.recv().await.unwrap();
assert_eq!(pid2, 5678);
}

#[tokio::test]
async fn test_process_signaler_multiple_subscribers() {
let signaler = ProcessSignaler::new();
let mut receiver1 = signaler.subscribe();
let mut receiver2 = signaler.subscribe();

signaler.notify_spawn(1234);

// Both receivers should get the notification
let pid1 = receiver1.recv().await.unwrap();
let pid2 = receiver2.recv().await.unwrap();
assert_eq!(pid1, 1234);
assert_eq!(pid2, 1234);
}

#[test]
fn test_process_signaler_no_subscribers() {
// Should not panic when there are no subscribers
let signaler = ProcessSignaler::new();
signaler.notify_spawn(1234);
signaler.notify_exit();

// PID should still be tracked even without subscribers
signaler.notify_spawn(5678);
assert_eq!(signaler.current_pid(), Some(5678));
}
}
Loading
Loading