44package reclaim
55
66import (
7+ "flag"
78 "fmt"
89 "testing"
10+ "time"
911
1012 "go.uber.org/mock/gomock"
1113 "gopkg.in/h2non/gock.v1"
14+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1215
16+ kaiv1 "github.com/kai-scheduler/KAI-scheduler/pkg/apis/kai/v1"
17+ commonconstants "github.com/kai-scheduler/KAI-scheduler/pkg/common/constants"
1318 "github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/actions/reclaim"
19+ "github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api"
1420 "github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api/common_info"
1521 "github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api/pod_status"
1622 "github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/constants"
23+ "github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/framework"
1724 "github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/test_utils"
1825 "github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/test_utils/jobs_fake"
1926 "github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/test_utils/nodes_fake"
2027 "github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/test_utils/tasks_fake"
2128)
2229
30+ var reclaimLargeJobSearchBudget = flag .String (
31+ "reclaim-large-job-search-budget" ,
32+ "" ,
33+ "scenario search job budget for BenchmarkReclaimLargeJobs; action uses the same budget and generators use half" ,
34+ )
35+
36+ var reclaimLargeJobNodeLocalGreedyBudget = flag .String (
37+ "reclaim-large-job-node-local-greedy-budget" ,
38+ "" ,
39+ "optional NodeLocalGreedy generator budget override for BenchmarkReclaimLargeJobs" ,
40+ )
41+
2342type VeryLargeJobReclaimParams struct {
2443 NumNodes int
2544 GPUsPerNode int
@@ -82,6 +101,37 @@ func TestUnschedulableDistributedReclaimTopology(t *testing.T) {
82101 }
83102}
84103
104+ func TestDefaultGeneratorPortfolioPreservesTopologyReclaimCoverage (t * testing.T ) {
105+ defer gock .Off ()
106+
107+ ctrl := gomock .NewController (t )
108+ defer ctrl .Finish ()
109+
110+ params := defaultUnschedulableDistributedReclaimParams (10 )
111+ topology := buildUnschedulableDistributedReclaimTopology (params )
112+
113+ ssn := test_utils .BuildSession (topology , ctrl )
114+ assertDefaultScenarioGeneratorPortfolio (t , ssn )
115+ assertDefaultScenarioSearchBudgets (t , ssn )
116+ multiNodeGangEmissions := observeMultiNodeGangScenarios (t , ssn )
117+
118+ action := reclaim .New ()
119+ action .Execute (ssn )
120+
121+ if * multiNodeGangEmissions == 0 {
122+ t .Fatalf ("expected default reclaim scenario portfolio to reach %s" , commonconstants .GeneratorMultiNodeGang )
123+ }
124+
125+ job := ssn .ClusterInfo .PodGroupInfos [common_info .PodGroupID (unschedulableDistributedJobName )]
126+ if job == nil {
127+ t .Fatalf ("expected distributed job %q in session" , unschedulableDistributedJobName )
128+ }
129+ if len (job .PodStatusIndex [pod_status .Pending ]) != params .PodsPerDistributedJob {
130+ t .Fatalf ("expected %d pending distributed-job tasks, got %d" ,
131+ params .PodsPerDistributedJob , len (job .PodStatusIndex [pod_status .Pending ]))
132+ }
133+ }
134+
85135type unschedulableDistributedReclaimParams struct {
86136 NumNodes int
87137 GPUsPerNode int
@@ -102,6 +152,70 @@ func BenchmarkReclaimLargeJobs_10Node(b *testing.B) {
102152 benchmarkReclaimLargeJobs (b , 10 )
103153}
104154
155+ func TestReclaimLargeJobScenarioSearchBudgetsUsesHalfBudgetForGenerators (t * testing.T ) {
156+ originalBudget := * reclaimLargeJobSearchBudget
157+ * reclaimLargeJobSearchBudget = "2m"
158+ defer func () {
159+ * reclaimLargeJobSearchBudget = originalBudget
160+ }()
161+
162+ budgets := reclaimLargeJobScenarioSearchBudgets ()
163+ if budgets == nil {
164+ t .Fatal ("expected benchmark scenario search budgets" )
165+ }
166+ if got := budgets .MaxActionSearchDuration [commonconstants .ActionDefault ].Duration ; got != 2 * time .Minute {
167+ t .Fatalf ("expected default action budget 2m, got %s" , got )
168+ }
169+ if got := budgets .MaxActionSearchDuration [commonconstants .ActionReclaim ].Duration ; got != 2 * time .Minute {
170+ t .Fatalf ("expected reclaim action budget 2m, got %s" , got )
171+ }
172+ if got := budgets .MaxJobSearchDuration .Duration ; got != 2 * time .Minute {
173+ t .Fatalf ("expected job budget 2m, got %s" , got )
174+ }
175+
176+ expectedGeneratorBudget := time .Minute
177+ for _ , generator := range []string {
178+ commonconstants .ActionDefault ,
179+ commonconstants .GeneratorNodeLocalGreedy ,
180+ commonconstants .GeneratorMultiNodeGang ,
181+ } {
182+ if got := budgets .MaxGeneratorSearchDuration [generator ].Duration ; got != expectedGeneratorBudget {
183+ t .Fatalf ("expected %s generator budget %s, got %s" , generator , expectedGeneratorBudget , got )
184+ }
185+ }
186+ }
187+
188+ func TestReclaimLargeJobScenarioSearchBudgetsCanOverrideNodeLocalGreedyBudget (t * testing.T ) {
189+ originalBudget := * reclaimLargeJobSearchBudget
190+ originalNodeLocalGreedyBudget := * reclaimLargeJobNodeLocalGreedyBudget
191+ * reclaimLargeJobSearchBudget = "4m"
192+ * reclaimLargeJobNodeLocalGreedyBudget = "0s"
193+ defer func () {
194+ * reclaimLargeJobSearchBudget = originalBudget
195+ * reclaimLargeJobNodeLocalGreedyBudget = originalNodeLocalGreedyBudget
196+ }()
197+
198+ budgets := reclaimLargeJobScenarioSearchBudgets ()
199+ if budgets == nil {
200+ t .Fatal ("expected benchmark scenario search budgets" )
201+ }
202+ if got := budgets .MaxActionSearchDuration [commonconstants .ActionReclaim ].Duration ; got != 4 * time .Minute {
203+ t .Fatalf ("expected reclaim action budget 4m, got %s" , got )
204+ }
205+ if got := budgets .MaxJobSearchDuration .Duration ; got != 4 * time .Minute {
206+ t .Fatalf ("expected job budget 4m, got %s" , got )
207+ }
208+ if got := budgets .MaxGeneratorSearchDuration [commonconstants .ActionDefault ].Duration ; got != 2 * time .Minute {
209+ t .Fatalf ("expected default generator budget 2m, got %s" , got )
210+ }
211+ if got := budgets .MaxGeneratorSearchDuration [commonconstants .GeneratorMultiNodeGang ].Duration ; got != 2 * time .Minute {
212+ t .Fatalf ("expected MultiNodeGang generator budget 2m, got %s" , got )
213+ }
214+ if got := budgets .MaxGeneratorSearchDuration [commonconstants .GeneratorNodeLocalGreedy ].Duration ; got != 0 {
215+ t .Fatalf ("expected NodeLocalGreedy generator budget 0s, got %s" , got )
216+ }
217+ }
218+
105219func BenchmarkReclaimLargeJobs_50Node (b * testing.B ) {
106220 benchmarkReclaimLargeJobs (b , 50 )
107221}
@@ -131,7 +245,7 @@ func benchmarkReclaimLargeJobs(b *testing.B, numNodes int) {
131245 NumJobs : numNodes * 8 ,
132246 GPUsPerTask : 1 ,
133247 VeryLargeJobGPUsPerTask : 8 ,
134- VeryLargeJobTasks : numNodes / 10 ,
248+ VeryLargeJobTasks : numNodes / 2 ,
135249 Queue0DeservedGPUs : 0 ,
136250 Queue1DeservedGPUs : numNodes * 8 ,
137251 NumberOfCacheBinds : numNodes * 4 ,
@@ -144,12 +258,73 @@ func benchmarkReclaimLargeJobs(b *testing.B, numNodes int) {
144258 for b .Loop () {
145259 ctrl := gomock .NewController (b )
146260 ssn := test_utils .BuildSession (topology , ctrl )
261+ if budgets := reclaimLargeJobScenarioSearchBudgets (); budgets != nil {
262+ ssn .Config .ScenarioSearchBudgets = budgets
263+ }
147264 action := reclaim .New ()
148265 action .Execute (ssn )
266+ assertVeryLargeJobReclaimed (b , ssn , params )
149267 ctrl .Finish ()
150268 }
151269}
152270
271+ func reclaimLargeJobScenarioSearchBudgets () * kaiv1.ScenarioSearchBudgets {
272+ if * reclaimLargeJobSearchBudget == "" {
273+ return nil
274+ }
275+ jobBudget , err := time .ParseDuration (* reclaimLargeJobSearchBudget )
276+ if err != nil {
277+ panic (fmt .Sprintf ("invalid reclaim-large-job-search-budget: %v" , err ))
278+ }
279+ generatorBudget := jobBudget / 2
280+ nodeLocalGreedyBudget := generatorBudget
281+ if * reclaimLargeJobNodeLocalGreedyBudget != "" {
282+ parsedNodeLocalGreedyBudget , err := time .ParseDuration (* reclaimLargeJobNodeLocalGreedyBudget )
283+ if err != nil {
284+ panic (fmt .Sprintf ("invalid reclaim-large-job-node-local-greedy-budget: %v" , err ))
285+ }
286+ nodeLocalGreedyBudget = parsedNodeLocalGreedyBudget
287+ }
288+ return & kaiv1.ScenarioSearchBudgets {
289+ MaxActionSearchDuration : map [string ]metav1.Duration {
290+ commonconstants .ActionDefault : {Duration : jobBudget },
291+ commonconstants .ActionReclaim : {Duration : jobBudget },
292+ },
293+ MaxJobSearchDuration : & metav1.Duration {Duration : jobBudget },
294+ MinJobSearchDuration : & metav1.Duration {},
295+ MaxGeneratorSearchDuration : map [string ]metav1.Duration {
296+ commonconstants .ActionDefault : {Duration : generatorBudget },
297+ commonconstants .GeneratorNodeLocalGreedy : {Duration : nodeLocalGreedyBudget },
298+ commonconstants .GeneratorMultiNodeGang : {Duration : generatorBudget },
299+ },
300+ }
301+ }
302+
303+ func assertVeryLargeJobReclaimed (b * testing.B , ssn * framework.Session , params VeryLargeJobReclaimParams ) {
304+ b .Helper ()
305+
306+ job := ssn .ClusterInfo .PodGroupInfos [common_info .PodGroupID ("very-large-job" )]
307+ if job == nil {
308+ b .Fatalf ("expected very-large-job in session" )
309+ }
310+ if pending := len (job .PodStatusIndex [pod_status .Pending ]); pending != 0 {
311+ b .Fatalf ("expected very-large-job to have no pending tasks after reclaim, got %d" , pending )
312+ }
313+ if pipelined := len (job .PodStatusIndex [pod_status .Pipelined ]); pipelined != params .VeryLargeJobTasks {
314+ b .Fatalf ("expected very-large-job to pipeline %d tasks, got %d" , params .VeryLargeJobTasks , pipelined )
315+ }
316+
317+ releasingTasks := 0
318+ for _ , clusterJob := range ssn .ClusterInfo .PodGroupInfos {
319+ releasingTasks += len (clusterJob .PodStatusIndex [pod_status .Releasing ])
320+ }
321+ expectedReleasingTasks := params .VeryLargeJobTasks * params .VeryLargeJobGPUsPerTask / params .GPUsPerTask
322+ if releasingTasks != expectedReleasingTasks {
323+ b .Fatalf ("expected %d victim tasks to be releasing after reclaim, got %d" ,
324+ expectedReleasingTasks , releasingTasks )
325+ }
326+ }
327+
153328func buildReclaimTopology (params VeryLargeJobReclaimParams ) test_utils.TestTopologyBasic {
154329 nodes := make (map [string ]nodes_fake.TestNodeBasic )
155330 for i := 0 ; i < params .NumNodes ; i ++ {
@@ -305,3 +480,85 @@ func buildUnschedulableDistributedReclaimJobs(
305480 jobs = append (jobs , distributedJob )
306481 return jobs
307482}
483+
484+ func assertDefaultScenarioGeneratorPortfolio (t * testing.T , ssn * framework.Session ) {
485+ t .Helper ()
486+
487+ for _ , expectedGenerator := range []string {
488+ commonconstants .GeneratorNodeLocalGreedy ,
489+ commonconstants .GeneratorMultiNodeGang ,
490+ } {
491+ foundGenerator := false
492+ for _ , registration := range ssn .ScenarioGeneratorRegistrations {
493+ if registration .Name != expectedGenerator {
494+ continue
495+ }
496+ foundGenerator = true
497+ if _ , found := registration .Actions [framework .Reclaim ]; ! found {
498+ t .Fatalf ("expected default generator %q to apply to reclaim" , expectedGenerator )
499+ }
500+ break
501+ }
502+ if ! foundGenerator {
503+ t .Fatalf ("expected default scenario generator plugins to register %q" , expectedGenerator )
504+ }
505+ }
506+ }
507+
508+ func observeMultiNodeGangScenarios (t * testing.T , ssn * framework.Session ) * int {
509+ t .Helper ()
510+
511+ for index , registration := range ssn .ScenarioGeneratorRegistrations {
512+ if registration .Name != commonconstants .GeneratorMultiNodeGang {
513+ continue
514+ }
515+ emissions := 0
516+ originalFactory := registration .Factory
517+ ssn .ScenarioGeneratorRegistrations [index ].Factory = func (ctx framework.ScenarioGeneratorContext ) framework.ScenarioGenerator {
518+ generator := originalFactory (ctx )
519+ if generator == nil {
520+ return nil
521+ }
522+ return & observedScenarioGenerator {
523+ ScenarioGenerator : generator ,
524+ onScenario : func () {
525+ emissions ++
526+ },
527+ }
528+ }
529+ return & emissions
530+ }
531+ t .Fatalf ("expected default scenario generator plugins to register %q" , commonconstants .GeneratorMultiNodeGang )
532+ return nil
533+ }
534+
535+ func assertDefaultScenarioSearchBudgets (t * testing.T , ssn * framework.Session ) {
536+ t .Helper ()
537+
538+ if ssn .Config == nil || ssn .Config .ScenarioSearchBudgets == nil {
539+ t .Fatalf ("expected default scenario search budgets on session" )
540+ }
541+
542+ generatorBudgets := ssn .Config .ScenarioSearchBudgets .MaxGeneratorSearchDuration
543+ if got := generatorBudgets [commonconstants .GeneratorNodeLocalGreedy ].Duration ; got != 30 * time .Second {
544+ t .Fatalf ("expected default %s budget 30s, got %s" ,
545+ commonconstants .GeneratorNodeLocalGreedy , got )
546+ }
547+ if got := generatorBudgets [commonconstants .GeneratorMultiNodeGang ].Duration ; got != 2 * time .Minute {
548+ t .Fatalf ("expected default %s budget 2m, got %s" ,
549+ commonconstants .GeneratorMultiNodeGang , got )
550+ }
551+ }
552+
553+ type observedScenarioGenerator struct {
554+ framework.ScenarioGenerator
555+ onScenario func ()
556+ }
557+
558+ func (g * observedScenarioGenerator ) Next () api.ScenarioInfo {
559+ scenario := g .ScenarioGenerator .Next ()
560+ if scenario != nil && g .onScenario != nil {
561+ g .onScenario ()
562+ }
563+ return scenario
564+ }
0 commit comments