Skip to content

Commit f5915d5

Browse files
committed
fix(scheduler): filter unreclaimable reclaim victims
Signed-off-by: Erez Freiberger <enoodle@gmail.com>
1 parent 530df63 commit f5915d5

5 files changed

Lines changed: 120 additions & 0 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
1111
### Changed
1212

1313
### Fixed
14+
- Fixed reclaim abandoning valid over-quota victims when an unrelated under-deserved queue appeared earlier in victim ordering. [#1750](https://github.com/kai-scheduler/KAI-Scheduler/issues/1750)
1415

1516
## [v0.16.0] - 2026-06-24
1617

pkg/scheduler/actions/reclaim/reclaim_anchor_victim_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,13 @@ func TestReclaimAnchorVictim(t *testing.T) {
112112
DontValidateGPUGroup: true,
113113
},
114114
},
115+
Mocks: &test_utils.TestMock{
116+
CacheRequirements: &test_utils.CacheMocking{
117+
NumberOfCacheBinds: 5,
118+
NumberOfCacheEvictions: 2,
119+
NumberOfPipelineActions: 2,
120+
},
121+
},
115122
}
116123

117124
ssn := test_utils.BuildSession(topology, controller)

pkg/scheduler/plugins/proportion/proportion.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {
105105
capacityPolicy := cp.New(pp.queues, pp.minNodeGPUMemory)
106106
ssn.AddQueueOrderFn(pp.queueOrder)
107107
ssn.AddCanReclaimResourcesFn(pp.CanReclaimResourcesFn)
108+
ssn.AddReclaimVictimFilterFn(pp.reclaimVictimFilterFn)
108109
ssn.AddReclaimScenarioValidatorFn(pp.reclaimableFn)
109110
ssn.AddOnJobSolutionStartFn(pp.OnJobSolutionStartFn)
110111
ssn.AddIsNonPreemptibleJobOverQueueQuotaFns(capacityPolicy.IsNonPreemptibleJobOverQuota)
@@ -140,6 +141,13 @@ func (pp *proportionPlugin) CanReclaimResourcesFn(reclaimer *podgroup_info.PodGr
140141
return pp.reclaimablePlugin.CanReclaimResources(pp.queues, reclaimerInfo)
141142
}
142143

144+
func (pp *proportionPlugin) reclaimVictimFilterFn(
145+
reclaimer *podgroup_info.PodGroupInfo, victim *podgroup_info.PodGroupInfo,
146+
) bool {
147+
reclaimerInfo := pp.buildReclaimerInfo(reclaimer, pp.minNodeGPUMemory)
148+
return pp.reclaimablePlugin.FilterVictim(pp.queues, reclaimerInfo, victim.Queue)
149+
}
150+
143151
func (pp *proportionPlugin) reclaimableFn(
144152
scenario api.ScenarioInfo,
145153
) bool {
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Copyright 2025 NVIDIA CORPORATION
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package reclaimable
5+
6+
import (
7+
commonconstants "github.com/kai-scheduler/KAI-scheduler/pkg/common/constants"
8+
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api/common_info"
9+
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/api/resource_info"
10+
rs "github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/plugins/proportion/resource_share"
11+
"github.com/kai-scheduler/KAI-scheduler/pkg/scheduler/plugins/proportion/utils"
12+
)
13+
14+
// FilterVictim removes victims that cannot be reclaimed by the deserved-quota strategy.
15+
func (r *Reclaimable) FilterVictim(
16+
queues map[common_info.QueueID]*rs.QueueAttributes,
17+
reclaimer *ReclaimerInfo,
18+
reclaimeeQueueID common_info.QueueID,
19+
) bool {
20+
if reclaimer == nil {
21+
return true
22+
}
23+
24+
reclaimerQueue, reclaimeeQueue := r.getLeveledQueues(queues, reclaimer.Queue, reclaimeeQueueID)
25+
if reclaimerQueue == nil || reclaimeeQueue == nil {
26+
return true
27+
}
28+
29+
if !canUseGuaranteeDeservedQuotaStrategy(reclaimer, reclaimerQueue) {
30+
return true
31+
}
32+
33+
return canBeDeservedQuotaReclaimCandidate(reclaimer, reclaimeeQueue)
34+
}
35+
36+
func canUseGuaranteeDeservedQuotaStrategy(
37+
reclaimer *ReclaimerInfo, reclaimerQueue *rs.QueueAttributes,
38+
) bool {
39+
allocatedWithReclaimer := reclaimerQueue.GetAllocatedShare()
40+
allocatedWithReclaimer.Add(utils.QuantifyVector(reclaimer.RequiredResources, reclaimer.VectorMap))
41+
return allocatedWithReclaimer.LessEqual(reclaimerQueue.GetDeservedShare())
42+
}
43+
44+
func canBeDeservedQuotaReclaimCandidate(
45+
reclaimer *ReclaimerInfo, reclaimeeQueue *rs.QueueAttributes,
46+
) bool {
47+
allocated := reclaimeeQueue.GetAllocatedShare()
48+
deserved := reclaimeeQueue.GetDeservedShare()
49+
involvedResources := getInvolvedResourcesNames([]resource_info.ResourceVector{reclaimer.RequiredResources}, reclaimer.VectorMap)
50+
51+
hasUnderDeservedResource := false
52+
for resource := range involvedResources {
53+
if deserved[resource] == commonconstants.UnlimitedResourceQuantity {
54+
continue
55+
}
56+
if allocated[resource] > deserved[resource] {
57+
return true
58+
}
59+
if allocated[resource] < deserved[resource] {
60+
hasUnderDeservedResource = true
61+
}
62+
}
63+
64+
return !hasUnderDeservedResource
65+
}

pkg/scheduler/plugins/proportion/reclaimable/reclaimable_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1134,6 +1134,45 @@ var _ = Describe("Reclaimable - Multiple hierarchy levels", func() {
11341134
})
11351135
})
11361136

1137+
var _ = Describe("FilterVictim", func() {
1138+
reclaimerInfo := &ReclaimerInfo{
1139+
Queue: "reclaimer",
1140+
RequiredResources: resource_info.NewResource(0, 0, 2).ToVector(testVectorMap),
1141+
VectorMap: testVectorMap,
1142+
IsPreemptable: true,
1143+
}
1144+
1145+
It("filters victims whose leveled queue is strictly under deserved quota", func() {
1146+
reclaimable := New(1)
1147+
queues := buildQueues(map[common_info.QueueID]queuesTestData{
1148+
"reclaimer": {deserved: 4, fairShare: 4, allocated: 0},
1149+
"victim": {deserved: 4, fairShare: 4, allocated: 2},
1150+
})
1151+
1152+
Expect(reclaimable.FilterVictim(queues, reclaimerInfo, "victim")).To(BeFalse())
1153+
})
1154+
1155+
It("keeps victims whose leveled queue is over deserved quota", func() {
1156+
reclaimable := New(1)
1157+
queues := buildQueues(map[common_info.QueueID]queuesTestData{
1158+
"reclaimer": {deserved: 4, fairShare: 4, allocated: 0},
1159+
"victim": {deserved: 0, fairShare: 4, allocated: 4},
1160+
})
1161+
1162+
Expect(reclaimable.FilterVictim(queues, reclaimerInfo, "victim")).To(BeTrue())
1163+
})
1164+
1165+
It("keeps victims exactly at deserved quota for consolidation", func() {
1166+
reclaimable := New(1)
1167+
queues := buildQueues(map[common_info.QueueID]queuesTestData{
1168+
"reclaimer": {deserved: 4, fairShare: 4, allocated: 0},
1169+
"victim": {deserved: 2, fairShare: 4, allocated: 2},
1170+
})
1171+
1172+
Expect(reclaimable.FilterVictim(queues, reclaimerInfo, "victim")).To(BeTrue())
1173+
})
1174+
})
1175+
11371176
func buildQueues(queuesData map[common_info.QueueID]queuesTestData) map[common_info.QueueID]*rs.QueueAttributes {
11381177
queues := map[common_info.QueueID]*rs.QueueAttributes{}
11391178
for name, queueData := range queuesData {

0 commit comments

Comments
 (0)