Skip to content

Commit efb7e32

Browse files
fix: windows process termination using Job Objects (NR-490832) (#2052)
* fix: Windows process termination using Job Objects * nit
1 parent 8c80faf commit efb7e32

File tree

10 files changed

+223
-77
lines changed

10 files changed

+223
-77
lines changed

agent-control/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ nix = { workspace = true, features = ["signal", "user", "hostname"] }
8585
windows = { workspace = true, features = [
8686
"Win32_System_Console",
8787
"Win32_System_Threading",
88+
"Win32_System_JobObjects",
89+
"Win32_Security",
8890
] }
8991
windows-service = "0.8.0"
9092

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
pub mod command_os;
22
pub mod error;
33
pub mod executable_data;
4+
#[cfg(target_family = "windows")]
5+
pub mod job_object;
46
pub mod logging;
57
pub mod restart_policy;

agent-control/src/sub_agent/on_host/command/command_os.rs

Lines changed: 44 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use crate::agent_control::defaults::{STDERR_LOG_PREFIX, STDOUT_LOG_PREFIX};
55
use crate::sub_agent::on_host::command::executable_data::ExecutableData;
66
use std::time::{Duration, Instant};
77
use std::{
8-
io,
98
path::PathBuf,
109
process::{Child, Command, ExitStatus, Stdio},
1110
};
@@ -18,6 +17,8 @@ use super::{
1817
logger::Logger,
1918
},
2019
};
20+
#[cfg(target_family = "windows")]
21+
use crate::sub_agent::on_host::command::job_object::JobObject;
2122

2223
const POLL_INTERVAL: Duration = Duration::from_millis(100);
2324

@@ -36,6 +37,9 @@ pub struct CommandOSStarted {
3637
process: Child,
3738
loggers: Option<FileSystemLoggers>,
3839
shutdown_timeout: Duration,
40+
41+
#[cfg(target_family = "windows")]
42+
job_object: Option<JobObject>,
3943
}
4044

4145
////////////////////////////////////////////////////////////////////////////////////
@@ -54,9 +58,6 @@ impl CommandOSNotStarted {
5458
.stdout(Stdio::piped())
5559
.stderr(Stdio::piped());
5660

57-
#[cfg(target_family = "windows")]
58-
Self::create_process_group(&mut cmd);
59-
6061
Self {
6162
agent_id,
6263
cmd,
@@ -74,32 +75,31 @@ impl CommandOSNotStarted {
7475
file_logger(&agent_id, self.logging_path, STDERR_LOG_PREFIX),
7576
)
7677
});
77-
Ok(CommandOSStarted {
78-
agent_id,
79-
process: self.cmd.spawn()?,
80-
loggers,
81-
shutdown_timeout: self.shutdown_timeout,
82-
})
83-
}
78+
let child = self.cmd.spawn()?;
8479

85-
#[cfg(target_family = "windows")]
86-
/// Sets the process creation flags to create a new process group for Windows processes.
87-
///
88-
/// This enables sending CTRL+BREAK events to it via the [`GenerateConsoleCtrlEvent`](windows::Win32::System::Console::GenerateConsoleCtrlEvent) function,
89-
/// which is the mechanism we use to gracefully shut down the process. Otherwise, the Agent Control process needs to attach itself to the
90-
/// console of the process to send a CTRL+C event which would need synchronization (many sub-agents making AC attach and reattach concurrently).
91-
///
92-
/// For details, see the [task termination mechanism for GitLab runners](https://gitlab.com/gitlab-org/gitlab-runner/-/blob/397ba5dc2685e7b13feaccbfed4c242646955334/helpers/process/killer_windows.go#L75-108), which can use either mechanism dependent on a flag to use the legacy method (attach and reattach the parent process).
93-
///
94-
/// Additional reading:
95-
/// - [`GenerateConsoleCtrlEvent` function](https://learn.microsoft.com/en-us/windows/console/generateconsolectrlevent), see second parameter `dwProcessGroupId`.
96-
/// - [Process Creation Flags](https://learn.microsoft.com/en-us/windows/win32/procthread/process-creation-flags)
97-
fn create_process_group(cmd: &mut Command) {
98-
use std::os::windows::process::CommandExt;
99-
use windows::Win32::System::Threading::CREATE_NEW_PROCESS_GROUP;
100-
101-
// Create new process group so we can send CTRL+BREAK events to it
102-
cmd.creation_flags(CREATE_NEW_PROCESS_GROUP.0);
80+
#[cfg(target_family = "unix")]
81+
{
82+
Ok(CommandOSStarted {
83+
agent_id,
84+
process: child,
85+
loggers,
86+
shutdown_timeout: self.shutdown_timeout,
87+
})
88+
}
89+
#[cfg(target_family = "windows")]
90+
{
91+
// Each started process gets its own JobObject. All sub-processes that the process spawns
92+
// will be assigned to the same JobObject, allowing for a graceful shutdown of the entire process tree.
93+
let job_object = JobObject::new()?;
94+
job_object.assign_process(&child)?;
95+
Ok(CommandOSStarted {
96+
agent_id,
97+
process: child,
98+
job_object: Some(job_object),
99+
loggers,
100+
shutdown_timeout: self.shutdown_timeout,
101+
})
102+
}
103103
}
104104
}
105105

@@ -164,12 +164,11 @@ impl CommandOSStarted {
164164
}
165165

166166
pub fn shutdown(&mut self) -> Result<(), CommandError> {
167-
let pid = self.get_pid();
168167
// Attempt a graceful shutdown (platform-dependent).
169-
let graceful_shutdown_result = Self::graceful_shutdown(pid);
168+
let graceful_shutdown_result = self.graceful_shutdown();
170169

171170
if let Err(e) = &graceful_shutdown_result {
172-
warn!(agent_id = %self.agent_id, "Graceful shutdown failed for process {pid}: {e}");
171+
warn!(agent_id = %self.agent_id, "Graceful shutdown failed for process {}: {e}",self.get_pid());
173172
}
174173

175174
if graceful_shutdown_result.is_err() || self.is_running_after_timeout(self.shutdown_timeout)
@@ -181,21 +180,25 @@ impl CommandOSStarted {
181180
}
182181

183182
#[cfg(target_family = "unix")]
184-
fn graceful_shutdown(pid: u32) -> Result<(), CommandError> {
183+
fn graceful_shutdown(&self) -> Result<(), CommandError> {
185184
use nix::{sys::signal, unistd::Pid};
185+
let pid = self.get_pid();
186186

187187
signal::kill(Pid::from_raw(pid as i32), signal::SIGTERM)
188-
.map_err(|e| CommandError::from(io::Error::from(e)))
188+
.map_err(|e| CommandError::from(std::io::Error::from(e)))
189189
}
190190

191191
#[cfg(target_family = "windows")]
192-
fn graceful_shutdown(pid: u32) -> Result<(), CommandError> {
193-
use windows::Win32::System::Console::{CTRL_BREAK_EVENT, GenerateConsoleCtrlEvent};
194-
// Graceful shutdown for console applications
195-
// <https://stackoverflow.com/a/12899284>
196-
// <https://gitlab.com/gitlab-org/gitlab-runner/-/blob/397ba5dc2685e7b13feaccbfed4c242646955334/helpers/process/killer_windows.go#L75-108>
197-
unsafe { GenerateConsoleCtrlEvent(CTRL_BREAK_EVENT, pid) }
198-
.map_err(|e| CommandError::from(io::Error::from(e)))
192+
/// On Windows there is no direct equivalent to sending SIGTERM. Applications that runs as
193+
/// services handles stops signals via Service Control Manager (SCM), and console applications
194+
/// can handle Ctrl-C or Ctrl-Break events via attached consoles.
195+
/// The current implementation uses Job Objects to manage process groups, and there is no graceful
196+
/// shutdown signal sent to the processes. The Job Object will terminate all associated processes.
197+
fn graceful_shutdown(&mut self) -> Result<(), CommandError> {
198+
if let Some(job_object) = self.job_object.take() {
199+
job_object.kill()?;
200+
}
201+
Ok(())
199202
}
200203
}
201204

agent-control/src/sub_agent/on_host/command/error.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,6 @@ pub enum CommandError {
99
#[error("{0}")]
1010
IOError(#[from] std::io::Error),
1111

12-
#[error("Nix Error: {0}")]
13-
NixError(String),
12+
#[error("{0}")]
13+
WinError(String),
1414
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
use crate::sub_agent::on_host::command::error::CommandError;
2+
use std::os::windows::io::AsRawHandle;
3+
use std::process::Child;
4+
use tracing::error;
5+
use windows::Win32::Foundation::{CloseHandle, HANDLE};
6+
use windows::Win32::System::JobObjects::{
7+
AssignProcessToJobObject, CreateJobObjectW, JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE,
8+
JOBOBJECT_EXTENDED_LIMIT_INFORMATION, JobObjectExtendedLimitInformation,
9+
SetInformationJobObject,
10+
};
11+
12+
/// Represents a Windows Job Object used to manage and control a group of processes.
13+
/// When the Job Object is killed or dropped, all associated processes are terminated.
14+
pub struct JobObject {
15+
handle: HANDLE,
16+
}
17+
impl JobObject {
18+
/// Creates a new JobObject with the "kill on job close" configuration.
19+
pub fn new() -> Result<Self, CommandError> {
20+
unsafe {
21+
let handle = CreateJobObjectW(None, None)
22+
.map_err(|e| CommandError::WinError(format!("creating JobObject: {e}")))?;
23+
24+
// Set the JobObject to kill all associated processes when the JobObject is closed.
25+
let mut limits = JOBOBJECT_EXTENDED_LIMIT_INFORMATION::default();
26+
limits.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
27+
SetInformationJobObject(
28+
handle,
29+
JobObjectExtendedLimitInformation,
30+
&limits as *const _ as *const _,
31+
std::mem::size_of::<JOBOBJECT_EXTENDED_LIMIT_INFORMATION>() as u32,
32+
)
33+
.map_err(|e| CommandError::WinError(format!("setting JobObject information: {e}")))?;
34+
35+
Ok(Self { handle })
36+
}
37+
}
38+
39+
/// Assigns the given process to this JobObject. The process will be terminated when the JobObject is closed.
40+
pub fn assign_process(&self, process: &Child) -> Result<(), CommandError> {
41+
unsafe {
42+
let process_handle = HANDLE(process.as_raw_handle());
43+
AssignProcessToJobObject(self.handle, process_handle).map_err(|e| {
44+
CommandError::WinError(format!("assigning process to JobObject: {e}"))
45+
})?;
46+
}
47+
Ok(())
48+
}
49+
50+
/// Kills the JobObject, terminating all associated processes.
51+
pub fn kill(self) -> Result<(), CommandError> {
52+
unsafe {
53+
CloseHandle(self.handle)
54+
.map_err(|e| CommandError::WinError(format!("closing JobObject handle: {e}")))?;
55+
}
56+
Ok(())
57+
}
58+
}
59+
60+
/// Ensure the JobObject is killed when dropped.
61+
impl Drop for JobObject {
62+
fn drop(&mut self) {
63+
unsafe {
64+
let _ =
65+
CloseHandle(self.handle).inspect_err(|err| error!(%err,"Fail to kill a JobObject"));
66+
}
67+
}
68+
}

agent-control/tests/on_host/command/shutdown.rs

Lines changed: 54 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,11 @@ fn non_blocking_runner() {
4444
}
4545

4646
#[test]
47+
#[cfg(target_family = "unix")]
4748
fn command_shutdown_when_sigterm_is_ignored() {
4849
let agent_id = "test".to_string().try_into().unwrap();
4950
let mut cmd = CommandOSNotStarted::new(
5051
agent_id,
51-
#[cfg(target_family = "unix")]
5252
&ExecutableData {
5353
id: "test".to_string(),
5454
bin: "tests/on_host/data/ignore_sigterm.sh".to_string(),
@@ -57,21 +57,6 @@ fn command_shutdown_when_sigterm_is_ignored() {
5757
restart_policy: Default::default(),
5858
shutdown_timeout: Duration::from_millis(100),
5959
},
60-
#[cfg(target_family = "windows")]
61-
&ExecutableData {
62-
id: "test".to_string(),
63-
bin: "powershell".to_string(),
64-
args: vec![
65-
"-NoProfile".to_string(),
66-
"-ExecutionPolicy".to_string(),
67-
"Bypass".to_string(),
68-
"-File".to_string(),
69-
"tests\\on_host\\data\\ignore_sigbreak.ps1".to_string(),
70-
],
71-
env: Default::default(),
72-
restart_policy: Default::default(),
73-
shutdown_timeout: Duration::from_millis(100),
74-
},
7560
false,
7661
Default::default(),
7762
)
@@ -95,3 +80,56 @@ fn command_shutdown_when_sigterm_is_ignored() {
9580
assert!(!cmd.is_running());
9681
assert!(terminated.is_ok());
9782
}
83+
84+
#[test]
85+
// This test ensure the Job Object is properly terminating orphan processes on Windows
86+
// On Unix, this is handled by systemd.
87+
#[cfg(target_family = "windows")]
88+
fn command_shutdown_kill_orphan_process() {
89+
use crate::common::retry::retry;
90+
use crate::on_host::tools::windows_process::{is_process_orphan, is_process_running};
91+
92+
let agent_id = "test".to_string().try_into().unwrap();
93+
let id = chrono::Local::now().format("%Y%m%d_%H%M%S").to_string();
94+
let mut cmd = CommandOSNotStarted::new(
95+
agent_id,
96+
&ExecutableData {
97+
id: "test".to_string(),
98+
bin: "powershell".to_string(),
99+
args: vec![
100+
"-NoProfile".to_string(),
101+
"-ExecutionPolicy".to_string(),
102+
"Bypass".to_string(),
103+
"-File".to_string(),
104+
"tests\\on_host\\data\\leak_sub_process.ps1".to_string(),
105+
id.clone(),
106+
],
107+
env: Default::default(),
108+
restart_policy: Default::default(),
109+
shutdown_timeout: Duration::from_millis(100),
110+
},
111+
false,
112+
Default::default(),
113+
)
114+
.start()
115+
.unwrap();
116+
117+
// Check that we have a leaked process
118+
retry(30, Duration::from_secs(1), || {
119+
if is_process_running(&id) && is_process_orphan(&id) {
120+
Ok(())
121+
} else {
122+
Err("Process not running or not orphaned yet".into())
123+
}
124+
});
125+
126+
cmd.shutdown().unwrap();
127+
128+
retry(30, Duration::from_secs(1), || {
129+
if !is_process_running(&id) {
130+
Ok(())
131+
} else {
132+
Err("Orphan process leaked".into())
133+
}
134+
});
135+
}

agent-control/tests/on_host/data/ignore_sigbreak.ps1

Lines changed: 0 additions & 18 deletions
This file was deleted.
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
param(
2+
# Just a unique identifier to make simple to find if running from tests
3+
# with a command like: `gwmi Win32_Process | ? CommandLine -match "$Id" | select ProcessId, CommandLine`
4+
[string]$Id = "default"
5+
6+
)
7+
8+
# Windows-only simple sub-process that sleeps
9+
$proc = Start-Process -FilePath 'powershell.exe' -ArgumentList @(
10+
'-NoProfile'
11+
'-NonInteractive'
12+
'-Command'
13+
"# test-id: $Id
14+
Start-Sleep -Seconds 30"
15+
) -PassThru -WindowStyle Hidden

agent-control/tests/on_host/tools.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,5 @@ pub mod custom_agent_type;
33
pub mod instance_id;
44
pub mod oci_artifact;
55
pub mod oci_package_manager;
6+
#[cfg(target_family = "windows")]
7+
pub mod windows_process;

0 commit comments

Comments
 (0)