Skip to content

Commit c635b1d

Browse files
fix: Add stop signal to status server (#1126)
1 parent f44e291 commit c635b1d

File tree

5 files changed

+244
-80
lines changed

5 files changed

+244
-80
lines changed

agent-control/src/agent_control/http_server/async_bridge.rs

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ pub fn run_async_sync_bridge(
1313
async_suba_publisher: UnboundedSender<SubAgentEvent>,
1414
agent_control_consumer: EventConsumer<AgentControlEvent>,
1515
sub_agent_consumer: EventConsumer<SubAgentEvent>,
16+
stop_rx: EventConsumer<()>,
1617
) -> JoinHandle<()> {
1718
spawn_named_thread("Async-Sync bridge", move || loop {
1819
select! {
@@ -36,24 +37,28 @@ pub fn run_async_sync_bridge(
3637
}
3738
},
3839
recv(&sub_agent_consumer.as_ref()) -> suba_event_res => {
39-
match suba_event_res {
40-
Ok(sub_agent_event) => {
41-
let _ = async_suba_publisher.send(sub_agent_event).inspect_err(|err| {
42-
error!(
43-
error_msg = %err,
44-
"cannot forward agent control event"
45-
);
46-
});
47-
}
48-
Err(err) => {
49-
debug!(
40+
match suba_event_res {
41+
Ok(sub_agent_event) => {
42+
let _ = async_suba_publisher.send(sub_agent_event).inspect_err(|err| {
43+
error!(
5044
error_msg = %err,
51-
"status server bridge channel closed"
45+
"cannot forward agent control event"
5246
);
53-
break;
54-
}
47+
});
48+
}
49+
Err(err) => {
50+
debug!(
51+
error_msg = %err,
52+
"status server bridge channel closed"
53+
);
54+
break;
5555
}
5656
}
57+
},
58+
recv(&stop_rx.as_ref()) -> _ => {
59+
debug!("status server bridge stopping");
60+
break;
61+
}
5762
}
5863
})
5964
}

agent-control/src/agent_control/http_server/config.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,12 @@ mod tests {
6363
server_config: ServerConfig,
6464
}
6565

66+
impl From<u16> for Port {
67+
fn from(value: u16) -> Self {
68+
Port(value)
69+
}
70+
}
71+
6672
#[test]
6773
fn test_deserialize_default() {
6874
struct Test {
Lines changed: 193 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,20 @@
11
use crate::agent_control::config::OpAMPClientConfig;
22
use crate::agent_control::http_server::async_bridge::run_async_sync_bridge;
33
use crate::agent_control::http_server::config::ServerConfig;
4+
use crate::event::cancellation::CancellationMessage;
45
use crate::event::channel::EventConsumer;
56
use crate::event::{AgentControlEvent, SubAgentEvent};
6-
use crate::utils::threads::spawn_named_thread;
7+
use crate::utils::thread_context::{NotStartedThreadContext, StartedThreadContext};
78
use crossbeam::select;
89
use std::sync::Arc;
9-
use std::thread::JoinHandle;
1010
use tokio::runtime::Runtime;
1111
use tokio::sync::mpsc;
1212
use tracing::{debug, error, info};
1313

1414
/// Runner will be responsible for spawning the OS Thread for the HTTP Server
1515
/// and owning the JoinHandle. It controls the server stop implementing drop
1616
pub struct Runner {
17-
join_handle: Option<JoinHandle<()>>,
17+
thread_context: Option<StartedThreadContext>,
1818
}
1919

2020
impl Runner {
@@ -30,21 +30,29 @@ impl Runner {
3030
sub_agent_consumer: EventConsumer<SubAgentEvent>,
3131
maybe_opamp_client_config: Option<OpAMPClientConfig>,
3232
) -> Self {
33-
let join_handle = if config.enabled {
34-
Self::spawn_server(
35-
config,
36-
runtime,
37-
agent_control_consumer,
38-
sub_agent_consumer,
39-
maybe_opamp_client_config,
40-
)
33+
let thread_context = if config.enabled {
34+
let callback = move |stop_consumer: EventConsumer<CancellationMessage>| {
35+
Self::spawn_server(
36+
config,
37+
runtime,
38+
agent_control_consumer,
39+
sub_agent_consumer,
40+
maybe_opamp_client_config,
41+
stop_consumer,
42+
)
43+
};
44+
NotStartedThreadContext::new("Http server", callback).start()
4145
} else {
46+
let callback = move |stop_consumer: EventConsumer<CancellationMessage>| {
47+
Self::noop_consumer_loop(agent_control_consumer, sub_agent_consumer, stop_consumer)
48+
};
4249
// Spawn a thread with a no-action consumer to drain the channel and
4350
// avoid memory leaks
44-
Self::spawn_noop_consumer(agent_control_consumer, sub_agent_consumer)
51+
NotStartedThreadContext::new("No-action consumer", callback).start()
4552
};
53+
4654
Runner {
47-
join_handle: Some(join_handle),
55+
thread_context: Some(thread_context),
4856
}
4957
}
5058

@@ -54,47 +62,49 @@ impl Runner {
5462
agent_control_consumer: EventConsumer<AgentControlEvent>,
5563
sub_agent_consumer: EventConsumer<SubAgentEvent>,
5664
maybe_opamp_client_config: Option<OpAMPClientConfig>,
57-
) -> JoinHandle<()> {
58-
spawn_named_thread("Http server", move || {
59-
// Create 2 unbounded channel to send the Agent Control and Sub Agent Sync events
60-
// to the Async Status Server
61-
let (async_agent_control_event_publisher, async_agent_control_event_consumer) =
62-
mpsc::unbounded_channel::<AgentControlEvent>();
63-
let (async_sub_agent_event_publisher, async_sub_agent_event_consumer) =
64-
mpsc::unbounded_channel::<SubAgentEvent>();
65-
// Run an OS Thread that listens to sync channel and forwards the events
66-
// to an async channel
67-
let bridge_join_handle = run_async_sync_bridge(
68-
async_agent_control_event_publisher,
69-
async_sub_agent_event_publisher,
70-
agent_control_consumer,
71-
sub_agent_consumer,
72-
);
73-
74-
// Run the async status server
75-
let _ = runtime
76-
.block_on(
77-
crate::agent_control::http_server::server::run_status_server(
78-
config.clone(),
79-
async_agent_control_event_consumer,
80-
async_sub_agent_event_consumer,
81-
maybe_opamp_client_config,
82-
),
83-
)
84-
.inspect_err(|err| {
85-
error!(error_msg = %err, "error running status server");
86-
});
65+
stop_rx: EventConsumer<CancellationMessage>,
66+
) {
67+
// Create 2 unbounded channel to send the Agent Control and Sub Agent Sync events
68+
// to the Async Status Server
69+
let (async_agent_control_event_publisher, async_agent_control_event_consumer) =
70+
mpsc::unbounded_channel::<AgentControlEvent>();
71+
let (async_sub_agent_event_publisher, async_sub_agent_event_consumer) =
72+
mpsc::unbounded_channel::<SubAgentEvent>();
8773

88-
// Wait until the bridge is closed
89-
bridge_join_handle.join().unwrap();
90-
})
74+
// Run an OS Thread that listens to sync channel and forwards the events
75+
// to an async channel
76+
let bridge_join_handle = run_async_sync_bridge(
77+
async_agent_control_event_publisher,
78+
async_sub_agent_event_publisher,
79+
agent_control_consumer,
80+
sub_agent_consumer,
81+
stop_rx,
82+
);
83+
84+
// Run the async status server
85+
let _ = runtime
86+
.block_on(
87+
crate::agent_control::http_server::server::run_status_server(
88+
config.clone(),
89+
async_agent_control_event_consumer,
90+
async_sub_agent_event_consumer,
91+
maybe_opamp_client_config,
92+
),
93+
)
94+
.inspect_err(|err| {
95+
error!(error_msg = %err, "error running status server");
96+
});
97+
98+
// Wait until the bridge is closed
99+
bridge_join_handle.join().unwrap();
91100
}
92101

93-
fn spawn_noop_consumer(
102+
fn noop_consumer_loop(
94103
agent_control_consumer: EventConsumer<AgentControlEvent>,
95104
sub_agent_consumer: EventConsumer<SubAgentEvent>,
96-
) -> JoinHandle<()> {
97-
spawn_named_thread("No-action consumer", move || loop {
105+
stop_rx: EventConsumer<CancellationMessage>,
106+
) {
107+
loop {
98108
select! {
99109
recv(agent_control_consumer.as_ref()) -> agent_control_consumer_res => {
100110
match agent_control_consumer_res {
@@ -119,19 +129,145 @@ impl Runner {
119129
break;
120130
}
121131
}
122-
}
132+
},
133+
recv(stop_rx.as_ref()) -> _ => {
134+
debug!("http server event drain processor stopped");
135+
break;
136+
},
123137
}
124-
})
138+
}
125139
}
126140
}
127141

128142
impl Drop for Runner {
129143
fn drop(&mut self) {
130-
if let Some(join_handle) = self.join_handle.take() {
131-
info!("waiting for status server to stop gracefully...");
132-
join_handle
133-
.join()
134-
.expect("error waiting for server join handle")
135-
}
144+
info!("waiting for status server to stop gracefully...");
145+
146+
let Some(thread_context) = self.thread_context.take() else {
147+
return;
148+
};
149+
150+
let _ = thread_context
151+
.stop()
152+
.inspect(|_| debug!("status server runner thread stopped"))
153+
.inspect_err(|error_msg| {
154+
error!(
155+
%error_msg,
156+
"Error stopping Status Server"
157+
)
158+
});
159+
}
160+
}
161+
162+
#[cfg(test)]
163+
mod tests {
164+
use std::sync::Arc;
165+
use std::thread::sleep;
166+
use std::time::Duration;
167+
168+
use tracing_test::internal::logs_with_scope_contain;
169+
use tracing_test::traced_test;
170+
171+
use crate::agent_control::http_server::config::ServerConfig;
172+
use crate::event::channel::pub_sub;
173+
use crate::event::AgentControlEvent;
174+
175+
use super::Runner;
176+
177+
#[test]
178+
#[traced_test]
179+
fn test_noop_consumer_stops_gracefully_when_dropped() {
180+
let runtime = Arc::new(
181+
tokio::runtime::Builder::new_multi_thread()
182+
.enable_all()
183+
.build()
184+
.unwrap(),
185+
);
186+
let (_agent_control_publisher, agent_control_consumer) = pub_sub::<AgentControlEvent>();
187+
let (_sub_agent_publisher, sub_agent_consumer) = pub_sub();
188+
let _http_server_runner = Runner::start(
189+
ServerConfig::default(),
190+
runtime,
191+
agent_control_consumer,
192+
sub_agent_consumer,
193+
None,
194+
);
195+
drop(_http_server_runner);
196+
197+
// wait for logs to be flushed
198+
sleep(Duration::from_millis(100));
199+
assert!(logs_with_scope_contain(
200+
"newrelic_agent_control::agent_control::http_server::runner",
201+
"http server event drain processor stopped",
202+
));
203+
}
204+
#[test]
205+
#[traced_test]
206+
fn test_server_stops_gracefully_when_dropped() {
207+
let runtime = Arc::new(
208+
tokio::runtime::Builder::new_multi_thread()
209+
.enable_all()
210+
.build()
211+
.unwrap(),
212+
);
213+
let (_agent_control_publisher, agent_control_consumer) = pub_sub::<AgentControlEvent>();
214+
let (_sub_agent_publisher, sub_agent_consumer) = pub_sub();
215+
let _http_server_runner = Runner::start(
216+
ServerConfig {
217+
enabled: true,
218+
port: 0.into(),
219+
..Default::default()
220+
},
221+
runtime,
222+
agent_control_consumer,
223+
sub_agent_consumer,
224+
None,
225+
);
226+
// server warm up
227+
sleep(Duration::from_millis(100));
228+
229+
drop(_http_server_runner);
230+
231+
// wait for logs to be flushed
232+
sleep(Duration::from_millis(100));
233+
assert!(logs_with_scope_contain(
234+
"newrelic_agent_control::agent_control::http_server::server",
235+
"status server gracefully stopped",
236+
));
237+
}
238+
#[test]
239+
#[traced_test]
240+
fn test_server_stops_gracefully_when_external_channels_close() {
241+
let runtime = Arc::new(
242+
tokio::runtime::Builder::new_multi_thread()
243+
.enable_all()
244+
.build()
245+
.unwrap(),
246+
);
247+
let (_agent_control_publisher, agent_control_consumer) = pub_sub::<AgentControlEvent>();
248+
let (_sub_agent_publisher, sub_agent_consumer) = pub_sub();
249+
let _http_server_runner = Runner::start(
250+
ServerConfig {
251+
enabled: true,
252+
port: 0.into(),
253+
..Default::default()
254+
},
255+
runtime,
256+
agent_control_consumer,
257+
sub_agent_consumer,
258+
None,
259+
);
260+
// server warm up
261+
sleep(Duration::from_millis(100));
262+
263+
drop(_agent_control_publisher);
264+
drop(_sub_agent_publisher);
265+
266+
// wait for logs to be flushed
267+
sleep(Duration::from_millis(100));
268+
assert!(logs_with_scope_contain(
269+
"newrelic_agent_control::agent_control::http_server::server",
270+
"status server gracefully stopped",
271+
));
136272
}
137273
}

agent-control/src/agent_control/http_server/server.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ pub async fn run_status_server(
7070
debug!("waiting for status server join handle");
7171
server_join_handle.await?;
7272

73+
debug!("status server gracefully stopped");
74+
7375
Ok(())
7476
}
7577

0 commit comments

Comments
 (0)