@@ -53,6 +53,7 @@ pub struct SenderCounter {
53
53
pub tx : AtomicU64 ,
54
54
pub tx_bytes : AtomicU64 ,
55
55
pub dropped : AtomicU64 ,
56
+ pub waited : AtomicU64 ,
56
57
}
57
58
58
59
impl RefCountable for SenderCounter {
@@ -89,6 +90,11 @@ impl RefCountable for SenderCounter {
89
90
CounterType :: Counted ,
90
91
CounterValue :: Unsigned ( self . dropped. swap( 0 , Ordering :: Relaxed ) ) ,
91
92
) ,
93
+ (
94
+ "waited" ,
95
+ CounterType :: Counted ,
96
+ CounterValue :: Unsigned ( self . waited. swap( 0 , Ordering :: Relaxed ) ) ,
97
+ ) ,
92
98
]
93
99
}
94
100
}
@@ -380,6 +386,7 @@ pub struct UniformSender<T> {
380
386
381
387
input : Arc < Receiver < T > > ,
382
388
counter : Arc < SenderCounter > ,
389
+ overwritten_count : u64 ,
383
390
384
391
encoder : Encoder < T > ,
385
392
private_conn : Mutex < Connection > ,
@@ -426,6 +433,7 @@ impl<T: Sendable> UniformSender<T> {
426
433
name,
427
434
input,
428
435
counter : Arc :: new ( SenderCounter :: default ( ) ) ,
436
+ overwritten_count : 0 ,
429
437
encoder : Encoder :: new (
430
438
0 ,
431
439
SendMessageType :: TaggedFlow ,
@@ -626,7 +634,11 @@ impl<T: Sendable> UniformSender<T> {
626
634
}
627
635
}
628
636
629
- fn is_exceed_max_throughput ( & mut self , max_throughput_mbps : u64 ) -> bool {
637
+ fn is_exceed_max_throughput (
638
+ & mut self ,
639
+ max_throughput_mbps : u64 ,
640
+ ingester_traffic_overflow_action : u8 ,
641
+ ) -> bool {
630
642
if max_throughput_mbps == 0 {
631
643
return false ;
632
644
}
@@ -637,7 +649,7 @@ impl<T: Sendable> UniformSender<T> {
637
649
. unwrap ( ) ;
638
650
639
651
let used = now - Duration :: from_nanos ( SENT_START_DURATION . load ( Ordering :: Relaxed ) ) ;
640
- if used > Duration :: from_secs ( 1 ) {
652
+ if used >= Duration :: from_secs ( 1 ) {
641
653
SENT_START_DURATION . store ( now. as_nanos ( ) as u64 , Ordering :: Relaxed ) ;
642
654
TOTAL_SENT_BYTES . store ( 0 , Ordering :: Relaxed ) ;
643
655
} else {
@@ -646,11 +658,23 @@ impl<T: Sendable> UniformSender<T> {
646
658
> Duration :: from_secs ( 5 )
647
659
{
648
660
warn ! (
649
- "{} sender dropping message, throughput execeed setting value 'max_throughput_to_ingester' {}Mbps" ,
650
- self . name, max_throughput_mbps
661
+ "{} sender dropping message, throughput execeed setting value 'max_throughput_to_ingester' {}Mbps, action {} (0: wait, 1: drop), total overwrittern count {} " ,
662
+ self . name, max_throughput_mbps, ingester_traffic_overflow_action , self . overwritten_count
651
663
) ;
652
664
LAST_LOGGING_DURATION . store ( now. as_nanos ( ) as u64 , Ordering :: Relaxed ) ;
653
665
}
666
+ // action is wait
667
+ if ingester_traffic_overflow_action == 0 {
668
+ thread:: sleep ( Duration :: from_secs ( 1 ) - used) ;
669
+ // if the queue loses data, need to report the exception
670
+ if self . input . total_overwritten_count ( ) > self . overwritten_count {
671
+ self . exception_handler
672
+ . set ( Exception :: DataBpsThresholdExceeded ) ;
673
+ self . overwritten_count = self . input . total_overwritten_count ( ) ;
674
+ }
675
+ return true ;
676
+ }
677
+ // action is drop
654
678
self . exception_handler
655
679
. set ( Exception :: DataBpsThresholdExceeded ) ;
656
680
return true ;
@@ -678,6 +702,7 @@ impl<T: Sendable> UniformSender<T> {
678
702
let config = self . config . load ( ) ;
679
703
let socket_type = config. collector_socket_type ;
680
704
let max_throughput_mpbs = config. max_throughput_to_ingester ;
705
+ let ingester_traffic_overflow_action = config. ingester_traffic_overflow_action ;
681
706
match self . input . recv_all (
682
707
& mut batch,
683
708
Some ( Duration :: from_secs ( Self :: QUEUE_READ_TIMEOUT ) ) ,
@@ -688,12 +713,21 @@ impl<T: Sendable> UniformSender<T> {
688
713
start_cached = Instant :: now ( ) ;
689
714
self . cached = false ;
690
715
}
691
- if self . is_exceed_max_throughput ( max_throughput_mpbs) {
692
- self . counter
693
- . dropped
694
- . fetch_add ( batch. len ( ) as u64 , Ordering :: Relaxed ) ;
695
- batch. clear ( ) ;
696
- continue ;
716
+ if self . is_exceed_max_throughput (
717
+ max_throughput_mpbs,
718
+ ingester_traffic_overflow_action,
719
+ ) {
720
+ if ingester_traffic_overflow_action == 0 {
721
+ self . counter
722
+ . waited
723
+ . fetch_add ( batch. len ( ) as u64 , Ordering :: Relaxed ) ;
724
+ } else {
725
+ self . counter
726
+ . dropped
727
+ . fetch_add ( batch. len ( ) as u64 , Ordering :: Relaxed ) ;
728
+ batch. clear ( ) ;
729
+ continue ;
730
+ }
697
731
}
698
732
for send_item in batch. drain ( ..) {
699
733
if !self . running . load ( Ordering :: Relaxed ) {
0 commit comments