Skip to content

Commit e295b9b

Browse files
manirajv06pbacsko
authored andcommitted
[YUNIKORN-3182] Handle queues when some other queue in the same hierarchy in going through preemption (#1056)
Closes: #1056 Signed-off-by: Peter Bacsko <pbacsko@cloudera.com> (cherry picked from commit 2a0db10)
1 parent 783cfbd commit e295b9b

4 files changed

Lines changed: 95 additions & 10 deletions

File tree

pkg/scheduler/objects/queue.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2196,12 +2196,41 @@ func (sq *Queue) MarkQuotaChangePreemptionRunning(run bool) {
21962196
sq.isQuotaChangePreemptionRunning = run
21972197
}
21982198

2199-
func (sq *Queue) IsQuotaChangePreemptionRunning() bool {
2199+
func (sq *Queue) isQCPreemptionRunningForParent() bool {
2200+
if sq == nil {
2201+
return false
2202+
}
2203+
if sq.parent != nil {
2204+
if sq.parent.isQCPreemptionRunningForParent() {
2205+
return true
2206+
}
2207+
}
2208+
sq.RLock()
2209+
defer sq.RUnlock()
2210+
return sq.isQuotaChangePreemptionRunning
2211+
}
2212+
2213+
func (sq *Queue) isQCPreemptionRunningForChild() bool {
2214+
for _, child := range sq.GetCopyOfChildren() {
2215+
if child.isQCPreemptionRunningForChild() {
2216+
return true
2217+
}
2218+
}
22002219
sq.RLock()
22012220
defer sq.RUnlock()
22022221
return sq.isQuotaChangePreemptionRunning
22032222
}
22042223

2224+
func (sq *Queue) IsQCPreemptionRunning() bool {
2225+
if sq.isQCPreemptionRunningForParent() {
2226+
return true
2227+
}
2228+
if sq.isQCPreemptionRunningForChild() {
2229+
return true
2230+
}
2231+
return false
2232+
}
2233+
22052234
func (sq *Queue) GetMaxAppUnschedAskBackoff() uint64 {
22062235
sq.RLock()
22072236
defer sq.RUnlock()

pkg/scheduler/objects/queue_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3142,3 +3142,56 @@ func TestQueueBackoffProperties(t *testing.T) {
31423142
assert.Equal(t, uint64(0), leaf3.GetMaxAppUnschedAskBackoff())
31433143
assert.Equal(t, 30*time.Second, leaf3.GetBackoffDelay())
31443144
}
3145+
3146+
func TestQueue_IsQCPreemptionRunning(t *testing.T) {
3147+
// create the root
3148+
root, err := createManagedQueueMaxApps(nil, "root", true, nil, 1)
3149+
assert.NilError(t, err, "queue create failed")
3150+
parent, err := createManagedQueue(root, "parent", true, nil)
3151+
assert.NilError(t, err, "failed to create parent queue")
3152+
3153+
var leaf, leaf2, leaf11, leaf111 *Queue
3154+
leaf, err = createManagedQueue(parent, "leaf", true, nil)
3155+
assert.NilError(t, err, "failed to create leaf queue")
3156+
leaf2, err = createManagedQueue(parent, "leaf2", false, nil)
3157+
assert.NilError(t, err, "failed to create leaf2 queue")
3158+
3159+
leaf11, err = createManagedQueue(leaf, "leaf11", true, nil)
3160+
assert.NilError(t, err, "failed to create leaf11 queue")
3161+
3162+
leaf111, err = createManagedQueue(leaf11, "leaf111", false, nil)
3163+
assert.NilError(t, err, "failed to create leaf111 queue")
3164+
3165+
// root.parent is running. any queue located in this hierarchy (both upwards and downwards) should return true. All branches of parent should return true.
3166+
parent.isQuotaChangePreemptionRunning = true
3167+
assert.Equal(t, parent.IsQCPreemptionRunning(), true)
3168+
assert.Equal(t, root.IsQCPreemptionRunning(), true)
3169+
assert.Equal(t, leaf111.IsQCPreemptionRunning(), true)
3170+
assert.Equal(t, leaf11.IsQCPreemptionRunning(), true)
3171+
assert.Equal(t, leaf.IsQCPreemptionRunning(), true)
3172+
assert.Equal(t, leaf2.IsQCPreemptionRunning(), true)
3173+
3174+
// reset
3175+
parent.isQuotaChangePreemptionRunning = false
3176+
3177+
// root.parent.leaf111 (leaf queue) is running. any queue located in this hierarchy (upwards) should return true. Other branches of parent should return false.
3178+
leaf111.isQuotaChangePreemptionRunning = true
3179+
assert.Equal(t, parent.IsQCPreemptionRunning(), true)
3180+
assert.Equal(t, root.IsQCPreemptionRunning(), true)
3181+
assert.Equal(t, leaf111.IsQCPreemptionRunning(), true)
3182+
assert.Equal(t, leaf11.IsQCPreemptionRunning(), true)
3183+
assert.Equal(t, leaf.IsQCPreemptionRunning(), true)
3184+
assert.Equal(t, leaf2.IsQCPreemptionRunning(), false)
3185+
3186+
// reset
3187+
leaf111.isQuotaChangePreemptionRunning = false
3188+
3189+
// root.parent.leaf2 (leaf queue) is running. any queue located in this hierarchy (upwards) should return true. Other branches of parent should return false.
3190+
leaf2.isQuotaChangePreemptionRunning = true
3191+
assert.Equal(t, parent.IsQCPreemptionRunning(), true)
3192+
assert.Equal(t, root.IsQCPreemptionRunning(), true)
3193+
assert.Equal(t, leaf111.IsQCPreemptionRunning(), false)
3194+
assert.Equal(t, leaf11.IsQCPreemptionRunning(), false)
3195+
assert.Equal(t, leaf.IsQCPreemptionRunning(), false)
3196+
assert.Equal(t, leaf2.IsQCPreemptionRunning(), true)
3197+
}

pkg/scheduler/objects/quota_change_preemptor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func NewQuotaChangePreemptor(queue *Queue) *QuotaChangePreemptionContext {
5151
}
5252

5353
func (qcp *QuotaChangePreemptionContext) CheckPreconditions() bool {
54-
if !qcp.queue.IsManaged() || qcp.queue.IsQuotaChangePreemptionRunning() {
54+
if !qcp.queue.IsManaged() || qcp.queue.IsQCPreemptionRunning() {
5555
return false
5656
}
5757
if qcp.maxResource.StrictlyGreaterThanOrEqualsOnlyExisting(qcp.queue.GetAllocatedResource()) {

pkg/scheduler/objects/quota_change_preemptor_test.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ func TestQuotaChangeCheckPreconditions(t *testing.T) {
6363
Resources: leafRes,
6464
}, parent, false, nil)
6565
assert.NilError(t, err)
66-
alreadyPreemptionRunning.MarkQuotaChangePreemptionRunning(true)
6766

6867
usageExceededMaxQueue, err := NewConfiguredQueue(configs.QueueConfig{
6968
Name: "leaf-usage-exceeded-max",
@@ -91,22 +90,26 @@ func TestQuotaChangeCheckPreconditions(t *testing.T) {
9190
testCases := []struct {
9291
name string
9392
queue *Queue
93+
preemptionRunning bool
9494
preconditionResult bool
9595
}{
96-
{"parent queue", parent, true},
97-
{"leaf queue", leaf, false},
98-
{"dynamic leaf queue", dynamicLeaf, false},
99-
{"leaf queue, already preemption process started or running", alreadyPreemptionRunning, false},
100-
{"leaf queue, usage exceeded max resources", usageExceededMaxQueue, true},
101-
{"leaf queue, usage equals max resources", usageEqualsMaxQueue, false},
102-
{"leaf queue, usage res not matching max resources", usageNotMatchingMaxQueue, false},
96+
{"parent queue", parent, false, true},
97+
{"leaf queue", leaf, false, false},
98+
{"dynamic leaf queue", dynamicLeaf, false, false},
99+
{"leaf queue, usage exceeded max resources", usageExceededMaxQueue, false, true},
100+
{"leaf queue, usage equals max resources", usageEqualsMaxQueue, false, false},
101+
{"leaf queue, already preemption process started or running", alreadyPreemptionRunning, true, false},
103102
}
104103
for _, tc := range testCases {
105104
t.Run(tc.name, func(t *testing.T) {
105+
tc.queue.MarkQuotaChangePreemptionRunning(tc.preemptionRunning)
106106
preemptor := NewQuotaChangePreemptor(tc.queue)
107107
assert.Equal(t, preemptor.CheckPreconditions(), tc.preconditionResult)
108108
})
109109
}
110+
// Since parent's leaf queue "leaf-already-preemption-running" is running, parent preconditions passed earlier should fail now
111+
preemptor := NewQuotaChangePreemptor(parent)
112+
assert.Equal(t, preemptor.CheckPreconditions(), false)
110113
}
111114

112115
func TestQuotaChangeGetPreemptableResource(t *testing.T) {

0 commit comments

Comments
 (0)