Skip to content

Commit 10f50b2

Browse files
authored
[YUNIKORN-3113] Resource-wise preemption (#1029) (#1060)
Introduced FitInActual() method to decide whether the ask would fit into the ask queue or not instead of using strict based methods to avoid problems due to, from the asks' resource requirement perspective, unreleated res types with negative values Introduced two more methods in Preemptor to decide ask queue is under guaranteed and victim queue is over guranateed or not from ask resource requirement perspective. Used these methods in appropriate places to simplify the decision making Added tests for the same. Closes: #1029 (cherry picked from commit 54a2a25) Signed-off-by: Wilfred Spiegelenburg <wilfreds@apache.org>
1 parent d0c1c33 commit 10f50b2

4 files changed

Lines changed: 459 additions & 52 deletions

File tree

pkg/common/resources/resources.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -446,20 +446,27 @@ func subNonNegative(left, right *Resource) (*Resource, string) {
446446
// Types not defined in resource this is called against are considered 0 for Quantity
447447
// A nil resource is treated as an empty resource (no types defined)
448448
func (r *Resource) FitIn(smaller *Resource) bool {
449-
return r.fitIn(smaller, false)
449+
return r.fitIn(smaller, false, false)
450450
}
451451

452452
// FitInMaxUndef checks if smaller fits in the defined resource
453453
// Types not defined in resource this is called against are considered the maximum value for Quantity
454454
// A nil resource is treated as an empty resource (no types defined)
455455
func (r *Resource) FitInMaxUndef(smaller *Resource) bool {
456-
return r.fitIn(smaller, true)
456+
return r.fitIn(smaller, true, false)
457+
}
458+
459+
// FitInActual checks if smaller fits in the defined resource based on the actual values. Negative values too are compared as is.
460+
// Types not defined in resource this is called against are skipped
461+
// A nil resource is treated as an empty resource (no types defined)
462+
func (r *Resource) FitInActual(smaller *Resource) bool {
463+
return r.fitIn(smaller, true, true)
457464
}
458465

459466
// Check if smaller fits in the defined resource
460-
// Negative values will be treated as 0
467+
// Negative values will be treated as 0 if actual flag is false. Otherwise, use the value as is and do the comparison later
461468
// A nil resource is treated as an empty resource, behaviour defined by skipUndef
462-
func (r *Resource) fitIn(smaller *Resource, skipUndef bool) bool {
469+
func (r *Resource) fitIn(smaller *Resource, skipUndef bool, actual bool) bool {
463470
if r == nil {
464471
r = Zero // shadows in the local function not seen by the callers.
465472
}
@@ -475,7 +482,9 @@ func (r *Resource) fitIn(smaller *Resource, skipUndef bool) bool {
475482
if skipUndef && !ok {
476483
continue
477484
}
478-
largerValue = max(0, largerValue)
485+
if !actual {
486+
largerValue = max(0, largerValue)
487+
}
479488
if v > largerValue {
480489
return false
481490
}

pkg/common/resources/resources_test.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1381,6 +1381,77 @@ func TestFitInSkip(t *testing.T) {
13811381
}
13821382
}
13831383

1384+
func TestFitInActual(t *testing.T) {
1385+
tests := []struct {
1386+
larger *Resource
1387+
smaller *Resource
1388+
expected bool
1389+
message string
1390+
}{
1391+
{
1392+
larger: NewResource(),
1393+
smaller: &Resource{Resources: map[string]Quantity{"a": 1}},
1394+
expected: true,
1395+
message: "defined resource %+v should fit in empty (skip undefined)",
1396+
},
1397+
{
1398+
larger: NewResourceFromMap(map[string]Quantity{"a": 5}),
1399+
smaller: &Resource{Resources: map[string]Quantity{"a": 1}},
1400+
expected: true,
1401+
message: "fitin smaller resource with value %+v should fit in larger %+v (skip undefined)",
1402+
},
1403+
{
1404+
smaller: &Resource{Resources: map[string]Quantity{"a": 1}},
1405+
larger: &Resource{Resources: map[string]Quantity{"not-in-smaller": 1}},
1406+
expected: true,
1407+
message: "different type in smaller %+v should fit in larger %+v (skip undefined)",
1408+
},
1409+
{
1410+
larger: &Resource{Resources: map[string]Quantity{"not-in-smaller": 1}},
1411+
smaller: &Resource{Resources: map[string]Quantity{"not-in-larger": 1}},
1412+
expected: true,
1413+
message: "different type in smaller %+v should fit in larger %+v (skip undefined)",
1414+
},
1415+
{
1416+
larger: &Resource{Resources: map[string]Quantity{"a": -10}},
1417+
smaller: &Resource{Resources: map[string]Quantity{"a": 0, "b": -10}},
1418+
expected: false,
1419+
message: "fitin smaller resource with zero or neg values %+v should not fit in larger %+v (skip undefined)",
1420+
},
1421+
{
1422+
larger: &Resource{Resources: map[string]Quantity{"a": -5}},
1423+
smaller: &Resource{Resources: map[string]Quantity{"a": 0, "b": 10}},
1424+
expected: false,
1425+
message: "fitin smaller resource with value %+v should not fit in larger %+v (skip undefined)",
1426+
},
1427+
{
1428+
larger: &Resource{Resources: map[string]Quantity{"a": -5}},
1429+
smaller: &Resource{Resources: map[string]Quantity{"a": -4, "b": 10}},
1430+
expected: false,
1431+
message: "fitin smaller resource with lesser neg value %+v should not fit in larger %+v (skip undefined)",
1432+
},
1433+
{
1434+
larger: &Resource{Resources: map[string]Quantity{"a": -5}},
1435+
smaller: &Resource{Resources: map[string]Quantity{"a": -6, "b": 10}},
1436+
expected: true,
1437+
message: "fitin smaller resource with higher neg value %+v should fit in larger %+v (skip undefined)",
1438+
},
1439+
{
1440+
larger: &Resource{Resources: map[string]Quantity{"a": -5}},
1441+
smaller: &Resource{Resources: map[string]Quantity{"a": -5, "b": 10}},
1442+
expected: true,
1443+
message: "fitin smaller resource with equal neg value %+v should fit in larger %+v (skip undefined)",
1444+
},
1445+
}
1446+
1447+
for _, tc := range tests {
1448+
t.Run(tc.message, func(t *testing.T) {
1449+
result := tc.larger.FitInActual(tc.smaller)
1450+
assert.Equal(t, result, tc.expected, tc.message, tc.smaller, tc.larger)
1451+
})
1452+
}
1453+
}
1454+
13841455
//nolint:funlen // thorough test
13851456
func TestGetFairShare(t *testing.T) {
13861457
// 0 guarantee should be treated as absence of a gurantee

pkg/scheduler/objects/preemption.go

Lines changed: 78 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -218,15 +218,20 @@ func (p *Preemptor) checkPreemptionQueueGuarantees() bool {
218218
zap.String("queuePath", p.queuePath))
219219
return false
220220
}
221-
221+
oldRemaining := currentQueue.GetRemainingGuaranteedResource()
222+
if oldRemaining != nil && oldRemaining.FitInActual(p.ask.GetAllocatedResource()) {
223+
return true
224+
}
222225
currentQueue.AddAllocation(p.ask.GetAllocatedResource())
223226

224227
// remove each allocation in turn, validating that at some point we free enough resources to allow this ask to fit
225228
for _, snapshot := range queues {
226229
for _, alloc := range snapshot.PotentialVictims {
227230
snapshot.RemoveAllocation(alloc.GetAllocatedResource())
228231
remaining := currentQueue.GetRemainingGuaranteedResource()
229-
if remaining != nil && resources.StrictlyGreaterThanOrEquals(remaining, resources.Zero) {
232+
233+
// Is all ask's res types in ask queue still under guaranteed?
234+
if remaining != nil && isAskQueueUnderGuaranteed(p.ask.GetAllocatedResource(), remaining) {
230235
return true
231236
}
232237
}
@@ -268,26 +273,24 @@ func (p *Preemptor) calculateVictimsByNode(nodeAvailable *resources.Resource, po
268273
// check to see if removing this task will keep queue above guaranteed amount; if not, skip to the next one
269274
if qv, ok := p.queueByAlloc[victim.GetAllocationKey()]; ok {
270275
if queueSnapshot, ok2 := allocationsByQueueSnap[qv.QueuePath]; ok2 {
271-
oldRemaining := queueSnapshot.GetRemainingGuaranteedResource()
276+
remaining := queueSnapshot.GetRemainingGuaranteedResource()
272277
queueSnapshot.RemoveAllocation(victim.GetAllocatedResource())
273278
preemptableResource := queueSnapshot.GetPreemptableResource()
274279

275-
// Did removing this allocation still keep the queue over-allocated?
280+
// Did removing this allocation still keep the victim queue over-allocated?
276281
// At times, over-allocation happens because of resource types in usage but not defined as guaranteed.
277-
// So, as an additional check, -ve remaining guaranteed resource before removing the victim means
278-
// some really useful victim is there.
282+
// So, as an additional check, res types used by ask should be either -ve or zero in victim queue remaining guaranteed resource to confirm
283+
// some relevant useful victim is there.
279284
// In case of victims densely populated on any specific node, checking/honouring the guaranteed quota on ask or preemptor queue
280285
// acts as early filtering layer to carry forward only the required victims.
281286
// For other cases like victims spread over multiple nodes, this doesn't add great value.
282287
if resources.StrictlyGreaterThanOrEquals(preemptableResource, resources.Zero) &&
283-
(oldRemaining == nil || resources.StrictlyGreaterThan(resources.Zero, oldRemaining)) {
284-
// add the current victim into the ask queue
285-
askQueue.AddAllocation(victim.GetAllocatedResource())
286-
askQueueNewRemaining := askQueue.GetRemainingGuaranteedResource()
287-
288-
// Did adding this allocation make the ask queue over - utilized?
289-
if askQueueNewRemaining != nil && resources.StrictlyGreaterThan(resources.Zero, askQueueNewRemaining) {
290-
askQueue.RemoveAllocation(victim.GetAllocatedResource())
288+
(remaining == nil || isVictimQueueOverGuaranteed(p.ask.GetAllocatedResource(), remaining)) {
289+
// Does victimQueue have space equivalent to the resource used by the victim?
290+
askQueueRemaining := askQueue.GetRemainingGuaranteedResource()
291+
if askQueueRemaining != nil && askQueueRemaining.FitInActual(victim.GetAllocatedResource()) {
292+
askQueue.AddAllocation(victim.GetAllocatedResource())
293+
} else {
291294
queueSnapshot.AddAllocation(victim.GetAllocatedResource())
292295
break
293296
}
@@ -340,17 +343,17 @@ func (p *Preemptor) calculateVictimsByNode(nodeAvailable *resources.Resource, po
340343
// check to see if removing this task will keep queue above guaranteed amount; if not, skip to the next one
341344
if qv, ok := p.queueByAlloc[victim.GetAllocationKey()]; ok {
342345
if queueSnapshot, ok2 := allocationsByQueueSnap[qv.QueuePath]; ok2 {
343-
oldRemaining := queueSnapshot.GetRemainingGuaranteedResource()
346+
remaining := queueSnapshot.GetRemainingGuaranteedResource()
344347
queueSnapshot.RemoveAllocation(victim.GetAllocatedResource())
345348
preemptableResource := queueSnapshot.GetPreemptableResource()
346349

347-
// Did removing this allocation still keep the queue over-allocated?
350+
// Did removing this allocation still keep the victim queue over-allocated?
348351
// At times, over-allocation happens because of resource types in usage but not defined as guaranteed.
349-
// So, as an additional check, -ve remaining guaranteed resource before removing the victim means
350-
// some really useful victim is there.
352+
// So, as an additional check, res types used by ask should be either -ve or zero in victim queue remaining guaranteed resource to confirm
353+
// some relevant useful victim is there.
351354
// Similar checks could be added even on the ask or preemptor queue to prevent being over utilized.
352355
if resources.StrictlyGreaterThanOrEquals(preemptableResource, resources.Zero) &&
353-
(oldRemaining == nil || resources.StrictlyGreaterThan(resources.Zero, oldRemaining)) {
356+
(remaining == nil || isVictimQueueOverGuaranteed(p.ask.GetAllocatedResource(), remaining)) {
354357
// removing task does not violate queue constraints, adjust queue and node
355358
nodeCurrentAvailable.AddTo(victim.GetAllocatedResource())
356359
// check if ask now fits and we haven't had this happen before
@@ -499,29 +502,28 @@ func (p *Preemptor) calculateAdditionalVictims(nodeVictims []*Allocation) ([]*Al
499502
// check to see if removing this task will keep queue above guaranteed amount; if not, skip to the next one
500503
if qv, ok := p.queueByAlloc[victim.GetAllocationKey()]; ok {
501504
if queueSnapshot, ok2 := allocationsByQueueSnap[qv.QueuePath]; ok2 {
502-
oldRemaining := queueSnapshot.GetRemainingGuaranteedResource()
505+
remaining := queueSnapshot.GetRemainingGuaranteedResource()
503506
queueSnapshot.RemoveAllocation(victim.GetAllocatedResource())
504507

505-
// Did removing this allocation still keep the queue over-allocated?
508+
// Did removing this allocation still keep the victim queue over-allocated?
506509
// At times, over-allocation happens because of resource types in usage but not defined as guaranteed.
507-
// So, as an additional check, -ve remaining guaranteed resource before removing the victim means
508-
// some really useful victim is there.
510+
// So, as an additional check, res types used by ask should be either -ve or zero in victim queue remaining guaranteed resource to confirm
511+
// some relevant useful victim is there.
509512
preemptableResource := queueSnapshot.GetPreemptableResource()
510513
if resources.StrictlyGreaterThanOrEquals(preemptableResource, resources.Zero) &&
511-
(oldRemaining == nil || resources.StrictlyGreaterThan(resources.Zero, oldRemaining)) {
512-
askQueueRemainingAfterVictimRemoval := askQueue.GetRemainingGuaranteedResource()
513-
514-
// add the current victim into the ask queue
515-
askQueue.AddAllocation(victim.GetAllocatedResource())
516-
askQueueNewRemaining := askQueue.GetRemainingGuaranteedResource()
517-
// Did adding this allocation make the ask queue over - utilized?
518-
if askQueueNewRemaining != nil && resources.StrictlyGreaterThan(resources.Zero, askQueueNewRemaining) {
519-
askQueue.RemoveAllocation(victim.GetAllocatedResource())
514+
(remaining == nil || isVictimQueueOverGuaranteed(p.ask.GetAllocatedResource(), remaining)) {
515+
// Does victimQueue have space equivalent to the resource used by the victim?
516+
askQueueRemaining := askQueue.GetRemainingGuaranteedResource()
517+
if askQueueRemaining != nil && askQueueRemaining.FitInActual(victim.GetAllocatedResource()) {
518+
askQueue.AddAllocation(victim.GetAllocatedResource())
519+
} else {
520520
queueSnapshot.AddAllocation(victim.GetAllocatedResource())
521521
break
522522
}
523+
askQueueNewRemaining := askQueue.GetRemainingGuaranteedResource()
524+
523525
// check to see if the shortfall on the queue has changed
524-
if !resources.EqualsOrEmpty(askQueueRemainingAfterVictimRemoval, askQueueNewRemaining) {
526+
if !resources.EqualsOrEmpty(askQueueRemaining, askQueueNewRemaining) {
525527
// remaining capacity changed, so we should keep this task
526528
victims = append(victims, victim)
527529
} else {
@@ -536,12 +538,17 @@ func (p *Preemptor) calculateAdditionalVictims(nodeVictims []*Allocation) ([]*Al
536538
}
537539
}
538540
}
539-
// At last, did the ask queue usage under or equals guaranteed quota?
540-
finalRemainingRes := askQueue.GetRemainingGuaranteedResource()
541-
if finalRemainingRes != nil && resources.StrictlyGreaterThanOrEquals(finalRemainingRes, resources.Zero) {
542-
return victims, true
541+
542+
// At last, did the ask queue usage under or equals guaranteed quota after finding the additional victims?
543+
if len(victims) > 0 {
544+
finalRemainingRes := askQueue.GetRemainingGuaranteedResource()
545+
if finalRemainingRes != nil && isAskQueueUnderGuaranteed(p.ask.GetAllocatedResource(), finalRemainingRes) {
546+
return victims, true
547+
} else {
548+
return victims, false
549+
}
543550
}
544-
return nil, false
551+
return nil, true
545552
}
546553

547554
// tryNodes attempts to find potential nodes for scheduling. For each node, potential victims are passed to
@@ -678,7 +685,7 @@ func (p *Preemptor) TryPreemption() (*AllocationResult, bool) {
678685
log.Log(log.SchedPreemption).Info("Preempting task",
679686
zap.String("askApplicationID", p.ask.applicationID),
680687
zap.String("askAllocationKey", p.ask.allocationKey),
681-
zap.String("askQueue", p.queue.Name),
688+
zap.String("victimQueue", p.queue.Name),
682689
zap.String("victimApplicationID", victim.GetApplicationID()),
683690
zap.String("victimAllocationKey", victim.GetAllocationKey()),
684691
zap.Stringer("victimAllocatedResource", victim.GetAllocatedResource()),
@@ -705,7 +712,8 @@ func (p *Preemptor) TryPreemption() (*AllocationResult, bool) {
705712
log.Log(log.SchedPreemption).Info("Reserving node for ask after preemption",
706713
zap.String("allocationKey", p.ask.GetAllocationKey()),
707714
zap.String("nodeID", nodeID),
708-
zap.Int("victimCount", len(victims)))
715+
zap.Int("collected victim count", len(victims)),
716+
zap.Int("preempted victim count", len(finalVictims)))
709717
return newReservedAllocationResult(nodeID, p.ask), true
710718
}
711719

@@ -914,3 +922,33 @@ func batchPreemptionChecks(checks []*si.PreemptionPredicatesArgs, batchSize int)
914922
}
915923
return result
916924
}
925+
926+
// isAskQueueUnderGuaranteed Is Ask Queue (not in general sense) under guaranteed purely based on the ask's resource requirement?
927+
// Traverse each ask's res type, confirm its existence in ask queue and check whether it has -ve or not.
928+
// -ve value means over guaranteed, return false to confirm the same
929+
// For all other cases (even if non-matching res type has -ve value), return true
930+
func isAskQueueUnderGuaranteed(askResource *resources.Resource, askQueue *resources.Resource) bool {
931+
for resType := range askResource.Resources {
932+
if val, ok := askQueue.Resources[resType]; ok {
933+
if val < 0 {
934+
return false
935+
}
936+
}
937+
}
938+
return true
939+
}
940+
941+
// isVictimQueueOverGuaranteed Is Victim Queue (not in general sense) over guaranteed purely based on the ask's resource requirement?
942+
// Traverse each ask's res type, confirm its existence in victim queue and check whether it has -ve or not.
943+
// -ve value means over guaranteed, return true to confirm the same
944+
// For all other cases (even if non-matching res type has -ve value), return false
945+
func isVictimQueueOverGuaranteed(askResource *resources.Resource, victimQueue *resources.Resource) bool {
946+
for resType := range askResource.Resources {
947+
if val, ok := victimQueue.Resources[resType]; ok {
948+
if val < 0 {
949+
return true
950+
}
951+
}
952+
}
953+
return false
954+
}

0 commit comments

Comments
 (0)