Skip to content

Commit ffa8729

Browse files
committed
Force bq.enter()/bq.leave() to be called if the scheduler is idle
Once the final workers shut down, bq.enter()/bq.leave() won't end up getting called. This means that they only get removed from the scheduler the next time any RPC is called (e.g., when workers start to come online again). This unfortunately causes Prometheus metrics to be incorrect in the meantime.
1 parent 8a43a77 commit ffa8729

File tree

2 files changed

+28
-1
lines changed

2 files changed

+28
-1
lines changed

cmd/bb_scheduler/main.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,25 @@ func main() {
141141
actionRouter,
142142
executeAuthorizer,
143143
modifyDrainsAuthorizer,
144-
killOperationsAuthorizer)
144+
killOperationsAuthorizer,
145+
)
146+
147+
// Force periodic cleanups of stale workers. This also
148+
// happens automatically when RPCs occur, but that's not
149+
// sufficient to ensure Prometheus metrics are updated
150+
// if the final workers disappear.
151+
dependenciesGroup.Go(func(ctx context.Context, siblingsGroup, dependenciesGroup program.Group) error {
152+
t := time.NewTicker(time.Minute)
153+
for {
154+
select {
155+
case <-t.C:
156+
buildQueue.ForceCleanup()
157+
case <-ctx.Done():
158+
t.Stop()
159+
return nil
160+
}
161+
}
162+
})
145163

146164
// Create predeclared platform queues.
147165
for _, platformQueue := range configuration.PredeclaredPlatformQueues {

pkg/scheduler/in_memory_build_queue.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1258,6 +1258,15 @@ func (bq *InMemoryBuildQueue) leave() {
12581258
bq.lock.Unlock()
12591259
}
12601260

1261+
// ForceCleanup forcefully runs any pending cleanup tasks. This method
1262+
// can be invoked periodically to ensure that workers are removed, even
1263+
// if no other RPC traffic occurs. This ensures that Prometheus metrics
1264+
// report the correct values.
1265+
func (bq *InMemoryBuildQueue) ForceCleanup() {
1266+
bq.enter(bq.clock.Now())
1267+
bq.leave()
1268+
}
1269+
12611270
// getIdleSynchronizeResponse returns a synchronization response that
12621271
// explicitly instructs a worker to return to the idle state.
12631272
func (bq *InMemoryBuildQueue) getIdleSynchronizeResponse() *remoteworker.SynchronizeResponse {

0 commit comments

Comments
 (0)