Skip to content

Commit 752be1c

Browse files
OlegErshovnexus49
andauthored
feat: introduced custom rate limiter based on options pattern (#106)
* feat: introduced custom rate limiter based on options pattern On-behalf-of: SAP [email protected] * feat: added validation and rw mutex On-behalf-of: SAP [email protected] * chore: updated naming On-behalf-of: SAP [email protected] * feat: introduced custom rate limiter based on options pattern On-behalf-of: SAP [email protected] * feat: added validation and rw mutex On-behalf-of: SAP [email protected] * used builder pattern for rate limiter set up On-behalf-of: SAP [email protected] * refactored rate limiter set up On-behalf-of: SAP [email protected] * added rate limiter config error handling On-behalf-of: SAP [email protected] * fixed tests On-behalf-of: SAP [email protected] --------- Co-authored-by: Bastian Echterhölter <[email protected]>
1 parent 5a9f2d3 commit 752be1c

File tree

8 files changed

+364
-1
lines changed

8 files changed

+364
-1
lines changed

controller/lifecycle/builder/builder.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/platform-mesh/golang-commons/controller/lifecycle/controllerruntime"
88
"github.com/platform-mesh/golang-commons/controller/lifecycle/multicluster"
9+
"github.com/platform-mesh/golang-commons/controller/lifecycle/ratelimiter"
910
"github.com/platform-mesh/golang-commons/controller/lifecycle/subroutine"
1011
"github.com/platform-mesh/golang-commons/logger"
1112
)
@@ -16,6 +17,7 @@ type Builder struct {
1617
withConditionManagement bool
1718
withSpreadingReconciles bool
1819
withReadOnly bool
20+
rateLimiterOptions *[]ratelimiter.Option
1921
subroutines []subroutine.Subroutine
2022
log *logger.Logger
2123
}
@@ -45,6 +47,11 @@ func (b *Builder) WithReadOnly() *Builder {
4547
return b
4648
}
4749

50+
func (b *Builder) WithStaticThenExponentialRateLimiter(opts ...ratelimiter.Option) *Builder {
51+
b.rateLimiterOptions = &opts
52+
return b
53+
}
54+
4855
func (b *Builder) BuildControllerRuntime(cl client.Client) *controllerruntime.LifecycleManager {
4956
lm := controllerruntime.NewLifecycleManager(b.subroutines, b.operatorName, b.controllerName, cl, b.log)
5057
if b.withConditionManagement {
@@ -56,6 +63,9 @@ func (b *Builder) BuildControllerRuntime(cl client.Client) *controllerruntime.Li
5663
if b.withReadOnly {
5764
lm.WithReadOnly()
5865
}
66+
if b.rateLimiterOptions != nil {
67+
lm.WithStaticThenExponentialRateLimiter((*b.rateLimiterOptions)...)
68+
}
5969
return lm
6070
}
6171

@@ -70,5 +80,8 @@ func (b *Builder) BuildMultiCluster(mgr mcmanager.Manager) *multicluster.Lifecyc
7080
if b.withReadOnly {
7181
lm.WithReadOnly()
7282
}
83+
if b.rateLimiterOptions != nil {
84+
lm.WithStaticThenExponentialRateLimiter((*b.rateLimiterOptions)...)
85+
}
7386
return lm
7487
}

controller/lifecycle/builder/builder_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@ package builder
22

33
import (
44
"testing"
5+
"time"
56

67
"github.com/stretchr/testify/assert"
78
"k8s.io/client-go/rest"
89
mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager"
910

11+
"github.com/platform-mesh/golang-commons/controller/lifecycle/ratelimiter"
1012
pmtesting "github.com/platform-mesh/golang-commons/controller/testSupport"
1113
"github.com/platform-mesh/golang-commons/logger"
1214
)
@@ -58,6 +60,38 @@ func TestBuilder_WithReadOnly(t *testing.T) {
5860
}
5961
}
6062

63+
func TestBuilder_WithCustomRateLimiter(t *testing.T) {
64+
t.Run("With options", func(t *testing.T) {
65+
b := NewBuilder("op", "ctrl", nil, &logger.Logger{})
66+
opts := []ratelimiter.Option{
67+
ratelimiter.WithRequeueDelay(5 * time.Second),
68+
ratelimiter.WithStaticWindow(1 * time.Minute),
69+
}
70+
b.WithStaticThenExponentialRateLimiter(opts...)
71+
if b.rateLimiterOptions == nil {
72+
t.Error("expected rateLimiterOptions to be non-nil")
73+
}
74+
if got := len(*b.rateLimiterOptions); got != 2 {
75+
t.Errorf("expected 2 rate limiter options, got %d", got)
76+
}
77+
})
78+
t.Run("Without options", func(t *testing.T) {
79+
b := NewBuilder("op", "ctrl", nil, &logger.Logger{})
80+
b.WithStaticThenExponentialRateLimiter()
81+
if b.rateLimiterOptions == nil {
82+
t.Error("expected rateLimiterOptions to be non-nil even with no options")
83+
}
84+
if got := len(*b.rateLimiterOptions); got != 0 {
85+
t.Errorf("expected 0 rate limiter options, got %d", got)
86+
}
87+
})
88+
89+
t.Run("Without custom rate limiter", func(t *testing.T) {
90+
b := NewBuilder("op", "ctrl", nil, &logger.Logger{})
91+
assert.Nil(t, b.rateLimiterOptions)
92+
})
93+
}
94+
6195
func TestControllerRuntimeBuilder(t *testing.T) {
6296
t.Run("Minimal setup", func(t *testing.T) {
6397
b := NewBuilder("op", "ctrl", nil, &logger.Logger{})
@@ -77,6 +111,16 @@ func TestControllerRuntimeBuilder(t *testing.T) {
77111
lm := b.BuildControllerRuntime(fakeClient)
78112
assert.NotNil(t, lm)
79113
})
114+
t.Run("WithCustomRateLimiter", func(t *testing.T) {
115+
b := NewBuilder("op", "ctrl", nil, &logger.Logger{}).WithStaticThenExponentialRateLimiter(
116+
ratelimiter.WithRequeueDelay(5*time.Second),
117+
ratelimiter.WithStaticWindow(1*time.Minute),
118+
ratelimiter.WithExponentialInitialBackoff(5*time.Second),
119+
)
120+
fakeClient := pmtesting.CreateFakeClient(t, &pmtesting.TestApiObject{})
121+
lm := b.BuildControllerRuntime(fakeClient)
122+
assert.NotNil(t, lm)
123+
})
80124
}
81125

82126
func TestMulticontrollerRuntimeBuilder(t *testing.T) {
@@ -107,4 +151,17 @@ func TestMulticontrollerRuntimeBuilder(t *testing.T) {
107151
lm := b.BuildMultiCluster(mgr)
108152
assert.NotNil(t, lm)
109153
})
154+
t.Run("WithCustomRateLimiter", func(t *testing.T) {
155+
b := NewBuilder("op", "ctrl", nil, &logger.Logger{}).WithStaticThenExponentialRateLimiter(
156+
ratelimiter.WithRequeueDelay(5*time.Second),
157+
ratelimiter.WithStaticWindow(1*time.Minute),
158+
ratelimiter.WithExponentialInitialBackoff(5*time.Second),
159+
)
160+
cfg := &rest.Config{}
161+
provider := pmtesting.NewFakeProvider(cfg)
162+
mgr, err := mcmanager.New(cfg, provider, mcmanager.Options{})
163+
assert.NoError(t, err)
164+
lm := b.BuildMultiCluster(mgr)
165+
assert.NotNil(t, lm)
166+
})
110167
}

controller/lifecycle/controllerruntime/lifecycle.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package controllerruntime
33
import (
44
"context"
55
"fmt"
6+
"log"
67

78
ctrl "sigs.k8s.io/controller-runtime"
89
"sigs.k8s.io/controller-runtime/pkg/builder"
@@ -11,10 +12,13 @@ import (
1112
"sigs.k8s.io/controller-runtime/pkg/predicate"
1213
"sigs.k8s.io/controller-runtime/pkg/reconcile"
1314

15+
"k8s.io/client-go/util/workqueue"
16+
1417
"github.com/platform-mesh/golang-commons/controller/filter"
1518
"github.com/platform-mesh/golang-commons/controller/lifecycle"
1619
"github.com/platform-mesh/golang-commons/controller/lifecycle/api"
1720
"github.com/platform-mesh/golang-commons/controller/lifecycle/conditions"
21+
"github.com/platform-mesh/golang-commons/controller/lifecycle/ratelimiter"
1822
"github.com/platform-mesh/golang-commons/controller/lifecycle/runtimeobject"
1923
"github.com/platform-mesh/golang-commons/controller/lifecycle/spread"
2024
"github.com/platform-mesh/golang-commons/controller/lifecycle/subroutine"
@@ -29,6 +33,7 @@ type LifecycleManager struct {
2933
spreader *spread.Spreader
3034
conditionsManager *conditions.ConditionManager
3135
prepareContextFunc api.PrepareContextFunc
36+
rateLimiter workqueue.TypedRateLimiter[reconcile.Request]
3237
}
3338

3439
func NewLifecycleManager(subroutines []subroutine.Subroutine, operatorName string, controllerName string, client client.Client, log *logger.Logger) *LifecycleManager {
@@ -83,10 +88,18 @@ func (l *LifecycleManager) SetupWithManagerBuilder(mgr ctrl.Manager, maxReconcil
8388
}
8489

8590
eventPredicates = append([]predicate.Predicate{filter.DebugResourcesBehaviourPredicate(debugLabelValue)}, eventPredicates...)
91+
opts := controller.Options{
92+
MaxConcurrentReconciles: maxReconciles,
93+
}
94+
95+
if l.rateLimiter != nil {
96+
opts.RateLimiter = l.rateLimiter
97+
}
98+
8699
return ctrl.NewControllerManagedBy(mgr).
87100
Named(reconcilerName).
88101
For(instance).
89-
WithOptions(controller.Options{MaxConcurrentReconciles: maxReconciles}).
102+
WithOptions(opts).
90103
WithEventFilter(predicate.And(eventPredicates...)), nil
91104
}
92105
func (l *LifecycleManager) SetupWithManager(mgr ctrl.Manager, maxReconciles int, reconcilerName string, instance runtimeobject.RuntimeObject, debugLabelValue string, r reconcile.Reconciler, log *logger.Logger, eventPredicates ...predicate.Predicate) error {
@@ -123,3 +136,12 @@ func (l *LifecycleManager) WithConditionManagement() *LifecycleManager {
123136
l.conditionsManager = conditions.NewConditionManager()
124137
return l
125138
}
139+
140+
func (l *LifecycleManager) WithStaticThenExponentialRateLimiter(opts ...ratelimiter.Option) *LifecycleManager {
141+
rateLimiter, err := ratelimiter.NewStaticThenExponentialRateLimiter[reconcile.Request](ratelimiter.NewConfig(opts...))
142+
if err != nil {
143+
log.Fatalf("rate limiter config error: %s",err)
144+
}
145+
l.rateLimiter = rateLimiter
146+
return l
147+
}

controller/lifecycle/controllerruntime/lifecycle_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
goerrors "errors"
66
"testing"
7+
"time"
78

89
"github.com/stretchr/testify/assert"
910
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -13,6 +14,7 @@ import (
1314
"sigs.k8s.io/controller-runtime/pkg/client"
1415
"sigs.k8s.io/controller-runtime/pkg/manager"
1516

17+
"github.com/platform-mesh/golang-commons/controller/lifecycle/ratelimiter"
1618
"github.com/platform-mesh/golang-commons/controller/lifecycle/runtimeobject"
1719
"github.com/platform-mesh/golang-commons/controller/lifecycle/subroutine"
1820
pmtesting "github.com/platform-mesh/golang-commons/controller/testSupport"
@@ -152,6 +154,31 @@ func TestLifecycle(t *testing.T) {
152154
assert.True(t, true, l.ConditionsManager() != nil)
153155
})
154156

157+
t.Run("WithRateLimiter", func(t *testing.T) {
158+
fakeClient := pmtesting.CreateFakeClient(t, &pmtesting.TestApiObject{})
159+
_, log := createLifecycleManager([]subroutine.Subroutine{}, fakeClient)
160+
161+
l := NewLifecycleManager([]subroutine.Subroutine{}, "test-operator", "test-controller", fakeClient, log.Logger)
162+
expectedCfg := ratelimiter.Config{
163+
StaticRequeueDelay: 5 * time.Second,
164+
StaticWindow: 10 * time.Second,
165+
ExponentialInitialBackoff: 5 * time.Second,
166+
ExponentialMaxBackoff: time.Minute,
167+
}
168+
l.WithStaticThenExponentialRateLimiter(
169+
ratelimiter.WithRequeueDelay(expectedCfg.StaticRequeueDelay),
170+
ratelimiter.WithStaticWindow(expectedCfg.StaticWindow),
171+
ratelimiter.WithExponentialInitialBackoff(expectedCfg.ExponentialInitialBackoff),
172+
ratelimiter.WithExponentialMaxBackoff(expectedCfg.ExponentialMaxBackoff),
173+
)
174+
175+
assert.NotNil(t, l.rateLimiter, "rate limiter should be configured")
176+
177+
req := controllerruntime.Request{}
178+
delay := l.rateLimiter.When(req)
179+
assert.Equal(t, expectedCfg.StaticRequeueDelay, delay)
180+
})
181+
155182
}
156183

157184
type testReconciler struct {

controller/lifecycle/multicluster/lifecycle.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package multicluster
33
import (
44
"context"
55
"fmt"
6+
"log"
67

78
ctrl "sigs.k8s.io/controller-runtime"
89
"sigs.k8s.io/controller-runtime/pkg/cluster"
@@ -13,10 +14,13 @@ import (
1314
mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager"
1415
mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile"
1516

17+
"k8s.io/client-go/util/workqueue"
18+
1619
"github.com/platform-mesh/golang-commons/controller/filter"
1720
"github.com/platform-mesh/golang-commons/controller/lifecycle"
1821
"github.com/platform-mesh/golang-commons/controller/lifecycle/api"
1922
"github.com/platform-mesh/golang-commons/controller/lifecycle/conditions"
23+
"github.com/platform-mesh/golang-commons/controller/lifecycle/ratelimiter"
2024
"github.com/platform-mesh/golang-commons/controller/lifecycle/runtimeobject"
2125
"github.com/platform-mesh/golang-commons/controller/lifecycle/spread"
2226
"github.com/platform-mesh/golang-commons/controller/lifecycle/subroutine"
@@ -35,6 +39,7 @@ type LifecycleManager struct {
3539
spreader *spread.Spreader
3640
conditionsManager *conditions.ConditionManager
3741
prepareContextFunc api.PrepareContextFunc
42+
rateLimiter workqueue.TypedRateLimiter[mcreconcile.Request]
3843
}
3944

4045
func NewLifecycleManager(subroutines []subroutine.Subroutine, operatorName string, controllerName string, mgr ClusterGetter, log *logger.Logger) *LifecycleManager {
@@ -96,6 +101,11 @@ func (l *LifecycleManager) SetupWithManagerBuilder(mgr mcmanager.Manager, maxRec
96101
opts := controller.TypedOptions[mcreconcile.Request]{
97102
MaxConcurrentReconciles: maxReconciles,
98103
}
104+
105+
if l.rateLimiter != nil {
106+
opts.RateLimiter = l.rateLimiter
107+
}
108+
99109
return mcbuilder.ControllerManagedBy(mgr).
100110
Named(reconcilerName).
101111
For(instance).
@@ -136,3 +146,12 @@ func (l *LifecycleManager) WithConditionManagement() api.Lifecycle {
136146
l.conditionsManager = conditions.NewConditionManager()
137147
return l
138148
}
149+
150+
func (l *LifecycleManager) WithStaticThenExponentialRateLimiter(opts ...ratelimiter.Option) *LifecycleManager {
151+
rateLimiter, err := ratelimiter.NewStaticThenExponentialRateLimiter[mcreconcile.Request](ratelimiter.NewConfig(opts...))
152+
if err != nil {
153+
log.Fatalf("rate limiter config error: %s",err)
154+
}
155+
l.rateLimiter = rateLimiter
156+
return l
157+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package ratelimiter
2+
3+
import (
4+
"fmt"
5+
"time"
6+
)
7+
8+
type Config struct {
9+
StaticRequeueDelay time.Duration
10+
StaticWindow time.Duration
11+
ExponentialInitialBackoff time.Duration
12+
ExponentialMaxBackoff time.Duration
13+
}
14+
15+
var defaultConfig = Config{
16+
StaticRequeueDelay: 2 * time.Second,
17+
StaticWindow: 60 * time.Second,
18+
ExponentialInitialBackoff: 2 * time.Second,
19+
ExponentialMaxBackoff: 1000 * time.Second,
20+
}
21+
22+
func (c Config) validate() error {
23+
if c.StaticRequeueDelay < 0 {
24+
return fmt.Errorf("the static requeue delay shouldn't be negative")
25+
}
26+
if c.ExponentialInitialBackoff < 0 {
27+
return fmt.Errorf("the initial exponential backoff shouldn't be negative")
28+
}
29+
if c.StaticRequeueDelay > c.ExponentialInitialBackoff {
30+
return fmt.Errorf("the initial exponential backoff should be equal to or greater than the static requeue delay")
31+
}
32+
if c.StaticWindow < c.StaticRequeueDelay {
33+
return fmt.Errorf("the static window duration should be equal to or greater than the static requeue delay")
34+
}
35+
return nil
36+
}
37+
38+
type Option func(*Config)
39+
40+
func WithStaticWindow(d time.Duration) Option {
41+
return func(c *Config) {
42+
c.StaticWindow = d
43+
}
44+
}
45+
46+
func WithRequeueDelay(d time.Duration) Option {
47+
return func(c *Config) {
48+
c.StaticRequeueDelay = d
49+
}
50+
}
51+
52+
func WithExponentialInitialBackoff(d time.Duration) Option {
53+
return func(c *Config) {
54+
c.ExponentialInitialBackoff = d
55+
}
56+
}
57+
58+
func WithExponentialMaxBackoff(d time.Duration) Option {
59+
return func(c *Config) {
60+
c.ExponentialMaxBackoff = d
61+
}
62+
}
63+
64+
func NewConfig(options ...Option) Config {
65+
cfg := defaultConfig
66+
67+
for _, option := range options {
68+
option(&cfg)
69+
}
70+
71+
return cfg
72+
}

0 commit comments

Comments
 (0)