Skip to content

Commit 597914a

Browse files
committed
remove the hack sender channel holder
Signed-off-by: Zhonghu Xu <xuzhonghu@huawei.com>
1 parent 35bca98 commit 597914a

File tree

9 files changed

+96
-147
lines changed

9 files changed

+96
-147
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

orion-lib/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ typed-builder = "0.18.2"
7171
url.workspace = true
7272
uuid = { version = "1.17.0", features = ["v4"] }
7373
x509-parser = { version = "0.17", features = ["default"] }
74+
tokio-util = "0.7.16"
7475

7576

7677
[dev-dependencies]

orion-lib/src/lib.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,13 +131,16 @@ pub fn new_configuration_channel(capacity: usize) -> (ConfigurationSenders, Conf
131131

132132
/// Start the listeners manager directly without spawning a background task.
133133
/// Caller must be inside a Tokio runtime and await this async function.
134-
pub async fn start_listener_manager(configuration_receivers: ConfigurationReceivers) -> Result<()> {
134+
pub async fn start_listener_manager(
135+
configuration_receivers: ConfigurationReceivers,
136+
ct: tokio_util::sync::CancellationToken,
137+
) -> Result<()> {
135138
let ConfigurationReceivers { listener_configuration_receiver, route_configuration_receiver } =
136139
configuration_receivers;
137140

138141
tracing::debug!("listeners manager starting");
139142
let mgr = ListenersManager::new(listener_configuration_receiver, route_configuration_receiver);
140-
mgr.start().await.map_err(|err| {
143+
mgr.start(ct).await.map_err(|err| {
141144
tracing::warn!(error = %err, "listeners manager exited with error");
142145
err
143146
})?;

orion-lib/src/listeners/listeners_manager.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -55,26 +55,30 @@ impl ListenerInfo {
5555
}
5656

5757
pub struct ListenersManager {
58-
configuration_channel: mpsc::Receiver<ListenerConfigurationChange>,
58+
listener_configuration_channel: mpsc::Receiver<ListenerConfigurationChange>,
5959
route_configuration_channel: mpsc::Receiver<RouteConfigurationChange>,
6060
listener_handles: BTreeMap<&'static str, ListenerInfo>,
6161
}
6262

6363
impl ListenersManager {
6464
pub fn new(
65-
configuration_channel: mpsc::Receiver<ListenerConfigurationChange>,
65+
listener_configuration_channel: mpsc::Receiver<ListenerConfigurationChange>,
6666
route_configuration_channel: mpsc::Receiver<RouteConfigurationChange>,
6767
) -> Self {
68-
ListenersManager { configuration_channel, route_configuration_channel, listener_handles: BTreeMap::new() }
68+
ListenersManager {
69+
listener_configuration_channel,
70+
route_configuration_channel,
71+
listener_handles: BTreeMap::new(),
72+
}
6973
}
7074

71-
pub async fn start(mut self) -> Result<()> {
75+
pub async fn start(mut self, ct: tokio_util::sync::CancellationToken) -> Result<()> {
7276
let (tx_secret_updates, _) = broadcast::channel(16);
7377
let (tx_route_updates, _) = broadcast::channel(16);
74-
78+
// TODO: create child token for each listener?
7579
loop {
7680
tokio::select! {
77-
Some(listener_configuration_change) = self.configuration_channel.recv() => {
81+
Some(listener_configuration_change) = self.listener_configuration_channel.recv() => {
7882
match listener_configuration_change {
7983
ListenerConfigurationChange::Added(boxed) => {
8084
let (factory, listener_conf) = *boxed;
@@ -110,9 +114,9 @@ impl ListenersManager {
110114
warn!("Internal problem when updating a route: {e}");
111115
}
112116
},
113-
else => {
114-
warn!("All listener manager channels are closed...exiting");
115-
return Err("All listener manager channels are closed...exiting".into());
117+
_ = ct.cancelled() => {
118+
warn!("Listener manager exiting");
119+
return Ok(());
116120
}
117121
}
118122
}

orion-proxy/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ tracing-subscriber = { workspace = true, features = [
5454
"registry",
5555
"std",
5656
] }
57+
tokio-util = "0.7.16"
5758

5859
[target.'cfg(not(target_env = "msvc"))'.dependencies]
5960
tikv-jemallocator = { version = "0.6", optional = true }
@@ -67,5 +68,8 @@ axum-test = "17.2.0"
6768
orion-data-plane-api.workspace = true
6869
tracing-test.workspace = true
6970

71+
[target.'cfg(unix)'.dev-dependencies]
72+
libc = "0.2"
73+
7074
[lints]
7175
workspace = true

orion-proxy/src/main.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ static GLOBAL: Jemalloc = Jemalloc;
2626
#[global_allocator]
2727
static ALLOC: dhat::Alloc = dhat::Alloc;
2828

29-
fn main() -> orion_error::Result<()> {
29+
#[tokio::main]
30+
async fn main() -> orion_error::Result<()> {
3031
#[cfg(all(feature = "dhat-heap", not(feature = "jemalloc")))]
3132
let _profiler = dhat::Profiler::new_heap();
3233
orion_proxy::run()

orion-proxy/src/proxy.rs

Lines changed: 29 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use crate::{
1919
admin::start_admin_server,
2020
core_affinity,
2121
runtime::{self, RuntimeId},
22-
signal::{spawn_signal_handler, ShutdownSignal},
22+
signal::wait_signal,
2323
xds_configurator::XdsConfigurationHandler,
2424
};
2525
use compact_str::ToCompactString;
@@ -51,20 +51,20 @@ use tracing::{debug, info, warn};
5151
pub fn run_orion(bootstrap: Bootstrap, access_log_config: Option<AccessLogConfig>) {
5252
debug!("Starting on thread {:?}", std::thread::current().name());
5353

54-
// Set up signal handling and shutdown notification channel
55-
let (shutdown_tx, signal_handle) = spawn_signal_handler();
54+
let ct = tokio_util::sync::CancellationToken::new();
55+
let ct_clone = ct.clone();
56+
tokio::spawn(async move {
57+
// Set up signal handling and shutdown notification channel
58+
wait_signal().await;
59+
// Trigger cancellation
60+
ct_clone.cancel();
61+
});
5662

5763
// launch the runtimes...
58-
let sender_guards =
59-
launch_runtimes(bootstrap, access_log_config, &shutdown_tx).with_context_msg("failed to launch runtimes");
60-
61-
// Wait for signal handler to complete
62-
if let Err(err) = signal_handle.join() {
63-
warn!("Signal handler thread panicked: {:?}", err);
64+
let res = launch_runtimes(bootstrap, access_log_config, ct).with_context_msg("failed to launch runtimes");
65+
if let Err(err) = res {
66+
warn!("Error running orion: {err}");
6467
}
65-
66-
// Return the sender guards (if successful) to keep channels alive
67-
_ = sender_guards;
6868
}
6969

7070
fn calculate_num_threads_per_runtime(num_cpus: usize, num_runtimes: usize) -> Result<usize> {
@@ -106,13 +106,11 @@ struct ProxyConfiguration {
106106
metrics: Vec<Metrics>,
107107
}
108108

109-
type SenderGuards = Vec<ConfigurationSenders>;
110-
111109
fn launch_runtimes(
112110
bootstrap: Bootstrap,
113111
access_log_config: Option<AccessLogConfig>,
114-
shutdown_tx: &tokio::sync::broadcast::Sender<ShutdownSignal>,
115-
) -> Result<SenderGuards> {
112+
ct: tokio_util::sync::CancellationToken,
113+
) -> Result<()> {
116114
let rt_config = runtime_config();
117115
let num_runtimes = rt_config.num_runtimes();
118116
let num_cpus = rt_config.num_cpus();
@@ -123,11 +121,6 @@ fn launch_runtimes(
123121
let (config_senders, config_receivers): (Vec<ConfigurationSenders>, Vec<ConfigurationReceivers>) =
124122
(0..num_runtimes).map(|_| new_configuration_channel(100)).collect::<Vec<_>>().into_iter().unzip();
125123

126-
// keep a copy of the senders to avoid them being dropped if no services are configured...
127-
//
128-
129-
let sender_guards = config_senders.clone();
130-
131124
// launch services runtime...
132125
//
133126

@@ -180,7 +173,7 @@ fn launch_runtimes(
180173
rt_config.num_service_threads.get() as usize,
181174
None,
182175
config,
183-
shutdown_tx.subscribe(),
176+
ct.clone(),
184177
)?;
185178

186179
if !are_metrics_empty {
@@ -211,7 +204,7 @@ fn launch_runtimes(
211204
metrics.clone(),
212205
rt_config.affinity_strategy.clone().map(|affinity| (RuntimeId(id), affinity)),
213206
config_receivers,
214-
shutdown_tx.subscribe(),
207+
ct.clone(),
215208
)
216209
})
217210
.collect::<Result<Vec<_>>>()?
@@ -224,7 +217,7 @@ fn launch_runtimes(
224217
warn!("Closing handler with error {err:?}");
225218
}
226219
}
227-
Ok(sender_guards)
220+
Ok(())
228221
}
229222

230223
fn spawn_proxy_runtime_from_thread(
@@ -233,22 +226,19 @@ fn spawn_proxy_runtime_from_thread(
233226
metrics: Vec<Metrics>,
234227
affinity_info: Option<(RuntimeId, Affinity)>,
235228
configuration_receivers: ConfigurationReceivers,
236-
mut shutdown_rx: tokio::sync::broadcast::Receiver<ShutdownSignal>,
229+
ct: tokio_util::sync::CancellationToken,
237230
) -> Result<JoinHandle<()>> {
238231
let thread_name = build_thread_name(thread_name, affinity_info.as_ref());
239232

240233
let handle = thread::Builder::new().name(thread_name.clone()).spawn(move || {
241234
let rt = runtime::build_tokio_runtime(&thread_name, num_threads, affinity_info, Some(metrics));
242235
rt.block_on(async {
243236
tokio::select! {
244-
_ = start_proxy(configuration_receivers) => {
237+
_ = start_proxy(configuration_receivers, ct.clone()) => {
245238
info!("Proxy Runtime terminated!");
246239
}
247-
signal = shutdown_rx.recv() => {
248-
match signal {
249-
Ok(signal) => info!("Received {} signal, shutting down Proxy runtime!", signal),
250-
Err(_) => info!("Shutdown channel closed, shutting down Proxy runtime!"),
251-
}
240+
_ = ct.cancelled() => {
241+
info!("Shutdown channel closed, shutting down Proxy runtime!");
252242
}
253243
}
254244
});
@@ -261,7 +251,7 @@ fn spawn_services_runtime_from_thread(
261251
threads_num: usize,
262252
affinity_info: Option<(RuntimeId, Affinity)>,
263253
config: ProxyConfiguration,
264-
mut shutdown_rx: tokio::sync::broadcast::Receiver<ShutdownSignal>,
254+
ct: tokio_util::sync::CancellationToken,
265255
) -> Result<JoinHandle<()>> {
266256
let thread_name = build_thread_name(thread_name, affinity_info.as_ref());
267257
let rt_handle = thread::Builder::new().name(thread_name.clone()).spawn(move || {
@@ -274,11 +264,8 @@ fn spawn_services_runtime_from_thread(
274264
}
275265
info!("Services Runtime terminated!");
276266
}
277-
signal = shutdown_rx.recv() => {
278-
match signal {
279-
Ok(signal) => info!("Received {} signal, shutting down Services runtime!", signal),
280-
Err(_) => info!("Shutdown channel closed, shutting down Services runtime!"),
281-
}
267+
_ = ct.cancelled() => {
268+
info!("Shutdown channel closed, shutting down Services runtime!");
282269
}
283270
}
284271
});
@@ -435,7 +422,10 @@ async fn configure_initial_resources(
435422
clusters.into_iter().map(orion_lib::clusters::add_cluster).collect::<Result<_>>()
436423
}
437424

438-
async fn start_proxy(configuration_receivers: ConfigurationReceivers) -> Result<()> {
439-
orion_lib::start_listener_manager(configuration_receivers).await?;
425+
async fn start_proxy(
426+
configuration_receivers: ConfigurationReceivers,
427+
ct: tokio_util::sync::CancellationToken,
428+
) -> Result<()> {
429+
orion_lib::start_listener_manager(configuration_receivers, ct).await?;
440430
Ok(())
441431
}

0 commit comments

Comments
 (0)