Skip to content

Commit 1cfeccf

Browse files
author
Olivier
committed
Add CancellationToken as argument to server methods
This is the advice tokio way to stop a task
1 parent 0c98f9b commit 1cfeccf

File tree

2 files changed

+15
-6
lines changed

2 files changed

+15
-6
lines changed

lib/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ chrono = { version = "0.4", features = ["serde"] }
4848
parking_lot = { version = "0.12", features = ["send_guard"] }
4949
futures = "0.3"
5050
tokio = { version = "1", features = ["full"] }
51-
tokio-util = { version = "0.6", features = ["codec"] }
51+
tokio-util = { version = "0.7", features = ["codec"] }
5252
lazy_static = "1.4.0"
5353
regex = "1.7"
5454
serde = "1.0"

lib/src/server/server.rs

+14-5
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use tokio::{
1212
sync::oneshot::{self, Sender},
1313
time::{interval_at, Duration, Instant},
1414
};
15+
use tokio_util::sync::CancellationToken;
1516

1617
use crate::core::{config::Config, prelude::*};
1718
use crate::crypto::*;
@@ -220,19 +221,21 @@ impl Server {
220221
/// Calling this function consumes the server.
221222
pub fn run(self) {
222223
let server = Arc::new(RwLock::new(self));
223-
Self::run_server(server);
224+
//FIXME; What API do we we want? for now just creating a dummy token
225+
let cancel_token = CancellationToken::new();
226+
Self::run_server(server, cancel_token);
224227
}
225228

226229
/// Runs the supplied server and blocks until it completes either by aborting or
227230
/// by error.
228-
pub fn run_server(server: Arc<RwLock<Server>>) {
231+
pub fn run_server(server: Arc<RwLock<Server>>, cancel_token: CancellationToken) {
229232
let single_threaded_executor = {
230233
let server = trace_read_lock!(server);
231234
let server_state = trace_read_lock!(server.server_state);
232235
let config = trace_read_lock!(server_state.config);
233236
config.performance.single_threaded_executor
234237
};
235-
let server_task = Self::new_server_task(server);
238+
let server_task = Self::new_server_task(server, cancel_token);
236239
// Launch
237240
let mut builder = if !single_threaded_executor {
238241
tokio::runtime::Builder::new_multi_thread()
@@ -266,7 +269,7 @@ impl Server {
266269
}
267270

268271
/// Returns the main server task - the loop that waits for connections and processes them.
269-
pub async fn new_server_task(server: Arc<RwLock<Server>>) {
272+
pub async fn new_server_task(server: Arc<RwLock<Server>>, cancel_token: CancellationToken) {
270273
// Get the address and discovery url
271274
let (sock_addr, discovery_server_url) = {
272275
let server = trace_read_lock!(server);
@@ -296,14 +299,17 @@ impl Server {
296299
None => {
297300
error!("Cannot resolve server address, check configuration of server");
298301
}
299-
Some(sock_addr) => Self::server_task(server, sock_addr, discovery_server_url).await,
302+
Some(sock_addr) => {
303+
Self::server_task(server, sock_addr, discovery_server_url, cancel_token).await
304+
}
300305
}
301306
}
302307

303308
async fn server_task<A: ToSocketAddrs>(
304309
server: Arc<RwLock<Server>>,
305310
sock_addr: A,
306311
discovery_server_url: Option<String>,
312+
cancel_token: CancellationToken,
307313
) {
308314
// This is returned as the main server task
309315
info!("Waiting for Connection");
@@ -375,6 +381,9 @@ impl Server {
375381
_ = rx_abort => {
376382
info!("abort received");
377383
}
384+
_ = cancel_token.cancelled() => {
385+
info!("Cancellation token triggered, shutting down server");
386+
}
378387
}
379388
info!("main server task is finished");
380389
}

0 commit comments

Comments
 (0)