@@ -18,17 +18,19 @@ package command
1818
1919import (
2020 "fmt"
21+ "strings"
22+ "sync"
23+ "sync/atomic"
24+ "time"
25+
2126 "github.com/google/uuid"
2227 "github.com/michaelquigley/pfxlog"
2328 "github.com/openziti/foundation/v2/errorz"
2429 "github.com/openziti/foundation/v2/rate"
2530 "github.com/openziti/metrics"
2631 "github.com/openziti/ziti/controller/apierror"
2732 "github.com/pkg/errors"
28- "strings"
29- "sync"
30- "sync/atomic"
31- "time"
33+ gometrics "github.com/rcrowley/go-metrics"
3234)
3335
3436const (
@@ -42,13 +44,22 @@ const (
4244 DefaultAdaptiveRateLimiterMinWindowSize = 5
4345 DefaultAdaptiveRateLimiterMaxWindowSize = 250
4446 DefaultAdaptiveRateLimiterTimeout = 30 * time .Second
47+
48+ DefaultAdaptiveRateLimiterSuccessThreshold = 0.9
49+ DefaultAdaptiveRateLimiterIncreaseFactor = 1.02
50+ DefaultAdaptiveRateLimiterDecreaseFactor = 0.9
51+ DefaultAdaptiveRateLimiterIncreaseCheckInterval = 10
52+ DefaultAdaptiveRateLimiterDecreaseCheckInterval = 10
4553)
4654
55+ // RateLimiterConfig contains configuration values used to create a new DefaultRateLimiter
4756type RateLimiterConfig struct {
4857 Enabled bool
4958 QueueSize uint32
5059}
5160
61+ // NewRateLimiter creates a new rate limiter using the given configuration. If the configuration has
62+ // Enabled set to false, a NoOpRateLimiter will be returned
5263func NewRateLimiter (config RateLimiterConfig , registry metrics.Registry , closeNotify <- chan struct {}) rate.RateLimiter {
5364 if ! config .Enabled {
5465 return NoOpRateLimiter {}
@@ -78,6 +89,7 @@ func NewRateLimiter(config RateLimiterConfig, registry metrics.Registry, closeNo
7889 return result
7990}
8091
92+ // NoOpRateLimiter is a rate limiter that doesn't enforce any rate limiting
8193type NoOpRateLimiter struct {}
8294
8395func (self NoOpRateLimiter ) RunRateLimited (f func () error ) error {
@@ -88,12 +100,14 @@ func (self NoOpRateLimiter) GetQueueFillPct() float64 {
88100 return 0
89101}
90102
103+ // NoOpAdaptiveRateLimiter is an adaptive rate limiter that doesn't enforce any rate limiting
91104type NoOpAdaptiveRateLimiter struct {}
92105
93106func (self NoOpAdaptiveRateLimiter ) RunRateLimited (f func () error ) (rate.RateLimitControl , error ) {
94107 return rate .NoOpRateLimitControl (), f ()
95108}
96109
110+ // NoOpAdaptiveRateLimitTracker is an adaptive rate limit tracker that doesn't enforce any rate limiting
97111type NoOpAdaptiveRateLimitTracker struct {}
98112
99113func (n NoOpAdaptiveRateLimitTracker ) RunRateLimited (string ) (rate.RateLimitControl , error ) {
@@ -113,6 +127,7 @@ type rateLimitedWork struct {
113127 result chan error
114128}
115129
130+ // DefaultRateLimiter implements rate.RateLimiter using a fixed-size buffered channel as a work queue
116131type DefaultRateLimiter struct {
117132 currentSize atomic.Int32
118133 queue chan * rateLimitedWork
@@ -191,14 +206,16 @@ type AdaptiveRateLimiterConfig struct {
191206 Timeout time.Duration
192207}
193208
209+ // SetDefaults sets the default values for the AdaptiveRateLimiterConfig
194210func (self * AdaptiveRateLimiterConfig ) SetDefaults () {
195211 self .Enabled = DefaultAdaptiveRateLimiterEnabled
196212 self .MinSize = DefaultAdaptiveRateLimiterMinWindowSize
197213 self .MaxSize = DefaultAdaptiveRateLimiterMaxWindowSize
198214 self .Timeout = DefaultAdaptiveRateLimiterTimeout
199215}
200216
201- func LoadAdaptiveRateLimiterConfig (cfg * AdaptiveRateLimiterConfig , cfgmap map [interface {}]interface {}) error {
217+ // Load reads the configuration values from the given config map
218+ func (cfg * AdaptiveRateLimiterConfig ) Load (cfgmap map [interface {}]interface {}) error {
202219 if value , found := cfgmap ["enabled" ]; found {
203220 cfg .Enabled = strings .EqualFold ("true" , fmt .Sprintf ("%v" , value ))
204221 }
@@ -240,6 +257,8 @@ func LoadAdaptiveRateLimiterConfig(cfg *AdaptiveRateLimiterConfig, cfgmap map[in
240257 return nil
241258}
242259
260+ // NewAdaptiveRateLimiter creates a new adaptive rate limiter using the given configuration. If the
261+ // configuration has Enabled set to false, a NoOpAdaptiveRateLimiter will be returned
243262func NewAdaptiveRateLimiter (config AdaptiveRateLimiterConfig , registry metrics.Registry , closeNotify <- chan struct {}) rate.AdaptiveRateLimiter {
244263 if ! config .Enabled {
245264 return NoOpAdaptiveRateLimiter {}
@@ -410,6 +429,7 @@ func (r rateLimitControl) Failed() {
410429 // no-op for this type
411430}
412431
432+ // WasRateLimited returns true if the given error indicates that a request was rejected due to rate limiting
413433func WasRateLimited (err error ) bool {
414434 var apiErr * errorz.ApiError
415435 if errors .As (err , & apiErr ) {
@@ -418,18 +438,132 @@ func WasRateLimited(err error) bool {
418438 return false
419439}
420440
421- func NewAdaptiveRateLimitTracker (config AdaptiveRateLimiterConfig , registry metrics.Registry , closeNotify <- chan struct {}) rate.AdaptiveRateLimitTracker {
441+ // AdaptiveRateLimitTrackerConfig contains configuration values used to create a new AdaptiveRateLimitTracker
442+ type AdaptiveRateLimitTrackerConfig struct {
443+ AdaptiveRateLimiterConfig
444+
445+ // SuccessThreshold - the success rate threshold above which the window size will be increased and
446+ // below which the window size will be decreased
447+ SuccessThreshold float64
448+
449+ // IncreaseFactor - the multiplier applied to the current window size when increasing it
450+ IncreaseFactor float64
451+
452+ // DecreaseFactor - the multiplier applied to the current window size when decreasing it
453+ DecreaseFactor float64
454+
455+ // IncreaseCheckInterval - the number of successes between window size increase checks
456+ IncreaseCheckInterval int
457+
458+ // DecreaseCheckInterval - the number of backoffs between window size decrease checks
459+ DecreaseCheckInterval int
460+ }
461+
462+ // SetDefaults sets the default values for the AdaptiveRateLimitTrackerConfig
463+ func (self * AdaptiveRateLimitTrackerConfig ) SetDefaults () {
464+ self .AdaptiveRateLimiterConfig .SetDefaults ()
465+ self .SuccessThreshold = DefaultAdaptiveRateLimiterSuccessThreshold
466+ self .IncreaseFactor = DefaultAdaptiveRateLimiterIncreaseFactor
467+ self .DecreaseFactor = DefaultAdaptiveRateLimiterDecreaseFactor
468+ self .IncreaseCheckInterval = DefaultAdaptiveRateLimiterIncreaseCheckInterval
469+ self .DecreaseCheckInterval = DefaultAdaptiveRateLimiterDecreaseCheckInterval
470+ }
471+
472+ // Load reads the configuration values from the given config map
473+ func (cfg * AdaptiveRateLimitTrackerConfig ) Load (cfgmap map [interface {}]interface {}) error {
474+ if err := cfg .AdaptiveRateLimiterConfig .Load (cfgmap ); err != nil {
475+ return err
476+ }
477+
478+ if value , found := cfgmap ["successThreshold" ]; found {
479+ if v , ok := value .(float64 ); ok {
480+ cfg .SuccessThreshold = v
481+ } else {
482+ return errors .Errorf ("invalid value %v for adaptive rate limiter success threshold, must be floating point value" , value )
483+ }
484+ }
485+
486+ if cfg .SuccessThreshold > 1 {
487+ return errors .Errorf ("invalid value %f for adaptive rate limiter success threshold, must be between 0 and 1" , cfg .SuccessThreshold )
488+ }
489+
490+ if cfg .SuccessThreshold < 0 {
491+ return errors .Errorf ("invalid value %f for adaptive rate limiter success threshold, must be between 0 and 1" , cfg .SuccessThreshold )
492+ }
493+
494+ if value , found := cfgmap ["increaseFactor" ]; found {
495+ if v , ok := value .(float64 ); ok {
496+ cfg .IncreaseFactor = v
497+ } else {
498+ return errors .Errorf ("invalid value %v for adaptive rate limiter increaseFactor, must be floating point value" , value )
499+ }
500+ }
501+
502+ if cfg .IncreaseFactor < 1 {
503+ return errors .Errorf ("invalid value %f for adaptive rate limiter increaseFactor, must be greater than 1, usually less than 2" , cfg .IncreaseFactor )
504+ }
505+
506+ if value , found := cfgmap ["decreaseFactor" ]; found {
507+ if v , ok := value .(float64 ); ok {
508+ cfg .DecreaseFactor = v
509+ } else {
510+ return errors .Errorf ("invalid value %v for adaptive rate limiter decreaseFactor, must be floating point value" , value )
511+ }
512+ }
513+
514+ if cfg .DecreaseFactor <= 0 || cfg .DecreaseFactor >= 1 {
515+ return errors .Errorf ("invalid value %f for adaptive rate limiter decreaseFactor, must be between 0 and 1" , cfg .DecreaseFactor )
516+ }
517+
518+ if value , found := cfgmap ["increaseCheckInterval" ]; found {
519+ if intVal , ok := value .(int ); ok {
520+ cfg .IncreaseCheckInterval = intVal
521+ } else {
522+ return errors .Errorf ("invalid value %v for adaptive rate limiter increaseCheckInterval, must be integer value" , value )
523+ }
524+ }
525+
526+ if cfg .IncreaseCheckInterval < 1 {
527+ return errors .Errorf ("invalid value %d for adaptive rate limiter increaseCheckInterval, must be at least 1" , cfg .IncreaseCheckInterval )
528+ }
529+
530+ if value , found := cfgmap ["decreaseCheckInterval" ]; found {
531+ if intVal , ok := value .(int ); ok {
532+ cfg .DecreaseCheckInterval = intVal
533+ } else {
534+ return errors .Errorf ("invalid value %v for adaptive rate limiter decreaseCheckInterval, must be integer value" , value )
535+ }
536+ }
537+
538+ if cfg .DecreaseCheckInterval < 1 {
539+ return errors .Errorf ("invalid value %d for adaptive rate limiter decreaseCheckInterval, must be at least 1" , cfg .DecreaseCheckInterval )
540+ }
541+
542+ return nil
543+ }
544+
545+ // NewAdaptiveRateLimitTracker creates a new adaptive rate limit tracker using the given configuration.
546+ // If the configuration has Enabled set to false, a NoOpAdaptiveRateLimitTracker will be returned.
547+ // Unlike the AdaptiveRateLimiter, the tracker does not execute work directly. Instead it tracks
548+ // outstanding work and adjusts the window size based on the success rate of completed work.
549+ func NewAdaptiveRateLimitTracker (config AdaptiveRateLimitTrackerConfig , registry metrics.Registry , closeNotify <- chan struct {}) rate.AdaptiveRateLimitTracker {
422550 if ! config .Enabled {
423551 return NoOpAdaptiveRateLimitTracker {}
424552 }
425553
426554 result := & adaptiveRateLimitTracker {
427- minWindow : int32 (config .MinSize ),
428- maxWindow : int32 (config .MaxSize ),
429- timeout : config .Timeout ,
430- workRate : registry .Timer (config .WorkTimerMetric ),
431- outstandingWork : map [string ]* adaptiveRateLimitTrackerWork {},
432- closeNotify : closeNotify ,
555+ minWindow : int32 (config .MinSize ),
556+ maxWindow : int32 (config .MaxSize ),
557+ successThreshold : config .SuccessThreshold ,
558+ increaseFactor : config .IncreaseFactor ,
559+ decreaseFactor : config .DecreaseFactor ,
560+ increaseCheckInterval : uint32 (config .IncreaseCheckInterval ),
561+ decreaseCheckInterval : uint32 (config .DecreaseCheckInterval ),
562+ timeout : config .Timeout ,
563+ workRate : registry .Timer (config .WorkTimerMetric ),
564+ outstandingWork : map [string ]* adaptiveRateLimitTrackerWork {},
565+ closeNotify : closeNotify ,
566+ successRate : gometrics .NewHistogram (gometrics .NewExpDecaySample (128 , 0.5 )),
433567 }
434568
435569 if existing := registry .GetGauge (config .QueueSizeMetric ); existing != nil {
@@ -455,18 +589,35 @@ func NewAdaptiveRateLimitTracker(config AdaptiveRateLimiterConfig, registry metr
455589 return result
456590}
457591
592+ // adaptiveRateLimitTracker manages a sliding concurrency window to control the rate of outstanding work.
593+ // It does not execute work directly. Callers acquire a slot via RunRateLimited and later report the
594+ // outcome (Success, Backoff, or Failed) through the returned RateLimitControl.
595+ //
596+ // The window size adjusts between minWindow and maxWindow based on an exponentially decaying success
597+ // rate histogram. Every increaseCheckInterval successes, if the success rate exceeds the configured
598+ // threshold the window is grown by increaseFactor. Every decreaseCheckInterval backoffs, if the
599+ // success rate is below the threshold the window is shrunk by decreaseFactor. A background goroutine
600+ // expires work that has not been completed within the configured timeout, treating it as a backoff.
458601type adaptiveRateLimitTracker struct {
459- currentWindow atomic.Int32
460- minWindow int32
461- maxWindow int32
602+ currentWindow atomic.Int32
603+ minWindow int32
604+ maxWindow int32
605+ successThreshold float64
606+ increaseFactor float64
607+ decreaseFactor float64
608+ increaseCheckInterval uint32
609+ decreaseCheckInterval uint32
610+
462611 timeout time.Duration
463612 lock sync.Mutex
464613 successCounter atomic.Uint32
614+ backoffCounter atomic.Uint32
465615
466616 currentSize atomic.Int32
467617 workRate metrics.Timer
468618 outstandingWork map [string ]* adaptiveRateLimitTrackerWork
469619 closeNotify <- chan struct {}
620+ successRate gometrics.Histogram
470621}
471622
472623func (self * adaptiveRateLimitTracker ) IsRateLimited () bool {
@@ -480,14 +631,23 @@ func (self *adaptiveRateLimitTracker) success(work *adaptiveRateLimitTrackerWork
480631 self .currentSize .Add (- 1 )
481632 delete (self .outstandingWork , work .id )
482633 self .workRate .UpdateSince (work .createTime )
634+ self .successRate .Update (1 )
635+
483636 if self .currentWindow .Load () >= self .maxWindow {
484637 return
485638 }
486639
487- if self .successCounter .Add (1 )% 10 == 0 {
488- if nextVal := self .currentWindow .Add (1 ); nextVal > self .maxWindow {
489- self .currentWindow .Store (self .maxWindow )
640+ if self .successCounter .Add (1 )% self .increaseCheckInterval == 0 && self .successRate .Mean () > self .successThreshold {
641+ current := self .currentWindow .Load ()
642+ nextWindow := int32 (float64 (current ) * self .increaseFactor )
643+ if nextWindow == current {
644+ nextWindow ++
645+ }
646+
647+ if nextWindow > self .maxWindow {
648+ nextWindow = self .maxWindow
490649 }
650+ self .updateWindowSize (nextWindow )
491651 }
492652}
493653
@@ -498,20 +658,36 @@ func (self *adaptiveRateLimitTracker) backoff(work *adaptiveRateLimitTrackerWork
498658 self .currentSize .Add (- 1 )
499659 delete (self .outstandingWork , work .id )
500660
661+ self .successRate .Update (0 )
662+
501663 if self .currentWindow .Load () <= self .minWindow {
502664 return
503665 }
504666
505- current := self .currentWindow .Load ()
506- nextWindow := work .queuePosition - 10
507- if nextWindow < current {
667+ if self .backoffCounter .Add (1 )% self .decreaseCheckInterval == 0 && self .successRate .Mean () < self .successThreshold {
668+ current := self .currentWindow .Load ()
669+ nextWindow := int32 (float64 (current ) * self .decreaseFactor )
670+
671+ if nextWindow == current {
672+ nextWindow --
673+ }
674+
508675 if nextWindow < self .minWindow {
509676 nextWindow = self .minWindow
510677 }
511- self .currentWindow . Store (nextWindow )
678+ self .updateWindowSize (nextWindow )
512679 }
513680}
514681
682+ func (self * adaptiveRateLimitTracker ) updateWindowSize (nextWindow int32 ) {
683+ pfxlog .Logger ().WithField ("queueSize" , self .currentSize .Load ()).
684+ WithField ("currentWindowSize" , self .currentWindow .Load ()).
685+ WithField ("nextWindowSize" , nextWindow ).
686+ WithField ("successRate" , self .successRate .Mean ()).
687+ Debug ("window size updated" )
688+ self .currentWindow .Store (nextWindow )
689+ }
690+
515691func (self * adaptiveRateLimitTracker ) complete (work * adaptiveRateLimitTrackerWork ) {
516692 self .lock .Lock ()
517693 defer self .lock .Unlock ()
@@ -599,6 +775,8 @@ func (self *adaptiveRateLimitTrackerWork) Success() {
599775 if self .completed .CompareAndSwap (false , true ) {
600776 pfxlog .Logger ().WithField ("label" , self .label ).
601777 WithField ("duration" , time .Since (self .createTime )).
778+ WithField ("currentSize" , self .limiter .currentSize .Load ()).
779+ WithField ("currentWindow" , self .limiter .currentWindow .Load ()).
602780 Debug ("success" )
603781 self .limiter .success (self )
604782 }
@@ -608,6 +786,8 @@ func (self *adaptiveRateLimitTrackerWork) Backoff() {
608786 if self .completed .CompareAndSwap (false , true ) {
609787 pfxlog .Logger ().WithField ("label" , self .label ).
610788 WithField ("duration" , time .Since (self .createTime )).
789+ WithField ("currentSize" , self .limiter .currentSize .Load ()).
790+ WithField ("currentWindow" , self .limiter .currentWindow .Load ()).
611791 Debug ("backoff" )
612792 self .limiter .backoff (self )
613793 }
@@ -617,6 +797,8 @@ func (self *adaptiveRateLimitTrackerWork) Failed() {
617797 if self .completed .CompareAndSwap (false , true ) {
618798 pfxlog .Logger ().WithField ("label" , self .label ).
619799 WithField ("duration" , time .Since (self .createTime )).
800+ WithField ("currentSize" , self .limiter .currentSize .Load ()).
801+ WithField ("currentWindow" , self .limiter .currentWindow .Load ()).
620802 Debug ("failed" )
621803 self .limiter .complete (self )
622804 }
0 commit comments