File tree Expand file tree Collapse file tree 2 files changed +21
-3
lines changed
Expand file tree Collapse file tree 2 files changed +21
-3
lines changed Original file line number Diff line number Diff line change @@ -86,9 +86,13 @@ impl SupervisorStopper for StartedSupervisorOnHost {
8686 fn stop ( self ) -> Result < ( ) , ThreadContextStopperError > {
8787 let mut stop_result = Ok ( ( ) ) ;
8888
89- for thread_context in self . thread_contexts . into_iter ( ) {
89+ for thread_context in self . thread_contexts . iter ( ) {
90+ thread_context. notify_stop ( ) ;
91+ }
92+
93+ for thread_context in self . thread_contexts {
9094 let thread_name = thread_context. thread_name ( ) . to_string ( ) ;
91- match thread_context. stop_blocking ( ) {
95+ match thread_context. wait_stop ( ) {
9296 Ok ( _) => info ! ( "{} stopped" , thread_name) ,
9397 Err ( error_msg) => {
9498 error ! ( "Stopping '{thread_name}': {error_msg}" ) ;
@@ -355,8 +359,8 @@ impl NotStartedSupervisorOnHost {
355359 } ;
356360
357361 vec ! [
358- NotStartedThreadContext :: new( executable_data. bin. clone( ) , terminator_callback) . start( ) ,
359362 NotStartedThreadContext :: new( executable_data. bin. clone( ) , executor_callback) . start( ) ,
363+ NotStartedThreadContext :: new( executable_data. bin. clone( ) , terminator_callback) . start( ) ,
360364 ]
361365 }
362366}
Original file line number Diff line number Diff line change @@ -143,6 +143,20 @@ where
143143 trace ! ( thread = self . thread_name, "Joining" ) ;
144144 self . join_thread ( )
145145 }
146+
147+ pub fn notify_stop ( & self ) {
148+ trace ! ( thread = self . thread_name, "Publishing stop" ) ;
149+ // Stop consumer could be disconnected if the thread has finished already.
150+ // Either the stop is full or disconnected that shouldn't prevent to join the thread.
151+ let _ = self . stop_publisher . try_publish ( ( ) ) . inspect_err ( |err| {
152+ debug ! ( thread = self . thread_name, "Publishing stop failed: {}" , err)
153+ } ) ;
154+ }
155+
156+ pub fn wait_stop ( self ) -> Result < T , ThreadContextStopperError > {
157+ trace ! ( thread = self . thread_name, "Joining" ) ;
158+ self . join_thread ( )
159+ }
146160}
147161
148162#[ cfg( test) ]
You can’t perform that action at this time.
0 commit comments