Skip to content

Commit 9e0fcfe

Browse files
authored
feat(scheduler): add scenario search budget model (#1742)
Signed-off-by: Erez Freiberger <enoodle@gmail.com>
1 parent d226f61 commit 9e0fcfe

3 files changed

Lines changed: 824 additions & 0 deletions

File tree

Lines changed: 394 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,394 @@
1+
// Copyright 2025 NVIDIA CORPORATION
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package solvers
5+
6+
import (
7+
"fmt"
8+
"time"
9+
10+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11+
12+
kaiv1 "github.com/kai-scheduler/KAI-scheduler/pkg/apis/kai/v1"
13+
"github.com/kai-scheduler/KAI-scheduler/pkg/common/constants"
14+
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/framework"
15+
)
16+
17+
const unlimitedRemaining = time.Duration(1<<63 - 1)
18+
19+
// ActionSearchBudget tracks the top-level deadline for one scheduling action search.
20+
type ActionSearchBudget struct {
21+
action framework.ActionType
22+
actionLimit time.Duration
23+
jobLimit time.Duration
24+
minJobSearch time.Duration
25+
generatorLimits map[string]time.Duration
26+
deadline deadlineBudget
27+
}
28+
29+
type deadlineBudget struct {
30+
deadline time.Time
31+
unlimited bool
32+
now func() time.Time
33+
}
34+
35+
type jobSearchBudget struct {
36+
deadline deadlineBudget
37+
reducedBudget bool
38+
generatorLimits map[string]time.Duration
39+
}
40+
41+
type generatorSearchBudget struct {
42+
deadline deadlineBudget
43+
}
44+
45+
// NewActionSearchBudget parses scenario search budget config for one scheduler action.
46+
func NewActionSearchBudget(ssn *framework.Session, action framework.ActionType) (*ActionSearchBudget, error) {
47+
return newActionSearchBudgetWithClock(ssn, action, time.Now)
48+
}
49+
50+
func newActionSearchBudgetWithClock(
51+
ssn *framework.Session, action framework.ActionType, now func() time.Time,
52+
) (*ActionSearchBudget, error) {
53+
now = clockOrDefault(now)
54+
55+
budgets := scenarioSearchBudgets(ssn)
56+
actionLimit, err := parseActionLimit(budgets, action)
57+
if err != nil {
58+
return nil, err
59+
}
60+
jobLimit, err := parseDurationWithDefault(
61+
"maxJobSearchDuration", budgetFieldValue(budgets, func(b *kaiv1.ScenarioSearchBudgets) *metav1.Duration {
62+
return b.MaxJobSearchDuration
63+
}), constants.DefaultJobBudget,
64+
)
65+
if err != nil {
66+
return nil, err
67+
}
68+
minJobSearch, err := parseDurationWithDefault(
69+
"minJobSearchDuration", budgetFieldValue(budgets, func(b *kaiv1.ScenarioSearchBudgets) *metav1.Duration {
70+
return b.MinJobSearchDuration
71+
}), constants.DefaultMinJobBudget,
72+
)
73+
if err != nil {
74+
return nil, err
75+
}
76+
if jobLimit != 0 && minJobSearch >= jobLimit {
77+
return nil, fmt.Errorf("minJobSearchDuration must be less than maxJobSearchDuration")
78+
}
79+
generatorLimits, err := parseGeneratorLimits(budgets)
80+
if err != nil {
81+
return nil, err
82+
}
83+
84+
return &ActionSearchBudget{
85+
action: action,
86+
actionLimit: actionLimit,
87+
jobLimit: jobLimit,
88+
minJobSearch: minJobSearch,
89+
generatorLimits: generatorLimits,
90+
deadline: newDeadlineBudget(searchDurationForLimit(actionLimit), now),
91+
}, nil
92+
}
93+
94+
func (b *ActionSearchBudget) BeginJob() *jobSearchBudget {
95+
if b == nil {
96+
return &jobSearchBudget{
97+
deadline: newDeadlineBudget(0, time.Now),
98+
}
99+
}
100+
now := b.clock()
101+
102+
actionRemaining := b.Remaining()
103+
remaining := actionRemaining
104+
if b.jobLimit != 0 && b.jobLimit < remaining {
105+
remaining = b.jobLimit
106+
}
107+
if b.minJobSearch > remaining {
108+
remaining = b.minJobSearch
109+
}
110+
111+
return &jobSearchBudget{
112+
deadline: newDeadlineBudget(remaining, now),
113+
reducedBudget: b.jobLimit > 0 && actionRemaining < b.jobLimit,
114+
generatorLimits: b.generatorLimits,
115+
}
116+
}
117+
118+
func (b *ActionSearchBudget) Remaining() time.Duration {
119+
if b == nil {
120+
return 0
121+
}
122+
return b.deadline.Remaining()
123+
}
124+
125+
func (b *ActionSearchBudget) Exhausted() bool {
126+
return b.Remaining() <= 0
127+
}
128+
129+
func (b *jobSearchBudget) BeginGenerator(name string) *generatorSearchBudget {
130+
if b == nil {
131+
return &generatorSearchBudget{
132+
deadline: newDeadlineBudget(0, time.Now),
133+
}
134+
}
135+
now := b.clock()
136+
137+
jobRemaining := b.Remaining()
138+
generatorRemaining := jobRemaining
139+
generatorLimit := b.generatorLimit(name)
140+
if generatorLimit != 0 && generatorLimit < generatorRemaining {
141+
generatorRemaining = generatorLimit
142+
}
143+
144+
return &generatorSearchBudget{
145+
deadline: newDeadlineBudget(generatorRemaining, now),
146+
}
147+
}
148+
149+
func (b *jobSearchBudget) Remaining() time.Duration {
150+
if b == nil {
151+
return 0
152+
}
153+
return b.deadline.Remaining()
154+
}
155+
156+
func (b *jobSearchBudget) Exhausted() bool {
157+
return b.Remaining() <= 0
158+
}
159+
160+
func (b *jobSearchBudget) ReducedBudget() bool {
161+
if b == nil {
162+
return false
163+
}
164+
return b.reducedBudget
165+
}
166+
167+
func (b *generatorSearchBudget) Remaining() time.Duration {
168+
if b == nil {
169+
return 0
170+
}
171+
return b.deadline.Remaining()
172+
}
173+
174+
func (b *generatorSearchBudget) Exhausted() bool {
175+
return b.Remaining() <= 0
176+
}
177+
178+
func (b *jobSearchBudget) generatorLimit(name string) time.Duration {
179+
if b == nil || b.generatorLimits == nil {
180+
return 0
181+
}
182+
if limit, found := b.generatorLimits[name]; found {
183+
return limit
184+
}
185+
return b.generatorLimits[constants.ActionDefault]
186+
}
187+
188+
func newDeadlineBudget(remaining time.Duration, now func() time.Time) deadlineBudget {
189+
now = clockOrDefault(now)
190+
if remaining == unlimitedRemaining {
191+
return deadlineBudget{
192+
unlimited: true,
193+
now: now,
194+
}
195+
}
196+
if remaining < 0 {
197+
remaining = 0
198+
}
199+
return deadlineBudget{
200+
deadline: now().Add(remaining),
201+
now: now,
202+
}
203+
}
204+
205+
func (b deadlineBudget) Remaining() time.Duration {
206+
if b.now == nil {
207+
return 0
208+
}
209+
if b.unlimited {
210+
return unlimitedRemaining
211+
}
212+
remaining := b.deadline.Sub(b.now())
213+
if remaining <= 0 {
214+
return 0
215+
}
216+
return remaining
217+
}
218+
219+
func (b *ActionSearchBudget) clock() func() time.Time {
220+
if b == nil {
221+
return time.Now
222+
}
223+
return clockOrDefault(b.deadline.now)
224+
}
225+
226+
func (b *jobSearchBudget) clock() func() time.Time {
227+
if b == nil {
228+
return time.Now
229+
}
230+
return clockOrDefault(b.deadline.now)
231+
}
232+
233+
func searchDurationForLimit(limit time.Duration) time.Duration {
234+
if limit == 0 {
235+
return unlimitedRemaining
236+
}
237+
return limit
238+
}
239+
240+
func clockOrDefault(now func() time.Time) func() time.Time {
241+
if now == nil {
242+
return time.Now
243+
}
244+
return now
245+
}
246+
247+
func scenarioSearchBudgets(ssn *framework.Session) *kaiv1.ScenarioSearchBudgets {
248+
if ssn == nil || ssn.Config == nil {
249+
return nil
250+
}
251+
return ssn.Config.ScenarioSearchBudgets
252+
}
253+
254+
func budgetFieldValue(
255+
budgets *kaiv1.ScenarioSearchBudgets, valueFn func(*kaiv1.ScenarioSearchBudgets) *metav1.Duration,
256+
) *metav1.Duration {
257+
if budgets == nil {
258+
return nil
259+
}
260+
return valueFn(budgets)
261+
}
262+
263+
func parseActionLimit(budgets *kaiv1.ScenarioSearchBudgets, action framework.ActionType) (time.Duration, error) {
264+
configuredLimits, err := parseDurationMap(
265+
"maxActionSearchDuration", actionLimitValues(budgets),
266+
)
267+
if err != nil {
268+
return 0, err
269+
}
270+
271+
actionKey := scenarioSearchActionKey(action)
272+
if limit, found := configuredLimits[actionKey]; found {
273+
return limit, nil
274+
}
275+
if limit, found := configuredLimits[constants.ActionDefault]; found {
276+
return limit, nil
277+
}
278+
return mustParseDuration(defaultActionLimit()), nil
279+
}
280+
281+
func parseGeneratorLimits(budgets *kaiv1.ScenarioSearchBudgets) (map[string]time.Duration, error) {
282+
configuredLimits, err := parseDurationMap(
283+
"maxGeneratorSearchDuration", generatorLimitValues(budgets),
284+
)
285+
if err != nil {
286+
return nil, err
287+
}
288+
289+
defaultLimit, hasConfiguredDefault := configuredLimits[constants.ActionDefault]
290+
if !hasConfiguredDefault {
291+
defaultLimit = mustParseDuration(constants.DefaultGeneratorBudget)
292+
}
293+
294+
generatorLimits := map[string]time.Duration{
295+
constants.ActionDefault: defaultLimit,
296+
}
297+
for name, limit := range configuredLimits {
298+
if name != constants.ActionDefault {
299+
generatorLimits[name] = limit
300+
}
301+
}
302+
setKnownGeneratorLimit(
303+
generatorLimits, configuredLimits, constants.GeneratorNodeLocalGreedy,
304+
constants.DefaultNodeLocalGreedy, defaultLimit, hasConfiguredDefault,
305+
)
306+
setKnownGeneratorLimit(
307+
generatorLimits, configuredLimits, constants.GeneratorMultiNodeGang,
308+
constants.DefaultMultiNodeGang, defaultLimit, hasConfiguredDefault,
309+
)
310+
return generatorLimits, nil
311+
}
312+
313+
func setKnownGeneratorLimit(
314+
generatorLimits map[string]time.Duration,
315+
configuredLimits map[string]time.Duration,
316+
name string,
317+
defaultValue string,
318+
defaultLimit time.Duration,
319+
hasConfiguredDefault bool,
320+
) {
321+
if _, found := configuredLimits[name]; found {
322+
return
323+
}
324+
if hasConfiguredDefault {
325+
generatorLimits[name] = defaultLimit
326+
return
327+
}
328+
generatorLimits[name] = mustParseDuration(defaultValue)
329+
}
330+
331+
func actionLimitValues(budgets *kaiv1.ScenarioSearchBudgets) map[string]metav1.Duration {
332+
if budgets == nil {
333+
return nil
334+
}
335+
return budgets.MaxActionSearchDuration
336+
}
337+
338+
func generatorLimitValues(budgets *kaiv1.ScenarioSearchBudgets) map[string]metav1.Duration {
339+
if budgets == nil {
340+
return nil
341+
}
342+
return budgets.MaxGeneratorSearchDuration
343+
}
344+
345+
func parseDurationWithDefault(fieldName string, value *metav1.Duration, defaultValue string) (time.Duration, error) {
346+
if value == nil {
347+
return validateDuration(fieldName, mustParseDuration(defaultValue))
348+
}
349+
return validateDuration(fieldName, value.Duration)
350+
}
351+
352+
func parseDurationMap(fieldName string, durationValues map[string]metav1.Duration) (map[string]time.Duration, error) {
353+
durations := map[string]time.Duration{}
354+
for key, durationValue := range durationValues {
355+
duration, err := validateDuration(fmt.Sprintf("%s[%q]", fieldName, key), durationValue.Duration)
356+
if err != nil {
357+
return nil, err
358+
}
359+
durations[key] = duration
360+
}
361+
return durations, nil
362+
}
363+
364+
func validateDuration(fieldName string, duration time.Duration) (time.Duration, error) {
365+
if duration < 0 {
366+
return 0, fmt.Errorf("%s must be non-negative", fieldName)
367+
}
368+
return duration, nil
369+
}
370+
371+
func mustParseDuration(value string) time.Duration {
372+
duration, err := time.ParseDuration(value)
373+
if err != nil {
374+
panic(err)
375+
}
376+
return duration
377+
}
378+
379+
func scenarioSearchActionKey(action framework.ActionType) string {
380+
switch action {
381+
case framework.Reclaim:
382+
return constants.ActionReclaim
383+
case framework.Preempt:
384+
return constants.ActionPreempt
385+
case framework.Consolidation:
386+
return constants.ActionConsolidation
387+
default:
388+
return constants.ActionDefault
389+
}
390+
}
391+
392+
func defaultActionLimit() string {
393+
return constants.DefaultActionBudget
394+
}

0 commit comments

Comments
 (0)