Skip to content

Commit 4d7cb9d

Browse files
authored
feat(scheduler): add scenario search metrics (#1746)
Signed-off-by: Erez Freiberger <enoodle@gmail.com>
1 parent a0c8601 commit 4d7cb9d

10 files changed

Lines changed: 527 additions & 35 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
77
## [Unreleased]
88

99
### Added
10-
- Added built-in `NodeLocalGreedy` and `MultiNodeGang` scenario generator implementations for bounded reclaim, preempt, and consolidation search.
10+
- Added a bounded scenario generator portfolio for reclaim, preempt, and consolidation search, with `SchedulingShard.spec.scenarioSearchBudgets` time-budget configuration and production scenario-search metrics.
1111
- Added an opt-in `deviceaccess` admission plugin (`--block-nvidia-visible-devices`, config field `admission.blockNvidiaVisibleDevices`, default disabled) that (1) rejects pods overriding the `NVIDIA_VISIBLE_DEVICES` environment variable with values other than `void`/`none` (or via a `valueFrom` reference), and (2) injects `NVIDIA_VISIBLE_DEVICES=void` into containers that do not request a GPU, blocking their access to GPUs on the node.
1212
- Added support for configuring admission Pod Disruption Budget via Helm values (`admission.podDisruptionBudget`) [#1490](https://github.com/kai-scheduler/KAI-Scheduler/pull/1490) [dttung2905](https://github.com/dttung2905)
1313
- Added an opt-in `hamicore` binder plugin (depends on `gpusharing`) to write the HAMI-core GPU memory limit (`CUDA_DEVICE_MEMORY_LIMIT`) for fractional GPU pods.

docs/metrics/METRICS.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,13 @@ Metrics related to the core scheduling algorithm performance, task lifecycle, an
5959
| `scenarios_filtered_by_action` | Counter | `endpoint`, `instance`, `job`, `namespace`, `pod`, `service`, `action` | Cumulative count of simulation scenarios filtered/rejected by each action. |
6060
| `total_preemption_attempts` | Counter | `endpoint`, `instance`, `job`, `namespace`, `pod`, `service` | Cumulative total of preemption attempts across the entire cluster lifetime. |
6161
| `pod_group_evicted_pods_total` | Counter | `endpoint`, `instance`, `job`, `namespace`, `pod`, `service`, `podgroup`, `uid`, `nodepool`, `action` | Cumulative count of pods evicted per pod group, tracked by nodepool and action. |
62+
| `scenario_search_jobs_total` | Counter | `endpoint`, `instance`, `job`, `namespace`, `pod`, `service`, `action`, `result`, `reduced_budget` | Cumulative count of jobs considered by bounded scenario search, grouped by scheduling action, terminal search result, and whether the job ran after the action budget was reduced. |
63+
| `scenario_search_action_budget_configured_seconds` | Gauge | `endpoint`, `instance`, `job`, `namespace`, `pod`, `service`, `action` | Configured scenario-search budget for each scheduling action in seconds. A value of 0 means unlimited. |
64+
| `scenario_search_job_budget_configured_seconds` | Gauge | `endpoint`, `instance`, `job`, `namespace`, `pod`, `service` | Configured per-job scenario-search budget in seconds. A value of 0 means unlimited. |
65+
| `scenario_search_generator_budget_configured_seconds` | Gauge | `endpoint`, `instance`, `job`, `namespace`, `pod`, `service`, `generator` | Configured per-generator scenario-search budget in seconds. A value of 0 means unlimited. |
66+
| `scenario_search_action_budget_exhausted_total` | Counter | `endpoint`, `instance`, `job`, `namespace`, `pod`, `service`, `action` | Cumulative count of action-level scenario-search budget exhaustion events. |
67+
| `scenario_search_duration_seconds` | Histogram | `endpoint`, `instance`, `job`, `namespace`, `pod`, `service`, `action`, `generator`, `result` | Duration in seconds of generator scenario-search attempts. Buckets: [1ms, 2ms, 4ms, ..., 32.768s] (exponential). |
68+
| `scenario_search_scenarios_total` | Counter | `endpoint`, `instance`, `job`, `namespace`, `pod`, `service`, `action`, `generator`, `state` | Cumulative count of bounded-search scenarios emitted by generators, simulated by the solver, or rejected by validation. |
6269

6370
### Queue Fair-Share & Usage Metrics
6471

@@ -88,6 +95,10 @@ Business/Resource Labels:
8895
- **`queue_metadata_name`**: The Queue resource's `metadata.name`. Always populated.
8996
- **`queue_display_name`**: The Queue's `spec.displayName`. Empty string when unset.
9097
- **`action`**: Scheduling action name
98+
- **`generator`**: Scenario generator name
99+
- **`result`**: Scenario search result (`solved`, `deadline_exhausted`, `generators_exhausted`, `no_generator`, `not_attempted`, `unsolved`, or `validator_rejected`, depending on the metric)
100+
- **`reduced_budget`**: Whether the scenario search ran after the action budget was reduced (`true` or `false`)
101+
- **`state`**: Scenario lifecycle state (`emitted`, `simulated`, or `validator_rejected`)
91102
- **`plugin`**: Plugin name
92103
- **`OnSession`**: Session lifecycle phase (`OnSessionOpen` or `OnSessionClose`)
93104
- **`podgroup`**: PodGroup resource identifier

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ require (
2424
github.com/pkg/errors v0.9.1
2525
github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.88.0
2626
github.com/prometheus/client_golang v1.23.2
27+
github.com/prometheus/client_model v0.6.2
2728
github.com/prometheus/common v0.67.5
2829
github.com/ray-project/kuberay/ray-operator v1.5.1
2930
github.com/run-ai/kwok-operator v0.0.0-20240926063032-05b6364bc7c7
@@ -149,7 +150,6 @@ require (
149150
github.com/opencontainers/selinux v1.13.0 // indirect
150151
github.com/pelletier/go-toml/v2 v2.2.4 // indirect
151152
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
152-
github.com/prometheus/client_model v0.6.2 // indirect
153153
github.com/prometheus/procfs v0.20.1 // indirect
154154
github.com/quic-go/qpack v0.6.0 // indirect
155155
github.com/quic-go/quic-go v0.59.1 // indirect

pkg/scheduler/actions/common/solvers/job_solver.go

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/actions/utils"
12+
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api"
1213
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api/node_info"
1314
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api/pod_info"
1415
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api/podgroup_info"
@@ -84,12 +85,22 @@ func (s *JobSolver) Solve(
8485
func (s *JobSolver) SolveWithResult(
8586
ssn *framework.Session, pendingJob *podgroup_info.PodGroupInfo,
8687
) (solved bool, statement *framework.Statement, victimTaskNames []string, searchResult *SearchResult) {
88+
defer func() {
89+
if searchResult != nil {
90+
metrics.IncScenarioSearchJobs(
91+
s.actionType, searchResult.scenarioSearchMetricResult(), searchResult.ReducedBudget(),
92+
)
93+
}
94+
}()
95+
8796
originalNumActiveTasks := pendingJob.GetNumActiveUsedTasks()
8897

8998
tasksToAllocate := podgroup_info.GetTasksToAllocate(pendingJob, ssn.SubGroupOrderFn, ssn.TaskOrderFn, false)
9099
n := len(tasksToAllocate)
91100
if n == 0 {
92-
return false, nil, nil, terminalSearchResult(SearchResultGeneratorsExhausted, false)
101+
searchResult := terminalSearchResult(SearchResultGeneratorsExhausted, false)
102+
searchResult.metricResult = string(SearchResultNotAttempted)
103+
return false, nil, nil, searchResult
93104
}
94105

95106
jobBudget := s.actionBudget.BeginJob()
@@ -313,27 +324,60 @@ func (s *JobSolver) solvePartialJob(
313324

314325
for {
315326
if jobBudget.Exhausted() {
327+
s.observeActionBudgetExhausted()
316328
return terminalSearchResult(SearchResultDeadlineExhausted, jobBudget.ReducedBudget())
317329
}
318330
scenarioToSolve := portfolio.Next()
319331
if scenarioToSolve == nil {
320332
break
321333
}
322-
scenarioSolver := newByPodSolver(feasibleNodeMap, s.solutionValidator, ssn.AllowConsolidatingReclaim(),
334+
generatorName := portfolio.CurrentGeneratorName()
335+
validatorRejected := false
336+
scenarioSolver := newByPodSolver(feasibleNodeMap, s.solutionValidatorWithMetrics(generatorName, &validatorRejected),
337+
ssn.AllowConsolidatingReclaim(),
323338
s.actionType)
324339

325340
log.InfraLogger.V(5).Infof("Trying to solve scenario: %s", scenarioToSolve)
326341
metrics.IncScenarioSimulatedByAction()
342+
metrics.IncScenarioSearchScenario(s.actionType, generatorName, "simulated")
327343

328344
result := scenarioSolver.solve(ssn, scenarioToSolve)
345+
attemptResult := scenarioSearchResultUnsolved
346+
if validatorRejected {
347+
attemptResult = scenarioSearchResultValidatorRejected
348+
}
329349
if result.solved {
350+
portfolio.ObserveCurrentAttempt(string(SearchResultSolved))
330351
return solvedSearchResult(result, jobBudget.ReducedBudget())
331352
}
353+
portfolio.ObserveCurrentAttempt(attemptResult)
332354
}
333355

334356
return terminalSearchResult(portfolio.StopReason(), jobBudget.ReducedBudget())
335357
}
336358

359+
func (s *JobSolver) observeActionBudgetExhausted() {
360+
if s.actionBudget != nil && s.actionBudget.Exhausted() {
361+
metrics.IncScenarioSearchActionBudgetExhausted(s.actionType)
362+
}
363+
}
364+
365+
func (s *JobSolver) solutionValidatorWithMetrics(generator string, rejected *bool) SolutionValidator {
366+
if s.solutionValidator == nil {
367+
return nil
368+
}
369+
return func(scenario api.ScenarioInfo) bool {
370+
valid := s.solutionValidator(scenario)
371+
if !valid {
372+
if rejected != nil {
373+
*rejected = true
374+
}
375+
metrics.IncScenarioSearchScenario(s.actionType, generator, "validator_rejected")
376+
}
377+
return valid
378+
}
379+
}
380+
337381
func shouldStopSearch(result *SearchResult) bool {
338382
switch result.Reason() {
339383
case SearchResultDeadlineExhausted, SearchResultNotAttempted, SearchResultNoGenerator:

pkg/scheduler/actions/common/solvers/job_solver_result_test.go

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"testing"
99
"time"
1010

11+
"github.com/prometheus/client_golang/prometheus"
12+
dto "github.com/prometheus/client_model/go"
1113
"github.com/stretchr/testify/require"
1214
v1 "k8s.io/api/core/v1"
1315
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -47,6 +49,22 @@ func TestSolveWithResultReturnsTerminalResultWhenNoTasksToAllocate(t *testing.T)
4749
require.False(t, result.ReducedBudget())
4850
}
4951

52+
func TestSolveWithResultRecordsNoSearchMetricAsNotAttempted(t *testing.T) {
53+
labels := map[string]string{
54+
"action": "reclaim",
55+
"result": string(SearchResultNotAttempted),
56+
"reduced_budget": "false",
57+
}
58+
before := scenarioSearchCounterValue(t, "scenario_search_jobs_total", labels)
59+
solver := NewJobsSolver(nil, nil, nil, framework.Reclaim, nil)
60+
pendingJob := podgroup_info.NewPodGroupInfo("pending-job")
61+
62+
_, _, _, result := solver.SolveWithResult(&framework.Session{}, pendingJob)
63+
64+
require.Equal(t, SearchResultGeneratorsExhausted, result.Reason())
65+
require.Equal(t, before+1, scenarioSearchCounterValue(t, "scenario_search_jobs_total", labels))
66+
}
67+
5068
func TestSolveWithResultReturnsNoGeneratorWhenGeneratorFuncIsNil(t *testing.T) {
5169
ssn, pendingJob := newJobSolverResultTestSession(t, 1)
5270
solver := NewJobsSolver(nil, nil, nil, framework.Reclaim, nil)
@@ -151,6 +169,65 @@ func TestSolveWithResultReportsDeadlineWhenBudgetExhaustsDuringScenarioSearch(t
151169
require.Equal(t, SearchResultDeadlineExhausted, result.Reason())
152170
}
153171

172+
func TestSolveWithResultRecordsGeneratorExhaustedMetricAfterGeneratorAttempt(t *testing.T) {
173+
labels := map[string]string{
174+
"action": "reclaim",
175+
"result": string(SearchResultGeneratorsExhausted),
176+
"reduced_budget": "false",
177+
}
178+
before := scenarioSearchCounterValue(t, "scenario_search_jobs_total", labels)
179+
ssn, pendingJob := newJobSolverResultTestSession(t, 1)
180+
ssn.AddScenarioGenerator("empty", portfolioTestFactory(&portfolioTestGenerator{name: "empty"}))
181+
solver := NewJobsSolver(
182+
nil,
183+
nil,
184+
func() *utils.JobsOrderByQueues {
185+
return utils.GetVictimsQueue(ssn, nil)
186+
},
187+
framework.Reclaim,
188+
nil,
189+
)
190+
191+
_, _, _, result := solver.SolveWithResult(ssn, pendingJob)
192+
193+
require.Equal(t, SearchResultGeneratorsExhausted, result.Reason())
194+
require.Equal(t, before+1, scenarioSearchCounterValue(t, "scenario_search_jobs_total", labels))
195+
}
196+
197+
func TestSolveWithResultRecordsUnsolvedScenarioDurationAfterSimulation(t *testing.T) {
198+
generatorName := "test-unsolved-duration"
199+
labels := map[string]string{
200+
"action": "reclaim",
201+
"generator": generatorName,
202+
"result": scenarioSearchResultUnsolved,
203+
}
204+
before := scenarioSearchHistogramCount(t, "scenario_search_duration_seconds", labels)
205+
ssn, pendingJob := newJobSolverResultTestSession(t, 1)
206+
ssn.ClusterInfo.Nodes = map[string]*node_info.NodeInfo{"node-1": {}}
207+
scenarioToSolve := scenario.NewByNodeScenario(
208+
ssn, pendingJob,
209+
podgroup_info.GetTasksToAllocate(pendingJob, ssn.SubGroupOrderFn, ssn.TaskOrderFn, false),
210+
nil, nil,
211+
)
212+
ssn.AddScenarioGenerator(generatorName, portfolioTestFactory(&portfolioTestGenerator{
213+
name: generatorName,
214+
scenarios: []api.ScenarioInfo{scenarioToSolve},
215+
}))
216+
solver := NewJobsSolver(
217+
nil,
218+
nil,
219+
func() *utils.JobsOrderByQueues {
220+
return utils.GetVictimsQueue(ssn, nil)
221+
},
222+
framework.Reclaim,
223+
nil,
224+
)
225+
226+
solver.SolveWithResult(ssn, pendingJob)
227+
228+
require.Equal(t, before+1, scenarioSearchHistogramCount(t, "scenario_search_duration_seconds", labels))
229+
}
230+
154231
func TestSolveWithResultRunsCompletePartialSearchForOneGeneratorBeforeNext(t *testing.T) {
155232
ssn := newGeneratorTestSession(t, map[string]int{
156233
"node-1": 1,
@@ -268,3 +345,54 @@ func newJobSolverResultTestSession(t *testing.T, tasksCount int) (*framework.Ses
268345
},
269346
}, pendingJob
270347
}
348+
349+
func scenarioSearchCounterValue(t *testing.T, metricName string, labels map[string]string) float64 {
350+
t.Helper()
351+
352+
metric := scenarioSearchMetric(t, metricName, labels)
353+
if metric == nil || metric.GetCounter() == nil {
354+
return 0
355+
}
356+
return metric.GetCounter().GetValue()
357+
}
358+
359+
func scenarioSearchHistogramCount(t *testing.T, metricName string, labels map[string]string) uint64 {
360+
t.Helper()
361+
362+
metric := scenarioSearchMetric(t, metricName, labels)
363+
if metric == nil || metric.GetHistogram() == nil {
364+
return 0
365+
}
366+
return metric.GetHistogram().GetSampleCount()
367+
}
368+
369+
func scenarioSearchMetric(t *testing.T, metricName string, labels map[string]string) *dto.Metric {
370+
t.Helper()
371+
372+
families, err := prometheus.DefaultGatherer.Gather()
373+
require.NoError(t, err)
374+
for _, family := range families {
375+
if family.GetName() != metricName {
376+
continue
377+
}
378+
for _, metric := range family.GetMetric() {
379+
if scenarioSearchMetricHasLabels(metric, labels) {
380+
return metric
381+
}
382+
}
383+
}
384+
return nil
385+
}
386+
387+
func scenarioSearchMetricHasLabels(metric *dto.Metric, labels map[string]string) bool {
388+
if len(metric.GetLabel()) != len(labels) {
389+
return false
390+
}
391+
for _, label := range metric.GetLabel() {
392+
expectedValue, found := labels[label.GetName()]
393+
if !found || expectedValue != label.GetValue() {
394+
return false
395+
}
396+
}
397+
return true
398+
}

0 commit comments

Comments
 (0)