Skip to content

Commit d0c1c33

Browse files
authored
[YUNIKORN-3190] Fix race condition occurring between released and preempted allocations (#1058) (#1059)
Closes: #1058 (cherry picked from commit 4027116) Signed-off-by: mani <manirajv06@gmail.com>
1 parent e295b9b commit d0c1c33

13 files changed

Lines changed: 528 additions & 113 deletions

pkg/common/errors.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,5 @@ const (
4141
PreemptionDoesNotHelp = "Preemption does not help"
4242
NoVictimForRequiredNode = "No fit on required node, preemption does not help"
4343
PreemptionMaxAttemptsExhausted = "Preemption max attempts exhausted"
44+
PreemptionVictimsReleased = "Victims picked earlier were released at the final stage"
4445
)

pkg/log/logger.go

Lines changed: 31 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -54,36 +54,37 @@ const (
5454

5555
// Defined loggers: when adding new loggers, ids must be sequential, and all must be added to the loggers slice in the same order
5656
var (
57-
Core = &LoggerHandle{id: 0, name: "core"}
58-
Test = &LoggerHandle{id: 1, name: "test"}
59-
Deprecation = &LoggerHandle{id: 2, name: "deprecation"}
60-
Config = &LoggerHandle{id: 3, name: "core.config"}
61-
Entrypoint = &LoggerHandle{id: 4, name: "core.entrypoint"}
62-
Events = &LoggerHandle{id: 5, name: "core.events"}
63-
OpenTracing = &LoggerHandle{id: 6, name: "core.opentracing"}
64-
Resources = &LoggerHandle{id: 7, name: "core.resources"}
65-
REST = &LoggerHandle{id: 8, name: "core.rest"}
66-
RMProxy = &LoggerHandle{id: 9, name: "core.rmproxy"}
67-
RPC = &LoggerHandle{id: 10, name: "core.rpc"}
68-
Metrics = &LoggerHandle{id: 11, name: "core.metrics"}
69-
Scheduler = &LoggerHandle{id: 12, name: "core.scheduler"}
70-
SchedAllocation = &LoggerHandle{id: 13, name: "core.scheduler.allocation"}
71-
SchedApplication = &LoggerHandle{id: 14, name: "core.scheduler.application"}
72-
SchedAppUsage = &LoggerHandle{id: 15, name: "core.scheduler.application.usage"}
73-
SchedContext = &LoggerHandle{id: 16, name: "core.scheduler.context"}
74-
SchedFSM = &LoggerHandle{id: 17, name: "core.scheduler.fsm"}
75-
SchedHealth = &LoggerHandle{id: 18, name: "core.scheduler.health"}
76-
SchedNode = &LoggerHandle{id: 19, name: "core.scheduler.node"}
77-
SchedPartition = &LoggerHandle{id: 20, name: "core.scheduler.partition"}
78-
SchedPreemption = &LoggerHandle{id: 21, name: "core.scheduler.preemption"}
79-
SchedQueue = &LoggerHandle{id: 22, name: "core.scheduler.queue"}
80-
SchedReservation = &LoggerHandle{id: 23, name: "core.scheduler.reservation"}
81-
SchedUGM = &LoggerHandle{id: 24, name: "core.scheduler.ugm"}
82-
SchedNodesUsage = &LoggerHandle{id: 25, name: "core.scheduler.nodesusage"}
83-
Security = &LoggerHandle{id: 26, name: "core.security"}
84-
Utils = &LoggerHandle{id: 27, name: "core.utils"}
85-
Diagnostics = &LoggerHandle{id: 28, name: "core.diagnostics"}
86-
SchedQuotaChangePreemption = &LoggerHandle{id: 29, name: "core.scheduler.preemption.quotachange"}
57+
Core = &LoggerHandle{id: 0, name: "core"}
58+
Test = &LoggerHandle{id: 1, name: "test"}
59+
Deprecation = &LoggerHandle{id: 2, name: "deprecation"}
60+
Config = &LoggerHandle{id: 3, name: "core.config"}
61+
Entrypoint = &LoggerHandle{id: 4, name: "core.entrypoint"}
62+
Events = &LoggerHandle{id: 5, name: "core.events"}
63+
OpenTracing = &LoggerHandle{id: 6, name: "core.opentracing"}
64+
Resources = &LoggerHandle{id: 7, name: "core.resources"}
65+
REST = &LoggerHandle{id: 8, name: "core.rest"}
66+
RMProxy = &LoggerHandle{id: 9, name: "core.rmproxy"}
67+
RPC = &LoggerHandle{id: 10, name: "core.rpc"}
68+
Metrics = &LoggerHandle{id: 11, name: "core.metrics"}
69+
Scheduler = &LoggerHandle{id: 12, name: "core.scheduler"}
70+
SchedAllocation = &LoggerHandle{id: 13, name: "core.scheduler.allocation"}
71+
SchedApplication = &LoggerHandle{id: 14, name: "core.scheduler.application"}
72+
SchedAppUsage = &LoggerHandle{id: 15, name: "core.scheduler.application.usage"}
73+
SchedContext = &LoggerHandle{id: 16, name: "core.scheduler.context"}
74+
SchedFSM = &LoggerHandle{id: 17, name: "core.scheduler.fsm"}
75+
SchedHealth = &LoggerHandle{id: 18, name: "core.scheduler.health"}
76+
SchedNode = &LoggerHandle{id: 19, name: "core.scheduler.node"}
77+
SchedPartition = &LoggerHandle{id: 20, name: "core.scheduler.partition"}
78+
SchedPreemption = &LoggerHandle{id: 21, name: "core.scheduler.preemption"}
79+
SchedQueue = &LoggerHandle{id: 22, name: "core.scheduler.queue"}
80+
SchedReservation = &LoggerHandle{id: 23, name: "core.scheduler.reservation"}
81+
SchedUGM = &LoggerHandle{id: 24, name: "core.scheduler.ugm"}
82+
SchedNodesUsage = &LoggerHandle{id: 25, name: "core.scheduler.nodesusage"}
83+
Security = &LoggerHandle{id: 26, name: "core.security"}
84+
Utils = &LoggerHandle{id: 27, name: "core.utils"}
85+
Diagnostics = &LoggerHandle{id: 28, name: "core.diagnostics"}
86+
SchedQuotaChangePreemption = &LoggerHandle{id: 29, name: "core.scheduler.preemption.quotachange"}
87+
SchedRequiredNodePreemption = &LoggerHandle{id: 29, name: "core.scheduler.preemption.requirednode"}
8788
)
8889

8990
// this tracks all the known logger handles, used to preallocate the real logger instances when configuration changes

pkg/scheduler/objects/allocation.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package objects
2020

2121
import (
22+
"errors"
2223
"fmt"
2324
"strconv"
2425
"time"
@@ -294,10 +295,16 @@ func (a *Allocation) IsReleased() bool {
294295
}
295296

296297
// SetReleased updates the release status of the allocation.
297-
func (a *Allocation) SetReleased(released bool) {
298+
func (a *Allocation) SetReleased(released bool) error {
298299
a.Lock()
299300
defer a.Unlock()
301+
if released {
302+
if a.preempted {
303+
return errors.New("allocation is already preempted")
304+
}
305+
}
300306
a.released = released
307+
return nil
301308
}
302309

303310
// GetTagsClone returns the copy of the tags for this allocation.
@@ -348,10 +355,21 @@ func (a *Allocation) SetAllocatedResource(allocatedResource *resources.Resource)
348355
}
349356

350357
// MarkPreempted marks the allocation as preempted.
351-
func (a *Allocation) MarkPreempted() {
358+
func (a *Allocation) MarkPreempted() error {
352359
a.Lock()
353360
defer a.Unlock()
361+
if a.released {
362+
return errors.New("allocation is already released")
363+
}
354364
a.preempted = true
365+
return nil
366+
}
367+
368+
// MarkUnPreempted unmarks the allocation as preempted.
369+
func (a *Allocation) MarkUnPreempted() {
370+
a.Lock()
371+
defer a.Unlock()
372+
a.preempted = false
355373
}
356374

357375
// IsPreempted returns whether the allocation has been marked for preemption or not.

pkg/scheduler/objects/application.go

Lines changed: 107 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -330,15 +330,30 @@ func (sa *Application) timeoutStateTimer(expectedState string, event application
330330
zap.String("state", sa.stateMachine.Current()))
331331
// if the app is completing, but there are placeholders left, first do the cleanup
332332
if sa.IsCompleting() && !resources.IsZero(sa.allocatedPlaceholder) {
333+
replacing := 0
334+
preempted := 0
333335
var toRelease []*Allocation
334336
for _, alloc := range sa.getPlaceholderAllocations() {
335337
// skip over the allocations that are already marked for release
336338
if alloc.IsReleased() {
339+
replacing++
340+
continue
341+
}
342+
err := alloc.SetReleased(true)
343+
if err != nil {
344+
log.Log(log.SchedApplication).Warn("allocation is already preempted, so skipping release process",
345+
zap.String("applicationID", sa.ApplicationID),
346+
zap.String("allocationKey", alloc.GetAllocationKey()))
347+
preempted++
337348
continue
338349
}
339-
alloc.SetReleased(true)
340350
toRelease = append(toRelease, alloc)
341351
}
352+
log.Log(log.SchedApplication).Info("application is getting timed out, releasing allocated placeholders",
353+
zap.String("AppID", sa.ApplicationID),
354+
zap.Int("replaced", replacing),
355+
zap.Int("preempted", preempted),
356+
zap.Int("releasing", len(toRelease)))
342357
sa.notifyRMAllocationReleased(toRelease, si.TerminationType_TIMEOUT, "releasing placeholders on app complete")
343358
sa.clearStateTimer()
344359
} else {
@@ -395,19 +410,28 @@ func (sa *Application) timeoutPlaceholderProcessing() {
395410
// Case 1: if all app's placeholders are allocated, only part of them gets replaced, just delete the remaining placeholders
396411
var toRelease []*Allocation
397412
replacing := 0
413+
preempted := 0
398414
for _, alloc := range sa.getPlaceholderAllocations() {
399415
// skip over the allocations that are already marked for release, they will be replaced soon
400416
if alloc.IsReleased() {
401417
replacing++
402418
continue
403419
}
404-
alloc.SetReleased(true)
420+
err := alloc.SetReleased(true)
421+
if err != nil {
422+
log.Log(log.SchedApplication).Warn("allocation is already preempted, so skipping release process",
423+
zap.String("applicationID", sa.ApplicationID),
424+
zap.String("allocationKey", alloc.GetAllocationKey()))
425+
preempted++
426+
continue
427+
}
405428
toRelease = append(toRelease, alloc)
406429
}
407430
log.Log(log.SchedApplication).Info("Placeholder timeout, releasing allocated placeholders",
408431
zap.String("AppID", sa.ApplicationID),
409-
zap.Int("placeholders being replaced", replacing),
410-
zap.Int("releasing placeholders", len(toRelease)))
432+
zap.Int("replaced", replacing),
433+
zap.Int("preempted", preempted),
434+
zap.Int("releasing", len(toRelease)))
411435
// trigger the release of the placeholders: accounting updates when the release is done
412436
sa.notifyRMAllocationReleased(toRelease, si.TerminationType_TIMEOUT, "releasing allocated placeholders on placeholder timeout")
413437
} else {
@@ -425,22 +449,38 @@ func (sa *Application) timeoutPlaceholderProcessing() {
425449
}
426450
// all allocations are placeholders release them all
427451
var toRelease, pendingRelease []*Allocation
452+
preempted := 0
428453
for _, alloc := range sa.allocations {
429-
alloc.SetReleased(true)
454+
err := alloc.SetReleased(true)
455+
if err != nil {
456+
log.Log(log.SchedApplication).Warn("allocation is already preempted, so skipping release process",
457+
zap.String("applicationID", sa.ApplicationID),
458+
zap.String("allocationKey", alloc.GetAllocationKey()))
459+
preempted++
460+
continue
461+
}
430462
toRelease = append(toRelease, alloc)
431463
}
432464
// get all open requests and remove them all filter out already allocated as they are already released
433465
for _, alloc := range sa.requests {
434466
if !alloc.IsAllocated() {
435-
alloc.SetReleased(true)
467+
err := alloc.SetReleased(true)
468+
if err != nil {
469+
log.Log(log.SchedApplication).Warn("allocation is already preempted, so skipping release process",
470+
zap.String("applicationID", sa.ApplicationID),
471+
zap.String("allocationKey", alloc.GetAllocationKey()))
472+
preempted++
473+
continue
474+
}
436475
pendingRelease = append(pendingRelease, alloc)
437476
sa.placeholderData[alloc.taskGroupName].TimedOut++
438477
}
439478
}
440479
log.Log(log.SchedApplication).Info("Placeholder timeout, releasing allocated and pending placeholders",
441480
zap.String("AppID", sa.ApplicationID),
442-
zap.Int("releasing placeholders", len(toRelease)),
443-
zap.Int("pending placeholders", len(pendingRelease)),
481+
zap.Int("releasing", len(toRelease)),
482+
zap.Int("pending", len(pendingRelease)),
483+
zap.Int("preempted", preempted),
444484
zap.String("gang scheduling style", sa.gangSchedulingStyle))
445485
sa.removeAsksInternal("", si.EventRecord_REQUEST_TIMEOUT)
446486
// trigger the release of the allocated placeholders: accounting updates when the release is done
@@ -1202,9 +1242,15 @@ func (sa *Application) tryPlaceholderAllocate(nodeIterator func() NodeIterator,
12021242
zap.String("placeholderKey", ph.GetAllocationKey()),
12031243
zap.Stringer("placeholder resource", ph.GetAllocatedResource()))
12041244
// release the placeholder and tell the RM
1205-
ph.SetReleased(true)
1206-
sa.notifyRMAllocationReleased([]*Allocation{ph}, si.TerminationType_TIMEOUT, "cancel placeholder: resource incompatible")
1207-
sa.appEvents.SendPlaceholderLargerEvent(ph.taskGroupName, sa.ApplicationID, ph.allocationKey, request.GetAllocatedResource(), ph.GetAllocatedResource())
1245+
err := ph.SetReleased(true)
1246+
if err != nil {
1247+
log.Log(log.SchedApplication).Warn("allocation is already preempted, so skipping placeholder cancellation",
1248+
zap.String("applicationID", sa.ApplicationID),
1249+
zap.String("allocationKey", ph.GetAllocationKey()))
1250+
} else {
1251+
sa.notifyRMAllocationReleased([]*Allocation{ph}, si.TerminationType_TIMEOUT, "cancel placeholder: resource incompatible")
1252+
sa.appEvents.SendPlaceholderLargerEvent(ph.taskGroupName, sa.ApplicationID, ph.allocationKey, request.GetAllocatedResource(), ph.GetAllocatedResource())
1253+
}
12081254
continue
12091255
}
12101256
// placeholder is the same or larger continue processing and difference is handled when the placeholder
@@ -1228,7 +1274,25 @@ func (sa *Application) tryPlaceholderAllocate(nodeIterator func() NodeIterator,
12281274
// placeholder point to the real one in the releases list
12291275
ph.SetRelease(request)
12301276
// mark placeholder as released
1231-
ph.SetReleased(true)
1277+
err = ph.SetReleased(true)
1278+
if err != nil {
1279+
log.Log(log.SchedApplication).Warn("allocation is already preempted, so not proceeding further and reverting to old state",
1280+
zap.String("applicationID", sa.ApplicationID),
1281+
zap.String("allocationKey", ph.GetAllocationKey()))
1282+
1283+
// revert: allocate ask
1284+
_, err = sa.deallocateAsk(request)
1285+
if err != nil {
1286+
log.Log(log.SchedApplication).Warn("deallocation of ask failed unexpectedly",
1287+
zap.Error(err))
1288+
}
1289+
// revert: double link to make it easier to find
1290+
// alloc (the real one) releases points to the placeholder in the releases list
1291+
request.ClearRelease()
1292+
// revert: placeholder point to the real one in the releases list
1293+
ph.ClearRelease()
1294+
continue
1295+
}
12321296
// bind node here so it will be handled properly upon replacement
12331297
request.SetBindTime(time.Now())
12341298
request.SetNodeID(node.NodeID)
@@ -1285,7 +1349,34 @@ func (sa *Application) tryPlaceholderAllocate(nodeIterator func() NodeIterator,
12851349
// placeholder point to the real one in the releases list
12861350
phFit.SetRelease(reqFit)
12871351
// mark placeholder as released
1288-
phFit.SetReleased(true)
1352+
err = phFit.SetReleased(true)
1353+
1354+
if err != nil {
1355+
log.Log(log.SchedApplication).Warn("allocation is already preempted, so not proceeding further and reverting to old state",
1356+
zap.String("applicationID", sa.ApplicationID),
1357+
zap.String("allocationKey", phFit.GetAllocationKey()))
1358+
1359+
// revert: node allocation
1360+
if alloc := node.RemoveAllocation(reqFit.GetAllocationKey()); alloc != nil {
1361+
log.Log(log.SchedApplication).Debug("Reverting: Node removal failed unexpectedly",
1362+
zap.String("applicationID", sa.ApplicationID),
1363+
zap.Stringer("alloc", reqFit))
1364+
}
1365+
1366+
// revert: allocate ask
1367+
_, err = sa.deallocateAsk(reqFit)
1368+
if err != nil {
1369+
log.Log(log.SchedApplication).Warn("deallocation of ask failed unexpectedly",
1370+
zap.Error(err))
1371+
}
1372+
// revert: double link to make it easier to find
1373+
// alloc (the real one) releases points to the placeholder in the releases list
1374+
reqFit.ClearRelease()
1375+
// revert: placeholder point to the real one in the releases list
1376+
phFit.ClearRelease()
1377+
return false
1378+
}
1379+
12891380
// bind node here so it will be handled properly upon replacement
12901381
reqFit.SetBindTime(time.Now())
12911382
reqFit.SetNodeID(node.NodeID)
@@ -1340,7 +1431,9 @@ func (sa *Application) tryReservedAllocate(headRoom *resources.Resource, nodeIte
13401431
// Do we need a specific node?
13411432
if ask.GetRequiredNode() != "" {
13421433
if !reserve.node.CanAllocate(ask.GetAllocatedResource()) && !ask.HasTriggeredPreemption() {
1343-
sa.tryRequiredNodePreemption(reserve, ask)
1434+
// try preemption and see if we can free up resource
1435+
preemptor := NewRequiredNodePreemptor(reserve.node, ask, sa)
1436+
preemptor.tryPreemption()
13441437
continue
13451438
}
13461439
}
@@ -1403,46 +1496,6 @@ func (sa *Application) tryPreemption(headRoom *resources.Resource, preemptionDel
14031496
return preemptor.TryPreemption()
14041497
}
14051498

1406-
func (sa *Application) tryRequiredNodePreemption(reserve *reservation, ask *Allocation) bool {
1407-
// try preemption and see if we can free up resource
1408-
preemptor := NewRequiredNodePreemptor(reserve.node, ask)
1409-
result := preemptor.filterAllocations()
1410-
preemptor.sortAllocations()
1411-
1412-
// Are there any victims/asks to preempt?
1413-
victims := preemptor.GetVictims()
1414-
if len(victims) > 0 {
1415-
log.Log(log.SchedApplication).Info("Found victims for required node preemption",
1416-
zap.String("ds allocation key", ask.GetAllocationKey()),
1417-
zap.String("allocation name", ask.GetAllocationName()),
1418-
zap.Int("no.of victims", len(victims)))
1419-
for _, victim := range victims {
1420-
if victimQueue := sa.queue.GetQueueByAppID(victim.GetApplicationID()); victimQueue != nil {
1421-
victimQueue.IncPreemptingResource(victim.GetAllocatedResource())
1422-
}
1423-
victim.MarkPreempted()
1424-
victim.SendPreemptedBySchedulerEvent(ask.GetAllocationKey(), ask.GetApplicationID(), sa.ApplicationID)
1425-
}
1426-
ask.MarkTriggeredPreemption()
1427-
sa.notifyRMAllocationReleased(victims, si.TerminationType_PREEMPTED_BY_SCHEDULER,
1428-
"preempting allocations to free up resources to run daemon set ask: "+ask.GetAllocationKey())
1429-
return true
1430-
}
1431-
ask.LogAllocationFailure(common.NoVictimForRequiredNode, true)
1432-
ask.SendRequiredNodePreemptionFailedEvent(reserve.node.NodeID)
1433-
getRateLimitedReqNodeLog().Info("no victim found for required node preemption",
1434-
zap.String("allocation key", ask.GetAllocationKey()),
1435-
zap.String("allocation name", ask.GetAllocationName()),
1436-
zap.String("node", reserve.node.NodeID),
1437-
zap.Int("total allocations", result.totalAllocations),
1438-
zap.Int("requiredNode allocations", result.requiredNodeAllocations),
1439-
zap.Int("allocations already preempted", result.alreadyPreemptedAllocations),
1440-
zap.Int("higher priority allocations", result.higherPriorityAllocations),
1441-
zap.Int("allocations with non-matching resources", result.atLeastOneResNotMatched),
1442-
zap.Int("released placeholder allocations", result.releasedPhAllocations))
1443-
return false
1444-
}
1445-
14461499
// tryNodesNoReserve tries all the nodes for a reserved request that have not been tried yet.
14471500
// This should never result in a reservation as the allocation is already reserved
14481501
func (sa *Application) tryNodesNoReserve(ask *Allocation, iterator NodeIterator, reservedNode string) *AllocationResult {
@@ -2219,13 +2272,6 @@ func getRateLimitedAppLog() *log.RateLimitedLogger {
22192272
return rateLimitedAppLog
22202273
}
22212274

2222-
func getRateLimitedReqNodeLog() *log.RateLimitedLogger {
2223-
initReqNodeLogOnce.Do(func() {
2224-
rateLimitedReqNodeLog = log.NewRateLimitedLogger(log.SchedApplication, time.Minute)
2225-
})
2226-
return rateLimitedReqNodeLog
2227-
}
2228-
22292275
func (sa *Application) updateRunnableStatus(runnableInQueue, runnableByUserLimit bool) {
22302276
sa.Lock()
22312277
defer sa.Unlock()

0 commit comments

Comments
 (0)