File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -243,13 +243,26 @@ func Test_NodesJoiningAndLeaving(t *testing.T) {
243243 time .Sleep (10 * time .Millisecond )
244244 }
245245
246+ // Process all items, including those waiting in the rate limiter's delayed queue.
247+ // The rate limiter has max delay of 5ms, so we wait longer than that when the
248+ // queue appears empty to ensure delayed items have time to be re-added.
246249 cnt := 0
247- for tc .workqueue .Len () > 0 {
248- tc .process (context .TODO ())
249- cnt ++
250- // sleep briefly because of exponential backoff when requeueing failed workitem
251- // resulting in workqueue to be empty if checked immediately
252- time .Sleep (7 * time .Millisecond )
250+ emptyChecks := 0
251+ const maxEmptyChecks = 3
252+ const emptyCheckDelay = 20 * time .Millisecond // 4x the max rate limit delay of 5ms
253+
254+ for emptyChecks < maxEmptyChecks {
255+ if tc .workqueue .Len () > 0 {
256+ tc .process (context .TODO ())
257+ cnt ++
258+ emptyChecks = 0 // Reset since we found work
259+ // No sleep here - process items as fast as possible
260+ } else {
261+ // Queue appears empty, but items might be in the delayed queue.
262+ // Wait longer than the max rate limit delay before concluding we're done.
263+ time .Sleep (emptyCheckDelay )
264+ emptyChecks ++
265+ }
253266 }
254267
255268 for _ , msg := range testcase .expectedMessages {
You can’t perform that action at this time.
0 commit comments