Skip to content

Commit 3a09c24

Browse files
committed
Use typed rate limit interface
Signed-off-by: LogicalShark <maralder@google.com>
1 parent 5efae2d commit 3a09c24

File tree

5 files changed

+26
-25
lines changed

5 files changed

+26
-25
lines changed

pkg/controller/gkenetworkparamset/gkenetworkparamset_controller.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ type Controller struct {
7272
networkInformer networkinformer.NetworkInformer
7373
networkClientset networkclientset.Interface
7474
gceCloud *gce.Cloud
75-
queue workqueue.RateLimitingInterface
75+
queue workqueue.TypedRateLimitingInterface[string]
7676
networkInformerFactory networkinformers.SharedInformerFactory
7777

7878
nodeLister corelisters.NodeLister
@@ -99,7 +99,7 @@ func NewGKENetworkParamSetController(
9999
gkeNetworkParamsInformer: gkeNetworkParamsInformer,
100100
networkInformer: networkInformer,
101101
gceCloud: gceCloud,
102-
queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{Name: workqueueName}),
102+
queue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[string](), workqueue.TypedRateLimitingQueueConfig[string]{Name: workqueueName}),
103103
networkInformerFactory: networkInformerFactory,
104104
nodeLister: nodeInformer.Lister(),
105105
nodeInformerSynced: nodeInformer.Informer().HasSynced,
@@ -230,13 +230,13 @@ func (c *Controller) processNextItem(ctx context.Context) bool {
230230

231231
defer c.queue.Done(key)
232232

233-
err := c.reconcile(ctx, key.(string))
233+
err := c.reconcile(ctx, key)
234234
c.handleErr(err, key)
235235
return true
236236
}
237237

238238
// handleErr checks if an error happened and makes sure we will retry later.
239-
func (c *Controller) handleErr(err error, key interface{}) {
239+
func (c *Controller) handleErr(err error, key string) {
240240
if err == nil {
241241
// Forget about the #AddRateLimited history of the key on every successful synchronization.
242242
// This ensures that future processing of updates for this key is not delayed because of

pkg/controller/nodeipam/ipam/cidr_allocator.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,12 +68,6 @@ const (
6868
// cidrUpdateRetries is the no. of times a NodeSpec update will be retried before dropping it.
6969
cidrUpdateRetries = 3
7070

71-
// updateRetryTimeout is the time to wait before requeing a failed node for retry
72-
updateRetryTimeout = 250 * time.Millisecond
73-
74-
// maxUpdateRetryTimeout is the maximum amount of time between timeouts.
75-
maxUpdateRetryTimeout = 5 * time.Second
76-
7771
// updateMaxRetries is the max retries for a failed node
7872
updateMaxRetries = 10
7973

pkg/controller/nodeipam/ipam/cloud_cidr_allocator.go

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
networkinformer "github.com/GoogleCloudPlatform/gke-networking-api/client/network/informers/externalversions/network/v1"
3838
networklister "github.com/GoogleCloudPlatform/gke-networking-api/client/network/listers/network/v1"
3939
nodetopologyclientset "github.com/GoogleCloudPlatform/gke-networking-api/client/nodetopology/clientset/versioned"
40+
"golang.org/x/time/rate"
4041
v1 "k8s.io/api/core/v1"
4142
"k8s.io/apimachinery/pkg/api/errors"
4243
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -98,7 +99,7 @@ type cloudCIDRAllocator struct {
9899
nodesSynced cache.InformerSynced
99100

100101
recorder record.EventRecorder
101-
queue workqueue.RateLimitingInterface
102+
queue workqueue.TypedRateLimitingInterface[string]
102103
nodeTopologyQueue *TaskQueue
103104

104105
stackType clusterStackType
@@ -141,14 +142,20 @@ func NewCloudCIDRAllocator(client clientset.Interface, cloud cloudprovider.Inter
141142
}
142143

143144
ca := &cloudCIDRAllocator{
144-
client: client,
145-
cloud: gceCloud,
146-
networksLister: nwInformer.Lister(),
147-
gnpLister: gnpInformer.Lister(),
148-
nodeLister: nodeInformer.Lister(),
149-
nodesSynced: nodeInformer.Informer().HasSynced,
150-
recorder: recorder,
151-
queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{Name: workqueueName}),
145+
client: client,
146+
cloud: gceCloud,
147+
networksLister: nwInformer.Lister(),
148+
gnpLister: gnpInformer.Lister(),
149+
nodeLister: nodeInformer.Lister(),
150+
nodesSynced: nodeInformer.Informer().HasSynced,
151+
recorder: recorder,
152+
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
153+
workqueue.NewTypedMaxOfRateLimiter(
154+
workqueue.NewTypedItemExponentialFailureRateLimiter[string](updateRetryTimeout, maxUpdateRetryTimeout),
155+
&workqueue.TypedBucketRateLimiter[string]{
156+
Limiter: rate.NewLimiter(rate.Limit(10), 100),
157+
}),
158+
workqueue.TypedRateLimitingQueueConfig[string]{Name: workqueueName}),
152159
stackType: stackType,
153160
enableMultiNetworking: enableMultiNetworking,
154161
}
@@ -341,13 +348,13 @@ func (ca *cloudCIDRAllocator) processNextItem(ctx context.Context) bool {
341348

342349
klog.V(3).Infof("Processing %s", key)
343350
//TODO: properly enable and pass ctx to updateCIDRAllocation
344-
err := ca.updateCIDRAllocation(key.(string))
351+
err := ca.updateCIDRAllocation(key)
345352
ca.handleErr(err, key)
346353
return true
347354
}
348355

349356
// handleErr checks if an error happened and makes sure we will retry later.
350-
func (ca *cloudCIDRAllocator) handleErr(err error, key interface{}) {
357+
func (ca *cloudCIDRAllocator) handleErr(err error, key string) {
351358
if err == nil {
352359
// Forget about the #AddRateLimited history of the key on every successful synchronization.
353360
// This ensures that future processing of updates for this key is not delayed because of

pkg/controller/nodeipam/ipam/cloud_cidr_allocator_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ var (
8080
func hasNodeInProcessing(ca *cloudCIDRAllocator, name string) bool {
8181
if ca.queue.Len() > 0 {
8282
val, _ := ca.queue.Get()
83-
if val.(string) == name {
83+
if val == name {
8484
return true
8585
}
8686
}
@@ -94,7 +94,7 @@ func TestBoundedRetries(t *testing.T) {
9494
client: clientSet,
9595
nodeLister: sharedInfomer.Core().V1().Nodes().Lister(),
9696
nodesSynced: sharedInfomer.Core().V1().Nodes().Informer().HasSynced,
97-
queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{Name: "cloudCIDRAllocator"}),
97+
queue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[string](), workqueue.TypedRateLimitingQueueConfig[string]{Name: "cloudCIDRAllocator"}),
9898
}
9999
go wait.UntilWithContext(context.TODO(), ca.runWorker, time.Second)
100100
nodeName := "testNode"

pkg/controller/nodeipam/ipam/multinetwork_cloud_cidr_allocator_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ func TestNetworkToNodes(t *testing.T) {
212212
ca := &cloudCIDRAllocator{
213213
nodeLister: fakeNodeInformer.Lister(),
214214
nodesSynced: fakeNodeInformer.Informer().HasSynced,
215-
queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{Name: "cloudCIDRAllocator"}),
215+
queue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[string](), workqueue.TypedRateLimitingQueueConfig[string]{Name: "cloudCIDRAllocator"}),
216216
}
217217

218218
// test
@@ -230,7 +230,7 @@ func TestNetworkToNodes(t *testing.T) {
230230
if sh {
231231
t.Fatalf("got preemtive queue shutdown")
232232
}
233-
_, ok := tc.expectNodes[val.(string)]
233+
_, ok := tc.expectNodes[val]
234234
if !ok {
235235
t.Fatalf("unexpected node %s in processing", val)
236236
}

0 commit comments

Comments
 (0)