@@ -69,7 +69,7 @@ type Controller struct {
6969 // means we can ensure we only process a fixed amount of resources at a
7070 // time, and makes it easy to ensure we are never processing the same item
7171 // simultaneously in two different workers.
72- workqueue workqueue.RateLimitingInterface
72+ workqueue workqueue.TypedRateLimitingInterface [ string ]
7373
7474 // recorder is an event recorder for recording Event resources to the
7575 // Kubernetes API.
@@ -103,9 +103,9 @@ func NewController(
103103 eventBroadcaster .StartStructuredLogging (0 )
104104 eventBroadcaster .StartRecordingToSink (& typedcorev1.EventSinkImpl {Interface : kubeclientset .CoreV1 ().Events ("" )})
105105 recorder := eventBroadcaster .NewRecorder (scheme .Scheme , corev1.EventSource {Component : controllerAgentName })
106- ratelimiter := workqueue .NewMaxOfRateLimiter (
107- workqueue .NewItemExponentialFailureRateLimiter (5 * time .Millisecond , 1000 * time .Second ),
108- & workqueue.BucketRateLimiter {Limiter : rate .NewLimiter (rate .Limit (50 ), 300 )},
106+ ratelimiter := workqueue .NewTypedMaxOfRateLimiter (
107+ workqueue .NewTypedItemExponentialFailureRateLimiter [ string ] (5 * time .Millisecond , 1000 * time .Second ),
108+ & workqueue.TypedBucketRateLimiter [ string ] {Limiter : rate .NewLimiter (rate .Limit (50 ), 300 )},
109109 )
110110
111111 c := & Controller {
@@ -121,7 +121,7 @@ func NewController(
121121 nadInformer : nadInformer ,
122122 nadLister : nadInformer .Lister (),
123123 nadSynced : nadInformer .Informer ().HasSynced ,
124- workqueue : workqueue .NewRateLimitingQueue (ratelimiter ),
124+ workqueue : workqueue .NewTypedRateLimitingQueue (ratelimiter ),
125125 recorder : recorder ,
126126 sortResults : sortResults ,
127127 whereaboutsNamespace : whereaboutsNamespace ,
@@ -253,29 +253,20 @@ func (c *Controller) processNextWorkItem(ctx context.Context) bool {
253253 }
254254
255255 // We wrap this block in a func so we can defer c.workqueue.Done.
256- err := func (obj interface {} ) error {
256+ err := func (key string ) error {
257257 // We call Done here so the workqueue knows we have finished
258258 // processing this item. We also must remember to call Forget if we
259259 // do not want this work item being re-queued. For example, we do
260260 // not call Forget if a transient error occurs, instead the item is
261261 // put back on the workqueue and attempted again after a back-off
262262 // period.
263- defer c .workqueue .Done (obj )
264- var key string
265- var ok bool
263+ defer c .workqueue .Done (key )
266264 // We expect strings to come off the workqueue. These are of the
267265 // form namespace/name. We do this as the delayed nature of the
268266 // workqueue means the items in the informer cache may actually be
269267 // more up to date that when the item was initially put onto the
270268 // workqueue.
271- if key , ok = obj .(string ); ! ok {
272- // As the item in the workqueue is actually invalid, we call
273- // Forget here else we'd go into a loop of attempting to
274- // process a work item that is invalid.
275- c .workqueue .Forget (obj )
276- utilruntime .HandleError (fmt .Errorf ("expected string in workqueue but got %#v" , obj ))
277- return nil
278- }
269+
279270 // Run the syncHandler, passing it the namespace/name string of the
280271 // Foo resource to be synced.
281272 if err := c .syncHandler (ctx , key ); err != nil {
@@ -285,7 +276,7 @@ func (c *Controller) processNextWorkItem(ctx context.Context) bool {
285276 }
286277 // Finally, if no error occurs we Forget this item so it does not
287278 // get queued again until another change happens.
288- c .workqueue .Forget (obj )
279+ c .workqueue .Forget (key )
289280 logger .Info ("Successfully synced" , "resourceName" , key )
290281 return nil
291282 }(obj )
0 commit comments