Skip to content

Commit 79e48b6

Browse files
committed
scheduler: reschedule tracker dropped if follow-up fails placement
In #12319 we attempted to fix a bug where the reschedule tracker could be dropped if the scheduler couldn't place the replacement allocation. This would result in the eventual replacement being reschedulable more than the job author's policy intended. In #5602 we introduced plan normalization, where we reduced the size of the required Raft log entry for plan apply by dropping the job spec from the plan. This required backwards compatibility shims that we intended to remove in Nomad 0.11. It's been long impossible to upgrade to any currently supported version of Nomad from that far back, so I attempted to remove these backwards compatibility shims. But in doing so, this uncovered that the #12319 fix was incorrect. The scheduler test harness used the old code paths, which did not normalize the plans. With normalized plans, we end up dropping the reschedule tracker. This changeset fixes the bug by ensuring that a rescheduled allocation that cannot be placed is not marked with `DesiredStatus: stop`, to match the behavior we see when an evaluation fires before the `reschedule.delay` window expires. This ensures that the plan applier doesn't clear the reschedule tracker because the allocation is terminal. I've also removed the backwards compatibility shims and version checks for plan normalization, and fixed a few test incorrect assertions revealed by the fix. Ref: #12319 Ref: #5602
1 parent 121a33a commit 79e48b6

17 files changed

+197
-333
lines changed

.changelog/27129.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:bug
2+
scheduler: Fixed a bug that was previously patched incorrectly where rescheduled allocations that could not be placed would later ignore their reschedule policy limits
3+
```

nomad/acl_test.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -152,10 +152,8 @@ func TestAuthenticate_mTLS(t *testing.T) {
152152
must.NoError(t, err, must.Sprint("could not sign claims"))
153153

154154
planReq := &structs.ApplyPlanResultsRequest{
155-
AllocUpdateRequest: structs.AllocUpdateRequest{
156-
Alloc: []*structs.Allocation{alloc1, alloc2},
157-
Job: job,
158-
},
155+
AllocsUpdated: []*structs.Allocation{alloc1, alloc2},
156+
Job: job,
159157
}
160158
_, _, err = leader.raftApply(structs.ApplyPlanResultsRequestType, planReq)
161159
must.NoError(t, err)

nomad/eval_endpoint_test.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -397,11 +397,9 @@ func TestEvalEndpoint_Dequeue_UpdateWaitIndex(t *testing.T) {
397397

398398
// Create a plan result and apply it with a later index
399399
res := structs.ApplyPlanResultsRequest{
400-
AllocUpdateRequest: structs.AllocUpdateRequest{
401-
Alloc: []*structs.Allocation{alloc},
402-
Job: job,
403-
},
404-
EvalID: eval.ID,
400+
AllocsUpdated: []*structs.Allocation{alloc},
401+
Job: job,
402+
EvalID: eval.ID,
405403
}
406404
assert := assert.New(t)
407405
err := state.UpsertPlanResults(structs.MsgTypeTestSetup, 1000, &res)

nomad/fsm.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -989,7 +989,7 @@ func (n *nomadFSM) applyAllocClientUpdate(msgType structs.MessageType, buf []byt
989989
return err
990990
}
991991

992-
// Update any evals
992+
// Update any evals that were added by the RPC handler
993993
if len(req.Evals) > 0 {
994994
if err := n.upsertEvals(msgType, index, req.Evals); err != nil {
995995
n.logger.Error("applyAllocClientUpdate failed to update evaluations", "error", err)

nomad/fsm_test.go

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1635,13 +1635,12 @@ func TestFSM_ApplyPlanResults(t *testing.T) {
16351635
eval2.JobID = job2.ID
16361636

16371637
req := structs.ApplyPlanResultsRequest{
1638-
AllocUpdateRequest: structs.AllocUpdateRequest{
1639-
Job: job,
1640-
Alloc: []*structs.Allocation{alloc},
1641-
},
1642-
Deployment: d,
1643-
EvalID: eval.ID,
1644-
NodePreemptions: []*structs.Allocation{alloc1, alloc2},
1638+
Job: job,
1639+
AllocsUpdated: []*structs.Allocation{alloc},
1640+
Deployment: d,
1641+
EvalID: eval.ID,
1642+
AllocsPreempted: []*structs.AllocationDiff{
1643+
alloc1.AllocationDiff(), alloc2.AllocationDiff()},
16451644
PreemptionEvals: []*structs.Evaluation{eval1, eval2},
16461645
}
16471646
buf, err := structs.Encode(structs.ApplyPlanResultsRequestType, req)
@@ -1703,11 +1702,9 @@ func TestFSM_ApplyPlanResults(t *testing.T) {
17031702
evictAlloc.Job = nil
17041703
evictAlloc.DesiredStatus = structs.AllocDesiredStatusEvict
17051704
req2 := structs.ApplyPlanResultsRequest{
1706-
AllocUpdateRequest: structs.AllocUpdateRequest{
1707-
Job: job,
1708-
Alloc: []*structs.Allocation{evictAlloc},
1709-
},
1710-
EvalID: eval.ID,
1705+
Job: job,
1706+
AllocsUpdated: []*structs.Allocation{evictAlloc},
1707+
EvalID: eval.ID,
17111708
}
17121709
buf, err = structs.Encode(structs.ApplyPlanResultsRequestType, req2)
17131710
assert.Nil(err)

nomad/plan_apply.go

Lines changed: 26 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -244,9 +244,7 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap
244244

245245
// Setup the update request
246246
req := structs.ApplyPlanResultsRequest{
247-
AllocUpdateRequest: structs.AllocUpdateRequest{
248-
Job: plan.Job,
249-
},
247+
Job: plan.Job,
250248
Deployment: result.Deployment,
251249
DeploymentUpdates: result.DeploymentUpdates,
252250
IneligibleNodes: result.IneligibleNodes,
@@ -256,74 +254,38 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap
256254

257255
preemptedJobIDs := make(map[structs.NamespacedID]struct{})
258256

259-
if p.srv.peersCache.ServersMeetMinimumVersion(p.srv.Region(), MinVersionPlanNormalization, true) {
260-
// Initialize the allocs request using the new optimized log entry format.
261-
// Determine the minimum number of updates, could be more if there
262-
// are multiple updates per node
263-
req.AllocsStopped = make([]*structs.AllocationDiff, 0, len(result.NodeUpdate))
264-
req.AllocsUpdated = make([]*structs.Allocation, 0, len(result.NodeAllocation))
265-
req.AllocsPreempted = make([]*structs.AllocationDiff, 0, len(result.NodePreemptions))
266-
267-
for _, updateList := range result.NodeUpdate {
268-
for _, stoppedAlloc := range updateList {
269-
req.AllocsStopped = append(req.AllocsStopped, normalizeStoppedAlloc(stoppedAlloc, unixNow))
270-
}
271-
}
272-
273-
for _, allocList := range result.NodeAllocation {
274-
req.AllocsUpdated = append(req.AllocsUpdated, allocList...)
275-
}
276-
277-
// Set the time the alloc was applied for the first time. This can be used
278-
// to approximate the scheduling time.
279-
updateAllocTimestamps(req.AllocsUpdated, unixNow)
257+
// Initialize the allocs request using the new optimized log entry format.
258+
// Determine the minimum number of updates, could be more if there
259+
// are multiple updates per node
260+
req.AllocsStopped = make([]*structs.AllocationDiff, 0, len(result.NodeUpdate))
261+
req.AllocsUpdated = make([]*structs.Allocation, 0, len(result.NodeAllocation))
262+
req.AllocsPreempted = make([]*structs.AllocationDiff, 0, len(result.NodePreemptions))
280263

281-
err := signAllocIdentities(p.srv.encrypter, plan.Job, req.AllocsUpdated, now)
282-
if err != nil {
283-
return nil, err
284-
}
285-
286-
for _, preemptions := range result.NodePreemptions {
287-
for _, preemptedAlloc := range preemptions {
288-
req.AllocsPreempted = append(req.AllocsPreempted, normalizePreemptedAlloc(preemptedAlloc, unixNow))
289-
290-
// Gather jobids to create follow up evals
291-
appendNamespacedJobID(preemptedJobIDs, preemptedAlloc)
292-
}
264+
for _, updateList := range result.NodeUpdate {
265+
for _, stoppedAlloc := range updateList {
266+
req.AllocsStopped = append(req.AllocsStopped, normalizeStoppedAlloc(stoppedAlloc, unixNow))
293267
}
294-
} else {
295-
// COMPAT 0.11: This branch is deprecated and will only be used to support
296-
// application of older log entries. Expected to be removed in a future version.
297-
298-
// Determine the minimum number of updates, could be more if there
299-
// are multiple updates per node
300-
minUpdates := len(result.NodeUpdate)
301-
minUpdates += len(result.NodeAllocation)
268+
}
302269

303-
// Initialize using the older log entry format for Alloc and NodePreemptions
304-
req.Alloc = make([]*structs.Allocation, 0, minUpdates)
305-
req.NodePreemptions = make([]*structs.Allocation, 0, len(result.NodePreemptions))
270+
for _, allocList := range result.NodeAllocation {
271+
req.AllocsUpdated = append(req.AllocsUpdated, allocList...)
272+
}
306273

307-
for _, updateList := range result.NodeUpdate {
308-
req.Alloc = append(req.Alloc, updateList...)
309-
}
310-
for _, allocList := range result.NodeAllocation {
311-
req.Alloc = append(req.Alloc, allocList...)
312-
}
274+
// Set the time the alloc was applied for the first time. This can be used
275+
// to approximate the scheduling time.
276+
updateAllocTimestamps(req.AllocsUpdated, unixNow)
313277

314-
for _, preemptions := range result.NodePreemptions {
315-
req.NodePreemptions = append(req.NodePreemptions, preemptions...)
316-
}
278+
err := signAllocIdentities(p.srv.encrypter, plan.Job, req.AllocsUpdated, now)
279+
if err != nil {
280+
return nil, err
281+
}
317282

318-
// Set the time the alloc was applied for the first time. This can be used
319-
// to approximate the scheduling time.
320-
updateAllocTimestamps(req.Alloc, unixNow)
283+
for _, preemptions := range result.NodePreemptions {
284+
for _, preemptedAlloc := range preemptions {
285+
req.AllocsPreempted = append(req.AllocsPreempted, normalizePreemptedAlloc(preemptedAlloc, unixNow))
321286

322-
// Set modify time for preempted allocs if any
323-
// Also gather jobids to create follow up evals
324-
for _, alloc := range req.NodePreemptions {
325-
alloc.ModifyTime = unixNow
326-
appendNamespacedJobID(preemptedJobIDs, alloc)
287+
// Gather jobids to create follow up evals
288+
appendNamespacedJobID(preemptedJobIDs, preemptedAlloc)
327289
}
328290
}
329291

nomad/plan_normalization_test.go

Lines changed: 0 additions & 80 deletions
This file was deleted.

nomad/state/events_test.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -505,12 +505,10 @@ func TestEventsFromChanges_ApplyPlanResultsRequestType(t *testing.T) {
505505

506506
msgType := structs.ApplyPlanResultsRequestType
507507
req := &structs.ApplyPlanResultsRequest{
508-
AllocUpdateRequest: structs.AllocUpdateRequest{
509-
Alloc: []*structs.Allocation{alloc, alloc2},
510-
Job: job,
511-
},
512-
Deployment: d,
513-
EvalID: eval.ID,
508+
AllocsUpdated: []*structs.Allocation{alloc, alloc2},
509+
Job: job,
510+
Deployment: d,
511+
EvalID: eval.ID,
514512
}
515513

516514
must.NoError(t, s.UpsertPlanResults(msgType, 100, req))

nomad/state/state_store.go

Lines changed: 9 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -374,12 +374,6 @@ func (s *StateStore) UpsertPlanResults(msgType structs.MessageType, index uint64
374374
return err
375375
}
376376

377-
// COMPAT 0.11: Remove this denormalization when NodePreemptions is removed
378-
results.NodePreemptions, err = snapshot.DenormalizeAllocationSlice(results.NodePreemptions)
379-
if err != nil {
380-
return err
381-
}
382-
383377
txn := s.db.WriteTxnMsgT(msgType, index)
384378
defer txn.Abort()
385379

@@ -418,29 +412,17 @@ func (s *StateStore) UpsertPlanResults(msgType structs.MessageType, index uint64
418412
}
419413

420414
numAllocs := 0
421-
if len(results.Alloc) > 0 || len(results.NodePreemptions) > 0 {
422-
// COMPAT 0.11: This branch will be removed, when Alloc is removed
423-
// Attach the job to all the allocations. It is pulled out in the payload to
424-
// avoid the redundancy of encoding, but should be denormalized prior to
425-
// being inserted into MemDB.
426-
addComputedAllocAttrs(results.Alloc, results.Job)
427-
numAllocs = len(results.Alloc) + len(results.NodePreemptions)
428-
} else {
429-
// Attach the job to all the allocations. It is pulled out in the payload to
430-
// avoid the redundancy of encoding, but should be denormalized prior to
431-
// being inserted into MemDB.
432-
addComputedAllocAttrs(results.AllocsUpdated, results.Job)
433-
numAllocs = len(allocsStopped) + len(results.AllocsUpdated) + len(allocsPreempted)
434-
}
435415

436-
allocsToUpsert := make([]*structs.Allocation, 0, numAllocs)
437-
438-
// COMPAT 0.11: Both these appends should be removed when Alloc and NodePreemptions are removed
439-
allocsToUpsert = append(allocsToUpsert, results.Alloc...)
440-
allocsToUpsert = append(allocsToUpsert, results.NodePreemptions...)
416+
// Attach the job to all the allocations. It is pulled out in the payload to
417+
// avoid the redundancy of encoding, but should be denormalized prior to
418+
// being inserted into MemDB.
419+
addComputedAllocAttrs(results.AllocsUpdated, results.Job)
420+
addComputedAllocAttrs(allocsStopped, results.Job)
421+
numAllocs = len(allocsStopped) + len(results.AllocsUpdated) + len(allocsPreempted)
441422

442-
allocsToUpsert = append(allocsToUpsert, allocsStopped...)
423+
allocsToUpsert := make([]*structs.Allocation, 0, numAllocs)
443424
allocsToUpsert = append(allocsToUpsert, results.AllocsUpdated...)
425+
allocsToUpsert = append(allocsToUpsert, allocsStopped...)
444426
allocsToUpsert = append(allocsToUpsert, allocsPreempted...)
445427

446428
// handle upgrade path
@@ -7376,6 +7358,7 @@ func (s *StateSnapshot) DenormalizeAllocationDiffSlice(allocDiffs []*structs.All
73767358
// If alloc is a stopped alloc
73777359
allocCopy.DesiredDescription = allocDiff.DesiredDescription
73787360
allocCopy.DesiredStatus = structs.AllocDesiredStatusStop
7361+
allocCopy.AllocStates = allocDiff.AllocStates
73797362
if allocDiff.ClientStatus != "" {
73807363
allocCopy.ClientStatus = allocDiff.ClientStatus
73817364
}

nomad/state/state_store_service_registration_test.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -695,9 +695,7 @@ func TestAlloc_ServiceRegistrationLifecycle(t *testing.T) {
695695
index++
696696
must.NoError(t, store.UpsertPlanResults(structs.MsgTypeTestSetup, index,
697697
&structs.ApplyPlanResultsRequest{
698-
AllocUpdateRequest: structs.AllocUpdateRequest{
699-
AllocsStopped: []*structs.AllocationDiff{&diff},
700-
},
698+
AllocsStopped: []*structs.AllocationDiff{&diff},
701699
}))
702700

703701
iter, err = store.GetServiceRegistrationsByAllocID(nil, alloc1.ID)

0 commit comments

Comments
 (0)