Skip to content

Commit a51fd3e

Browse files
[release-0.13] Fix SimulateWorkloadRemoval bug (#7084)
* Fix SimulateWorkloadRemoval * Test case for 7015 * introduce cqUsage type --------- Co-authored-by: Gabe <15304068+gabesaba@users.noreply.github.com>
1 parent 4c16464 commit a51fd3e

File tree

5 files changed

+65
-27
lines changed

5 files changed

+65
-27
lines changed

pkg/cache/clusterqueue_snapshot.go

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -72,24 +72,6 @@ func (c *ClusterQueueSnapshot) RGByResource(resource corev1.ResourceName) *Resou
7272
return nil
7373
}
7474

75-
// SimulateWorkloadRemoval modifies the snapshot by removing the usage
76-
// corresponding to the list of workloads. It returns a function which
77-
// can be used to restore the usage.
78-
func (c *ClusterQueueSnapshot) SimulateWorkloadRemoval(workloads []*workload.Info) func() {
79-
usage := make([]workload.Usage, 0, len(workloads))
80-
for _, w := range workloads {
81-
usage = append(usage, w.Usage())
82-
}
83-
for _, u := range usage {
84-
c.RemoveUsage(u)
85-
}
86-
return func() {
87-
for _, u := range usage {
88-
c.AddUsage(u)
89-
}
90-
}
91-
}
92-
9375
// SimulateUsageAddition modifies the snapshot by adding usage, and
9476
// returns a function used to restore the usage.
9577
func (c *ClusterQueueSnapshot) SimulateUsageAddition(usage workload.Usage) func() {

pkg/cache/snapshot.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,37 @@ func (s *Snapshot) RemoveWorkload(wl *workload.Info) {
5252
cq.RemoveUsage(wl.Usage())
5353
}
5454

55-
// AddWorkload adds a workload from its corresponding ClusterQueue and
55+
// AddWorkload adds a workload to its corresponding ClusterQueue and
5656
// updates resource usage.
5757
func (s *Snapshot) AddWorkload(wl *workload.Info) {
5858
cq := s.ClusterQueue(wl.ClusterQueue)
5959
cq.Workloads[workload.Key(wl.Obj)] = wl
6060
cq.AddUsage(wl.Usage())
6161
}
6262

63+
// SimulateWorkloadRemoval modifies the snapshot by removing the usage
64+
// corresponding to the list of workloads from workloads' respective
65+
// ClusterQueues. It returns a function which can be used to restore
66+
// this usage.
67+
func (s *Snapshot) SimulateWorkloadRemoval(workloads []*workload.Info) func() {
68+
type cqUsage struct {
69+
cq kueue.ClusterQueueReference
70+
usage workload.Usage
71+
}
72+
cqUsages := make([]cqUsage, 0, len(workloads))
73+
for _, w := range workloads {
74+
cqUsages = append(cqUsages, cqUsage{cq: w.ClusterQueue, usage: w.Usage()})
75+
}
76+
for _, cqUsage := range cqUsages {
77+
s.ClusterQueue(cqUsage.cq).RemoveUsage(cqUsage.usage)
78+
}
79+
return func() {
80+
for _, cqUsage := range cqUsages {
81+
s.ClusterQueue(cqUsage.cq).AddUsage(cqUsage.usage)
82+
}
83+
}
84+
}
85+
6386
func (s *Snapshot) Log(log logr.Logger) {
6487
for name, cq := range s.ClusterQueues() {
6588
cohortName := "<none>"

pkg/scheduler/preemption/preemption_oracle.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func (p *PreemptionOracle) SimulatePreemption(log logr.Logger, cq *schdcache.Clu
5757
for i, c := range candidates {
5858
workloadsToPreempt[i] = c.WorkloadInfo
5959
}
60-
revertRemoval := cq.SimulateWorkloadRemoval(workloadsToPreempt)
60+
revertRemoval := p.snapshot.SimulateWorkloadRemoval(workloadsToPreempt)
6161
borrowAfterPreemptions, _ := classical.FindHeightOfLowestSubtreeThatFits(cq, fr, quantity)
6262
revertRemoval()
6363

pkg/scheduler/scheduler.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ func (s *Scheduler) schedule(ctx context.Context) wait.SpeedSignal {
283283
}
284284

285285
usage := e.assignmentUsage()
286-
if !fits(cq, &usage, preemptedWorkloads, e.preemptionTargets) {
286+
if !fits(snapshot, cq, &usage, preemptedWorkloads, e.preemptionTargets) {
287287
setSkipped(e, "Workload no longer fits after processing another workload")
288288
if mode == flavorassigner.Preempt {
289289
skippedPreemptions[cq.Name]++
@@ -439,12 +439,12 @@ func (s *Scheduler) nominate(ctx context.Context, workloads []workload.Info, sna
439439
return entries, inadmissibleEntries
440440
}
441441

442-
func fits(cq *schdcache.ClusterQueueSnapshot, usage *workload.Usage, preemptedWorkloads preemption.PreemptedWorkloads, newTargets []*preemption.Target) bool {
442+
func fits(snapshot *schdcache.Snapshot, cq *schdcache.ClusterQueueSnapshot, usage *workload.Usage, preemptedWorkloads preemption.PreemptedWorkloads, newTargets []*preemption.Target) bool {
443443
workloads := slices.Collect(maps.Values(preemptedWorkloads))
444444
for _, target := range newTargets {
445445
workloads = append(workloads, target.WorkloadInfo)
446446
}
447-
revertUsage := cq.SimulateWorkloadRemoval(workloads)
447+
revertUsage := snapshot.SimulateWorkloadRemoval(workloads)
448448
defer revertUsage()
449449
return cq.Fits(*usage)
450450
}
@@ -494,7 +494,7 @@ type partialAssignment struct {
494494
func (s *Scheduler) getAssignments(log logr.Logger, wl *workload.Info, snap *schdcache.Snapshot) (flavorassigner.Assignment, []*preemption.Target) {
495495
assignment, targets := s.getInitialAssignments(log, wl, snap)
496496
cq := snap.ClusterQueue(wl.ClusterQueue)
497-
updateAssignmentForTAS(cq, wl, &assignment, targets)
497+
updateAssignmentForTAS(snap, cq, wl, &assignment, targets)
498498
return assignment, targets
499499
}
500500

@@ -575,7 +575,7 @@ func (s *Scheduler) evictWorkloadAfterFailedTASReplacement(ctx context.Context,
575575
return nil
576576
}
577577

578-
func updateAssignmentForTAS(cq *schdcache.ClusterQueueSnapshot, wl *workload.Info, assignment *flavorassigner.Assignment, targets []*preemption.Target) {
578+
func updateAssignmentForTAS(snapshot *schdcache.Snapshot, cq *schdcache.ClusterQueueSnapshot, wl *workload.Info, assignment *flavorassigner.Assignment, targets []*preemption.Target) {
579579
if features.Enabled(features.TopologyAwareScheduling) && assignment.RepresentativeMode() == flavorassigner.Preempt &&
580580
(workload.IsExplicitlyRequestingTAS(wl.Obj.Spec.PodSets...) || cq.IsTASOnly()) && !workload.HasTopologyAssignmentWithNodeToReplace(wl.Obj) {
581581
tasRequests := assignment.WorkloadsTopologyRequests(wl, cq)
@@ -585,7 +585,7 @@ func updateAssignmentForTAS(cq *schdcache.ClusterQueueSnapshot, wl *workload.Inf
585585
for _, target := range targets {
586586
targetWorkloads = append(targetWorkloads, target.WorkloadInfo)
587587
}
588-
revertUsage := cq.SimulateWorkloadRemoval(targetWorkloads)
588+
revertUsage := snapshot.SimulateWorkloadRemoval(targetWorkloads)
589589
tasResult = cq.FindTopologyAssignmentsForWorkload(tasRequests)
590590
revertUsage()
591591
} else {

test/integration/singlecluster/scheduler/fairsharing/fair_sharing_test.go

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,7 @@ var _ = ginkgo.Describe("Scheduler", ginkgo.Ordered, ginkgo.ContinueOnFailure, f
493493
ginkgo.When("using hierarchical cohorts with several flavors", func() {
494494
var (
495495
cqp1 *kueue.ClusterQueue
496+
cqp5 *kueue.ClusterQueue
496497
)
497498
ginkgo.BeforeEach(func() {
498499
createCohort(testing.MakeCohort("root-cohort").Obj())
@@ -566,7 +567,7 @@ var _ = ginkgo.Describe("Scheduler", ginkgo.Ordered, ginkgo.ContinueOnFailure, f
566567
Preemption(preemption).
567568
Obj())
568569

569-
createQueue(testing.MakeClusterQueue("cq-p5").
570+
cqp5 = createQueue(testing.MakeClusterQueue("cq-p5").
570571
Cohort("cohort-b").
571572
FairWeight(resource.MustParse("1")).
572573
ResourceGroup(
@@ -614,6 +615,38 @@ var _ = ginkgo.Describe("Scheduler", ginkgo.Ordered, ginkgo.ContinueOnFailure, f
614615
util.ExpectClusterQueueWeightedShareMetric(cqp1, 600)
615616
expectCohortWeightedShare("cohort-a", 0)
616617
})
618+
619+
// scenario from Kueue#7015
620+
// WeightedShare(CohortA) = 0/20 * 1000 = 0
621+
// WeightedShare(cq-p1) = 12/20 * 1000 = 600
622+
// WeightedShare(CohortB) = 0/20 * 1000 = 0
623+
// WeightedShare(cq-p5) = 2/20 * 1000 = 100
624+
ginkgo.It("CohortB preempts and schedules in flavor which has guarantees", func() {
625+
_ = features.SetEnable(features.FlavorFungibilityImplicitPreferenceDefault, true)
626+
ginkgo.By("Create workload which saturate all cohort resources")
627+
for range 20 {
628+
createWorkload("cq-p1", "1")
629+
}
630+
util.ExpectReservingActiveWorkloadsMetric(cqp1, 20)
631+
expectCohortWeightedShare("cohort-a", 100)
632+
633+
ginkgo.By("Create workloads in CohortB which will preempt CohortA")
634+
createWorkload("cq-p5", "1")
635+
createWorkload("cq-p5", "1")
636+
637+
ginkgo.By("Finish Preemption of Workloads")
638+
util.FinishEvictionOfWorkloadsInCQ(ctx, k8sClient, cqp1, 2)
639+
640+
ginkgo.By("Check expected workloads active")
641+
util.ExpectReservingActiveWorkloadsMetric(cqp1, 18)
642+
util.ExpectReservingActiveWorkloadsMetric(cqp5, 2)
643+
644+
ginkgo.By("Expected Weighted Shares")
645+
util.ExpectClusterQueueWeightedShareMetric(cqp1, 600)
646+
util.ExpectClusterQueueWeightedShareMetric(cqp5, 100)
647+
expectCohortWeightedShare("cohort-a", 0)
648+
expectCohortWeightedShare("cohort-b", 0)
649+
})
617650
})
618651

619652
ginkgo.When("Using AdmissionFairSharing at ClusterQueue level", func() {

0 commit comments

Comments
 (0)