Skip to content

Commit 51a09b6

Browse files
committed
feat(scheduler): add scenario search metrics
Signed-off-by: Erez Freiberger <enoodle@gmail.com>
1 parent 43e6394 commit 51a09b6

10 files changed

Lines changed: 516 additions & 50 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
77
## [Unreleased]
88

99
### Added
10-
- Added built-in `NodeLocalGreedy` and `MultiNodeGang` scenario generator implementations for bounded reclaim, preempt, and consolidation search.
10+
- Added a bounded scenario generator portfolio for reclaim, preempt, and consolidation search, with `SchedulingShard.spec.scenarioSearchBudgets` time-budget configuration and production scenario-search metrics.
1111
- Added an opt-in `deviceaccess` admission plugin (`--block-nvidia-visible-devices`, config field `admission.blockNvidiaVisibleDevices`, default disabled) that (1) rejects pods overriding the `NVIDIA_VISIBLE_DEVICES` environment variable with values other than `void`/`none` (or via a `valueFrom` reference), and (2) injects `NVIDIA_VISIBLE_DEVICES=void` into containers that do not request a GPU, blocking their access to GPUs on the node.
1212
- Added support for configuring admission Pod Disruption Budget via Helm values (`admission.podDisruptionBudget`) [#1490](https://github.com/kai-scheduler/KAI-Scheduler/pull/1490) [dttung2905](https://github.com/dttung2905)
1313
- Added an opt-in `hamicore` binder plugin (depends on `gpusharing`) to write the HAMI-core GPU memory limit (`CUDA_DEVICE_MEMORY_LIMIT`) for fractional GPU pods.

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ require (
2424
github.com/pkg/errors v0.9.1
2525
github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.88.0
2626
github.com/prometheus/client_golang v1.23.2
27+
github.com/prometheus/client_model v0.6.2
2728
github.com/prometheus/common v0.67.5
2829
github.com/ray-project/kuberay/ray-operator v1.5.1
2930
github.com/run-ai/kwok-operator v0.0.0-20240926063032-05b6364bc7c7
@@ -149,7 +150,6 @@ require (
149150
github.com/opencontainers/selinux v1.13.0 // indirect
150151
github.com/pelletier/go-toml/v2 v2.2.4 // indirect
151152
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
152-
github.com/prometheus/client_model v0.6.2 // indirect
153153
github.com/prometheus/procfs v0.20.1 // indirect
154154
github.com/quic-go/qpack v0.6.0 // indirect
155155
github.com/quic-go/quic-go v0.59.1 // indirect

pkg/scheduler/actions/common/solvers/job_solver.go

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/actions/utils"
12+
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api"
1213
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api/node_info"
1314
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api/pod_info"
1415
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api/podgroup_info"
@@ -87,12 +88,22 @@ func (s *JobSolver) Solve(
8788
func (s *JobSolver) SolveWithResult(
8889
ssn *framework.Session, pendingJob *podgroup_info.PodGroupInfo,
8990
) (solved bool, statement *framework.Statement, victimTaskNames []string, searchResult *SearchResult) {
91+
defer func() {
92+
if searchResult != nil {
93+
metrics.IncScenarioSearchJobs(
94+
s.actionType, searchResult.scenarioSearchMetricResult(), searchResult.ReducedBudget(),
95+
)
96+
}
97+
}()
98+
9099
originalNumActiveTasks := pendingJob.GetNumActiveUsedTasks()
91100

92101
tasksToAllocate := podgroup_info.GetTasksToAllocate(pendingJob, ssn.SubGroupOrderFn, ssn.TaskOrderFn, false)
93102
n := len(tasksToAllocate)
94103
if n == 0 {
95-
return false, nil, nil, terminalSearchResult(SearchResultGeneratorsExhausted, false)
104+
searchResult := terminalSearchResult(SearchResultGeneratorsExhausted, false)
105+
searchResult.metricResult = string(SearchResultNotAttempted)
106+
return false, nil, nil, searchResult
96107
}
97108

98109
jobBudget := s.actionBudget.BeginJob()
@@ -316,27 +327,60 @@ func (s *JobSolver) solvePartialJob(
316327

317328
for {
318329
if jobBudget.Exhausted() {
330+
s.observeActionBudgetExhausted()
319331
return terminalSearchResult(SearchResultDeadlineExhausted, jobBudget.ReducedBudget())
320332
}
321333
scenarioToSolve := portfolio.Next()
322334
if scenarioToSolve == nil {
323335
break
324336
}
325-
scenarioSolver := newByPodSolver(feasibleNodeMap, s.solutionValidator, ssn.AllowConsolidatingReclaim(),
337+
generatorName := portfolio.CurrentGeneratorName()
338+
validatorRejected := false
339+
scenarioSolver := newByPodSolver(feasibleNodeMap, s.solutionValidatorWithMetrics(generatorName, &validatorRejected),
340+
ssn.AllowConsolidatingReclaim(),
326341
s.actionType)
327342

328343
log.InfraLogger.V(5).Infof("Trying to solve scenario: %s", scenarioToSolve)
329344
metrics.IncScenarioSimulatedByAction()
345+
metrics.IncScenarioSearchScenario(s.actionType, generatorName, "simulated")
330346

331347
result := scenarioSolver.solve(ssn, scenarioToSolve)
348+
attemptResult := scenarioSearchResultUnsolved
349+
if validatorRejected {
350+
attemptResult = scenarioSearchResultValidatorRejected
351+
}
332352
if result.solved {
353+
portfolio.ObserveCurrentAttempt(string(SearchResultSolved))
333354
return solvedSearchResult(result, jobBudget.ReducedBudget())
334355
}
356+
portfolio.ObserveCurrentAttempt(attemptResult)
335357
}
336358

337359
return terminalSearchResult(portfolio.StopReason(), jobBudget.ReducedBudget())
338360
}
339361

362+
func (s *JobSolver) observeActionBudgetExhausted() {
363+
if s.actionBudget != nil && s.actionBudget.Exhausted() {
364+
metrics.IncScenarioSearchActionBudgetExhausted(s.actionType)
365+
}
366+
}
367+
368+
func (s *JobSolver) solutionValidatorWithMetrics(generator string, rejected *bool) SolutionValidator {
369+
if s.solutionValidator == nil {
370+
return nil
371+
}
372+
return func(scenario api.ScenarioInfo) bool {
373+
valid := s.solutionValidator(scenario)
374+
if !valid {
375+
if rejected != nil {
376+
*rejected = true
377+
}
378+
metrics.IncScenarioSearchScenario(s.actionType, generator, "validator_rejected")
379+
}
380+
return valid
381+
}
382+
}
383+
340384
func shouldStopSearch(result *SearchResult) bool {
341385
switch result.Reason() {
342386
case SearchResultDeadlineExhausted, SearchResultNotAttempted, SearchResultNoGenerator:

pkg/scheduler/actions/common/solvers/job_solver_result_test.go

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"testing"
99
"time"
1010

11+
"github.com/prometheus/client_golang/prometheus"
12+
dto "github.com/prometheus/client_model/go"
1113
"github.com/stretchr/testify/require"
1214
v1 "k8s.io/api/core/v1"
1315
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -55,6 +57,22 @@ func TestSolveWithResultReturnsTerminalResultWhenNoTasksToAllocate(t *testing.T)
5557
require.False(t, result.ReducedBudget())
5658
}
5759

60+
func TestSolveWithResultRecordsNoSearchMetricAsNotAttempted(t *testing.T) {
61+
labels := map[string]string{
62+
"action": "reclaim",
63+
"result": string(SearchResultNotAttempted),
64+
"reduced_budget": "false",
65+
}
66+
before := scenarioSearchCounterValue(t, "scenario_search_jobs_total", labels)
67+
solver := NewJobsSolver(nil, nil, nil, framework.Reclaim, nil)
68+
pendingJob := podgroup_info.NewPodGroupInfo("pending-job")
69+
70+
_, _, _, result := solver.SolveWithResult(&framework.Session{}, pendingJob)
71+
72+
require.Equal(t, SearchResultGeneratorsExhausted, result.Reason())
73+
require.Equal(t, before+1, scenarioSearchCounterValue(t, "scenario_search_jobs_total", labels))
74+
}
75+
5876
func TestSolveWithResultReturnsNoGeneratorWhenGeneratorFuncIsNil(t *testing.T) {
5977
ssn, pendingJob := newJobSolverResultTestSession(t, 1)
6078
solver := NewJobsSolver(nil, nil, nil, framework.Reclaim, nil)
@@ -159,6 +177,65 @@ func TestSolveWithResultReportsDeadlineWhenBudgetExhaustsDuringScenarioSearch(t
159177
require.Equal(t, SearchResultDeadlineExhausted, result.Reason())
160178
}
161179

180+
func TestSolveWithResultRecordsGeneratorExhaustedMetricAfterGeneratorAttempt(t *testing.T) {
181+
labels := map[string]string{
182+
"action": "reclaim",
183+
"result": string(SearchResultGeneratorsExhausted),
184+
"reduced_budget": "false",
185+
}
186+
before := scenarioSearchCounterValue(t, "scenario_search_jobs_total", labels)
187+
ssn, pendingJob := newJobSolverResultTestSession(t, 1)
188+
ssn.AddScenarioGenerator("empty", portfolioTestFactory(&portfolioTestGenerator{name: "empty"}), framework.Reclaim)
189+
solver := NewJobsSolver(
190+
nil,
191+
nil,
192+
func() *utils.JobsOrderByQueues {
193+
return utils.GetVictimsQueue(ssn, nil)
194+
},
195+
framework.Reclaim,
196+
nil,
197+
)
198+
199+
_, _, _, result := solver.SolveWithResult(ssn, pendingJob)
200+
201+
require.Equal(t, SearchResultGeneratorsExhausted, result.Reason())
202+
require.Equal(t, before+1, scenarioSearchCounterValue(t, "scenario_search_jobs_total", labels))
203+
}
204+
205+
func TestSolveWithResultRecordsUnsolvedScenarioDurationAfterSimulation(t *testing.T) {
206+
generatorName := "test-unsolved-duration"
207+
labels := map[string]string{
208+
"action": "reclaim",
209+
"generator": generatorName,
210+
"result": scenarioSearchResultUnsolved,
211+
}
212+
before := scenarioSearchHistogramCount(t, "scenario_search_duration_seconds", labels)
213+
ssn, pendingJob := newJobSolverResultTestSession(t, 1)
214+
ssn.ClusterInfo.Nodes = map[string]*node_info.NodeInfo{"node-1": {}}
215+
scenarioToSolve := scenario.NewByNodeScenario(
216+
ssn, pendingJob,
217+
podgroup_info.GetTasksToAllocate(pendingJob, ssn.SubGroupOrderFn, ssn.TaskOrderFn, false),
218+
nil, nil,
219+
)
220+
ssn.AddScenarioGenerator(generatorName, portfolioTestFactory(&portfolioTestGenerator{
221+
name: generatorName,
222+
scenarios: []api.ScenarioInfo{scenarioToSolve},
223+
}), framework.Reclaim)
224+
solver := NewJobsSolver(
225+
nil,
226+
nil,
227+
func() *utils.JobsOrderByQueues {
228+
return utils.GetVictimsQueue(ssn, nil)
229+
},
230+
framework.Reclaim,
231+
nil,
232+
)
233+
234+
solver.SolveWithResult(ssn, pendingJob)
235+
236+
require.Equal(t, before+1, scenarioSearchHistogramCount(t, "scenario_search_duration_seconds", labels))
237+
}
238+
162239
func TestSolveWithResultRunsCompletePartialSearchForOneGeneratorBeforeNext(t *testing.T) {
163240
ssn := newGeneratorTestSession(t, map[string]int{
164241
"node-1": 1,
@@ -276,3 +353,54 @@ func newJobSolverResultTestSession(t *testing.T, tasksCount int) (*framework.Ses
276353
},
277354
}, pendingJob
278355
}
356+
357+
func scenarioSearchCounterValue(t *testing.T, metricName string, labels map[string]string) float64 {
358+
t.Helper()
359+
360+
metric := scenarioSearchMetric(t, metricName, labels)
361+
if metric == nil || metric.GetCounter() == nil {
362+
return 0
363+
}
364+
return metric.GetCounter().GetValue()
365+
}
366+
367+
func scenarioSearchHistogramCount(t *testing.T, metricName string, labels map[string]string) uint64 {
368+
t.Helper()
369+
370+
metric := scenarioSearchMetric(t, metricName, labels)
371+
if metric == nil || metric.GetHistogram() == nil {
372+
return 0
373+
}
374+
return metric.GetHistogram().GetSampleCount()
375+
}
376+
377+
func scenarioSearchMetric(t *testing.T, metricName string, labels map[string]string) *dto.Metric {
378+
t.Helper()
379+
380+
families, err := prometheus.DefaultGatherer.Gather()
381+
require.NoError(t, err)
382+
for _, family := range families {
383+
if family.GetName() != metricName {
384+
continue
385+
}
386+
for _, metric := range family.GetMetric() {
387+
if scenarioSearchMetricHasLabels(metric, labels) {
388+
return metric
389+
}
390+
}
391+
}
392+
return nil
393+
}
394+
395+
func scenarioSearchMetricHasLabels(metric *dto.Metric, labels map[string]string) bool {
396+
if len(metric.GetLabel()) != len(labels) {
397+
return false
398+
}
399+
for _, label := range metric.GetLabel() {
400+
expectedValue, found := labels[label.GetName()]
401+
if !found || expectedValue != label.GetValue() {
402+
return false
403+
}
404+
}
405+
return true
406+
}

pkg/scheduler/actions/common/solvers/scenario_generator.go

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,21 @@
44
package solvers
55

66
import (
7+
"time"
8+
79
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/actions/common/solvers/scenario"
810
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/actions/utils"
911
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api/node_info"
1012
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api/pod_info"
1113
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api/podgroup_info"
1214
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/framework"
1315
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/log"
16+
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/metrics"
1417
)
1518

19+
const scenarioSearchResultUnsolved = "unsolved"
20+
const scenarioSearchResultValidatorRejected = "validator_rejected"
21+
1622
type SolveContext struct {
1723
Session *framework.Session
1824
ActionType framework.ActionType
@@ -30,12 +36,14 @@ func (ctx *SolveContext) Action() framework.ActionType {
3036
}
3137

3238
type scenarioPortfolio struct {
33-
ctx *SolveContext
34-
generators []framework.ScenarioGenerator
35-
jobBudget *jobSearchBudget
36-
currentIndex int
37-
currentBudget *generatorSearchBudget
38-
stopReason SearchResultReason
39+
ctx *SolveContext
40+
generators []framework.ScenarioGenerator
41+
jobBudget *jobSearchBudget
42+
currentIndex int
43+
currentBudget *generatorSearchBudget
44+
currentName string
45+
currentStartedAt time.Time
46+
stopReason SearchResultReason
3947
}
4048

4149
func newScenarioPortfolio(ctx *SolveContext, jobBudget *jobSearchBudget) *scenarioPortfolio {
@@ -113,24 +121,46 @@ func (p *scenarioPortfolio) Next() *scenario.ByNodeScenario {
113121
continue
114122
}
115123

124+
generatorName := generator.Name()
125+
attemptStartedAt := time.Now()
116126
sn := generator.Next()
117127
if sn == nil {
128+
p.observeGeneratorAttempt(generatorName, string(SearchResultGeneratorsExhausted), attemptStartedAt)
118129
p.moveToNextGenerator()
119130
continue
120131
}
121132
byNodeScenario, ok := sn.(*scenario.ByNodeScenario)
122133
if !ok {
134+
p.observeGeneratorAttempt(generatorName, "unsupported", attemptStartedAt)
123135
log.InfraLogger.V(4).Infof(
124136
"Scenario generator <%s> returned unsupported scenario type %T",
125-
generator.Name(), sn,
137+
generatorName, sn,
126138
)
127139
p.moveToNextGenerator()
128140
continue
129141
}
142+
p.currentName = generatorName
143+
p.currentStartedAt = attemptStartedAt
144+
metrics.IncScenarioSearchScenario(p.ctx.ActionType, generatorName, "emitted")
130145
return byNodeScenario
131146
}
132147
}
133148

149+
func (p *scenarioPortfolio) CurrentGeneratorName() string {
150+
if p == nil {
151+
return ""
152+
}
153+
return p.currentName
154+
}
155+
156+
func (p *scenarioPortfolio) ObserveCurrentAttempt(result string) {
157+
if p == nil || p.currentStartedAt.IsZero() {
158+
return
159+
}
160+
p.observeGeneratorAttempt(p.currentName, result, p.currentStartedAt)
161+
p.currentStartedAt = time.Time{}
162+
}
163+
134164
func (p *scenarioPortfolio) StopReason() SearchResultReason {
135165
if p == nil {
136166
return SearchResultNoGenerator
@@ -148,6 +178,15 @@ func (p *scenarioPortfolio) currentGenerator() framework.ScenarioGenerator {
148178
func (p *scenarioPortfolio) moveToNextGenerator() {
149179
p.currentIndex++
150180
p.currentBudget = nil
181+
p.currentName = ""
182+
p.currentStartedAt = time.Time{}
183+
}
184+
185+
func (p *scenarioPortfolio) observeGeneratorAttempt(generator string, result string, startedAt time.Time) {
186+
if p == nil || p.ctx == nil {
187+
return
188+
}
189+
metrics.ObserveScenarioSearchDuration(p.ctx.ActionType, generator, result, time.Since(startedAt))
151190
}
152191

153192
// ValidateScenarioGeneratorContext extracts the solver context required by scenario generator plugins.

pkg/scheduler/actions/common/solvers/search_budget.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
kaiv1 "github.com/kai-scheduler/KAI-scheduler/pkg/apis/kai/v1"
1313
"github.com/kai-scheduler/KAI-scheduler/pkg/common/constants"
1414
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/framework"
15+
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/metrics"
1516
)
1617

1718
const unlimitedRemaining = time.Duration(1<<63 - 1)
@@ -80,6 +81,11 @@ func newActionSearchBudgetWithClock(
8081
if err != nil {
8182
return nil, err
8283
}
84+
metrics.SetScenarioSearchActionBudget(action, actionLimit)
85+
metrics.SetScenarioSearchJobBudget(jobLimit)
86+
for generator, limit := range generatorLimits {
87+
metrics.SetScenarioSearchGeneratorBudget(generator, limit)
88+
}
8389

8490
return &ActionSearchBudget{
8591
action: action,

0 commit comments

Comments
 (0)