Skip to content

Commit 6117c2e

Browse files
committed
feat(scheduler): add scenario search metrics
Signed-off-by: Erez Freiberger <enoodle@gmail.com>
1 parent d09aac7 commit 6117c2e

11 files changed

Lines changed: 570 additions & 51 deletions

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
77
## [Unreleased]
88

99
### Added
10-
- Added built-in `NodeLocalGreedy` and `MultiNodeGang` scenario generator implementations for bounded reclaim, preempt, and consolidation search.
10+
- Added a bounded scenario generator portfolio for reclaim, preempt, and consolidation search, with `SchedulingShard.spec.scenarioSearchBudgets` time-budget configuration and production scenario-search metrics.
1111
- Added an opt-in `deviceaccess` admission plugin (`--block-nvidia-visible-devices`, config field `admission.blockNvidiaVisibleDevices`, default disabled) that (1) rejects pods overriding the `NVIDIA_VISIBLE_DEVICES` environment variable with values other than `void`/`none` (or via a `valueFrom` reference), and (2) injects `NVIDIA_VISIBLE_DEVICES=void` into containers that do not request a GPU, blocking their access to GPUs on the node.
1212
- Added support for configuring admission Pod Disruption Budget via Helm values (`admission.podDisruptionBudget`) [#1490](https://github.com/kai-scheduler/KAI-Scheduler/pull/1490) [dttung2905](https://github.com/dttung2905)
1313
- Added an opt-in `hamicore` binder plugin (depends on `gpusharing`) to write the HAMI-core GPU memory limit (`CUDA_DEVICE_MEMORY_LIMIT`) for fractional GPU pods.

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ require (
2424
github.com/pkg/errors v0.9.1
2525
github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.88.0
2626
github.com/prometheus/client_golang v1.23.2
27+
github.com/prometheus/client_model v0.6.2
2728
github.com/prometheus/common v0.67.5
2829
github.com/ray-project/kuberay/ray-operator v1.5.1
2930
github.com/run-ai/kwok-operator v0.0.0-20240926063032-05b6364bc7c7
@@ -149,7 +150,6 @@ require (
149150
github.com/opencontainers/selinux v1.13.0 // indirect
150151
github.com/pelletier/go-toml/v2 v2.2.4 // indirect
151152
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
152-
github.com/prometheus/client_model v0.6.2 // indirect
153153
github.com/prometheus/procfs v0.20.1 // indirect
154154
github.com/quic-go/qpack v0.6.0 // indirect
155155
github.com/quic-go/quic-go v0.59.1 // indirect

pkg/scheduler/actions/common/solvers/job_solver.go

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/actions/utils"
12+
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api"
1213
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api/node_info"
1314
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api/pod_info"
1415
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api/podgroup_info"
@@ -94,12 +95,22 @@ func (s *JobSolver) Solve(
9495
func (s *JobSolver) SolveWithResult(
9596
ssn *framework.Session, pendingJob *podgroup_info.PodGroupInfo,
9697
) (solved bool, statement *framework.Statement, victimTaskNames []string, searchResult *SearchResult) {
98+
defer func() {
99+
if searchResult != nil {
100+
metrics.IncScenarioSearchJobs(
101+
s.actionType, searchResult.scenarioSearchMetricResult(), searchResult.ReducedBudget(),
102+
)
103+
}
104+
}()
105+
97106
originalNumActiveTasks := pendingJob.GetNumActiveUsedTasks()
98107

99108
tasksToAllocate := podgroup_info.GetTasksToAllocate(pendingJob, ssn.SubGroupOrderFn, ssn.TaskOrderFn, false)
100109
n := len(tasksToAllocate)
101110
if n == 0 {
102-
return false, nil, nil, terminalSearchResult(SearchResultGeneratorsExhausted, false, false)
111+
searchResult := terminalSearchResult(SearchResultGeneratorsExhausted, false, false)
112+
searchResult.metricResult = string(SearchResultNotAttempted)
113+
return false, nil, nil, searchResult
103114
}
104115

105116
actionBudget := s.ensureActionBudget()
@@ -339,34 +350,68 @@ func (s *JobSolver) solvePartialJob(
339350

340351
for {
341352
if jobBudget.Exhausted() {
353+
s.observeActionBudgetExhausted(jobBudget)
342354
return terminalSearchResult(
343355
SearchResultDeadlineExhausted, jobBudget.ReducedBudget(), portfolio.enteredSearch,
344356
)
345357
}
346358
scenarioToSolve := portfolio.Next()
347359
if jobBudget.Exhausted() {
360+
s.observeActionBudgetExhausted(jobBudget)
348361
return terminalSearchResult(
349362
SearchResultDeadlineExhausted, jobBudget.ReducedBudget(), portfolio.enteredSearch,
350363
)
351364
}
352365
if scenarioToSolve == nil {
353366
break
354367
}
355-
scenarioSolver := newByPodSolver(feasibleNodeMap, s.solutionValidator, ssn.AllowConsolidatingReclaim(),
368+
generatorName := portfolio.CurrentGeneratorName()
369+
validatorRejected := false
370+
scenarioSolver := newByPodSolver(feasibleNodeMap, s.solutionValidatorWithMetrics(generatorName, &validatorRejected),
371+
ssn.AllowConsolidatingReclaim(),
356372
s.actionType)
357373

358374
log.InfraLogger.V(5).Infof("Trying to solve scenario: %s", scenarioToSolve)
359375
metrics.IncScenarioSimulatedByAction()
376+
metrics.IncScenarioSearchScenario(s.actionType, generatorName, "simulated")
360377

361378
result := scenarioSolver.solve(ssn, scenarioToSolve)
379+
attemptResult := scenarioSearchResultUnsolved
380+
if validatorRejected {
381+
attemptResult = scenarioSearchResultValidatorRejected
382+
}
362383
if result.solved {
384+
portfolio.ObserveCurrentAttempt(string(SearchResultSolved))
363385
return solvedSearchResult(result, jobBudget.ReducedBudget())
364386
}
387+
portfolio.ObserveCurrentAttempt(attemptResult)
365388
}
366389

367390
return terminalSearchResult(portfolio.StopReason(), jobBudget.ReducedBudget(), portfolio.enteredSearch)
368391
}
369392

393+
func (s *JobSolver) observeActionBudgetExhausted(jobBudget *jobSearchBudget) {
394+
if jobBudget != nil && jobBudget.actionBudget != nil && jobBudget.actionBudget.Exhausted() {
395+
metrics.IncScenarioSearchActionBudgetExhausted(s.actionType)
396+
}
397+
}
398+
399+
func (s *JobSolver) solutionValidatorWithMetrics(generator string, rejected *bool) SolutionValidator {
400+
if s.solutionValidator == nil {
401+
return nil
402+
}
403+
return func(scenario api.ScenarioInfo) bool {
404+
valid := s.solutionValidator(scenario)
405+
if !valid {
406+
if rejected != nil {
407+
*rejected = true
408+
}
409+
metrics.IncScenarioSearchScenario(s.actionType, generator, "validator_rejected")
410+
}
411+
return valid
412+
}
413+
}
414+
370415
func searchResultEntered(result *SearchResult) bool {
371416
return result != nil && result.EnteredSearch()
372417
}

pkg/scheduler/actions/common/solvers/job_solver_result_test.go

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"testing"
99
"time"
1010

11+
"github.com/prometheus/client_golang/prometheus"
12+
dto "github.com/prometheus/client_model/go"
1113
"github.com/stretchr/testify/require"
1214
v1 "k8s.io/api/core/v1"
1315
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -55,6 +57,23 @@ func TestSolveWithResultReturnsTerminalResultWhenNoTasksToAllocate(t *testing.T)
5557
require.False(t, result.EnteredSearch())
5658
}
5759

60+
func TestSolveWithResultRecordsNoSearchMetricAsNotAttempted(t *testing.T) {
61+
labels := map[string]string{
62+
"action": "reclaim",
63+
"result": string(SearchResultNotAttempted),
64+
"reduced_budget": "false",
65+
}
66+
before := scenarioSearchCounterValue(t, "scenario_search_jobs_total", labels)
67+
solver := NewJobsSolver(nil, nil, nil, framework.Reclaim, nil)
68+
pendingJob := podgroup_info.NewPodGroupInfo("pending-job")
69+
70+
_, _, _, result := solver.SolveWithResult(&framework.Session{}, pendingJob)
71+
72+
require.Equal(t, SearchResultGeneratorsExhausted, result.Reason())
73+
require.False(t, result.EnteredSearch())
74+
require.Equal(t, before+1, scenarioSearchCounterValue(t, "scenario_search_jobs_total", labels))
75+
}
76+
5877
func TestSolveWithResultReturnsNoGeneratorWhenGeneratorFuncIsNil(t *testing.T) {
5978
ssn, pendingJob := newJobSolverResultTestSession(t, 1)
6079
solver := NewJobsSolver(nil, nil, nil, framework.Reclaim, nil)
@@ -119,6 +138,66 @@ func TestSolveWithResultUsesMinJobBudgetAfterActionBudgetExpired(t *testing.T) {
119138
require.False(t, result.EnteredSearch())
120139
}
121140

141+
func TestSolveWithResultRecordsGeneratorExhaustedMetricAfterGeneratorAttempt(t *testing.T) {
142+
labels := map[string]string{
143+
"action": "reclaim",
144+
"result": string(SearchResultGeneratorsExhausted),
145+
"reduced_budget": "false",
146+
}
147+
before := scenarioSearchCounterValue(t, "scenario_search_jobs_total", labels)
148+
ssn, pendingJob := newJobSolverResultTestSession(t, 1)
149+
ssn.AddScenarioGenerator("empty", portfolioTestFactory(&portfolioTestGenerator{name: "empty"}), framework.Reclaim)
150+
solver := NewJobsSolver(
151+
nil,
152+
nil,
153+
func() *utils.JobsOrderByQueues {
154+
return utils.GetVictimsQueue(ssn, nil)
155+
},
156+
framework.Reclaim,
157+
nil,
158+
)
159+
160+
_, _, _, result := solver.SolveWithResult(ssn, pendingJob)
161+
162+
require.Equal(t, SearchResultGeneratorsExhausted, result.Reason())
163+
require.False(t, result.EnteredSearch())
164+
require.Equal(t, before+1, scenarioSearchCounterValue(t, "scenario_search_jobs_total", labels))
165+
}
166+
167+
func TestSolveWithResultRecordsUnsolvedScenarioDurationAfterSimulation(t *testing.T) {
168+
generatorName := "test-unsolved-duration"
169+
labels := map[string]string{
170+
"action": "reclaim",
171+
"generator": generatorName,
172+
"result": scenarioSearchResultUnsolved,
173+
}
174+
before := scenarioSearchHistogramCount(t, "scenario_search_duration_seconds", labels)
175+
ssn, pendingJob := newJobSolverResultTestSession(t, 1)
176+
ssn.ClusterInfo.Nodes = map[string]*node_info.NodeInfo{"node-1": {}}
177+
scenarioToSolve := scenario.NewByNodeScenario(
178+
ssn, pendingJob,
179+
podgroup_info.GetTasksToAllocate(pendingJob, ssn.SubGroupOrderFn, ssn.TaskOrderFn, false),
180+
nil, nil,
181+
)
182+
ssn.AddScenarioGenerator(generatorName, portfolioTestFactory(&portfolioTestGenerator{
183+
name: generatorName,
184+
scenarios: []api.ScenarioInfo{scenarioToSolve},
185+
}), framework.Reclaim)
186+
solver := NewJobsSolver(
187+
nil,
188+
nil,
189+
func() *utils.JobsOrderByQueues {
190+
return utils.GetVictimsQueue(ssn, nil)
191+
},
192+
framework.Reclaim,
193+
nil,
194+
)
195+
196+
solver.SolveWithResult(ssn, pendingJob)
197+
198+
require.Equal(t, before+1, scenarioSearchHistogramCount(t, "scenario_search_duration_seconds", labels))
199+
}
200+
122201
func TestSolveWithResultRunsCompletePartialSearchForOneGeneratorBeforeNext(t *testing.T) {
123202
ssn := newGeneratorTestSession(t, map[string]int{
124203
"node-1": 1,
@@ -280,3 +359,54 @@ func newJobSolverResultTestSession(t *testing.T, tasksCount int) (*framework.Ses
280359
},
281360
}, pendingJob
282361
}
362+
363+
func scenarioSearchCounterValue(t *testing.T, metricName string, labels map[string]string) float64 {
364+
t.Helper()
365+
366+
metric := scenarioSearchMetric(t, metricName, labels)
367+
if metric == nil || metric.GetCounter() == nil {
368+
return 0
369+
}
370+
return metric.GetCounter().GetValue()
371+
}
372+
373+
func scenarioSearchHistogramCount(t *testing.T, metricName string, labels map[string]string) uint64 {
374+
t.Helper()
375+
376+
metric := scenarioSearchMetric(t, metricName, labels)
377+
if metric == nil || metric.GetHistogram() == nil {
378+
return 0
379+
}
380+
return metric.GetHistogram().GetSampleCount()
381+
}
382+
383+
func scenarioSearchMetric(t *testing.T, metricName string, labels map[string]string) *dto.Metric {
384+
t.Helper()
385+
386+
families, err := prometheus.DefaultGatherer.Gather()
387+
require.NoError(t, err)
388+
for _, family := range families {
389+
if family.GetName() != metricName {
390+
continue
391+
}
392+
for _, metric := range family.GetMetric() {
393+
if scenarioSearchMetricHasLabels(metric, labels) {
394+
return metric
395+
}
396+
}
397+
}
398+
return nil
399+
}
400+
401+
func scenarioSearchMetricHasLabels(metric *dto.Metric, labels map[string]string) bool {
402+
if len(metric.GetLabel()) != len(labels) {
403+
return false
404+
}
405+
for _, label := range metric.GetLabel() {
406+
expectedValue, found := labels[label.GetName()]
407+
if !found || expectedValue != label.GetValue() {
408+
return false
409+
}
410+
}
411+
return true
412+
}

pkg/scheduler/actions/common/solvers/node_local_greedy_generator_test.go

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -545,18 +545,3 @@ func podNamesFromMap(tasks pod_info.PodsMap) []string {
545545
sort.Strings(names)
546546
return names
547547
}
548-
549-
func removeGeneratorTestTasks(tasks []*pod_info.PodInfo, tasksToRemove []*pod_info.PodInfo) []*pod_info.PodInfo {
550-
removeByUID := map[common_info.PodID]struct{}{}
551-
for _, task := range tasksToRemove {
552-
removeByUID[task.UID] = struct{}{}
553-
}
554-
remaining := make([]*pod_info.PodInfo, 0, len(tasks))
555-
for _, task := range tasks {
556-
if _, remove := removeByUID[task.UID]; remove {
557-
continue
558-
}
559-
remaining = append(remaining, task)
560-
}
561-
return remaining
562-
}

0 commit comments

Comments
 (0)