Skip to content

Commit a00207c

Browse files
wesmclaude
andauthored
fix: data race between WorkerPool.Start and Stop (#320)
## Summary - Fix data race between `WorkerPool.Start` and `Stop` when a signal fires during daemon startup - `wg.Add` in `Start()` was racing with `wg.Wait` in `Stop()`, detected by `-race` in `TestDaemonSignalCleanup` on macOS CI - Add a `readyCh` channel that `Start` closes after `wg.Add`, so `Stop` can synchronize before calling `wg.Wait` ## Test plan - [x] `go test -race ./internal/daemon/... -count=1` passes - [x] `go test -race ./cmd/roborev/... -tags integration -run TestDaemonSignalCleanup -count=5` passes (5/5, no races) - [x] `TestServerStop_StopsCIPoller` still works (Stop without Start) - [x] Full `go test ./...` passes 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 7f78f69 commit a00207c

File tree

1 file changed

+32
-12
lines changed

1 file changed

+32
-12
lines changed

internal/daemon/worker.go

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ type WorkerPool struct {
3030
numWorkers int
3131
activeWorkers atomic.Int32
3232
stopCh chan struct{}
33+
readyCh chan struct{} // closed after wg.Add in Start
34+
startOnce sync.Once
35+
stopOnce sync.Once
3336
wg sync.WaitGroup
3437

3538
// Track running jobs for cancellation
@@ -60,29 +63,46 @@ func NewWorkerPool(db *storage.DB, cfgGetter ConfigGetter, numWorkers int, broad
6063
activityLog: activityLog,
6164
numWorkers: numWorkers,
6265
stopCh: make(chan struct{}),
66+
readyCh: make(chan struct{}),
6367
runningJobs: make(map[int64]context.CancelFunc),
6468
pendingCancels: make(map[int64]bool),
6569
agentCooldowns: make(map[string]time.Time),
6670
outputBuffers: NewOutputBuffer(512*1024, 4*1024*1024), // 512KB/job, 4MB total
6771
}
6872
}
6973

70-
// Start begins the worker pool
74+
// Start begins the worker pool. Safe to call multiple times;
75+
// only the first call spawns workers.
7176
func (wp *WorkerPool) Start() {
72-
log.Printf("Starting worker pool with %d workers", wp.numWorkers)
73-
74-
wp.wg.Add(wp.numWorkers)
75-
for i := 0; i < wp.numWorkers; i++ {
76-
go wp.worker(i)
77-
}
77+
wp.startOnce.Do(func() {
78+
log.Printf(
79+
"Starting worker pool with %d workers",
80+
wp.numWorkers,
81+
)
82+
wp.wg.Add(wp.numWorkers)
83+
close(wp.readyCh)
84+
for i := 0; i < wp.numWorkers; i++ {
85+
go wp.worker(i)
86+
}
87+
})
7888
}
7989

80-
// Stop gracefully shuts down the worker pool
90+
// Stop gracefully shuts down the worker pool. Safe to call
91+
// multiple times; only the first call performs shutdown.
8192
func (wp *WorkerPool) Stop() {
82-
log.Println("Stopping worker pool...")
83-
close(wp.stopCh)
84-
wp.wg.Wait()
85-
log.Println("Worker pool stopped")
93+
wp.stopOnce.Do(func() {
94+
log.Println("Stopping worker pool...")
95+
close(wp.stopCh)
96+
// Wait for Start to finish wg.Add before calling Wait.
97+
// If Start was never called, readyCh stays open but
98+
// stopCh is closed, so any late workers exit immediately.
99+
select {
100+
case <-wp.readyCh:
101+
wp.wg.Wait()
102+
default:
103+
}
104+
log.Println("Worker pool stopped")
105+
})
86106
}
87107

88108
// ActiveWorkers returns the number of currently active workers

0 commit comments

Comments
 (0)