Skip to content

Commit 65870a5

Browse files
authored
feat(scheduler): wire scenario search budgets into solver (#1743)
Signed-off-by: Erez Freiberger <enoodle@gmail.com>
1 parent 9e0fcfe commit 65870a5

8 files changed

Lines changed: 512 additions & 60 deletions

File tree

pkg/scheduler/actions/common/solvers/job_solver.go

Lines changed: 141 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ package solvers
66
import (
77
"fmt"
88
"strings"
9+
"time"
910

11+
solverscenario "github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/actions/common/solvers/scenario"
1012
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/actions/utils"
1113
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api/node_info"
1214
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api/pod_info"
@@ -24,6 +26,7 @@ type JobSolver struct {
2426
solutionValidator SolutionValidator
2527
generateVictimsQueue GenerateVictimsQueue
2628
actionType framework.ActionType
29+
actionBudget *ActionSearchBudget
2730
}
2831

2932
type solvingState struct {
@@ -36,12 +39,29 @@ func NewJobsSolver(
3639
solutionValidator SolutionValidator,
3740
generateVictimsQueue GenerateVictimsQueue,
3841
action framework.ActionType,
42+
actionBudget ...*ActionSearchBudget,
3943
) *JobSolver {
44+
var budget *ActionSearchBudget
45+
if len(actionBudget) > 0 {
46+
budget = actionBudget[0]
47+
}
48+
if budget == nil {
49+
budget = newUnlimitedActionSearchBudget(action)
50+
}
4051
return &JobSolver{
4152
feasibleNodes: feasibleNodes,
4253
solutionValidator: solutionValidator,
4354
generateVictimsQueue: generateVictimsQueue,
4455
actionType: action,
56+
actionBudget: budget,
57+
}
58+
}
59+
60+
func newUnlimitedActionSearchBudget(action framework.ActionType) *ActionSearchBudget {
61+
now := time.Now
62+
return &ActionSearchBudget{
63+
action: action,
64+
deadline: newDeadlineBudget(unlimitedRemaining, now),
4565
}
4666
}
4767

@@ -59,25 +79,45 @@ func NewJobsSolver(
5979
// returned statement) and is left unchanged on failure.
6080
func (s *JobSolver) Solve(
6181
ssn *framework.Session, pendingJob *podgroup_info.PodGroupInfo) (bool, *framework.Statement, []string) {
82+
solved, statement, victimTaskNames, _ := s.SolveWithResult(ssn, pendingJob)
83+
return solved, statement, victimTaskNames
84+
}
85+
86+
// SolveWithResult attempts to solve pendingJob and returns a structured search result
87+
// describing why the scenario search stopped.
88+
func (s *JobSolver) SolveWithResult(
89+
ssn *framework.Session, pendingJob *podgroup_info.PodGroupInfo,
90+
) (bool, *framework.Statement, []string, *SearchResult) {
6291
state := solvingState{}
6392
originalNumActiveTasks := pendingJob.GetNumActiveUsedTasks()
6493

6594
tasksToAllocate := podgroup_info.GetTasksToAllocate(pendingJob, ssn.SubGroupOrderFn, ssn.TaskOrderFn, false)
6695
n := len(tasksToAllocate)
6796
if n == 0 {
68-
return false, nil, calcVictimNames(state.recordedVictimsTasks)
97+
return false, nil, calcVictimNames(state.recordedVictimsTasks),
98+
terminalSearchResult(SearchResultGeneratorsExhausted, false)
99+
}
100+
101+
jobBudget := s.actionBudget.BeginJob()
102+
if jobBudget.Exhausted() {
103+
return false, nil, calcVictimNames(state.recordedVictimsTasks),
104+
terminalSearchResult(SearchResultNotAttempted, false)
69105
}
70106

71-
maxSolvedK := s.searchMaxSolvableK(ssn, &state, pendingJob, tasksToAllocate)
107+
maxSolvedK, searchResult := s.searchMaxSolvableK(ssn, &state, pendingJob, tasksToAllocate, jobBudget)
72108
if maxSolvedK == 0 {
73-
return false, nil, calcVictimNames(state.recordedVictimsTasks)
109+
if searchResult == nil {
110+
searchResult = terminalSearchResult(SearchResultGeneratorsExhausted, false)
111+
}
112+
return false, nil, calcVictimNames(state.recordedVictimsTasks), searchResult
74113
}
75114

76-
result := s.probeAtK(ssn, &state, pendingJob, tasksToAllocate, n)
77-
if result == nil || !result.solved {
78-
return false, nil, calcVictimNames(state.recordedVictimsTasks)
115+
result := s.probeAtK(ssn, &state, pendingJob, tasksToAllocate, n, jobBudget)
116+
if !resultSolved(result) {
117+
return false, nil, calcVictimNames(state.recordedVictimsTasks), result
79118
}
80119

120+
solution := result.solution
81121
numActiveTasks := pendingJob.GetNumActiveUsedTasks()
82122
jobSolved := pendingJob.IsGangSatisfied()
83123
if originalNumActiveTasks >= numActiveTasks {
@@ -86,8 +126,8 @@ func (s *JobSolver) Solve(
86126

87127
log.InfraLogger.V(4).Infof(
88128
"Scenario solved for %d tasks to allocate for %s. Victims: %s",
89-
n, pendingJob.Name, victimPrintingStruct{result.victimsTasks})
90-
return jobSolved, result.statement, calcVictimNames(result.victimsTasks)
129+
n, pendingJob.Name, victimPrintingStruct{solution.victimsTasks})
130+
return jobSolved, solution.statement, calcVictimNames(solution.victimsTasks), result
91131
}
92132

93133
// searchMaxSolvableK returns the largest k in [0, n] for which a probe at k succeeds.
@@ -100,23 +140,40 @@ func (s *JobSolver) searchMaxSolvableK(
100140
state *solvingState,
101141
pendingJob *podgroup_info.PodGroupInfo,
102142
tasksToAllocate []*pod_info.PodInfo,
103-
) int {
143+
jobBudget *jobSearchBudget,
144+
) (int, *SearchResult) {
104145
n := len(tasksToAllocate)
105146
if n == 0 {
106-
return 0
147+
return 0, nil
148+
}
149+
150+
return searchMaxSolvableK(n, func(k int) *SearchResult {
151+
return s.tryProbeAndDiscard(ssn, state, pendingJob, tasksToAllocate, k, jobBudget)
152+
})
153+
}
154+
155+
func searchMaxSolvableK(n int, probe func(k int) *SearchResult) (int, *SearchResult) {
156+
if n == 0 {
157+
return 0, nil
107158
}
108159

109160
lo := 0
110161
var hi int
162+
var lastUnsolvedResult *SearchResult
111163
k := 1
112164
for {
113-
if !s.tryProbeAndDiscard(ssn, state, pendingJob, tasksToAllocate, k) {
165+
result := probe(k)
166+
if shouldStopSearch(result) {
167+
return 0, result
168+
}
169+
if !resultSolved(result) {
170+
lastUnsolvedResult = result
114171
hi = k
115172
break
116173
}
117174
lo = k
118175
if k == n {
119-
return n
176+
return n, lastUnsolvedResult
120177
}
121178
k *= 2
122179
if k > n {
@@ -126,39 +183,46 @@ func (s *JobSolver) searchMaxSolvableK(
126183

127184
for hi-lo > 1 {
128185
mid := (lo + hi) / 2
129-
if s.tryProbeAndDiscard(ssn, state, pendingJob, tasksToAllocate, mid) {
186+
result := probe(mid)
187+
if shouldStopSearch(result) {
188+
return 0, result
189+
}
190+
if resultSolved(result) {
130191
lo = mid
131192
} else {
193+
lastUnsolvedResult = result
132194
hi = mid
133195
}
134196
}
135-
return lo
197+
return lo, lastUnsolvedResult
136198
}
137199

138-
// tryProbeAndDiscard probes at k and always discards the resulting statement so the session
139-
// is left clean. On success, hints are written to state; returns whether the probe succeeded.
200+
// tryProbeAndDiscard probes at k and always discards a solved statement so the session
201+
// is left clean. On success, hints are written to state.
140202
func (s *JobSolver) tryProbeAndDiscard(
141203
ssn *framework.Session,
142204
state *solvingState,
143205
pendingJob *podgroup_info.PodGroupInfo,
144206
tasksToAllocate []*pod_info.PodInfo,
145207
k int,
146-
) bool {
147-
result := s.probeAtK(ssn, state, pendingJob, tasksToAllocate, k)
148-
if result == nil || !result.solved {
208+
jobBudget *jobSearchBudget,
209+
) *SearchResult {
210+
result := s.probeAtK(ssn, state, pendingJob, tasksToAllocate, k, jobBudget)
211+
if !resultSolved(result) {
149212
log.InfraLogger.V(5).Infof("No solution found for %d tasks out of %d tasks to allocate for %s",
150213
k, len(tasksToAllocate), pendingJob.Name)
151-
return false
214+
return result
152215
}
216+
solution := result.solution
153217
log.InfraLogger.V(5).Infof(
154218
"Scenario probed for %d tasks out of %d tasks to allocate for %s. Victims: %s",
155-
k, len(tasksToAllocate), pendingJob.Name, victimPrintingStruct{result.victimsTasks})
156-
state.recordedVictimsTasks = result.victimsTasks
157-
state.recordedVictimsJobs = result.victimJobs
158-
if result.statement != nil {
159-
result.statement.Discard()
219+
k, len(tasksToAllocate), pendingJob.Name, victimPrintingStruct{solution.victimsTasks})
220+
state.recordedVictimsTasks = solution.victimsTasks
221+
state.recordedVictimsJobs = solution.victimJobs
222+
if solution.statement != nil {
223+
solution.statement.Discard()
160224
}
161-
return true
225+
return result
162226
}
163227

164228
func (s *JobSolver) probeAtK(
@@ -167,13 +231,21 @@ func (s *JobSolver) probeAtK(
167231
pendingJob *podgroup_info.PodGroupInfo,
168232
tasksToAllocate []*pod_info.PodInfo,
169233
k int,
170-
) *solutionResult {
234+
jobBudget *jobSearchBudget,
235+
) *SearchResult {
171236
pendingTasks := tasksToAllocate[:k]
172237
partialPendingJob := getPartialJobRepresentative(pendingJob, pendingTasks)
173-
return s.solvePartialJob(ssn, state, partialPendingJob)
238+
return s.solvePartialJob(ssn, state, partialPendingJob, jobBudget)
174239
}
175240

176-
func (s *JobSolver) solvePartialJob(ssn *framework.Session, state *solvingState, partialPendingJob *podgroup_info.PodGroupInfo) *solutionResult {
241+
func (s *JobSolver) solvePartialJob(
242+
ssn *framework.Session, state *solvingState, partialPendingJob *podgroup_info.PodGroupInfo,
243+
jobBudget *jobSearchBudget,
244+
) *SearchResult {
245+
if jobBudget == nil {
246+
jobBudget = newUnlimitedActionSearchBudget(s.actionType).BeginJob()
247+
}
248+
177249
feasibleNodeMap := map[string]*node_info.NodeInfo{}
178250
for _, node := range s.feasibleNodes {
179251
feasibleNodeMap[node.Name] = node
@@ -183,11 +255,32 @@ func (s *JobSolver) solvePartialJob(ssn *framework.Session, state *solvingState,
183255
feasibleNodeMap[task.NodeName] = node
184256
}
185257

258+
if s.generateVictimsQueue == nil {
259+
return terminalSearchResult(SearchResultNoGenerator, jobBudget.ReducedBudget())
260+
}
261+
victimsQueue := s.generateVictimsQueue()
262+
if victimsQueue == nil {
263+
return terminalSearchResult(SearchResultNoGenerator, jobBudget.ReducedBudget())
264+
}
265+
186266
scenarioBuilder := NewPodAccumulatedScenarioBuilder(
187-
ssn, partialPendingJob, state.recordedVictimsJobs, s.generateVictimsQueue(), feasibleNodeMap)
267+
ssn, partialPendingJob, state.recordedVictimsJobs, victimsQueue, feasibleNodeMap)
188268

189-
for scenarioToSolve := scenarioBuilder.GetValidScenario(); scenarioToSolve != nil; scenarioToSolve =
190-
scenarioBuilder.GetNextScenario() {
269+
firstScenario := true
270+
for {
271+
if jobBudget.Exhausted() {
272+
return terminalSearchResult(SearchResultDeadlineExhausted, jobBudget.ReducedBudget())
273+
}
274+
var scenarioToSolve *solverscenario.ByNodeScenario
275+
if firstScenario {
276+
scenarioToSolve = scenarioBuilder.GetValidScenario()
277+
firstScenario = false
278+
} else {
279+
scenarioToSolve = scenarioBuilder.GetNextScenario()
280+
}
281+
if scenarioToSolve == nil {
282+
break
283+
}
191284
scenarioSolver := newByPodSolver(feasibleNodeMap, s.solutionValidator, ssn.AllowConsolidatingReclaim(),
192285
s.actionType)
193286

@@ -196,11 +289,25 @@ func (s *JobSolver) solvePartialJob(ssn *framework.Session, state *solvingState,
196289

197290
result := scenarioSolver.solve(ssn, scenarioToSolve)
198291
if result.solved {
199-
return result
292+
return solvedSearchResult(result, jobBudget.ReducedBudget())
200293
}
201294
}
202295

203-
return nil
296+
return terminalSearchResult(SearchResultGeneratorsExhausted, jobBudget.ReducedBudget())
297+
}
298+
299+
func shouldStopSearch(result *SearchResult) bool {
300+
switch result.Reason() {
301+
case SearchResultDeadlineExhausted, SearchResultNotAttempted, SearchResultNoGenerator:
302+
return true
303+
default:
304+
return false
305+
}
306+
}
307+
308+
func resultSolved(result *SearchResult) bool {
309+
return result != nil && result.Reason() == SearchResultSolved &&
310+
result.solution != nil && result.solution.solved
204311
}
205312

206313
func getPartialJobRepresentative(

0 commit comments

Comments
 (0)