@@ -221,34 +221,25 @@ impl NotStartedSupervisorOnHost {
221221
222222 let health_handler =
223223 HealthHandler :: new ( exec_data. id . clone ( ) , health_publisher. clone ( ) ) ;
224- let not_started_executable = NotStartedExecutable :: new (
224+
225+ info ! ( "Starting executable" ) ;
226+ let command = CommandOSNotStarted :: new (
225227 agent_id. clone ( ) ,
226- exec_data. clone ( ) ,
228+ & exec_data,
227229 log_to_file,
228230 logging_path. clone ( ) ,
229- health_handler. clone ( ) ,
230231 ) ;
231232
232- let started = not_started_executable. launch ( ) ;
233- let executable_result = started
234- . and_then ( |executable| executable. wait_for_exit ( & stop_consumer, HEALTHY_DELAY ) ) ;
233+ let started = command. start ( ) . and_then ( |cmd| cmd. stream ( ) ) ;
234+
235+ let bin = & exec_data. bin ;
236+ let executable_result = started. and_then ( |cmd| {
237+ wait_exit ( cmd, bin, & stop_consumer, HEALTHY_DELAY , & health_handler)
238+ } ) ;
239+
235240 match executable_result {
236241 Ok ( ( exit_status, was_cancelled) ) => {
237- if !exit_status. success ( ) {
238- let ExecutableData { bin, args, .. } = & exec_data;
239- error ! ( %agent_id, supervisor = bin, exit_code = ?exit_status. code( ) , "Executable exited unsuccessfully" ) ;
240- debug ! ( %exit_status, "Error executing executable, marking as unhealthy" ) ;
241-
242- let args = args. join ( " " ) ;
243- let error = format ! (
244- "path '{bin}' with args '{args}' failed with '{exit_status}'" ,
245- ) ;
246- let status = format ! (
247- "process exited with code: {:?}" ,
248- exit_status. code( ) . unwrap_or_default( )
249- ) ;
250- health_handler. publish_unhealthy_with_status ( error, status) ;
251- }
242+ handle_exit ( & agent_id, & exec_data, & exit_status, & health_handler) ;
252243
253244 if was_cancelled {
254245 span. exit ( ) ;
@@ -286,6 +277,54 @@ impl NotStartedSupervisorOnHost {
286277 }
287278}
288279
280+ fn wait_exit (
281+ mut command : CommandOSStarted ,
282+ bin : & str ,
283+ stop_consumer : & EventConsumer < CancellationMessage > ,
284+ healthy_publish_delay : Duration ,
285+ health_handler : & HealthHandler ,
286+ ) -> Result < ( ExitStatus , bool ) , CommandError > {
287+ info ! ( "Waiting for executable to complete or be cancelled" ) ;
288+ let mut was_cancelled = false ;
289+ let deadline = Instant :: now ( ) + healthy_publish_delay;
290+ let mut already_published = false ;
291+
292+ // Busy waiting is avoided with `is_cancelled_with_timeout`
293+ while command. is_running ( ) {
294+ // Shutdown the spawned process when the cancel signal is received.
295+ // This ensures the thread stops in time.
296+ if stop_consumer. is_cancelled_with_timeout ( WAIT_FOR_EXIT_TIMEOUT ) {
297+ info ! ( supervisor = bin, "Stopping executable" ) ;
298+ if let Err ( err) = command. shutdown ( ) {
299+ error ! ( supervisor = bin, "Failed to stop executable: {err}" ) ;
300+ }
301+ info ! ( supervisor = bin, msg = "Executable terminated" ) ;
302+ was_cancelled = true ;
303+ }
304+
305+ // Publish healthy status once after the process has been running
306+ // for an arbitrary long time without issues.
307+ if !already_published && Instant :: now ( ) > deadline {
308+ debug ! ( "Informing executable as healthy" ) ;
309+ health_handler. publish_healthy ( ) ;
310+ already_published = true ;
311+ }
312+ }
313+
314+ // At this point, the command is already dead. However, we call `wait` to
315+ // release resources.
316+ // Reference - https://doc.rust-lang.org/std/process/struct.Child.html#warning
317+ command
318+ . wait ( )
319+ . inspect ( |exit_status| {
320+ if !already_published && exit_status. success ( ) {
321+ debug ! ( "Informing executable as healthy" ) ;
322+ health_handler. publish_healthy ( ) ;
323+ }
324+ } )
325+ . map ( |exit_status| ( exit_status, was_cancelled) )
326+ }
327+
289328/// Waits for the restart policy backoff timeout to complete
290329///
291330/// If the [`CancellationMessage`] while waiting, the restart will be aborted.
@@ -308,100 +347,27 @@ fn restart_process_thread(
308347 restart
309348}
310349
311- struct NotStartedExecutable {
312- agent_id : AgentID ,
313- exec_data : ExecutableData ,
314- log_to_file : bool ,
315- logging_path : PathBuf ,
316- health_handler : HealthHandler ,
317- }
318-
319- impl NotStartedExecutable {
320- fn new (
321- agent_id : AgentID ,
322- exec_data : ExecutableData ,
323- log_to_file : bool ,
324- logging_path : PathBuf ,
325- health_handler : HealthHandler ,
326- ) -> Self {
327- Self {
328- agent_id,
329- exec_data,
330- log_to_file,
331- logging_path,
332- health_handler,
333- }
334- }
335-
336- fn launch ( & self ) -> Result < StartedExecutable , CommandError > {
337- info ! ( "Starting executable" ) ;
338- let command = CommandOSNotStarted :: new (
339- self . agent_id . clone ( ) ,
340- & self . exec_data ,
341- self . log_to_file ,
342- self . logging_path . clone ( ) ,
343- ) ;
344-
345- Ok ( StartedExecutable {
346- bin : self . exec_data . bin . clone ( ) ,
347- command : command. start ( ) . and_then ( |cmd| cmd. stream ( ) ) ?,
348- health_handler : self . health_handler . clone ( ) ,
349- } )
350+ fn handle_exit (
351+ agent_id : & AgentID ,
352+ exec_data : & ExecutableData ,
353+ exit_status : & ExitStatus ,
354+ health_handler : & HealthHandler ,
355+ ) {
356+ if exit_status. success ( ) {
357+ return ;
350358 }
351- }
352359
353- struct StartedExecutable {
354- bin : String ,
355- command : CommandOSStarted ,
356- health_handler : HealthHandler ,
357- }
358-
359- impl StartedExecutable {
360- fn wait_for_exit (
361- mut self ,
362- stop_consumer : & EventConsumer < CancellationMessage > ,
363- healthy_publish_delay : Duration ,
364- ) -> Result < ( ExitStatus , bool ) , CommandError > {
365- info ! ( "Waiting for executable to complete or be cancelled" ) ;
366- let mut was_cancelled = false ;
367- let deadline = Instant :: now ( ) + healthy_publish_delay;
368- let mut already_published = false ;
369-
370- // Busy waiting is avoided with `is_cancelled_with_timeout`
371- while self . command . is_running ( ) {
372- // Shutdown the spawned process when the cancel signal is received.
373- // This ensures the thread stops in time.
374- if stop_consumer. is_cancelled_with_timeout ( WAIT_FOR_EXIT_TIMEOUT ) {
375- info ! ( supervisor = self . bin, "Stopping executable" ) ;
376- if let Err ( err) = self . command . shutdown ( ) {
377- error ! ( supervisor = self . bin, "Failed to stop executable: {err}" ) ;
378- }
379- info ! ( supervisor = self . bin, msg = "Executable terminated" ) ;
380- was_cancelled = true ;
381- }
382-
383- // Publish healthy status once after the process has been running
384- // for an arbitrary long time without issues.
385- if !already_published && Instant :: now ( ) > deadline {
386- debug ! ( "Informing executable as healthy" ) ;
387- self . health_handler . publish_healthy ( ) ;
388- already_published = true ;
389- }
390- }
391-
392- // At this point, the command is already dead. However, we call `wait` to
393- // release resources.
394- // Reference - https://doc.rust-lang.org/std/process/struct.Child.html#warning
395- self . command
396- . wait ( )
397- . inspect ( |exit_status| {
398- if !already_published && exit_status. success ( ) {
399- debug ! ( "Informing executable as healthy" ) ;
400- self . health_handler . publish_healthy ( ) ;
401- }
402- } )
403- . map ( |exit_status| ( exit_status, was_cancelled) )
404- }
360+ let ExecutableData { bin, args, .. } = & exec_data;
361+ error ! ( %agent_id, supervisor = bin, exit_code = ?exit_status. code( ) , "Executable exited unsuccessfully" ) ;
362+ debug ! ( %exit_status, "Error executing executable, marking as unhealthy" ) ;
363+
364+ let args = args. join ( " " ) ;
365+ let error = format ! ( "path '{bin}' with args '{args}' failed with '{exit_status}'" , ) ;
366+ let status = format ! (
367+ "process exited with code: {:?}" ,
368+ exit_status. code( ) . unwrap_or_default( )
369+ ) ;
370+ health_handler. publish_unhealthy_with_status ( error, status) ;
405371}
406372
407373#[ derive( Clone ) ]
@@ -830,24 +796,20 @@ pub mod tests {
830796 let exec_data = ExecutableData :: new ( "sleep" . to_owned ( ) , "sleep" . to_owned ( ) )
831797 . with_args ( vec ! [ "3" . to_owned( ) ] ) ;
832798
799+ let command =
800+ CommandOSNotStarted :: new ( AgentID :: AgentControl , & exec_data, false , PathBuf :: new ( ) )
801+ . start ( )
802+ . unwrap ( ) ;
803+
833804 let ( health_publisher, health_consumer) = pub_sub ( ) ;
834805 let health_handler = HealthHandler :: new ( exec_data. id . clone ( ) , health_publisher) ;
835- let executable = NotStartedExecutable :: new (
836- AgentID :: AgentControl ,
837- exec_data,
838- false ,
839- PathBuf :: new ( ) ,
840- health_handler,
841- ) ;
842-
843- let started = executable. launch ( ) . unwrap ( ) ;
844806
845807 // Don't use the "_" expression for the publisher.
846808 // Renaming it to "_" drops the channel. Hence, it will be disconnected.
847809 // `wait_for_exit` then gets out on the first iteration and this test will
848810 // always pass even when it shouldn't.
849811 let ( _stop_publisher, stop_consumer) = pub_sub :: < CancellationMessage > ( ) ;
850- let _ = started . wait_for_exit ( & stop_consumer, Duration :: ZERO ) ;
812+ let _ = wait_exit ( command , "" , & stop_consumer, Duration :: ZERO , & health_handler ) ;
851813
852814 let start_time = SystemTime :: now ( ) ;
853815 let expected_ordered_events = vec ! [ (
@@ -875,20 +837,22 @@ pub mod tests {
875837 let exec_data = ExecutableData :: new ( "ls" . to_owned ( ) , "ls" . to_owned ( ) )
876838 . with_args ( vec ! [ "non-existent-path" . to_owned( ) ] ) ;
877839
840+ let command =
841+ CommandOSNotStarted :: new ( AgentID :: AgentControl , & exec_data, false , PathBuf :: new ( ) )
842+ . start ( )
843+ . unwrap ( ) ;
844+
878845 let ( health_publisher, health_consumer) = pub_sub ( ) ;
879846 let health_handler = HealthHandler :: new ( exec_data. id . clone ( ) , health_publisher) ;
880- let executable = NotStartedExecutable :: new (
881- AgentID :: AgentControl ,
882- exec_data,
883- false ,
884- PathBuf :: new ( ) ,
885- health_handler,
886- ) ;
887-
888- let started = executable. launch ( ) . unwrap ( ) ;
889847
890848 let ( _stop_publisher, stop_consumer) = pub_sub :: < CancellationMessage > ( ) ;
891- let _ = started. wait_for_exit ( & stop_consumer, Duration :: from_secs ( 10 ) ) ;
849+ let _ = wait_exit (
850+ command,
851+ "" ,
852+ & stop_consumer,
853+ Duration :: from_secs ( 10 ) ,
854+ & health_handler,
855+ ) ;
892856
893857 assert ! ( health_consumer. as_ref( ) . is_empty( ) )
894858 }
0 commit comments