1- use std:: sync:: {
2- Arc ,
3- RwLock ,
1+ use std:: {
2+ net:: SocketAddr ,
3+ sync:: {
4+ Arc ,
5+ RwLock ,
6+ } ,
47} ;
58
69use sled:: Db ;
710
811use crate :: {
9- admin:: accept:: accept_admin_request,
12+ admin:: {
13+ accept:: accept_admin_request,
14+ liveready:: {
15+ liveness_monitor,
16+ LiveReadyRequestSnd ,
17+ LiveReadyUpdateRecv ,
18+ } ,
19+ } ,
1020 log_info,
1121 Rpc ,
1222 Settings ,
@@ -17,7 +27,10 @@ use hyper::{
1727 service:: service_fn,
1828} ;
1929use hyper_util_blutgang:: rt:: TokioIo ;
20- use tokio:: net:: TcpListener ;
30+ use tokio:: {
31+ net:: TcpListener ,
32+ sync:: mpsc,
33+ } ;
2134
2235macro_rules! accept_admin {
2336 (
@@ -26,6 +39,7 @@ macro_rules! accept_admin {
2639 $poverty_list_rwlock: expr,
2740 $cache: expr,
2841 $config: expr,
42+ $liveness_request_tx: expr,
2943 ) => {
3044 // Bind the incoming connection to our service
3145 if let Err ( err) = http1:: Builder :: new( )
@@ -39,6 +53,7 @@ macro_rules! accept_admin {
3953 Arc :: clone( $poverty_list_rwlock) ,
4054 Arc :: clone( $cache) ,
4155 Arc :: clone( $config) ,
56+ $liveness_request_tx. clone( ) ,
4257 ) ;
4358 response
4459 } ) ,
@@ -50,24 +65,17 @@ macro_rules! accept_admin {
5065 } ;
5166}
5267
53- // Used for listening to admin requests as its own tokio task.
54- //
55- // Similar to what you'd find in main/balancer
56- pub async fn listen_for_admin_requests (
68+ async fn admin_api_server (
5769 rpc_list_rwlock : Arc < RwLock < Vec < Rpc > > > ,
5870 poverty_list_rwlock : Arc < RwLock < Vec < Rpc > > > ,
5971 cache : Arc < Db > ,
6072 config : Arc < RwLock < Settings > > ,
73+ address : SocketAddr ,
74+ liveness_request_tx : LiveReadyRequestSnd ,
6175) -> Result < ( ) , Box < dyn std:: error:: Error > > {
62- let address;
63- {
64- let config_guard = config. read ( ) . unwrap ( ) ;
65- address = config_guard. admin . address ;
66- }
67-
6876 // Create a listener and bind to it
6977 let listener = TcpListener :: bind ( address) . await ?;
70- log_info ! ( "Bound admin to: {}" , address) ;
78+ log_info ! ( "Bound admin API to: {}" , address) ;
7179
7280 loop {
7381 let ( stream, socketaddr) = listener. accept ( ) . await ?;
@@ -81,6 +89,7 @@ pub async fn listen_for_admin_requests(
8189 let poverty_list_rwlock_clone = Arc :: clone ( & poverty_list_rwlock) ;
8290 let cache_clone = Arc :: clone ( & cache) ;
8391 let config_clone = Arc :: clone ( & config) ;
92+ let liveness_request_tx_clone = liveness_request_tx. clone ( ) ;
8493
8594 // Spawn a tokio task to serve multiple connections concurrently
8695 tokio:: task:: spawn ( async move {
@@ -90,7 +99,40 @@ pub async fn listen_for_admin_requests(
9099 & poverty_list_rwlock_clone,
91100 & cache_clone,
92101 & config_clone,
102+ & liveness_request_tx_clone,
93103 ) ;
94104 } ) ;
95105 }
96106}
107+
108+ // Used for listening to admin requests as its own tokio task.
109+ // Also used for k8s liveness/readiness probes.
110+ //
111+ // Similar to what you'd find in main/balancer
112+ pub async fn listen_for_admin_requests (
113+ rpc_list_rwlock : Arc < RwLock < Vec < Rpc > > > ,
114+ poverty_list_rwlock : Arc < RwLock < Vec < Rpc > > > ,
115+ cache : Arc < Db > ,
116+ config : Arc < RwLock < Settings > > ,
117+ liveness_receiver : LiveReadyUpdateRecv ,
118+ ) -> Result < ( ) , Box < dyn std:: error:: Error > > {
119+ let address;
120+ {
121+ let config_guard = config. read ( ) . unwrap ( ) ;
122+ address = config_guard. admin . address ;
123+ }
124+
125+ // Spawn thread for monitoring the current liveness status of Blutgang
126+ let ( liveness_request_tx, liveness_request_rx) = mpsc:: channel ( 16 ) ;
127+ tokio:: spawn ( liveness_monitor ( liveness_receiver, liveness_request_rx) ) ;
128+
129+ admin_api_server (
130+ rpc_list_rwlock,
131+ poverty_list_rwlock,
132+ cache,
133+ config,
134+ address,
135+ liveness_request_tx,
136+ )
137+ . await
138+ }
0 commit comments