Skip to content

Commit 18e0c39

Browse files
committed
fix(worker): replace time.Sleep with channel sync in consumer manager test
time.Sleep(50ms) after Advance() is unreliable under CI load. Add an afterIterFn test hook called after each ticker iteration so the test can synchronize precisely. Use BlockUntil(1) before the first Advance to ensure the run() goroutine has created the ticker. Fixes flaky TestConsumerManagerEnabledDisabled (3x in CI Jan-Apr 2026). Signed-off-by: Seva Kaloshin <seva.kaloshin@gmail.com>
1 parent 7e8da86 commit 18e0c39

2 files changed

Lines changed: 20 additions & 3 deletions

File tree

service/worker/asyncworkflow/async_workflow_consumer_manager.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ func WithEmitConsumerCountMetrifFn(fn func(int)) ConsumerManagerOptions {
7171
}
7272
}
7373

74+
func withAfterIterFn(fn func()) ConsumerManagerOptions {
75+
return func(c *ConsumerManager) { c.afterIterFn = fn }
76+
}
77+
7478
func NewConsumerManager(
7579
logger log.Logger,
7680
metricsClient metrics.Client,
@@ -119,6 +123,7 @@ type ConsumerManager struct {
119123
wg sync.WaitGroup
120124
activeConsumers map[string]provider.Consumer
121125
emitConsumerCountMetricFn func(int)
126+
afterIterFn func() // test hook: called after each ticker iteration, nil in production
122127
}
123128

124129
func (c *ConsumerManager) Start() {
@@ -176,6 +181,9 @@ func (c *ConsumerManager) run() {
176181
c.logger.Info("ConsumerManager background loop stopped because context is done")
177182
return
178183
}
184+
if c.afterIterFn != nil {
185+
c.afterIterFn()
186+
}
179187
}
180188
}
181189

service/worker/asyncworkflow/async_workflow_consumer_manager_test.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,8 @@ func TestConsumerManagerEnabledDisabled(t *testing.T) {
349349

350350
var consumerMgrEnabled, consumerCount int32
351351

352+
refreshed := make(chan struct{}, 1)
353+
352354
// create consumer manager
353355
cm := NewConsumerManager(
354356
testlogger.New(t),
@@ -363,15 +365,22 @@ func TestConsumerManagerEnabledDisabled(t *testing.T) {
363365
WithEmitConsumerCountMetrifFn(func(count int) {
364366
atomic.StoreInt32(&consumerCount, int32(count))
365367
}),
368+
withAfterIterFn(func() {
369+
select {
370+
case refreshed <- struct{}{}:
371+
default:
372+
}
373+
}),
366374
)
367375

368376
cm.Start()
369377
defer cm.Stop()
370378

371379
// wait for the first round of consumers to be created and verify consumer count
372380
atomic.StoreInt32(&consumerMgrEnabled, 1)
381+
mockTimeSrc.BlockUntil(1) // wait for run() goroutine to create the ticker
373382
mockTimeSrc.Advance(defaultRefreshInterval)
374-
time.Sleep(50 * time.Millisecond)
383+
<-refreshed
375384
t.Log("first round comparison")
376385
got := atomic.LoadInt32(&consumerCount)
377386
want := 1 // consumer manager is enabled
@@ -382,7 +391,7 @@ func TestConsumerManagerEnabledDisabled(t *testing.T) {
382391
// disable consumer manager and wait for the second round of refresh
383392
atomic.StoreInt32(&consumerMgrEnabled, 0)
384393
mockTimeSrc.Advance(defaultRefreshInterval)
385-
time.Sleep(50 * time.Millisecond)
394+
<-refreshed
386395
got = atomic.LoadInt32(&consumerCount)
387396
want = 0 // all consumers should be stopped when consumer manager is disabled
388397
if got != int32(want) {
@@ -392,7 +401,7 @@ func TestConsumerManagerEnabledDisabled(t *testing.T) {
392401
// enable consumer manager and wait for the third round of refresh
393402
atomic.StoreInt32(&consumerMgrEnabled, 1)
394403
mockTimeSrc.Advance(defaultRefreshInterval)
395-
time.Sleep(50 * time.Millisecond)
404+
<-refreshed
396405
got = atomic.LoadInt32(&consumerCount)
397406
want = 1 // consumer manager is enabled
398407
if got != int32(want) {

0 commit comments

Comments
 (0)