Skip to content

Commit 0b287dd

Browse files
feat: add spawn named thread util
1 parent 734af1a commit 0b287dd

File tree

12 files changed

+191
-200
lines changed

12 files changed

+191
-200
lines changed

agent-control/src/agent_control/http_server/async_bridge.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::event::channel::EventConsumer;
22
use crate::event::{AgentControlEvent, SubAgentEvent};
3+
use crate::utils::threads::spawn_named_thread;
34
use crossbeam::select;
4-
use std::thread;
55
use std::thread::JoinHandle;
66
use tokio::sync::mpsc::UnboundedSender;
77
use tracing::{debug, error};
@@ -14,7 +14,7 @@ pub fn run_async_sync_bridge(
1414
agent_control_consumer: EventConsumer<AgentControlEvent>,
1515
sub_agent_consumer: EventConsumer<SubAgentEvent>,
1616
) -> JoinHandle<()> {
17-
thread::Builder::new().name("AC sync events to/from Status Http server async events bridge".to_string()).spawn(move || loop {
17+
spawn_named_thread("Async-Sync bridge", move || loop {
1818
select! {
1919
recv(&agent_control_consumer.as_ref()) -> sa_event_res => {
2020
match sa_event_res {
@@ -55,5 +55,5 @@ pub fn run_async_sync_bridge(
5555
}
5656
}
5757
}
58-
}).expect("thread config should be valid")
58+
})
5959
}

agent-control/src/agent_control/http_server/runner.rs

Lines changed: 56 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ use crate::agent_control::http_server::async_bridge::run_async_sync_bridge;
33
use crate::agent_control::http_server::config::ServerConfig;
44
use crate::event::channel::EventConsumer;
55
use crate::event::{AgentControlEvent, SubAgentEvent};
6+
use crate::utils::threads::spawn_named_thread;
67
use crossbeam::select;
78
use std::sync::Arc;
8-
use std::thread;
99
use std::thread::JoinHandle;
1010
use tokio::runtime::Runtime;
1111
use tokio::sync::mpsc;
@@ -55,79 +55,73 @@ impl Runner {
5555
sub_agent_consumer: EventConsumer<SubAgentEvent>,
5656
maybe_opamp_client_config: Option<OpAMPClientConfig>,
5757
) -> JoinHandle<()> {
58-
thread::Builder::new()
59-
.name("Http server".to_string())
60-
.spawn(move || {
61-
// Create 2 unbounded channel to send the Agent Control and Sub Agent Sync events
62-
// to the Async Status Server
63-
let (async_agent_control_event_publisher, async_agent_control_event_consumer) =
64-
mpsc::unbounded_channel::<AgentControlEvent>();
65-
let (async_sub_agent_event_publisher, async_sub_agent_event_consumer) =
66-
mpsc::unbounded_channel::<SubAgentEvent>();
67-
// Run an OS Thread that listens to sync channel and forwards the events
68-
// to an async channel
69-
let bridge_join_handle = run_async_sync_bridge(
70-
async_agent_control_event_publisher,
71-
async_sub_agent_event_publisher,
72-
agent_control_consumer,
73-
sub_agent_consumer,
74-
);
58+
spawn_named_thread("Http server", move || {
59+
// Create 2 unbounded channel to send the Agent Control and Sub Agent Sync events
60+
// to the Async Status Server
61+
let (async_agent_control_event_publisher, async_agent_control_event_consumer) =
62+
mpsc::unbounded_channel::<AgentControlEvent>();
63+
let (async_sub_agent_event_publisher, async_sub_agent_event_consumer) =
64+
mpsc::unbounded_channel::<SubAgentEvent>();
65+
// Run an OS Thread that listens to sync channel and forwards the events
66+
// to an async channel
67+
let bridge_join_handle = run_async_sync_bridge(
68+
async_agent_control_event_publisher,
69+
async_sub_agent_event_publisher,
70+
agent_control_consumer,
71+
sub_agent_consumer,
72+
);
7573

76-
// Run the async status server
77-
let _ = runtime
78-
.block_on(
79-
crate::agent_control::http_server::server::run_status_server(
80-
config.clone(),
81-
async_agent_control_event_consumer,
82-
async_sub_agent_event_consumer,
83-
maybe_opamp_client_config,
84-
),
85-
)
86-
.inspect_err(|err| {
87-
error!(error_msg = %err, "error running status server");
88-
});
74+
// Run the async status server
75+
let _ = runtime
76+
.block_on(
77+
crate::agent_control::http_server::server::run_status_server(
78+
config.clone(),
79+
async_agent_control_event_consumer,
80+
async_sub_agent_event_consumer,
81+
maybe_opamp_client_config,
82+
),
83+
)
84+
.inspect_err(|err| {
85+
error!(error_msg = %err, "error running status server");
86+
});
8987

90-
// Wait until the bridge is closed
91-
bridge_join_handle.join().unwrap();
92-
})
93-
.expect("thread config should be valid")
88+
// Wait until the bridge is closed
89+
bridge_join_handle.join().unwrap();
90+
})
9491
}
9592

9693
fn spawn_noop_consumer(
9794
agent_control_consumer: EventConsumer<AgentControlEvent>,
9895
sub_agent_consumer: EventConsumer<SubAgentEvent>,
9996
) -> JoinHandle<()> {
100-
thread::Builder::new()
101-
.name("No-action consumer".to_string())
102-
.spawn(move || loop {
103-
select! {
104-
recv(agent_control_consumer.as_ref()) -> agent_control_consumer_res => {
105-
match agent_control_consumer_res {
106-
Ok(_) => {}
107-
Err(err) => {
108-
debug!(
109-
error_msg = %err,
110-
"http server event drain processor closed"
111-
);
112-
break;
113-
}
97+
spawn_named_thread("No-action consumer", move || loop {
98+
select! {
99+
recv(agent_control_consumer.as_ref()) -> agent_control_consumer_res => {
100+
match agent_control_consumer_res {
101+
Ok(_) => {}
102+
Err(err) => {
103+
debug!(
104+
error_msg = %err,
105+
"http server event drain processor closed"
106+
);
107+
break;
114108
}
115-
},
116-
recv(sub_agent_consumer.as_ref()) -> sub_agent_consumer_res => {
117-
match sub_agent_consumer_res {
118-
Ok(_) => {}
119-
Err(err) => {
120-
debug!(
121-
error_msg = %err,
122-
"http server event drain processor closed"
123-
);
124-
break;
125-
}
109+
}
110+
},
111+
recv(sub_agent_consumer.as_ref()) -> sub_agent_consumer_res => {
112+
match sub_agent_consumer_res {
113+
Ok(_) => {}
114+
Err(err) => {
115+
debug!(
116+
error_msg = %err,
117+
"http server event drain processor closed"
118+
);
119+
break;
126120
}
127121
}
128122
}
129-
})
130-
.expect("thread config should be valid")
123+
}
124+
})
131125
}
132126
}
133127

agent-control/src/k8s/garbage_collector.rs

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::k8s::error::GarbageCollectorK8sError::{
1212
use crate::k8s::error::{GarbageCollectorK8sError, K8sError};
1313
use crate::k8s::Error::MissingName;
1414
use crate::k8s::{annotations, labels};
15+
use crate::utils::threads::spawn_named_thread;
1516
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
1617
use kube::api::TypeMeta;
1718
use std::{sync::Arc, thread, time::Duration};
@@ -85,20 +86,17 @@ where
8586
let (stop_tx, stop_rx) = pub_sub();
8687
let interval = self.interval;
8788

88-
let handle = thread::Builder::new()
89-
.name("Garbage collector".to_string())
90-
.spawn(move || {
91-
loop {
92-
let _ = self
93-
.collect()
94-
.inspect_err(|err| warn!("executing garbage collection: {err}"));
95-
if stop_rx.is_cancelled(interval) {
96-
break;
97-
}
89+
let handle = spawn_named_thread("Garbage collector", move || {
90+
loop {
91+
let _ = self
92+
.collect()
93+
.inspect_err(|err| warn!("executing garbage collection: {err}"));
94+
if stop_rx.is_cancelled(interval) {
95+
break;
9896
}
99-
info!("k8s garbage collector stopped");
100-
})
101-
.expect("thread config should be valid");
97+
}
98+
info!("k8s garbage collector stopped");
99+
});
102100

103101
K8sGarbageCollectorStarted { stop_tx, handle }
104102
}

agent-control/src/sub_agent/health/health_checker.rs

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use crate::event::SubAgentInternalEvent;
88
use crate::k8s;
99
use crate::sub_agent::health::with_start_time::HealthWithStartTime;
1010
use crate::sub_agent::supervisor::starter::SupervisorStarterError;
11-
use std::thread;
11+
use crate::utils::threads::spawn_named_thread;
1212
use std::time::{SystemTime, SystemTimeError};
1313
use tracing::{debug, error};
1414

@@ -220,27 +220,24 @@ pub(crate) fn spawn_health_checker<H>(
220220
) where
221221
H: HealthChecker + Send + 'static,
222222
{
223-
thread::Builder::new()
224-
.name("Health checker".to_string())
225-
.spawn(move || loop {
226-
debug!(%agent_id, "starting to check health with the configured checker");
227-
228-
let health = health_checker.check_health().unwrap_or_else(|err| {
229-
debug!(%agent_id, last_error = %err, "the configured health check failed");
230-
HealthWithStartTime::from_unhealthy(Unhealthy::from(err), sub_agent_start_time)
231-
});
223+
spawn_named_thread("Health checker", move || loop {
224+
debug!(%agent_id, "starting to check health with the configured checker");
232225

233-
publish_health_event(
234-
&sub_agent_internal_publisher,
235-
SubAgentInternalEvent::AgentHealthInfo(health),
236-
);
226+
let health = health_checker.check_health().unwrap_or_else(|err| {
227+
debug!(%agent_id, last_error = %err, "the configured health check failed");
228+
HealthWithStartTime::from_unhealthy(Unhealthy::from(err), sub_agent_start_time)
229+
});
237230

238-
// Check the cancellation signal
239-
if cancel_signal.is_cancelled(interval.into()) {
240-
break;
241-
}
242-
})
243-
.expect("thread config should be valid");
231+
publish_health_event(
232+
&sub_agent_internal_publisher,
233+
SubAgentInternalEvent::AgentHealthInfo(health),
234+
);
235+
236+
// Check the cancellation signal
237+
if cancel_signal.is_cancelled(interval.into()) {
238+
break;
239+
}
240+
});
244241
}
245242

246243
pub(crate) fn publish_health_event(

agent-control/src/sub_agent/k8s/supervisor.rs

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@ use crate::sub_agent::supervisor::starter::{SupervisorStarter, SupervisorStarter
1515
use crate::sub_agent::supervisor::stopper::SupervisorStopper;
1616
use crate::sub_agent::version::k8s::checkers::K8sAgentVersionChecker;
1717
use crate::sub_agent::version::version_checker::spawn_version_checker;
18+
use crate::utils::threads::spawn_named_thread;
1819
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
1920
use k8s_openapi::serde_json;
2021
use kube::{api::DynamicObject, core::TypeMeta};
2122
use std::sync::Arc;
22-
use std::thread::{self, JoinHandle};
23+
use std::thread::JoinHandle;
2324
use std::time::Duration;
2425
use tracing::{debug, error, info, warn};
2526

@@ -131,22 +132,18 @@ impl NotStartedSupervisorK8s {
131132
let k8s_client = self.k8s_client.clone();
132133

133134
info!(%agent_id, "k8s objects supervisor started");
134-
let join_handle = thread::Builder::new()
135-
.name("K8s objects supervisor".to_string())
136-
.spawn(move || loop {
137-
// Check and apply k8s objects
138-
if let Err(err) =
139-
Self::apply_resources(&agent_id, resources.iter(), k8s_client.clone())
140-
{
141-
error!(%agent_id, %err, "k8s resources apply failed");
142-
}
143-
// Check the cancellation signal
144-
if stop_consumer.is_cancelled(interval) {
145-
info!(%agent_id, "k8s objects supervisor stopped");
146-
break;
147-
}
148-
})
149-
.expect("thread config should be valid");
135+
let join_handle = spawn_named_thread("K8s objects supervisor", move || loop {
136+
// Check and apply k8s objects
137+
if let Err(err) = Self::apply_resources(&agent_id, resources.iter(), k8s_client.clone())
138+
{
139+
error!(%agent_id, %err, "k8s resources apply failed");
140+
}
141+
// Check the cancellation signal
142+
if stop_consumer.is_cancelled(interval) {
143+
info!(%agent_id, "k8s objects supervisor stopped");
144+
break;
145+
}
146+
});
150147

151148
(stop_publisher, join_handle)
152149
}
Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use super::file_logger::FileLogger;
22
use crate::agent_control::config::AgentID;
3+
use crate::utils::threads::spawn_named_thread;
34
use std::{sync::mpsc::Receiver, thread::JoinHandle};
45
use tracing::{debug, info};
56

@@ -14,27 +15,24 @@ impl Logger {
1415
where
1516
S: ToString + Send + 'static,
1617
{
17-
std::thread::Builder::new()
18-
.name("OnHost logger".to_string())
19-
.spawn(move || {
20-
match self {
21-
Self::File(file_logger, agent_id) => {
22-
// If the logger is a FileLogger, set this file logging as the default.
23-
// `_guard` needs to exist in scope to keep persisting the logs in the file
24-
let _guard = file_logger.set_file_logging();
25-
rx.iter()
26-
.for_each(|line| info!(%agent_id, "{}", line.to_string()));
27-
}
28-
Self::Stderr(agent_id) => {
29-
rx.iter()
30-
.for_each(|line| debug!(%agent_id, "{}", line.to_string()));
31-
}
32-
Self::Stdout(agent_id) => {
33-
rx.iter()
34-
.for_each(|line| debug!(%agent_id, "{}", line.to_string()));
35-
}
18+
spawn_named_thread("OnHost logger", move || {
19+
match self {
20+
Self::File(file_logger, agent_id) => {
21+
// If the logger is a FileLogger, set this file logging as the default.
22+
// `_guard` needs to exist in scope to keep persisting the logs in the file
23+
let _guard = file_logger.set_file_logging();
24+
rx.iter()
25+
.for_each(|line| info!(%agent_id, "{}", line.to_string()));
3626
}
37-
})
38-
.expect("thread config should be valid")
27+
Self::Stderr(agent_id) => {
28+
rx.iter()
29+
.for_each(|line| debug!(%agent_id, "{}", line.to_string()));
30+
}
31+
Self::Stdout(agent_id) => {
32+
rx.iter()
33+
.for_each(|line| debug!(%agent_id, "{}", line.to_string()));
34+
}
35+
}
36+
})
3937
}
4038
}

0 commit comments

Comments
 (0)