Skip to content

Commit 1fc0eb5

Browse files
committed
feat(scheduler): add scenario search metrics
Signed-off-by: Erez Freiberger <enoodle@gmail.com>
1 parent 5e8640d commit 1fc0eb5

11 files changed

Lines changed: 576 additions & 53 deletions

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
77
## [Unreleased]
88

99
### Added
10-
- Added built-in `NodeLocalGreedy` and `MultiNodeGang` scenario generator implementations for bounded reclaim, preempt, and consolidation search.
10+
- Added a bounded scenario generator portfolio for reclaim, preempt, and consolidation search, with `SchedulingShard.spec.scenarioSearchBudgets` time-budget configuration and production scenario-search metrics.
1111
- Added an opt-in `deviceaccess` admission plugin (`--block-nvidia-visible-devices`, config field `admission.blockNvidiaVisibleDevices`, default disabled) that (1) rejects pods overriding the `NVIDIA_VISIBLE_DEVICES` environment variable with values other than `void`/`none` (or via a `valueFrom` reference), and (2) injects `NVIDIA_VISIBLE_DEVICES=void` into containers that do not request a GPU, blocking their access to GPUs on the node.
1212
- Added support for configuring admission Pod Disruption Budget via Helm values (`admission.podDisruptionBudget`) [#1490](https://github.com/kai-scheduler/KAI-Scheduler/pull/1490) [dttung2905](https://github.com/dttung2905)
1313
- Added an opt-in `hamicore` binder plugin (depends on `gpusharing`) to write the HAMI-core GPU memory limit (`CUDA_DEVICE_MEMORY_LIMIT`) for fractional GPU pods.

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ require (
2424
github.com/pkg/errors v0.9.1
2525
github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.88.0
2626
github.com/prometheus/client_golang v1.23.2
27+
github.com/prometheus/client_model v0.6.2
2728
github.com/prometheus/common v0.67.5
2829
github.com/ray-project/kuberay/ray-operator v1.5.1
2930
github.com/run-ai/kwok-operator v0.0.0-20240926063032-05b6364bc7c7
@@ -149,7 +150,6 @@ require (
149150
github.com/opencontainers/selinux v1.13.0 // indirect
150151
github.com/pelletier/go-toml/v2 v2.2.4 // indirect
151152
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
152-
github.com/prometheus/client_model v0.6.2 // indirect
153153
github.com/prometheus/procfs v0.20.1 // indirect
154154
github.com/quic-go/qpack v0.6.0 // indirect
155155
github.com/quic-go/quic-go v0.59.1 // indirect

pkg/scheduler/actions/common/solvers/job_solver.go

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/actions/utils"
12+
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api"
1213
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api/node_info"
1314
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api/pod_info"
1415
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api/podgroup_info"
@@ -95,12 +96,22 @@ func (s *JobSolver) Solve(
9596
func (s *JobSolver) SolveWithResult(
9697
ssn *framework.Session, pendingJob *podgroup_info.PodGroupInfo,
9798
) (solved bool, statement *framework.Statement, victimTaskNames []string, searchResult *SearchResult) {
99+
defer func() {
100+
if searchResult != nil {
101+
metrics.IncScenarioSearchJobs(
102+
s.actionType, searchResult.scenarioSearchMetricResult(), searchResult.ReducedBudget(),
103+
)
104+
}
105+
}()
106+
98107
originalNumActiveTasks := pendingJob.GetNumActiveUsedTasks()
99108

100109
tasksToAllocate := podgroup_info.GetTasksToAllocate(pendingJob, ssn.SubGroupOrderFn, ssn.TaskOrderFn, false)
101110
n := len(tasksToAllocate)
102111
if n == 0 {
103-
return false, nil, nil, terminalSearchResult(SearchResultGeneratorsExhausted, false, false)
112+
searchResult := terminalSearchResult(SearchResultGeneratorsExhausted, false, false)
113+
searchResult.metricResult = string(SearchResultNotAttempted)
114+
return false, nil, nil, searchResult
104115
}
105116

106117
actionBudget := s.ensureActionBudget()
@@ -336,35 +347,73 @@ func (s *JobSolver) solvePartialJob(
336347
portfolio := newSingleGeneratorScenarioPortfolio(solveCtx, jobBudget, registration, generatorBudget)
337348

338349
for {
339-
if actionBudget.Exhausted() || jobBudget.Remaining() <= 0 {
350+
if actionBudget.Exhausted() {
351+
metrics.IncScenarioSearchActionBudgetExhausted(s.actionType)
352+
return terminalSearchResult(
353+
SearchResultDeadlineExhausted, jobBudget.ReducedBudget(), portfolio.enteredSearch,
354+
)
355+
}
356+
if jobBudget.Remaining() <= 0 {
340357
return terminalSearchResult(
341358
SearchResultDeadlineExhausted, jobBudget.ReducedBudget(), portfolio.enteredSearch,
342359
)
343360
}
344361
scenarioToSolve := portfolio.Next()
345-
if actionBudget.Exhausted() || jobBudget.Remaining() <= 0 {
362+
if actionBudget.Exhausted() {
363+
metrics.IncScenarioSearchActionBudgetExhausted(s.actionType)
364+
return terminalSearchResult(
365+
SearchResultDeadlineExhausted, jobBudget.ReducedBudget(), portfolio.enteredSearch,
366+
)
367+
}
368+
if jobBudget.Remaining() <= 0 {
346369
return terminalSearchResult(
347370
SearchResultDeadlineExhausted, jobBudget.ReducedBudget(), portfolio.enteredSearch,
348371
)
349372
}
350373
if scenarioToSolve == nil {
351374
break
352375
}
353-
scenarioSolver := newByPodSolver(feasibleNodeMap, s.solutionValidator, ssn.AllowConsolidatingReclaim(),
376+
generatorName := portfolio.CurrentGeneratorName()
377+
validatorRejected := false
378+
scenarioSolver := newByPodSolver(feasibleNodeMap, s.solutionValidatorWithMetrics(generatorName, &validatorRejected),
379+
ssn.AllowConsolidatingReclaim(),
354380
s.actionType)
355381

356382
log.InfraLogger.V(5).Infof("Trying to solve scenario: %s", scenarioToSolve)
357383
metrics.IncScenarioSimulatedByAction()
384+
metrics.IncScenarioSearchScenario(s.actionType, generatorName, "simulated")
358385

359386
result := scenarioSolver.solve(ssn, scenarioToSolve)
387+
attemptResult := scenarioSearchResultUnsolved
388+
if validatorRejected {
389+
attemptResult = scenarioSearchResultValidatorRejected
390+
}
360391
if result.solved {
392+
portfolio.ObserveCurrentAttempt(string(SearchResultSolved))
361393
return solvedSearchResult(result, jobBudget.ReducedBudget())
362394
}
395+
portfolio.ObserveCurrentAttempt(attemptResult)
363396
}
364397

365398
return terminalSearchResult(portfolio.StopReason(), jobBudget.ReducedBudget(), portfolio.enteredSearch)
366399
}
367400

401+
func (s *JobSolver) solutionValidatorWithMetrics(generator string, rejected *bool) SolutionValidator {
402+
if s.solutionValidator == nil {
403+
return nil
404+
}
405+
return func(scenario api.ScenarioInfo) bool {
406+
valid := s.solutionValidator(scenario)
407+
if !valid {
408+
if rejected != nil {
409+
*rejected = true
410+
}
411+
metrics.IncScenarioSearchScenario(s.actionType, generator, "validator_rejected")
412+
}
413+
return valid
414+
}
415+
}
416+
368417
func searchResultEntered(result *SearchResult) bool {
369418
return result != nil && result.EnteredSearch()
370419
}

pkg/scheduler/actions/common/solvers/job_solver_result_test.go

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"testing"
99
"time"
1010

11+
"github.com/prometheus/client_golang/prometheus"
12+
dto "github.com/prometheus/client_model/go"
1113
"github.com/stretchr/testify/require"
1214
v1 "k8s.io/api/core/v1"
1315
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -55,6 +57,23 @@ func TestSolveWithResultReturnsTerminalResultWhenNoTasksToAllocate(t *testing.T)
5557
require.False(t, result.EnteredSearch())
5658
}
5759

60+
func TestSolveWithResultRecordsNoSearchMetricAsNotAttempted(t *testing.T) {
61+
labels := map[string]string{
62+
"action": "reclaim",
63+
"result": string(SearchResultNotAttempted),
64+
"reduced_budget": "false",
65+
}
66+
before := scenarioSearchCounterValue(t, "scenario_search_jobs_total", labels)
67+
solver := NewJobsSolver(nil, nil, nil, framework.Reclaim, nil)
68+
pendingJob := podgroup_info.NewPodGroupInfo("pending-job")
69+
70+
_, _, _, result := solver.SolveWithResult(&framework.Session{}, pendingJob)
71+
72+
require.Equal(t, SearchResultGeneratorsExhausted, result.Reason())
73+
require.False(t, result.EnteredSearch())
74+
require.Equal(t, before+1, scenarioSearchCounterValue(t, "scenario_search_jobs_total", labels))
75+
}
76+
5877
func TestSolveWithResultReturnsNoGeneratorWhenGeneratorFuncIsNil(t *testing.T) {
5978
ssn, pendingJob := newJobSolverResultTestSession(t, 1)
6079
solver := NewJobsSolver(nil, nil, nil, framework.Reclaim, nil)
@@ -91,6 +110,66 @@ func TestSolveWithResultReturnsNoGeneratorWhenGeneratorReturnsNil(t *testing.T)
91110
require.False(t, result.EnteredSearch())
92111
}
93112

113+
func TestSolveWithResultRecordsGeneratorExhaustedMetricAfterGeneratorAttempt(t *testing.T) {
114+
labels := map[string]string{
115+
"action": "reclaim",
116+
"result": string(SearchResultGeneratorsExhausted),
117+
"reduced_budget": "false",
118+
}
119+
before := scenarioSearchCounterValue(t, "scenario_search_jobs_total", labels)
120+
ssn, pendingJob := newJobSolverResultTestSession(t, 1)
121+
ssn.AddScenarioGenerator("empty", portfolioTestFactory(&portfolioTestGenerator{name: "empty"}), framework.Reclaim)
122+
solver := NewJobsSolver(
123+
nil,
124+
nil,
125+
func() *utils.JobsOrderByQueues {
126+
return utils.GetVictimsQueue(ssn, nil)
127+
},
128+
framework.Reclaim,
129+
nil,
130+
)
131+
132+
_, _, _, result := solver.SolveWithResult(ssn, pendingJob)
133+
134+
require.Equal(t, SearchResultGeneratorsExhausted, result.Reason())
135+
require.False(t, result.EnteredSearch())
136+
require.Equal(t, before+1, scenarioSearchCounterValue(t, "scenario_search_jobs_total", labels))
137+
}
138+
139+
func TestSolveWithResultRecordsUnsolvedScenarioDurationAfterSimulation(t *testing.T) {
140+
generatorName := "test-unsolved-duration"
141+
labels := map[string]string{
142+
"action": "reclaim",
143+
"generator": generatorName,
144+
"result": scenarioSearchResultUnsolved,
145+
}
146+
before := scenarioSearchHistogramCount(t, "scenario_search_duration_seconds", labels)
147+
ssn, pendingJob := newJobSolverResultTestSession(t, 1)
148+
ssn.ClusterInfo.Nodes = map[string]*node_info.NodeInfo{"node-1": {}}
149+
scenarioToSolve := scenario.NewByNodeScenario(
150+
ssn, pendingJob,
151+
podgroup_info.GetTasksToAllocate(pendingJob, ssn.SubGroupOrderFn, ssn.TaskOrderFn, false),
152+
nil, nil,
153+
)
154+
ssn.AddScenarioGenerator(generatorName, portfolioTestFactory(&portfolioTestGenerator{
155+
name: generatorName,
156+
scenarios: []api.ScenarioInfo{scenarioToSolve},
157+
}), framework.Reclaim)
158+
solver := NewJobsSolver(
159+
nil,
160+
nil,
161+
func() *utils.JobsOrderByQueues {
162+
return utils.GetVictimsQueue(ssn, nil)
163+
},
164+
framework.Reclaim,
165+
nil,
166+
)
167+
168+
solver.SolveWithResult(ssn, pendingJob)
169+
170+
require.Equal(t, before+1, scenarioSearchHistogramCount(t, "scenario_search_duration_seconds", labels))
171+
}
172+
94173
func TestSearchMaxSolvableKSkipsSingleTaskFullProbe(t *testing.T) {
95174
ssn, pendingJob := newJobSolverResultTestSession(t, 1)
96175
actionBudget := newUnlimitedActionSearchBudget(framework.Reclaim)
@@ -277,3 +356,54 @@ func newJobSolverResultTestSession(t *testing.T, tasksCount int) (*framework.Ses
277356
},
278357
}, pendingJob
279358
}
359+
360+
func scenarioSearchCounterValue(t *testing.T, metricName string, labels map[string]string) float64 {
361+
t.Helper()
362+
363+
metric := scenarioSearchMetric(t, metricName, labels)
364+
if metric == nil || metric.GetCounter() == nil {
365+
return 0
366+
}
367+
return metric.GetCounter().GetValue()
368+
}
369+
370+
func scenarioSearchHistogramCount(t *testing.T, metricName string, labels map[string]string) uint64 {
371+
t.Helper()
372+
373+
metric := scenarioSearchMetric(t, metricName, labels)
374+
if metric == nil || metric.GetHistogram() == nil {
375+
return 0
376+
}
377+
return metric.GetHistogram().GetSampleCount()
378+
}
379+
380+
func scenarioSearchMetric(t *testing.T, metricName string, labels map[string]string) *dto.Metric {
381+
t.Helper()
382+
383+
families, err := prometheus.DefaultGatherer.Gather()
384+
require.NoError(t, err)
385+
for _, family := range families {
386+
if family.GetName() != metricName {
387+
continue
388+
}
389+
for _, metric := range family.GetMetric() {
390+
if scenarioSearchMetricHasLabels(metric, labels) {
391+
return metric
392+
}
393+
}
394+
}
395+
return nil
396+
}
397+
398+
func scenarioSearchMetricHasLabels(metric *dto.Metric, labels map[string]string) bool {
399+
if len(metric.GetLabel()) != len(labels) {
400+
return false
401+
}
402+
for _, label := range metric.GetLabel() {
403+
expectedValue, found := labels[label.GetName()]
404+
if !found || expectedValue != label.GetValue() {
405+
return false
406+
}
407+
}
408+
return true
409+
}

pkg/scheduler/actions/common/solvers/node_local_greedy_generator_test.go

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -545,18 +545,3 @@ func podNamesFromMap(tasks pod_info.PodsMap) []string {
545545
sort.Strings(names)
546546
return names
547547
}
548-
549-
func removeGeneratorTestTasks(tasks []*pod_info.PodInfo, tasksToRemove []*pod_info.PodInfo) []*pod_info.PodInfo {
550-
removeByUID := map[common_info.PodID]struct{}{}
551-
for _, task := range tasksToRemove {
552-
removeByUID[task.UID] = struct{}{}
553-
}
554-
remaining := make([]*pod_info.PodInfo, 0, len(tasks))
555-
for _, task := range tasks {
556-
if _, remove := removeByUID[task.UID]; remove {
557-
continue
558-
}
559-
remaining = append(remaining, task)
560-
}
561-
return remaining
562-
}

0 commit comments

Comments
 (0)