Skip to content

Commit b9cff21

Browse files
refactor: remove unnecessary structs
1 parent 5af9393 commit b9cff21

File tree

1 file changed

+101
-133
lines changed

1 file changed

+101
-133
lines changed

agent-control/src/sub_agent/on_host/supervisor.rs

Lines changed: 101 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -219,36 +219,29 @@ impl NotStartedSupervisorOnHost {
219219
break;
220220
}
221221

222+
// It's important to create a new health handler for each process instance
223+
// Otherwise, the published time won't be updated.
222224
let health_handler =
223225
HealthHandler::new(exec_data.id.clone(), health_publisher.clone());
224-
let not_started_executable = NotStartedExecutable::new(
226+
227+
info!("Starting executable");
228+
let command = CommandOSNotStarted::new(
225229
agent_id.clone(),
226-
exec_data.clone(),
230+
&exec_data,
227231
log_to_file,
228232
logging_path.clone(),
229-
health_handler.clone(),
230233
);
231234

232-
let started = not_started_executable.launch();
233-
let executable_result = started
234-
.and_then(|executable| executable.wait_for_exit(&stop_consumer, HEALTHY_DELAY));
235+
let started = command.start().and_then(|cmd| cmd.stream());
236+
237+
let bin = &exec_data.bin;
238+
let executable_result = started.and_then(|cmd| {
239+
wait_exit(cmd, bin, &stop_consumer, HEALTHY_DELAY, &health_handler)
240+
});
241+
235242
match executable_result {
236243
Ok((exit_status, was_cancelled)) => {
237-
if !exit_status.success() {
238-
let ExecutableData { bin, args, .. } = &exec_data;
239-
error!(%agent_id,supervisor = bin,exit_code = ?exit_status.code(),"Executable exited unsuccessfully");
240-
debug!(%exit_status, "Error executing executable, marking as unhealthy");
241-
242-
let args = args.join(" ");
243-
let error = format!(
244-
"path '{bin}' with args '{args}' failed with '{exit_status}'",
245-
);
246-
let status = format!(
247-
"process exited with code: {:?}",
248-
exit_status.code().unwrap_or_default()
249-
);
250-
health_handler.publish_unhealthy_with_status(error, status);
251-
}
244+
handle_exit(&agent_id, &exec_data, &exit_status, &health_handler);
252245

253246
if was_cancelled {
254247
span.exit();
@@ -286,6 +279,55 @@ impl NotStartedSupervisorOnHost {
286279
}
287280
}
288281

282+
/// Waits for the command to complete or be cancelled
283+
fn wait_exit(
284+
mut command: CommandOSStarted,
285+
bin: &str,
286+
stop_consumer: &EventConsumer<CancellationMessage>,
287+
healthy_publish_delay: Duration,
288+
health_handler: &HealthHandler,
289+
) -> Result<(ExitStatus, bool), CommandError> {
290+
info!("Waiting for executable to complete or be cancelled");
291+
let mut was_cancelled = false;
292+
let deadline = Instant::now() + healthy_publish_delay;
293+
let mut already_published = false;
294+
295+
// Busy waiting is avoided with `is_cancelled_with_timeout`
296+
while command.is_running() {
297+
// Shutdown the spawned process when the cancel signal is received.
298+
// This ensures the thread stops in time.
299+
if stop_consumer.is_cancelled_with_timeout(WAIT_FOR_EXIT_TIMEOUT) {
300+
info!(supervisor = bin, "Stopping executable");
301+
if let Err(err) = command.shutdown() {
302+
error!(supervisor = bin, "Failed to stop executable: {err}");
303+
}
304+
info!(supervisor = bin, msg = "Executable terminated");
305+
was_cancelled = true;
306+
}
307+
308+
// Publish healthy status once after the process has been running
309+
// for an arbitrary long time without issues.
310+
if !already_published && Instant::now() > deadline {
311+
debug!("Informing executable as healthy");
312+
health_handler.publish_healthy();
313+
already_published = true;
314+
}
315+
}
316+
317+
// At this point, the command is already dead. However, we call `wait` to
318+
// release resources.
319+
// Reference - https://doc.rust-lang.org/std/process/struct.Child.html#warning
320+
command
321+
.wait()
322+
.inspect(|exit_status| {
323+
if !already_published && exit_status.success() {
324+
debug!("Informing executable as healthy");
325+
health_handler.publish_healthy();
326+
}
327+
})
328+
.map(|exit_status| (exit_status, was_cancelled))
329+
}
330+
289331
/// Waits for the restart policy backoff timeout to complete
290332
///
291333
/// If the [`CancellationMessage`] while waiting, the restart will be aborted.
@@ -308,100 +350,28 @@ fn restart_process_thread(
308350
restart
309351
}
310352

311-
struct NotStartedExecutable {
312-
agent_id: AgentID,
313-
exec_data: ExecutableData,
314-
log_to_file: bool,
315-
logging_path: PathBuf,
316-
health_handler: HealthHandler,
317-
}
318-
319-
impl NotStartedExecutable {
320-
fn new(
321-
agent_id: AgentID,
322-
exec_data: ExecutableData,
323-
log_to_file: bool,
324-
logging_path: PathBuf,
325-
health_handler: HealthHandler,
326-
) -> Self {
327-
Self {
328-
agent_id,
329-
exec_data,
330-
log_to_file,
331-
logging_path,
332-
health_handler,
333-
}
334-
}
335-
336-
fn launch(&self) -> Result<StartedExecutable, CommandError> {
337-
info!("Starting executable");
338-
let command = CommandOSNotStarted::new(
339-
self.agent_id.clone(),
340-
&self.exec_data,
341-
self.log_to_file,
342-
self.logging_path.clone(),
343-
);
344-
345-
Ok(StartedExecutable {
346-
bin: self.exec_data.bin.clone(),
347-
command: command.start().and_then(|cmd| cmd.stream())?,
348-
health_handler: self.health_handler.clone(),
349-
})
353+
/// Executes operations based on the exit status of the command
354+
fn handle_exit(
355+
agent_id: &AgentID,
356+
exec_data: &ExecutableData,
357+
exit_status: &ExitStatus,
358+
health_handler: &HealthHandler,
359+
) {
360+
if exit_status.success() {
361+
return;
350362
}
351-
}
352363

353-
struct StartedExecutable {
354-
bin: String,
355-
command: CommandOSStarted,
356-
health_handler: HealthHandler,
357-
}
358-
359-
impl StartedExecutable {
360-
fn wait_for_exit(
361-
mut self,
362-
stop_consumer: &EventConsumer<CancellationMessage>,
363-
healthy_publish_delay: Duration,
364-
) -> Result<(ExitStatus, bool), CommandError> {
365-
info!("Waiting for executable to complete or be cancelled");
366-
let mut was_cancelled = false;
367-
let deadline = Instant::now() + healthy_publish_delay;
368-
let mut already_published = false;
369-
370-
// Busy waiting is avoided with `is_cancelled_with_timeout`
371-
while self.command.is_running() {
372-
// Shutdown the spawned process when the cancel signal is received.
373-
// This ensures the thread stops in time.
374-
if stop_consumer.is_cancelled_with_timeout(WAIT_FOR_EXIT_TIMEOUT) {
375-
info!(supervisor = self.bin, "Stopping executable");
376-
if let Err(err) = self.command.shutdown() {
377-
error!(supervisor = self.bin, "Failed to stop executable: {err}");
378-
}
379-
info!(supervisor = self.bin, msg = "Executable terminated");
380-
was_cancelled = true;
381-
}
382-
383-
// Publish healthy status once after the process has been running
384-
// for an arbitrary long time without issues.
385-
if !already_published && Instant::now() > deadline {
386-
debug!("Informing executable as healthy");
387-
self.health_handler.publish_healthy();
388-
already_published = true;
389-
}
390-
}
391-
392-
// At this point, the command is already dead. However, we call `wait` to
393-
// release resources.
394-
// Reference - https://doc.rust-lang.org/std/process/struct.Child.html#warning
395-
self.command
396-
.wait()
397-
.inspect(|exit_status| {
398-
if !already_published && exit_status.success() {
399-
debug!("Informing executable as healthy");
400-
self.health_handler.publish_healthy();
401-
}
402-
})
403-
.map(|exit_status| (exit_status, was_cancelled))
404-
}
364+
let ExecutableData { bin, args, .. } = &exec_data;
365+
error!(%agent_id,supervisor = bin,exit_code = ?exit_status.code(),"Executable exited unsuccessfully");
366+
debug!(%exit_status, "Error executing executable, marking as unhealthy");
367+
368+
let args = args.join(" ");
369+
let error = format!("path '{bin}' with args '{args}' failed with '{exit_status}'",);
370+
let status = format!(
371+
"process exited with code: {:?}",
372+
exit_status.code().unwrap_or_default()
373+
);
374+
health_handler.publish_unhealthy_with_status(error, status);
405375
}
406376

407377
#[derive(Clone)]
@@ -830,24 +800,20 @@ pub mod tests {
830800
let exec_data = ExecutableData::new("sleep".to_owned(), "sleep".to_owned())
831801
.with_args(vec!["3".to_owned()]);
832802

803+
let command =
804+
CommandOSNotStarted::new(AgentID::AgentControl, &exec_data, false, PathBuf::new())
805+
.start()
806+
.unwrap();
807+
833808
let (health_publisher, health_consumer) = pub_sub();
834809
let health_handler = HealthHandler::new(exec_data.id.clone(), health_publisher);
835-
let executable = NotStartedExecutable::new(
836-
AgentID::AgentControl,
837-
exec_data,
838-
false,
839-
PathBuf::new(),
840-
health_handler,
841-
);
842-
843-
let started = executable.launch().unwrap();
844810

845811
// Don't use the "_" expression for the publisher.
846812
// Renaming it to "_" drops the channel. Hence, it will be disconnected.
847813
// `wait_for_exit` then gets out on the first iteration and this test will
848814
// always pass even when it shouldn't.
849815
let (_stop_publisher, stop_consumer) = pub_sub::<CancellationMessage>();
850-
let _ = started.wait_for_exit(&stop_consumer, Duration::ZERO);
816+
let _ = wait_exit(command, "", &stop_consumer, Duration::ZERO, &health_handler);
851817

852818
let start_time = SystemTime::now();
853819
let expected_ordered_events = vec![(
@@ -875,20 +841,22 @@ pub mod tests {
875841
let exec_data = ExecutableData::new("ls".to_owned(), "ls".to_owned())
876842
.with_args(vec!["non-existent-path".to_owned()]);
877843

844+
let command =
845+
CommandOSNotStarted::new(AgentID::AgentControl, &exec_data, false, PathBuf::new())
846+
.start()
847+
.unwrap();
848+
878849
let (health_publisher, health_consumer) = pub_sub();
879850
let health_handler = HealthHandler::new(exec_data.id.clone(), health_publisher);
880-
let executable = NotStartedExecutable::new(
881-
AgentID::AgentControl,
882-
exec_data,
883-
false,
884-
PathBuf::new(),
885-
health_handler,
886-
);
887-
888-
let started = executable.launch().unwrap();
889851

890852
let (_stop_publisher, stop_consumer) = pub_sub::<CancellationMessage>();
891-
let _ = started.wait_for_exit(&stop_consumer, Duration::from_secs(10));
853+
let _ = wait_exit(
854+
command,
855+
"",
856+
&stop_consumer,
857+
Duration::from_secs(10),
858+
&health_handler,
859+
);
892860

893861
assert!(health_consumer.as_ref().is_empty())
894862
}

0 commit comments

Comments
 (0)