@@ -53,6 +53,7 @@ pub struct SenderCounter {
53
53
pub tx : AtomicU64 ,
54
54
pub tx_bytes : AtomicU64 ,
55
55
pub dropped : AtomicU64 ,
56
+ pub waited : AtomicU64 ,
56
57
}
57
58
58
59
impl RefCountable for SenderCounter {
@@ -89,6 +90,11 @@ impl RefCountable for SenderCounter {
89
90
CounterType :: Counted ,
90
91
CounterValue :: Unsigned ( self . dropped. swap( 0 , Ordering :: Relaxed ) ) ,
91
92
) ,
93
+ (
94
+ "waited" ,
95
+ CounterType :: Counted ,
96
+ CounterValue :: Unsigned ( self . waited. swap( 0 , Ordering :: Relaxed ) ) ,
97
+ ) ,
92
98
]
93
99
}
94
100
}
@@ -626,7 +632,11 @@ impl<T: Sendable> UniformSender<T> {
626
632
}
627
633
}
628
634
629
- fn is_exceed_max_throughput ( & mut self , max_throughput_mbps : u64 ) -> bool {
635
+ fn is_exceed_max_throughput (
636
+ & mut self ,
637
+ max_throughput_mbps : u64 ,
638
+ ingester_traffic_overflow_action : u8 ,
639
+ ) -> bool {
630
640
if max_throughput_mbps == 0 {
631
641
return false ;
632
642
}
@@ -637,7 +647,7 @@ impl<T: Sendable> UniformSender<T> {
637
647
. unwrap ( ) ;
638
648
639
649
let used = now - Duration :: from_nanos ( SENT_START_DURATION . load ( Ordering :: Relaxed ) ) ;
640
- if used > Duration :: from_secs ( 1 ) {
650
+ if used >= Duration :: from_secs ( 1 ) {
641
651
SENT_START_DURATION . store ( now. as_nanos ( ) as u64 , Ordering :: Relaxed ) ;
642
652
TOTAL_SENT_BYTES . store ( 0 , Ordering :: Relaxed ) ;
643
653
} else {
@@ -646,11 +656,16 @@ impl<T: Sendable> UniformSender<T> {
646
656
> Duration :: from_secs ( 5 )
647
657
{
648
658
warn ! (
649
- "{} sender dropping message, throughput execeed setting value 'max_throughput_to_ingester' {}Mbps" ,
650
- self . name, max_throughput_mbps
659
+ "{} sender dropping message, throughput execeed setting value 'max_throughput_to_ingester' {}Mbps, action {} (0: wait, 1: drop) " ,
660
+ self . name, max_throughput_mbps, ingester_traffic_overflow_action
651
661
) ;
652
662
LAST_LOGGING_DURATION . store ( now. as_nanos ( ) as u64 , Ordering :: Relaxed ) ;
653
663
}
664
+ // action is wait
665
+ if ingester_traffic_overflow_action == 0 {
666
+ thread:: sleep ( Duration :: from_secs ( 1 ) - used) ;
667
+ return true ;
668
+ }
654
669
self . exception_handler
655
670
. set ( Exception :: DataBpsThresholdExceeded ) ;
656
671
return true ;
@@ -678,6 +693,7 @@ impl<T: Sendable> UniformSender<T> {
678
693
let config = self . config . load ( ) ;
679
694
let socket_type = config. collector_socket_type ;
680
695
let max_throughput_mpbs = config. max_throughput_to_ingester ;
696
+ let ingester_traffic_overflow_action = config. ingester_traffic_overflow_action ;
681
697
match self . input . recv_all (
682
698
& mut batch,
683
699
Some ( Duration :: from_secs ( Self :: QUEUE_READ_TIMEOUT ) ) ,
@@ -688,12 +704,21 @@ impl<T: Sendable> UniformSender<T> {
688
704
start_cached = Instant :: now ( ) ;
689
705
self . cached = false ;
690
706
}
691
- if self . is_exceed_max_throughput ( max_throughput_mpbs) {
692
- self . counter
693
- . dropped
694
- . fetch_add ( batch. len ( ) as u64 , Ordering :: Relaxed ) ;
695
- batch. clear ( ) ;
696
- continue ;
707
+ if self . is_exceed_max_throughput (
708
+ max_throughput_mpbs,
709
+ ingester_traffic_overflow_action,
710
+ ) {
711
+ if ingester_traffic_overflow_action == 0 {
712
+ self . counter
713
+ . waited
714
+ . fetch_add ( batch. len ( ) as u64 , Ordering :: Relaxed ) ;
715
+ } else {
716
+ self . counter
717
+ . dropped
718
+ . fetch_add ( batch. len ( ) as u64 , Ordering :: Relaxed ) ;
719
+ batch. clear ( ) ;
720
+ continue ;
721
+ }
697
722
}
698
723
for send_item in batch. drain ( ..) {
699
724
if !self . running . load ( Ordering :: Relaxed ) {
0 commit comments