Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type Controller struct {
networkInformer networkinformer.NetworkInformer
networkClientset networkclientset.Interface
gceCloud *gce.Cloud
queue workqueue.RateLimitingInterface
queue workqueue.TypedRateLimitingInterface[string]
networkInformerFactory networkinformers.SharedInformerFactory

nodeLister corelisters.NodeLister
Expand All @@ -99,7 +99,7 @@ func NewGKENetworkParamSetController(
gkeNetworkParamsInformer: gkeNetworkParamsInformer,
networkInformer: networkInformer,
gceCloud: gceCloud,
queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{Name: workqueueName}),
queue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[string](), workqueue.TypedRateLimitingQueueConfig[string]{Name: workqueueName}),
networkInformerFactory: networkInformerFactory,
nodeLister: nodeInformer.Lister(),
nodeInformerSynced: nodeInformer.Informer().HasSynced,
Expand Down Expand Up @@ -230,13 +230,13 @@ func (c *Controller) processNextItem(ctx context.Context) bool {

defer c.queue.Done(key)

err := c.reconcile(ctx, key.(string))
err := c.reconcile(ctx, key)
c.handleErr(err, key)
return true
}

// handleErr checks if an error happened and makes sure we will retry later.
func (c *Controller) handleErr(err error, key interface{}) {
func (c *Controller) handleErr(err error, key string) {
if err == nil {
// Forget about the #AddRateLimited history of the key on every successful synchronization.
// This ensures that future processing of updates for this key is not delayed because of
Expand Down
6 changes: 0 additions & 6 deletions pkg/controller/nodeipam/ipam/cidr_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,6 @@ const (
// cidrUpdateRetries is the no. of times a NodeSpec update will be retried before dropping it.
cidrUpdateRetries = 3

// updateRetryTimeout is the time to wait before requeing a failed node for retry
updateRetryTimeout = 250 * time.Millisecond

// maxUpdateRetryTimeout is the maximum amount of time between timeouts.
maxUpdateRetryTimeout = 5 * time.Second

// updateMaxRetries is the max retries for a failed node
updateMaxRetries = 10

Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/nodeipam/ipam/cloud_cidr_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ type cloudCIDRAllocator struct {
nodesSynced cache.InformerSynced

recorder record.EventRecorder
queue workqueue.RateLimitingInterface
queue workqueue.TypedRateLimitingInterface[string]
nodeTopologyQueue *TaskQueue

stackType clusterStackType
Expand Down Expand Up @@ -148,7 +148,7 @@ func NewCloudCIDRAllocator(client clientset.Interface, cloud cloudprovider.Inter
nodeLister: nodeInformer.Lister(),
nodesSynced: nodeInformer.Informer().HasSynced,
recorder: recorder,
queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{Name: workqueueName}),
queue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[string](), workqueue.TypedRateLimitingQueueConfig[string]{Name: workqueueName}),
stackType: stackType,
enableMultiNetworking: enableMultiNetworking,
}
Expand Down Expand Up @@ -341,13 +341,13 @@ func (ca *cloudCIDRAllocator) processNextItem(ctx context.Context) bool {

klog.V(3).Infof("Processing %s", key)
//TODO: properly enable and pass ctx to updateCIDRAllocation
err := ca.updateCIDRAllocation(key.(string))
err := ca.updateCIDRAllocation(key)
ca.handleErr(err, key)
return true
}

// handleErr checks if an error happened and makes sure we will retry later.
func (ca *cloudCIDRAllocator) handleErr(err error, key interface{}) {
func (ca *cloudCIDRAllocator) handleErr(err error, key string) {
if err == nil {
// Forget about the #AddRateLimited history of the key on every successful synchronization.
// This ensures that future processing of updates for this key is not delayed because of
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/nodeipam/ipam/cloud_cidr_allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ var (
func hasNodeInProcessing(ca *cloudCIDRAllocator, name string) bool {
if ca.queue.Len() > 0 {
val, _ := ca.queue.Get()
if val.(string) == name {
if val == name {
return true
}
}
Expand All @@ -94,7 +94,7 @@ func TestBoundedRetries(t *testing.T) {
client: clientSet,
nodeLister: sharedInfomer.Core().V1().Nodes().Lister(),
nodesSynced: sharedInfomer.Core().V1().Nodes().Informer().HasSynced,
queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{Name: "cloudCIDRAllocator"}),
queue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[string](), workqueue.TypedRateLimitingQueueConfig[string]{Name: "cloudCIDRAllocator"}),
}
go wait.UntilWithContext(context.TODO(), ca.runWorker, time.Second)
nodeName := "testNode"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func TestNetworkToNodes(t *testing.T) {
ca := &cloudCIDRAllocator{
nodeLister: fakeNodeInformer.Lister(),
nodesSynced: fakeNodeInformer.Informer().HasSynced,
queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{Name: "cloudCIDRAllocator"}),
queue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[string](), workqueue.TypedRateLimitingQueueConfig[string]{Name: "cloudCIDRAllocator"}),
}

// test
Expand All @@ -230,7 +230,7 @@ func TestNetworkToNodes(t *testing.T) {
if sh {
t.Fatalf("got preemtive queue shutdown")
}
_, ok := tc.expectNodes[val.(string)]
_, ok := tc.expectNodes[val]
if !ok {
t.Fatalf("unexpected node %s in processing", val)
}
Expand Down