Skip to content

Commit 036b210

Browse files
authored
[release-0.10] cherry-pick #4813 (#4823)
* Move Integ Test Helpers from FairSharing to Util (#4812) * Copy MakeWorkloadWithGeneratedName from #4695 * Admit Borrowing Cohort Workloads when Reclaim Guaranteed (#4813) * Admit Borrowing Cohort Workloads when Reclaim Guaranteed * Update Metrics Test
1 parent f692c87 commit 036b210

File tree

7 files changed

+367
-49
lines changed

7 files changed

+367
-49
lines changed

pkg/scheduler/preemption/policy.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
Copyright The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package preemption
18+
19+
import (
20+
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
21+
"sigs.k8s.io/kueue/pkg/cache"
22+
)
23+
24+
// CanAlwaysReclaim indicates that the CQ is guaranteed to
25+
// be able to reclaim the capacity of workloads borrowing
26+
// its capacity.
27+
func CanAlwaysReclaim(cq *cache.ClusterQueueSnapshot) bool {
28+
return cq.Preemption.ReclaimWithinCohort == kueue.PreemptionPolicyAny
29+
}

pkg/scheduler/scheduler.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -225,10 +225,18 @@ func (s *Scheduler) schedule(ctx context.Context) wait.SpeedSignal {
225225

226226
if mode == flavorassigner.Preempt && len(e.preemptionTargets) == 0 {
227227
log.V(2).Info("Workload requires preemption, but there are no candidate workloads allowed for preemption", "preemption", cq.Preemption)
228-
// we use resourcesToReserve to block capacity up to either the nominal capacity,
229-
// or the borrowing limit when borrowing, so that a lower priority workload cannot
230-
// admit before us.
231-
cq.AddUsage(resourcesToReserve(e, cq))
228+
// we reserve capacity if we are uncertain
229+
// whether we can reclaim the capacity
230+
// later. Otherwise, we allow other workloads
231+
// in the Cohort to borrow this capacity,
232+
// confident we can reclaim it later.
233+
if !preemption.CanAlwaysReclaim(cq) {
234+
// reserve capacity up to the
235+
// borrowing limit, so that
236+
// lower-priority workloads in another
237+
// Cohort cannot admit before us.
238+
cq.AddUsage(resourcesToReserve(e, cq))
239+
}
232240
continue
233241
}
234242

pkg/scheduler/scheduler_test.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2557,6 +2557,104 @@ func TestSchedule(t *testing.T) {
25572557
"eng-gamma/Admitted-Workload-3": *utiltesting.MakeAdmission("CQ3").Assignment("gpu", "on-demand", "5").Obj(),
25582558
},
25592559
},
2560+
"capacity not blocked when lending clusterqueue can reclaim": {
2561+
additionalClusterQueues: []kueue.ClusterQueue{
2562+
*utiltesting.MakeClusterQueue("ClusterQueueA").
2563+
Cohort("root").
2564+
ResourceGroup(
2565+
utiltesting.MakeFlavorQuotas("on-demand").Resource("gpu", "2").FlavorQuotas,
2566+
).
2567+
Preemption(kueue.ClusterQueuePreemption{
2568+
ReclaimWithinCohort: kueue.PreemptionPolicyAny,
2569+
}).
2570+
Obj(),
2571+
*utiltesting.MakeClusterQueue("ClusterQueueB").
2572+
Cohort("root").
2573+
ResourceGroup(
2574+
utiltesting.MakeFlavorQuotas("on-demand").Resource("gpu", "0").FlavorQuotas,
2575+
).
2576+
Obj(),
2577+
},
2578+
additionalLocalQueues: []kueue.LocalQueue{
2579+
*utiltesting.MakeLocalQueue("lq", "eng-alpha").ClusterQueue("ClusterQueueA").Obj(),
2580+
*utiltesting.MakeLocalQueue("lq", "eng-beta").ClusterQueue("ClusterQueueB").Obj(),
2581+
},
2582+
workloads: []kueue.Workload{
2583+
*utiltesting.MakeWorkload("a1-admitted", "eng-alpha").
2584+
Queue("lq").
2585+
Request("gpu", "1").
2586+
SimpleReserveQuota("ClusterQueueA", "on-demand", now).
2587+
Obj(),
2588+
*utiltesting.MakeWorkload("a2-pending", "eng-alpha").
2589+
Queue("lq").
2590+
Request("gpu", "2").
2591+
Obj(),
2592+
*utiltesting.MakeWorkload("b1-pending", "eng-beta").
2593+
Creation(now).
2594+
Queue("lq").
2595+
Request("gpu", "1").
2596+
Obj(),
2597+
},
2598+
wantLeft: nil,
2599+
wantInadmissibleLeft: map[string][]string{
2600+
"ClusterQueueA": {"eng-alpha/a2-pending"},
2601+
},
2602+
wantScheduled: []string{
2603+
"eng-beta/b1-pending",
2604+
},
2605+
wantAssignments: map[string]kueue.Admission{
2606+
"eng-alpha/a1-admitted": *utiltesting.MakeAdmission("ClusterQueueA").Assignment("gpu", "on-demand", "1").Obj(),
2607+
"eng-beta/b1-pending": *utiltesting.MakeAdmission("ClusterQueueB").Assignment("gpu", "on-demand", "1").Obj(),
2608+
},
2609+
},
2610+
"capacity blocked when lending clusterqueue not guaranteed to reclaim": {
2611+
additionalClusterQueues: []kueue.ClusterQueue{
2612+
*utiltesting.MakeClusterQueue("ClusterQueueA").
2613+
Cohort("root").
2614+
ResourceGroup(
2615+
utiltesting.MakeFlavorQuotas("on-demand").Resource("gpu", "2").FlavorQuotas,
2616+
).
2617+
Preemption(kueue.ClusterQueuePreemption{
2618+
ReclaimWithinCohort: kueue.PreemptionPolicyLowerPriority,
2619+
}).
2620+
Obj(),
2621+
*utiltesting.MakeClusterQueue("ClusterQueueB").
2622+
Cohort("root").
2623+
ResourceGroup(
2624+
utiltesting.MakeFlavorQuotas("on-demand").Resource("gpu", "0").FlavorQuotas,
2625+
).
2626+
Obj(),
2627+
},
2628+
additionalLocalQueues: []kueue.LocalQueue{
2629+
*utiltesting.MakeLocalQueue("lq", "eng-alpha").ClusterQueue("ClusterQueueA").Obj(),
2630+
*utiltesting.MakeLocalQueue("lq", "eng-beta").ClusterQueue("ClusterQueueB").Obj(),
2631+
},
2632+
workloads: []kueue.Workload{
2633+
*utiltesting.MakeWorkload("a1-admitted", "eng-alpha").
2634+
Queue("lq").
2635+
Request("gpu", "1").
2636+
SimpleReserveQuota("ClusterQueueA", "on-demand", now).
2637+
Obj(),
2638+
*utiltesting.MakeWorkload("a2-pending", "eng-alpha").
2639+
Queue("lq").
2640+
Request("gpu", "2").
2641+
Obj(),
2642+
*utiltesting.MakeWorkload("b1-pending", "eng-beta").
2643+
Creation(now).
2644+
Queue("lq").
2645+
Request("gpu", "1").
2646+
Obj(),
2647+
},
2648+
wantLeft: map[string][]string{
2649+
"ClusterQueueB": {"eng-beta/b1-pending"},
2650+
},
2651+
wantInadmissibleLeft: map[string][]string{
2652+
"ClusterQueueA": {"eng-alpha/a2-pending"},
2653+
},
2654+
wantAssignments: map[string]kueue.Admission{
2655+
"eng-alpha/a1-admitted": *utiltesting.MakeAdmission("ClusterQueueA").Assignment("gpu", "on-demand", "1").Obj(),
2656+
},
2657+
},
25602658
}
25612659

25622660
for name, tc := range cases {

pkg/util/testing/wrappers.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,14 @@ func MakeWorkload(name, ns string) *WorkloadWrapper {
7575
}}
7676
}
7777

78+
// MakeWorkloadWithGeneratedName creates a wrapper for a Workload with a single pod
79+
// with a single container.
80+
func MakeWorkloadWithGeneratedName(namePrefix, ns string) *WorkloadWrapper {
81+
wl := MakeWorkload("", ns)
82+
wl.GenerateName = namePrefix
83+
return wl
84+
}
85+
7886
func (w *WorkloadWrapper) Obj() *kueue.Workload {
7987
return &w.Workload
8088
}

test/integration/scheduler/fairsharing/fair_sharing_test.go

Lines changed: 4 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,15 @@ package fairsharing
1818

1919
import (
2020
"fmt"
21-
"time"
2221

2322
"github.com/onsi/ginkgo/v2"
2423
"github.com/onsi/gomega"
2524
corev1 "k8s.io/api/core/v1"
26-
"k8s.io/apimachinery/pkg/api/meta"
2725
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28-
"k8s.io/apimachinery/pkg/types"
29-
"k8s.io/apimachinery/pkg/util/sets"
3026
"sigs.k8s.io/controller-runtime/pkg/client"
3127

3228
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
3329
"sigs.k8s.io/kueue/pkg/util/testing"
34-
"sigs.k8s.io/kueue/pkg/workload"
3530
"sigs.k8s.io/kueue/test/integration/framework"
3631
"sigs.k8s.io/kueue/test/util"
3732
)
@@ -130,7 +125,7 @@ var _ = ginkgo.Describe("Scheduler", func() {
130125
util.ExpectClusterQueueWeightedShareMetric(cqShared, 0)
131126

132127
ginkgo.By("Terminating 4 running workloads in cqA: shared quota is fair-shared")
133-
finishRunningWorkloadsInCQ(cqA, 4)
128+
util.FinishRunningWorkloadsInCQ(ctx, k8sClient, cqA, 4)
134129

135130
// Admits 1 from cqA and 3 from cqB.
136131
util.ExpectReservingActiveWorkloadsMetric(cqA, 5)
@@ -142,7 +137,7 @@ var _ = ginkgo.Describe("Scheduler", func() {
142137
util.ExpectClusterQueueWeightedShareMetric(cqShared, 0)
143138

144139
ginkgo.By("Terminating 2 more running workloads in cqA: cqB starts to take over shared quota")
145-
finishRunningWorkloadsInCQ(cqA, 2)
140+
util.FinishRunningWorkloadsInCQ(ctx, k8sClient, cqA, 2)
146141

147142
// Admits last 1 from cqA and 1 from cqB.
148143
util.ExpectReservingActiveWorkloadsMetric(cqA, 4)
@@ -263,7 +258,7 @@ var _ = ginkgo.Describe("Scheduler", func() {
263258
util.ExpectPendingWorkloadsMetric(cqB, 5, 0)
264259

265260
ginkgo.By("Finishing eviction of 4 running workloads in cqA: shared quota is fair-shared")
266-
finishEvictionOfWorkloadsInCQ(cqA, 4)
261+
util.FinishEvictionOfWorkloadsInCQ(ctx, k8sClient, cqA, 4)
267262
util.ExpectReservingActiveWorkloadsMetric(cqB, 4)
268263
util.ExpectClusterQueueWeightedShareMetric(cqA, 222)
269264
util.ExpectClusterQueueWeightedShareMetric(cqB, 111)
@@ -278,7 +273,7 @@ var _ = ginkgo.Describe("Scheduler", func() {
278273
util.ExpectClusterQueueWeightedShareMetric(cqC, 0)
279274

280275
ginkgo.By("Finishing eviction of 1 running workloads in the CQ with highest usage: cqA")
281-
finishEvictionOfWorkloadsInCQ(cqA, 1)
276+
util.FinishEvictionOfWorkloadsInCQ(ctx, k8sClient, cqA, 1)
282277
util.ExpectReservingActiveWorkloadsMetric(cqC, 1)
283278
util.ExpectClusterQueueWeightedShareMetric(cqA, 111)
284279
util.ExpectClusterQueueWeightedShareMetric(cqB, 111)
@@ -295,39 +290,3 @@ var _ = ginkgo.Describe("Scheduler", func() {
295290
})
296291
})
297292
})
298-
299-
func finishRunningWorkloadsInCQ(cq *kueue.ClusterQueue, n int) {
300-
var wList kueue.WorkloadList
301-
gomega.ExpectWithOffset(1, k8sClient.List(ctx, &wList)).To(gomega.Succeed())
302-
finished := 0
303-
for i := 0; i < len(wList.Items) && finished < n; i++ {
304-
wl := wList.Items[i]
305-
if wl.Status.Admission != nil && string(wl.Status.Admission.ClusterQueue) == cq.Name && !meta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadFinished) {
306-
util.FinishWorkloads(ctx, k8sClient, &wl)
307-
finished++
308-
}
309-
}
310-
gomega.ExpectWithOffset(1, finished).To(gomega.Equal(n), "Not enough workloads finished")
311-
}
312-
313-
func finishEvictionOfWorkloadsInCQ(cq *kueue.ClusterQueue, n int) {
314-
finished := sets.New[types.UID]()
315-
gomega.EventuallyWithOffset(1, func(g gomega.Gomega) {
316-
var wList kueue.WorkloadList
317-
g.Expect(k8sClient.List(ctx, &wList)).To(gomega.Succeed())
318-
for i := 0; i < len(wList.Items) && finished.Len() < n; i++ {
319-
wl := wList.Items[i]
320-
if wl.Status.Admission == nil || string(wl.Status.Admission.ClusterQueue) != cq.Name {
321-
continue
322-
}
323-
evicted := meta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadEvicted)
324-
quotaReserved := meta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadQuotaReserved)
325-
if evicted && quotaReserved {
326-
workload.UnsetQuotaReservationWithCondition(&wl, "Pending", "Eviction finished by test", time.Now())
327-
g.Expect(workload.ApplyAdmissionStatus(ctx, k8sClient, &wl, true)).To(gomega.Succeed())
328-
finished.Insert(wl.UID)
329-
}
330-
}
331-
g.Expect(finished.Len()).Should(gomega.Equal(n), "Not enough workloads evicted")
332-
}, util.Timeout, util.Interval).Should(gomega.Succeed())
333-
}

0 commit comments

Comments
 (0)