Skip to content

Commit 09c2a28

Browse files
authored
Merge pull request #429 from buildkite/add-waitgroup-around-jobhandlerworker
Add waitgroup around jobHandlerWorker
2 parents 0591ab8 + 36ce037 commit 09c2a28

File tree

1 file changed

+9
-1
lines changed

1 file changed

+9
-1
lines changed

internal/controller/monitor/monitor.go

+9-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"math/rand/v2"
99
"reflect"
10+
"sync"
1011
"time"
1112

1213
"github.com/Khan/genqlient/graphql"
@@ -208,8 +209,13 @@ func (m *Monitor) passJobsToNextHandler(ctx context.Context, logger *zap.Logger,
208209
jobsCh := make(chan *api.JobJobTypeCommand)
209210
defer close(jobsCh)
210211

212+
var wg sync.WaitGroup
211213
for range min(m.cfg.JobCreationConcurrency, len(jobs)) {
212-
go jobHandlerWorker(ctx, staleCtx, logger, handler, agentTags, jobsCh)
214+
wg.Add(1)
215+
go func() {
216+
defer wg.Done()
217+
jobHandlerWorker(ctx, staleCtx, logger, handler, agentTags, jobsCh)
218+
}()
213219
}
214220

215221
for _, job := range jobs {
@@ -221,6 +227,8 @@ func (m *Monitor) passJobsToNextHandler(ctx context.Context, logger *zap.Logger,
221227
case jobsCh <- job:
222228
}
223229
}
230+
231+
wg.Wait()
224232
}
225233

226234
func jobHandlerWorker(ctx, staleCtx context.Context, logger *zap.Logger, handler model.JobHandler, agentTags map[string]string, jobsCh <-chan *api.JobJobTypeCommand) {

0 commit comments

Comments
 (0)