Skip to content

Commit f7c3ef9

Browse files
refactor to remove deprecated code
1 parent daa12ca commit f7c3ef9

File tree

2 files changed

+14
-24
lines changed

2 files changed

+14
-24
lines changed

pkg/controlloop/pod.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ type PodController struct {
7373
netAttachDefLister nadlister.NetworkAttachmentDefinitionLister
7474
broadcaster record.EventBroadcaster
7575
recorder record.EventRecorder
76-
workqueue workqueue.RateLimitingInterface
76+
workqueue workqueue.TypedRateLimitingInterface[*v1.Pod]
7777
mountPath string
7878
cleanupFunc garbageCollector
7979
}
@@ -108,9 +108,8 @@ func newPodController(k8sCoreClient kubernetes.Interface, wbClient wbclientset.I
108108
networksInformer := netAttachDefInformer.Informer()
109109
podsInformer := k8sPodFilteredInformer.Informer()
110110

111-
queue := workqueue.NewNamedRateLimitingQueue(
112-
workqueue.DefaultControllerRateLimiter(),
113-
ipReconcilerQueueName)
111+
queue := workqueue.NewTypedRateLimitingQueue[*v1.Pod](
112+
workqueue.DefaultTypedControllerRateLimiter[*v1.Pod]())
114113

115114
podsInformer.AddEventHandler(
116115
cache.ResourceEventHandlerFuncs{
@@ -166,7 +165,7 @@ func (pc *PodController) processNextWorkItem() bool {
166165
}
167166
defer pc.workqueue.Done(queueItem)
168167

169-
pod := queueItem.(*v1.Pod)
168+
pod := queueItem
170169
err := pc.garbageCollectPodIPs(pod)
171170
logging.Verbosef("result of garbage collecting pods: %+v", err)
172171
pc.handleResult(pod, err)
@@ -344,7 +343,7 @@ func (pc *PodController) addressGarbageCollectionFailed(pod *v1.Pod, err error)
344343
}
345344
}
346345

347-
func onPodDelete(queue workqueue.RateLimitingInterface, obj interface{}) {
346+
func onPodDelete(queue workqueue.TypedRateLimitingInterface[*v1.Pod], obj interface{}) {
348347
pod, err := podFromTombstone(obj)
349348
if err != nil {
350349
logging.Errorf("cannot create pod object from %v on pod delete: %v", obj, err)

pkg/node-controller/controller.go

Lines changed: 9 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ type Controller struct {
6969
// means we can ensure we only process a fixed amount of resources at a
7070
// time, and makes it easy to ensure we are never processing the same item
7171
// simultaneously in two different workers.
72-
workqueue workqueue.RateLimitingInterface
72+
workqueue workqueue.TypedRateLimitingInterface[string]
7373

7474
// recorder is an event recorder for recording Event resources to the
7575
// Kubernetes API.
@@ -103,9 +103,9 @@ func NewController(
103103
eventBroadcaster.StartStructuredLogging(0)
104104
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
105105
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
106-
ratelimiter := workqueue.NewMaxOfRateLimiter(
107-
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
108-
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(50), 300)},
106+
ratelimiter := workqueue.NewTypedMaxOfRateLimiter(
107+
workqueue.NewTypedItemExponentialFailureRateLimiter[string](5*time.Millisecond, 1000*time.Second),
108+
&workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(50), 300)},
109109
)
110110

111111
c := &Controller{
@@ -121,7 +121,7 @@ func NewController(
121121
nadInformer: nadInformer,
122122
nadLister: nadInformer.Lister(),
123123
nadSynced: nadInformer.Informer().HasSynced,
124-
workqueue: workqueue.NewRateLimitingQueue(ratelimiter),
124+
workqueue: workqueue.NewTypedRateLimitingQueue(ratelimiter),
125125
recorder: recorder,
126126
sortResults: sortResults,
127127
whereaboutsNamespace: whereaboutsNamespace,
@@ -253,29 +253,20 @@ func (c *Controller) processNextWorkItem(ctx context.Context) bool {
253253
}
254254

255255
// We wrap this block in a func so we can defer c.workqueue.Done.
256-
err := func(obj interface{}) error {
256+
err := func(key string) error {
257257
// We call Done here so the workqueue knows we have finished
258258
// processing this item. We also must remember to call Forget if we
259259
// do not want this work item being re-queued. For example, we do
260260
// not call Forget if a transient error occurs, instead the item is
261261
// put back on the workqueue and attempted again after a back-off
262262
// period.
263-
defer c.workqueue.Done(obj)
264-
var key string
265-
var ok bool
263+
defer c.workqueue.Done(key)
266264
// We expect strings to come off the workqueue. These are of the
267265
// form namespace/name. We do this as the delayed nature of the
268266
// workqueue means the items in the informer cache may actually be
269267
// more up to date that when the item was initially put onto the
270268
// workqueue.
271-
if key, ok = obj.(string); !ok {
272-
// As the item in the workqueue is actually invalid, we call
273-
// Forget here else we'd go into a loop of attempting to
274-
// process a work item that is invalid.
275-
c.workqueue.Forget(obj)
276-
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
277-
return nil
278-
}
269+
279270
// Run the syncHandler, passing it the namespace/name string of the
280271
// Foo resource to be synced.
281272
if err := c.syncHandler(ctx, key); err != nil {
@@ -285,7 +276,7 @@ func (c *Controller) processNextWorkItem(ctx context.Context) bool {
285276
}
286277
// Finally, if no error occurs we Forget this item so it does not
287278
// get queued again until another change happens.
288-
c.workqueue.Forget(obj)
279+
c.workqueue.Forget(key)
289280
logger.Info("Successfully synced", "resourceName", key)
290281
return nil
291282
}(obj)

0 commit comments

Comments
 (0)