@@ -8,7 +8,7 @@ use core::net::SocketAddr;
8
8
use std:: collections:: HashMap ;
9
9
use std:: sync:: Arc ;
10
10
use std:: time:: Duration ;
11
- use std:: time:: Instant ;
11
+ use std:: time:: { Instant , SystemTime } ;
12
12
13
13
use anyhow:: { anyhow, Result } ;
14
14
use askama:: Template ;
@@ -97,7 +97,8 @@ impl Lighthouse {
97
97
} )
98
98
}
99
99
100
- async fn quorum_valid ( & self ) -> bool {
100
+ // Checks whether the quorum is valid and an explanation for the state.
101
+ async fn quorum_valid ( & self ) -> ( bool , String ) {
101
102
let state = self . state . lock ( ) . await ;
102
103
103
104
let mut first_joined = Instant :: now ( ) ;
@@ -119,38 +120,43 @@ impl Lighthouse {
119
120
}
120
121
121
122
if is_fast_quorum {
122
- info ! ( "Fast quorum found!" ) ;
123
- return is_fast_quorum;
123
+ return ( is_fast_quorum, format ! ( "Fast quorum found!" ) ) ;
124
124
}
125
125
}
126
126
127
127
if state. participants . len ( ) < self . opt . min_replicas as usize {
128
- info ! (
129
- "No quorum, only have {} participants, need {}" ,
130
- state. participants. len( ) ,
131
- self . opt. min_replicas
128
+ return (
129
+ false ,
130
+ format ! (
131
+ "No quorum, only have {} participants, need {}" ,
132
+ state. participants. len( ) ,
133
+ self . opt. min_replicas
134
+ ) ,
132
135
) ;
133
- return false ;
134
136
}
135
137
136
138
// Quorum is valid at this point but lets wait for stragglers.
137
139
138
140
if Instant :: now ( ) . duration_since ( first_joined)
139
141
< Duration :: from_millis ( self . opt . join_timeout_ms )
140
142
{
141
- info ! (
142
- "Valid quorum with {} participants, waiting for stragglers due to join timeout" ,
143
- state. participants. len( )
143
+ return (
144
+ false ,
145
+ format ! (
146
+ "Valid quorum with {} participants, waiting for stragglers due to join timeout" ,
147
+ state. participants. len( )
148
+ ) ,
144
149
) ;
145
- return false ;
146
150
}
147
151
148
- true
152
+ ( true , format ! ( "Valid quorum found" ) )
149
153
}
150
154
151
155
async fn _quorum_tick ( self : Arc < Self > ) -> Result < ( ) > {
152
156
// TODO: these should probably run under the same lock
153
- let quorum_met = self . quorum_valid ( ) . await ;
157
+ let ( quorum_met, reason) = self . quorum_valid ( ) . await ;
158
+ info ! ( "{}" , reason) ;
159
+
154
160
if quorum_met {
155
161
let mut state = self . state . lock ( ) . await ;
156
162
let mut participants: Vec < QuorumMember > = state
@@ -180,6 +186,7 @@ impl Lighthouse {
180
186
let quorum = Quorum {
181
187
quorum_id : state. quorum_id ,
182
188
participants : participants,
189
+ created : Some ( SystemTime :: now ( ) . into ( ) ) ,
183
190
} ;
184
191
185
192
info ! ( "Quorum! {:?}" , quorum) ;
@@ -257,16 +264,33 @@ impl Lighthouse {
257
264
}
258
265
259
266
async fn get_status ( self : Arc < Self > ) -> Html < String > {
267
+ let ( _, quorum_status) = self . quorum_valid ( ) . await ;
268
+
260
269
let template = {
261
270
let state = self . state . lock ( ) . await ;
262
271
272
+ let max_step = {
273
+ if let Some ( quorum) = state. prev_quorum . clone ( ) {
274
+ quorum
275
+ . participants
276
+ . iter ( )
277
+ . map ( |p| p. step )
278
+ . max ( )
279
+ . unwrap_or ( -1 )
280
+ } else {
281
+ -1
282
+ }
283
+ } ;
284
+
263
285
StatusTemplate {
264
286
quorum_id : state. quorum_id ,
265
287
prev_quorum : state. prev_quorum . clone ( ) ,
266
288
heartbeats : state. heartbeats . clone ( ) ,
289
+ quorum_status : quorum_status,
267
290
old_age_threshold : Instant :: now ( )
268
291
. checked_sub ( Duration :: from_secs ( 1 ) )
269
292
. unwrap_or ( Instant :: now ( ) ) ,
293
+ max_step : max_step,
270
294
}
271
295
} ;
272
296
Html ( template. render ( ) . unwrap ( ) )
@@ -361,10 +385,14 @@ struct IndexTemplate {}
361
385
#[ derive( Template ) ]
362
386
#[ template( path = "status.html" ) ]
363
387
struct StatusTemplate {
364
- old_age_threshold : Instant ,
365
388
prev_quorum : Option < Quorum > ,
366
389
quorum_id : i64 ,
367
390
heartbeats : HashMap < String , Instant > ,
391
+ quorum_status : String ,
392
+
393
+ // visualization thresholds
394
+ old_age_threshold : Instant ,
395
+ max_step : i64 ,
368
396
}
369
397
370
398
// Make our own error that wraps `anyhow::Error`.
@@ -422,7 +450,7 @@ mod tests {
422
450
#[ tokio:: test]
423
451
async fn test_quorum_join_timeout ( ) {
424
452
let lighthouse = lighthouse_test_new ( ) ;
425
- assert ! ( !lighthouse. quorum_valid( ) . await ) ;
453
+ assert ! ( !lighthouse. quorum_valid( ) . await . 0 ) ;
426
454
427
455
{
428
456
let mut state = lighthouse. state . lock ( ) . await ;
@@ -440,21 +468,21 @@ mod tests {
440
468
) ;
441
469
}
442
470
443
- assert ! ( !lighthouse. quorum_valid( ) . await ) ;
471
+ assert ! ( !lighthouse. quorum_valid( ) . await . 0 ) ;
444
472
445
473
{
446
474
let mut state = lighthouse. state . lock ( ) . await ;
447
475
state. participants . get_mut ( "a" ) . unwrap ( ) . joined =
448
476
Instant :: now ( ) . sub ( Duration :: from_secs ( 10 * 60 * 60 ) ) ;
449
477
}
450
478
451
- assert ! ( lighthouse. quorum_valid( ) . await ) ;
479
+ assert ! ( lighthouse. quorum_valid( ) . await . 0 ) ;
452
480
}
453
481
454
482
#[ tokio:: test]
455
483
async fn test_quorum_fast_prev_quorum ( ) {
456
484
let lighthouse = lighthouse_test_new ( ) ;
457
- assert ! ( !lighthouse. quorum_valid( ) . await ) ;
485
+ assert ! ( !lighthouse. quorum_valid( ) . await . 0 ) ;
458
486
459
487
{
460
488
let mut state = lighthouse. state . lock ( ) . await ;
@@ -472,7 +500,7 @@ mod tests {
472
500
) ;
473
501
}
474
502
475
- assert ! ( !lighthouse. quorum_valid( ) . await ) ;
503
+ assert ! ( !lighthouse. quorum_valid( ) . await . 0 ) ;
476
504
477
505
{
478
506
let mut state = lighthouse. state . lock ( ) . await ;
@@ -484,10 +512,11 @@ mod tests {
484
512
store_address: "" . to_string( ) ,
485
513
step: 1 ,
486
514
} ] ,
515
+ created : Some ( SystemTime :: now ( ) . into ( ) ) ,
487
516
} ) ;
488
517
}
489
518
490
- assert ! ( lighthouse. quorum_valid( ) . await ) ;
519
+ assert ! ( lighthouse. quorum_valid( ) . await . 0 ) ;
491
520
}
492
521
493
522
#[ tokio:: test]
0 commit comments