Skip to content

Commit 07c3812

Browse files
committed
test(scheduler): preserve reclaim benchmark and topology coverage
Signed-off-by: Erez Freiberger <enoodle@gmail.com>
1 parent 0a374a4 commit 07c3812

3 files changed

Lines changed: 271 additions & 6 deletions

File tree

pkg/scheduler/actions/integration_tests/reclaim/reclaim_benchmark_test.go

Lines changed: 258 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,41 @@
44
package reclaim
55

66
import (
7+
"flag"
78
"fmt"
89
"testing"
10+
"time"
911

1012
"go.uber.org/mock/gomock"
1113
"gopkg.in/h2non/gock.v1"
14+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1215

16+
kaiv1 "github.com/kai-scheduler/KAI-scheduler/pkg/apis/kai/v1"
17+
commonconstants "github.com/kai-scheduler/KAI-scheduler/pkg/common/constants"
1318
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/actions/reclaim"
19+
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api"
1420
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api/common_info"
1521
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api/pod_status"
1622
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/constants"
23+
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/framework"
1724
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/test_utils"
1825
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/test_utils/jobs_fake"
1926
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/test_utils/nodes_fake"
2027
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/test_utils/tasks_fake"
2128
)
2229

30+
var reclaimLargeJobSearchBudget = flag.String(
31+
"reclaim-large-job-search-budget",
32+
"",
33+
"scenario search job budget for BenchmarkReclaimLargeJobs; action uses the same budget and generators use half",
34+
)
35+
36+
var reclaimLargeJobNodeLocalGreedyBudget = flag.String(
37+
"reclaim-large-job-node-local-greedy-budget",
38+
"",
39+
"optional NodeLocalGreedy generator budget override for BenchmarkReclaimLargeJobs",
40+
)
41+
2342
type VeryLargeJobReclaimParams struct {
2443
NumNodes int
2544
GPUsPerNode int
@@ -82,6 +101,37 @@ func TestUnschedulableDistributedReclaimTopology(t *testing.T) {
82101
}
83102
}
84103

104+
func TestDefaultGeneratorPortfolioPreservesTopologyReclaimCoverage(t *testing.T) {
105+
defer gock.Off()
106+
107+
ctrl := gomock.NewController(t)
108+
defer ctrl.Finish()
109+
110+
params := defaultUnschedulableDistributedReclaimParams(10)
111+
topology := buildUnschedulableDistributedReclaimTopology(params)
112+
113+
ssn := test_utils.BuildSession(topology, ctrl)
114+
assertDefaultScenarioGeneratorPortfolio(t, ssn)
115+
assertDefaultScenarioSearchBudgets(t, ssn)
116+
multiNodeGangEmissions := observeMultiNodeGangScenarios(t, ssn)
117+
118+
action := reclaim.New()
119+
action.Execute(ssn)
120+
121+
if *multiNodeGangEmissions == 0 {
122+
t.Fatalf("expected default reclaim scenario portfolio to reach %s", commonconstants.GeneratorMultiNodeGang)
123+
}
124+
125+
job := ssn.ClusterInfo.PodGroupInfos[common_info.PodGroupID(unschedulableDistributedJobName)]
126+
if job == nil {
127+
t.Fatalf("expected distributed job %q in session", unschedulableDistributedJobName)
128+
}
129+
if len(job.PodStatusIndex[pod_status.Pending]) != params.PodsPerDistributedJob {
130+
t.Fatalf("expected %d pending distributed-job tasks, got %d",
131+
params.PodsPerDistributedJob, len(job.PodStatusIndex[pod_status.Pending]))
132+
}
133+
}
134+
85135
type unschedulableDistributedReclaimParams struct {
86136
NumNodes int
87137
GPUsPerNode int
@@ -102,6 +152,70 @@ func BenchmarkReclaimLargeJobs_10Node(b *testing.B) {
102152
benchmarkReclaimLargeJobs(b, 10)
103153
}
104154

155+
func TestReclaimLargeJobScenarioSearchBudgetsUsesHalfBudgetForGenerators(t *testing.T) {
156+
originalBudget := *reclaimLargeJobSearchBudget
157+
*reclaimLargeJobSearchBudget = "2m"
158+
defer func() {
159+
*reclaimLargeJobSearchBudget = originalBudget
160+
}()
161+
162+
budgets := reclaimLargeJobScenarioSearchBudgets()
163+
if budgets == nil {
164+
t.Fatal("expected benchmark scenario search budgets")
165+
}
166+
if got := budgets.MaxActionSearchDuration[commonconstants.ActionDefault].Duration; got != 2*time.Minute {
167+
t.Fatalf("expected default action budget 2m, got %s", got)
168+
}
169+
if got := budgets.MaxActionSearchDuration[commonconstants.ActionReclaim].Duration; got != 2*time.Minute {
170+
t.Fatalf("expected reclaim action budget 2m, got %s", got)
171+
}
172+
if got := budgets.MaxJobSearchDuration.Duration; got != 2*time.Minute {
173+
t.Fatalf("expected job budget 2m, got %s", got)
174+
}
175+
176+
expectedGeneratorBudget := time.Minute
177+
for _, generator := range []string{
178+
commonconstants.ActionDefault,
179+
commonconstants.GeneratorNodeLocalGreedy,
180+
commonconstants.GeneratorMultiNodeGang,
181+
} {
182+
if got := budgets.MaxGeneratorSearchDuration[generator].Duration; got != expectedGeneratorBudget {
183+
t.Fatalf("expected %s generator budget %s, got %s", generator, expectedGeneratorBudget, got)
184+
}
185+
}
186+
}
187+
188+
func TestReclaimLargeJobScenarioSearchBudgetsCanOverrideNodeLocalGreedyBudget(t *testing.T) {
189+
originalBudget := *reclaimLargeJobSearchBudget
190+
originalNodeLocalGreedyBudget := *reclaimLargeJobNodeLocalGreedyBudget
191+
*reclaimLargeJobSearchBudget = "4m"
192+
*reclaimLargeJobNodeLocalGreedyBudget = "0s"
193+
defer func() {
194+
*reclaimLargeJobSearchBudget = originalBudget
195+
*reclaimLargeJobNodeLocalGreedyBudget = originalNodeLocalGreedyBudget
196+
}()
197+
198+
budgets := reclaimLargeJobScenarioSearchBudgets()
199+
if budgets == nil {
200+
t.Fatal("expected benchmark scenario search budgets")
201+
}
202+
if got := budgets.MaxActionSearchDuration[commonconstants.ActionReclaim].Duration; got != 4*time.Minute {
203+
t.Fatalf("expected reclaim action budget 4m, got %s", got)
204+
}
205+
if got := budgets.MaxJobSearchDuration.Duration; got != 4*time.Minute {
206+
t.Fatalf("expected job budget 4m, got %s", got)
207+
}
208+
if got := budgets.MaxGeneratorSearchDuration[commonconstants.ActionDefault].Duration; got != 2*time.Minute {
209+
t.Fatalf("expected default generator budget 2m, got %s", got)
210+
}
211+
if got := budgets.MaxGeneratorSearchDuration[commonconstants.GeneratorMultiNodeGang].Duration; got != 2*time.Minute {
212+
t.Fatalf("expected MultiNodeGang generator budget 2m, got %s", got)
213+
}
214+
if got := budgets.MaxGeneratorSearchDuration[commonconstants.GeneratorNodeLocalGreedy].Duration; got != 0 {
215+
t.Fatalf("expected NodeLocalGreedy generator budget 0s, got %s", got)
216+
}
217+
}
218+
105219
func BenchmarkReclaimLargeJobs_50Node(b *testing.B) {
106220
benchmarkReclaimLargeJobs(b, 50)
107221
}
@@ -131,7 +245,7 @@ func benchmarkReclaimLargeJobs(b *testing.B, numNodes int) {
131245
NumJobs: numNodes * 8,
132246
GPUsPerTask: 1,
133247
VeryLargeJobGPUsPerTask: 8,
134-
VeryLargeJobTasks: numNodes / 10,
248+
VeryLargeJobTasks: numNodes / 2,
135249
Queue0DeservedGPUs: 0,
136250
Queue1DeservedGPUs: numNodes * 8,
137251
NumberOfCacheBinds: numNodes * 4,
@@ -144,12 +258,73 @@ func benchmarkReclaimLargeJobs(b *testing.B, numNodes int) {
144258
for b.Loop() {
145259
ctrl := gomock.NewController(b)
146260
ssn := test_utils.BuildSession(topology, ctrl)
261+
if budgets := reclaimLargeJobScenarioSearchBudgets(); budgets != nil {
262+
ssn.Config.ScenarioSearchBudgets = budgets
263+
}
147264
action := reclaim.New()
148265
action.Execute(ssn)
266+
assertVeryLargeJobReclaimed(b, ssn, params)
149267
ctrl.Finish()
150268
}
151269
}
152270

271+
func reclaimLargeJobScenarioSearchBudgets() *kaiv1.ScenarioSearchBudgets {
272+
if *reclaimLargeJobSearchBudget == "" {
273+
return nil
274+
}
275+
jobBudget, err := time.ParseDuration(*reclaimLargeJobSearchBudget)
276+
if err != nil {
277+
panic(fmt.Sprintf("invalid reclaim-large-job-search-budget: %v", err))
278+
}
279+
generatorBudget := jobBudget / 2
280+
nodeLocalGreedyBudget := generatorBudget
281+
if *reclaimLargeJobNodeLocalGreedyBudget != "" {
282+
parsedNodeLocalGreedyBudget, err := time.ParseDuration(*reclaimLargeJobNodeLocalGreedyBudget)
283+
if err != nil {
284+
panic(fmt.Sprintf("invalid reclaim-large-job-node-local-greedy-budget: %v", err))
285+
}
286+
nodeLocalGreedyBudget = parsedNodeLocalGreedyBudget
287+
}
288+
return &kaiv1.ScenarioSearchBudgets{
289+
MaxActionSearchDuration: map[string]metav1.Duration{
290+
commonconstants.ActionDefault: {Duration: jobBudget},
291+
commonconstants.ActionReclaim: {Duration: jobBudget},
292+
},
293+
MaxJobSearchDuration: &metav1.Duration{Duration: jobBudget},
294+
MinJobSearchDuration: &metav1.Duration{},
295+
MaxGeneratorSearchDuration: map[string]metav1.Duration{
296+
commonconstants.ActionDefault: {Duration: generatorBudget},
297+
commonconstants.GeneratorNodeLocalGreedy: {Duration: nodeLocalGreedyBudget},
298+
commonconstants.GeneratorMultiNodeGang: {Duration: generatorBudget},
299+
},
300+
}
301+
}
302+
303+
func assertVeryLargeJobReclaimed(b *testing.B, ssn *framework.Session, params VeryLargeJobReclaimParams) {
304+
b.Helper()
305+
306+
job := ssn.ClusterInfo.PodGroupInfos[common_info.PodGroupID("very-large-job")]
307+
if job == nil {
308+
b.Fatalf("expected very-large-job in session")
309+
}
310+
if pending := len(job.PodStatusIndex[pod_status.Pending]); pending != 0 {
311+
b.Fatalf("expected very-large-job to have no pending tasks after reclaim, got %d", pending)
312+
}
313+
if pipelined := len(job.PodStatusIndex[pod_status.Pipelined]); pipelined != params.VeryLargeJobTasks {
314+
b.Fatalf("expected very-large-job to pipeline %d tasks, got %d", params.VeryLargeJobTasks, pipelined)
315+
}
316+
317+
releasingTasks := 0
318+
for _, clusterJob := range ssn.ClusterInfo.PodGroupInfos {
319+
releasingTasks += len(clusterJob.PodStatusIndex[pod_status.Releasing])
320+
}
321+
expectedReleasingTasks := params.VeryLargeJobTasks * params.VeryLargeJobGPUsPerTask / params.GPUsPerTask
322+
if releasingTasks != expectedReleasingTasks {
323+
b.Fatalf("expected %d victim tasks to be releasing after reclaim, got %d",
324+
expectedReleasingTasks, releasingTasks)
325+
}
326+
}
327+
153328
func buildReclaimTopology(params VeryLargeJobReclaimParams) test_utils.TestTopologyBasic {
154329
nodes := make(map[string]nodes_fake.TestNodeBasic)
155330
for i := 0; i < params.NumNodes; i++ {
@@ -305,3 +480,85 @@ func buildUnschedulableDistributedReclaimJobs(
305480
jobs = append(jobs, distributedJob)
306481
return jobs
307482
}
483+
484+
func assertDefaultScenarioGeneratorPortfolio(t *testing.T, ssn *framework.Session) {
485+
t.Helper()
486+
487+
for _, expectedGenerator := range []string{
488+
commonconstants.GeneratorNodeLocalGreedy,
489+
commonconstants.GeneratorMultiNodeGang,
490+
} {
491+
foundGenerator := false
492+
for _, registration := range ssn.ScenarioGeneratorRegistrations {
493+
if registration.Name != expectedGenerator {
494+
continue
495+
}
496+
foundGenerator = true
497+
if _, found := registration.Actions[framework.Reclaim]; !found {
498+
t.Fatalf("expected default generator %q to apply to reclaim", expectedGenerator)
499+
}
500+
break
501+
}
502+
if !foundGenerator {
503+
t.Fatalf("expected default scenario generator plugins to register %q", expectedGenerator)
504+
}
505+
}
506+
}
507+
508+
func observeMultiNodeGangScenarios(t *testing.T, ssn *framework.Session) *int {
509+
t.Helper()
510+
511+
for index, registration := range ssn.ScenarioGeneratorRegistrations {
512+
if registration.Name != commonconstants.GeneratorMultiNodeGang {
513+
continue
514+
}
515+
emissions := 0
516+
originalFactory := registration.Factory
517+
ssn.ScenarioGeneratorRegistrations[index].Factory = func(ctx framework.ScenarioGeneratorContext) framework.ScenarioGenerator {
518+
generator := originalFactory(ctx)
519+
if generator == nil {
520+
return nil
521+
}
522+
return &observedScenarioGenerator{
523+
ScenarioGenerator: generator,
524+
onScenario: func() {
525+
emissions++
526+
},
527+
}
528+
}
529+
return &emissions
530+
}
531+
t.Fatalf("expected default scenario generator plugins to register %q", commonconstants.GeneratorMultiNodeGang)
532+
return nil
533+
}
534+
535+
func assertDefaultScenarioSearchBudgets(t *testing.T, ssn *framework.Session) {
536+
t.Helper()
537+
538+
if ssn.Config == nil || ssn.Config.ScenarioSearchBudgets == nil {
539+
t.Fatalf("expected default scenario search budgets on session")
540+
}
541+
542+
generatorBudgets := ssn.Config.ScenarioSearchBudgets.MaxGeneratorSearchDuration
543+
if got := generatorBudgets[commonconstants.GeneratorNodeLocalGreedy].Duration; got != 30*time.Second {
544+
t.Fatalf("expected default %s budget 30s, got %s",
545+
commonconstants.GeneratorNodeLocalGreedy, got)
546+
}
547+
if got := generatorBudgets[commonconstants.GeneratorMultiNodeGang].Duration; got != 2*time.Minute {
548+
t.Fatalf("expected default %s budget 2m, got %s",
549+
commonconstants.GeneratorMultiNodeGang, got)
550+
}
551+
}
552+
553+
type observedScenarioGenerator struct {
554+
framework.ScenarioGenerator
555+
onScenario func()
556+
}
557+
558+
func (g *observedScenarioGenerator) Next() api.ScenarioInfo {
559+
scenario := g.ScenarioGenerator.Next()
560+
if scenario != nil && g.onScenario != nil {
561+
g.onScenario()
562+
}
563+
return scenario
564+
}

pkg/scheduler/test_utils/test_utils.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"k8s.io/client-go/informers"
1919
"k8s.io/client-go/kubernetes/fake"
2020

21+
kaiv1 "github.com/kai-scheduler/KAI-scheduler/pkg/apis/kai/v1"
2122
kaiv1alpha1 "github.com/kai-scheduler/KAI-scheduler/pkg/apis/kai/v1alpha1"
2223

2324
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/actions"
@@ -98,8 +99,9 @@ type TestDepartmentBasic struct {
9899
}
99100

100101
type TestSessionConfig struct {
101-
Plugins []conf.Tier
102-
CachePlugins map[string]bool
102+
Plugins []conf.Tier
103+
CachePlugins map[string]bool
104+
ScenarioSearchBudgets *kaiv1.ScenarioSearchBudgets
103105
}
104106

105107
type TestExpectedResultBasic struct {

pkg/scheduler/test_utils/test_utils_builder.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ func CreateFakeSession(schedulerConfig *TestSessionConfig,
8888
}
8989

9090
if schedulerConfig != nil {
91+
ssn.Config.ScenarioSearchBudgets = schedulerConfig.ScenarioSearchBudgets
9192
addSessionPlugins(&ssn, schedulerConfig.Plugins, createCacheMockIfNotExists, schedulerConfig.CachePlugins)
9293
}
9394

@@ -230,6 +231,10 @@ func BuildDepartmentInfoMap(testMetadata TestTopologyBasic) map[common_info.Queu
230231
}
231232

232233
func BuildPlugins(testMetadata TestTopologyBasic) []conf.Tier {
234+
return buildSchedulerConfiguration(testMetadata).Tiers
235+
}
236+
237+
func buildSchedulerConfiguration(testMetadata TestTopologyBasic) *conf.SchedulerConfiguration {
233238
plugins.InitDefaultPlugins()
234239
confFileName := ""
235240

@@ -259,16 +264,17 @@ func BuildPlugins(testMetadata TestTopologyBasic) []conf.Tier {
259264
panic(err)
260265
}
261266

262-
return config.Tiers
267+
return config
263268
}
264269

265270
func BuildSession(testMetadata TestTopologyBasic, controller *Controller) *framework.Session {
266-
confPlugins := BuildPlugins(testMetadata)
271+
config := buildSchedulerConfiguration(testMetadata)
267272
schedulerConfig := TestSessionConfig{
268-
Plugins: confPlugins,
273+
Plugins: config.Tiers,
269274
CachePlugins: map[string]bool{
270275
"predicates": true,
271276
},
277+
ScenarioSearchBudgets: config.ScenarioSearchBudgets,
272278
}
273279

274280
addDefaultDepartmentIfNeeded(&testMetadata)

0 commit comments

Comments
 (0)