Skip to content

Commit 552f98b

Browse files
fix: stop finished thread (#1158)
1 parent c863b6c commit 552f98b

File tree

2 files changed

+18
-25
lines changed

2 files changed

+18
-25
lines changed

agent-control/src/event/channel.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ impl<E> EventPublisher<E> {
3333
.send(event)
3434
.map_err(|err| EventPublisherError::SendError(err.to_string()))
3535
}
36+
pub fn try_publish(&self, event: E) -> Result<(), EventPublisherError> {
37+
self.0
38+
.try_send(event)
39+
.map_err(|err| EventPublisherError::SendError(err.to_string()))
40+
}
3641
}
3742

3843
impl<E> Clone for EventPublisher<E> {

agent-control/src/utils/thread_context.rs

Lines changed: 13 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::{
22
thread::{sleep, JoinHandle},
33
time::Duration,
44
};
5-
use tracing::trace;
5+
use tracing::{debug, trace};
66

77
const GRACEFUL_STOP_RETRY: u16 = 10;
88
const GRACEFUL_STOP_RETRY_INTERVAL: Duration = Duration::from_millis(100);
@@ -112,18 +112,13 @@ where
112112
/// It sends a stop signal and periodically checks if the thread has finished until
113113
/// it timeout defined by `GRACEFUL_STOP_RETRY` * `GRACEFUL_STOP_RETRY_INTERVAL`.
114114
pub fn stop(self) -> Result<T, ThreadContextStopperError> {
115-
trace!(thread = self.thread_name, "stopping");
116-
if self.join_handle.is_finished() {
117-
trace!(thread = self.thread_name, "finished already, joining");
118-
return self.join_thread();
119-
}
120115
trace!(thread = self.thread_name, "publishing stop");
121-
self.stop_publisher.publish(()).map_err(|err| {
122-
ThreadContextStopperError::EventPublisherError {
123-
thread: self.thread_name.clone(),
124-
error: err.to_string(),
125-
}
126-
})?;
116+
// Stop consumer could be disconnected if the thread has finished already.
117+
// Either the stop is full or disconnected that shouldn't prevent to join the thread.
118+
let _ = self
119+
.stop_publisher
120+
.try_publish(())
121+
.inspect_err(|err| debug!(thread = self.thread_name, "Fail publishing stop: {}", err));
127122
for _ in 0..GRACEFUL_STOP_RETRY {
128123
if self.join_handle.is_finished() {
129124
trace!(thread = self.thread_name, "finished, joining");
@@ -139,18 +134,13 @@ where
139134

140135
/// It sends a stop signal and waits until the thread handle is joined.
141136
pub fn stop_blocking(self) -> Result<T, ThreadContextStopperError> {
142-
trace!(thread = self.thread_name, "stopping");
143-
if self.join_handle.is_finished() {
144-
trace!(thread = self.thread_name, "finished already, joining");
145-
return self.join_thread();
146-
}
147137
trace!(thread = self.thread_name, "publishing stop");
148-
self.stop_publisher.publish(()).map_err(|err| {
149-
ThreadContextStopperError::EventPublisherError {
150-
thread: self.thread_name.clone(),
151-
error: err.to_string(),
152-
}
153-
})?;
138+
// Stop consumer could be disconnected if the thread has finished already.
139+
// Either the stop is full or disconnected that shouldn't prevent to join the thread.
140+
let _ = self
141+
.stop_publisher
142+
.try_publish(())
143+
.inspect_err(|err| debug!(thread = self.thread_name, "Fail publishing stop: {}", err));
154144
trace!(thread = self.thread_name, "joining");
155145
self.join_thread()
156146
}
@@ -199,13 +189,11 @@ pub mod tests {
199189
};
200190

201191
let started_thread_context = NotStartedThreadContext::new(thread_name, callback).start();
202-
assert!(!started_thread_context.is_thread_finished());
203192
// wait for the thread to finish
204193
sleep(Duration::from_millis(100));
205194
started_thread_context.stop_blocking().unwrap();
206195

207196
let started_thread_context = NotStartedThreadContext::new(thread_name, callback).start();
208-
assert!(!started_thread_context.is_thread_finished());
209197
// wait for the thread to finish
210198
sleep(Duration::from_millis(100));
211199
started_thread_context.stop().unwrap();

0 commit comments

Comments
 (0)