@@ -27,8 +27,10 @@ use tokio::sync::mpsc::error::TrySendError;
27
27
use crate :: subscribers:: subscriber:: EventSubscriber ;
28
28
29
29
const EVENT_CHANNEL_SIZE : usize = 100 ;
30
+ const MAXIMUM_HEALTH_CHECK_EVENT_COUNT : u32 = 3600 ; // 1 hour of build events at expected rate of ~1/s.
30
31
31
32
struct HealthCheckEventStats {
33
+ total_event_count : u32 ,
32
34
excess_cache_miss_reported : bool ,
33
35
}
34
36
@@ -68,6 +70,7 @@ impl HealthCheckSubscriber {
68
70
health_check_client,
69
71
event_sender : Some ( events_tx) ,
70
72
event_stats : HealthCheckEventStats {
73
+ total_event_count : 0 ,
71
74
excess_cache_miss_reported : false ,
72
75
} ,
73
76
} )
@@ -161,6 +164,15 @@ impl HealthCheckSubscriber {
161
164
) ;
162
165
}
163
166
}
167
+ self . event_stats . total_event_count += 1 ;
168
+
169
+ if self . event_stats . total_event_count > MAXIMUM_HEALTH_CHECK_EVENT_COUNT {
170
+ self . close_client_connection_with_error_report ( & format ! (
171
+ "Too many health check events sent. Disabling health checks. Events sent: {}. Maximum allowed: {}" ,
172
+ self . event_stats. total_event_count,
173
+ MAXIMUM_HEALTH_CHECK_EVENT_COUNT
174
+ ) ) ;
175
+ }
164
176
}
165
177
Ok ( ( ) )
166
178
}
@@ -333,4 +345,104 @@ mod tests {
333
345
let mut buffer: Vec < HealthCheckEvent > = Vec :: with_capacity ( 2 ) ;
334
346
assert_eq ! ( 1 , events_rx. recv_many( & mut buffer, 2 ) . await ) ;
335
347
}
348
+
349
+ #[ tokio:: test]
350
+ async fn test_event_limit_exceeded ( ) -> buck2_error:: Result < ( ) > {
351
+ let ( events_tx, mut events_rx) = mpsc:: channel :: < HealthCheckEvent > ( EVENT_CHANNEL_SIZE ) ;
352
+
353
+ let mut subscriber = HealthCheckSubscriber :: new_with_client (
354
+ Some ( Box :: new ( NoOpHealthCheckClient { } ) ) ,
355
+ events_tx,
356
+ ) ;
357
+
358
+ let event = test_event ( buck2_data:: buck_event:: Data :: Instant (
359
+ buck2_data:: InstantEvent {
360
+ data : Some ( Box :: new ( buck2_data:: Snapshot :: default ( ) ) . into ( ) ) ,
361
+ } ,
362
+ ) ) ;
363
+
364
+ for _ in 0 ..MAXIMUM_HEALTH_CHECK_EVENT_COUNT {
365
+ subscriber. handle_event ( & event) . await ?;
366
+ let _ignored = events_rx. recv ( ) . await ; // Consume from channel.
367
+ }
368
+
369
+ // Verify that the client and channel are still active
370
+ assert ! ( subscriber. event_sender. is_some( ) ) ;
371
+ assert ! ( subscriber. health_check_client. is_some( ) ) ;
372
+
373
+ // Send the 3601st event
374
+ subscriber. handle_event ( & event) . await ?;
375
+
376
+ // The event is sent on the channel and then the sender/client are dropped.
377
+ let _ignored = events_rx. recv ( ) . await ;
378
+ assert ! ( subscriber. event_sender. is_none( ) ) ;
379
+ assert ! ( subscriber. health_check_client. is_none( ) ) ;
380
+
381
+ // Verify that no more events are sent
382
+ let mut buffer: Vec < HealthCheckEvent > = Vec :: with_capacity ( 1 ) ;
383
+ assert_eq ! ( 0 , events_rx. recv_many( & mut buffer, 1 ) . await ) ;
384
+ Ok ( ( ) )
385
+ }
386
+
387
+ #[ tokio:: test]
388
+ async fn test_slow_client ( ) -> buck2_error:: Result < ( ) > {
389
+ let ( events_tx, events_rx_guard) = mpsc:: channel :: < HealthCheckEvent > ( EVENT_CHANNEL_SIZE ) ;
390
+
391
+ let mut subscriber = HealthCheckSubscriber :: new_with_client (
392
+ Some ( Box :: new ( NoOpHealthCheckClient { } ) ) ,
393
+ events_tx,
394
+ ) ;
395
+
396
+ let event = test_event ( buck2_data:: buck_event:: Data :: Instant (
397
+ buck2_data:: InstantEvent {
398
+ data : Some ( Box :: new ( buck2_data:: Snapshot :: default ( ) ) . into ( ) ) ,
399
+ } ,
400
+ ) ) ;
401
+
402
+ for _ in 0 ..EVENT_CHANNEL_SIZE {
403
+ subscriber. handle_event ( & event) . await ?;
404
+ }
405
+
406
+ // Verify that the client and channel are still active
407
+ assert ! ( subscriber. event_sender. is_some( ) ) ;
408
+ assert ! ( subscriber. health_check_client. is_some( ) ) ;
409
+
410
+ subscriber. handle_event ( & event) . await ?;
411
+
412
+ // Verify that the client and channel sender are dropped since the buffer is full.
413
+ assert ! ( subscriber. event_sender. is_none( ) ) ;
414
+ assert ! ( subscriber. health_check_client. is_none( ) ) ;
415
+ drop ( events_rx_guard) ;
416
+ Ok ( ( ) )
417
+ }
418
+
419
+ #[ tokio:: test]
420
+ async fn test_dropped_client ( ) -> buck2_error:: Result < ( ) > {
421
+ let ( events_tx, events_rx_guard) = mpsc:: channel :: < HealthCheckEvent > ( EVENT_CHANNEL_SIZE ) ;
422
+
423
+ let mut subscriber = HealthCheckSubscriber :: new_with_client (
424
+ Some ( Box :: new ( NoOpHealthCheckClient { } ) ) ,
425
+ events_tx,
426
+ ) ;
427
+
428
+ let event = test_event ( buck2_data:: buck_event:: Data :: Instant (
429
+ buck2_data:: InstantEvent {
430
+ data : Some ( Box :: new ( buck2_data:: Snapshot :: default ( ) ) . into ( ) ) ,
431
+ } ,
432
+ ) ) ;
433
+
434
+ subscriber. handle_event ( & event) . await ?;
435
+ // Verify that the client and channel are still active
436
+ assert ! ( subscriber. event_sender. is_some( ) ) ;
437
+ assert ! ( subscriber. health_check_client. is_some( ) ) ;
438
+
439
+ // Close channel receiver.
440
+ drop ( events_rx_guard) ;
441
+
442
+ subscriber. handle_event ( & event) . await ?;
443
+ // Verify that the client and channel sender are dropped since the receiver is closed.
444
+ assert ! ( subscriber. event_sender. is_none( ) ) ;
445
+ assert ! ( subscriber. health_check_client. is_none( ) ) ;
446
+ Ok ( ( ) )
447
+ }
336
448
}
0 commit comments