@@ -12,6 +12,7 @@ use tokio::{
12
12
sync:: oneshot:: { self , Sender } ,
13
13
time:: { interval_at, Duration , Instant } ,
14
14
} ;
15
+ use tokio_util:: sync:: CancellationToken ;
15
16
16
17
use crate :: core:: { config:: Config , prelude:: * } ;
17
18
use crate :: crypto:: * ;
@@ -220,19 +221,21 @@ impl Server {
220
221
/// Calling this function consumes the server.
221
222
pub fn run ( self ) {
222
223
let server = Arc :: new ( RwLock :: new ( self ) ) ;
223
- Self :: run_server ( server) ;
224
+ //FIXME; What API do we we want? for now just creating a dummy token
225
+ let cancel_token = CancellationToken :: new ( ) ;
226
+ Self :: run_server ( server, cancel_token) ;
224
227
}
225
228
226
229
/// Runs the supplied server and blocks until it completes either by aborting or
227
230
/// by error.
228
- pub fn run_server ( server : Arc < RwLock < Server > > ) {
231
+ pub fn run_server ( server : Arc < RwLock < Server > > , cancel_token : CancellationToken ) {
229
232
let single_threaded_executor = {
230
233
let server = trace_read_lock ! ( server) ;
231
234
let server_state = trace_read_lock ! ( server. server_state) ;
232
235
let config = trace_read_lock ! ( server_state. config) ;
233
236
config. performance . single_threaded_executor
234
237
} ;
235
- let server_task = Self :: new_server_task ( server) ;
238
+ let server_task = Self :: new_server_task ( server, cancel_token ) ;
236
239
// Launch
237
240
let mut builder = if !single_threaded_executor {
238
241
tokio:: runtime:: Builder :: new_multi_thread ( )
@@ -266,7 +269,7 @@ impl Server {
266
269
}
267
270
268
271
/// Returns the main server task - the loop that waits for connections and processes them.
269
- pub async fn new_server_task ( server : Arc < RwLock < Server > > ) {
272
+ pub async fn new_server_task ( server : Arc < RwLock < Server > > , cancel_token : CancellationToken ) {
270
273
// Get the address and discovery url
271
274
let ( sock_addr, discovery_server_url) = {
272
275
let server = trace_read_lock ! ( server) ;
@@ -296,14 +299,17 @@ impl Server {
296
299
None => {
297
300
error ! ( "Cannot resolve server address, check configuration of server" ) ;
298
301
}
299
- Some ( sock_addr) => Self :: server_task ( server, sock_addr, discovery_server_url) . await ,
302
+ Some ( sock_addr) => {
303
+ Self :: server_task ( server, sock_addr, discovery_server_url, cancel_token) . await
304
+ }
300
305
}
301
306
}
302
307
303
308
async fn server_task < A : ToSocketAddrs > (
304
309
server : Arc < RwLock < Server > > ,
305
310
sock_addr : A ,
306
311
discovery_server_url : Option < String > ,
312
+ cancel_token : CancellationToken ,
307
313
) {
308
314
// This is returned as the main server task
309
315
info ! ( "Waiting for Connection" ) ;
@@ -375,6 +381,9 @@ impl Server {
375
381
_ = rx_abort => {
376
382
info!( "abort received" ) ;
377
383
}
384
+ _ = cancel_token. cancelled( ) => {
385
+ info!( "Cancellation token triggered, shutting down server" ) ;
386
+ }
378
387
}
379
388
info ! ( "main server task is finished" ) ;
380
389
}
0 commit comments