Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/27129.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
scheduler: Fixed a bug that was previously patched incorrectly where rescheduled allocations that could not be placed would later ignore their reschedule policy limits
```
6 changes: 2 additions & 4 deletions nomad/acl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,8 @@ func TestAuthenticate_mTLS(t *testing.T) {
must.NoError(t, err, must.Sprint("could not sign claims"))

planReq := &structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{alloc1, alloc2},
Job: job,
},
AllocsUpdated: []*structs.Allocation{alloc1, alloc2},
Job: job,
}
_, _, err = leader.raftApply(structs.ApplyPlanResultsRequestType, planReq)
must.NoError(t, err)
Expand Down
8 changes: 3 additions & 5 deletions nomad/eval_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,11 +397,9 @@ func TestEvalEndpoint_Dequeue_UpdateWaitIndex(t *testing.T) {

// Create a plan result and apply it with a later index
res := structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{alloc},
Job: job,
},
EvalID: eval.ID,
AllocsUpdated: []*structs.Allocation{alloc},
Job: job,
EvalID: eval.ID,
}
assert := assert.New(t)
err := state.UpsertPlanResults(structs.MsgTypeTestSetup, 1000, &res)
Expand Down
2 changes: 1 addition & 1 deletion nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -989,7 +989,7 @@ func (n *nomadFSM) applyAllocClientUpdate(msgType structs.MessageType, buf []byt
return err
}

// Update any evals
// Update any evals that were added by the RPC handler
if len(req.Evals) > 0 {
if err := n.upsertEvals(msgType, index, req.Evals); err != nil {
n.logger.Error("applyAllocClientUpdate failed to update evaluations", "error", err)
Expand Down
21 changes: 9 additions & 12 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1635,13 +1635,12 @@ func TestFSM_ApplyPlanResults(t *testing.T) {
eval2.JobID = job2.ID

req := structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
Job: job,
Alloc: []*structs.Allocation{alloc},
},
Deployment: d,
EvalID: eval.ID,
NodePreemptions: []*structs.Allocation{alloc1, alloc2},
Job: job,
AllocsUpdated: []*structs.Allocation{alloc},
Deployment: d,
EvalID: eval.ID,
AllocsPreempted: []*structs.AllocationDiff{
alloc1.AllocationDiff(), alloc2.AllocationDiff()},
PreemptionEvals: []*structs.Evaluation{eval1, eval2},
}
buf, err := structs.Encode(structs.ApplyPlanResultsRequestType, req)
Expand Down Expand Up @@ -1703,11 +1702,9 @@ func TestFSM_ApplyPlanResults(t *testing.T) {
evictAlloc.Job = nil
evictAlloc.DesiredStatus = structs.AllocDesiredStatusEvict
req2 := structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
Job: job,
Alloc: []*structs.Allocation{evictAlloc},
},
EvalID: eval.ID,
Job: job,
AllocsUpdated: []*structs.Allocation{evictAlloc},
EvalID: eval.ID,
}
buf, err = structs.Encode(structs.ApplyPlanResultsRequestType, req2)
assert.Nil(err)
Expand Down
90 changes: 26 additions & 64 deletions nomad/plan_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,7 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap

// Setup the update request
req := structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
Job: plan.Job,
},
Job: plan.Job,
Deployment: result.Deployment,
DeploymentUpdates: result.DeploymentUpdates,
IneligibleNodes: result.IneligibleNodes,
Expand All @@ -256,74 +254,38 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap

preemptedJobIDs := make(map[structs.NamespacedID]struct{})

if p.srv.peersCache.ServersMeetMinimumVersion(p.srv.Region(), MinVersionPlanNormalization, true) {
// Initialize the allocs request using the new optimized log entry format.
// Determine the minimum number of updates, could be more if there
// are multiple updates per node
req.AllocsStopped = make([]*structs.AllocationDiff, 0, len(result.NodeUpdate))
req.AllocsUpdated = make([]*structs.Allocation, 0, len(result.NodeAllocation))
req.AllocsPreempted = make([]*structs.AllocationDiff, 0, len(result.NodePreemptions))

for _, updateList := range result.NodeUpdate {
for _, stoppedAlloc := range updateList {
req.AllocsStopped = append(req.AllocsStopped, normalizeStoppedAlloc(stoppedAlloc, unixNow))
}
}

for _, allocList := range result.NodeAllocation {
req.AllocsUpdated = append(req.AllocsUpdated, allocList...)
}

// Set the time the alloc was applied for the first time. This can be used
// to approximate the scheduling time.
updateAllocTimestamps(req.AllocsUpdated, unixNow)
// Initialize the allocs request using the new optimized log entry format.
// Determine the minimum number of updates, could be more if there
// are multiple updates per node
req.AllocsStopped = make([]*structs.AllocationDiff, 0, len(result.NodeUpdate))
req.AllocsUpdated = make([]*structs.Allocation, 0, len(result.NodeAllocation))
req.AllocsPreempted = make([]*structs.AllocationDiff, 0, len(result.NodePreemptions))

err := signAllocIdentities(p.srv.encrypter, plan.Job, req.AllocsUpdated, now)
if err != nil {
return nil, err
}

for _, preemptions := range result.NodePreemptions {
for _, preemptedAlloc := range preemptions {
req.AllocsPreempted = append(req.AllocsPreempted, normalizePreemptedAlloc(preemptedAlloc, unixNow))

// Gather jobids to create follow up evals
appendNamespacedJobID(preemptedJobIDs, preemptedAlloc)
}
for _, updateList := range result.NodeUpdate {
for _, stoppedAlloc := range updateList {
req.AllocsStopped = append(req.AllocsStopped, normalizeStoppedAlloc(stoppedAlloc, unixNow))
}
} else {
// COMPAT 0.11: This branch is deprecated and will only be used to support
// application of older log entries. Expected to be removed in a future version.

// Determine the minimum number of updates, could be more if there
// are multiple updates per node
minUpdates := len(result.NodeUpdate)
minUpdates += len(result.NodeAllocation)
}

// Initialize using the older log entry format for Alloc and NodePreemptions
req.Alloc = make([]*structs.Allocation, 0, minUpdates)
req.NodePreemptions = make([]*structs.Allocation, 0, len(result.NodePreemptions))
for _, allocList := range result.NodeAllocation {
req.AllocsUpdated = append(req.AllocsUpdated, allocList...)
}

for _, updateList := range result.NodeUpdate {
req.Alloc = append(req.Alloc, updateList...)
}
for _, allocList := range result.NodeAllocation {
req.Alloc = append(req.Alloc, allocList...)
}
// Set the time the alloc was applied for the first time. This can be used
// to approximate the scheduling time.
updateAllocTimestamps(req.AllocsUpdated, unixNow)

for _, preemptions := range result.NodePreemptions {
req.NodePreemptions = append(req.NodePreemptions, preemptions...)
}
err := signAllocIdentities(p.srv.encrypter, plan.Job, req.AllocsUpdated, now)
if err != nil {
return nil, err
}

// Set the time the alloc was applied for the first time. This can be used
// to approximate the scheduling time.
updateAllocTimestamps(req.Alloc, unixNow)
for _, preemptions := range result.NodePreemptions {
for _, preemptedAlloc := range preemptions {
req.AllocsPreempted = append(req.AllocsPreempted, normalizePreemptedAlloc(preemptedAlloc, unixNow))

// Set modify time for preempted allocs if any
// Also gather jobids to create follow up evals
for _, alloc := range req.NodePreemptions {
alloc.ModifyTime = unixNow
appendNamespacedJobID(preemptedJobIDs, alloc)
// Gather jobids to create follow up evals
appendNamespacedJobID(preemptedJobIDs, preemptedAlloc)
}
}

Expand Down
80 changes: 0 additions & 80 deletions nomad/plan_normalization_test.go

This file was deleted.

10 changes: 4 additions & 6 deletions nomad/state/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,12 +505,10 @@ func TestEventsFromChanges_ApplyPlanResultsRequestType(t *testing.T) {

msgType := structs.ApplyPlanResultsRequestType
req := &structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{alloc, alloc2},
Job: job,
},
Deployment: d,
EvalID: eval.ID,
AllocsUpdated: []*structs.Allocation{alloc, alloc2},
Job: job,
Deployment: d,
EvalID: eval.ID,
}

must.NoError(t, s.UpsertPlanResults(msgType, 100, req))
Expand Down
35 changes: 9 additions & 26 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,12 +374,6 @@ func (s *StateStore) UpsertPlanResults(msgType structs.MessageType, index uint64
return err
}

// COMPAT 0.11: Remove this denormalization when NodePreemptions is removed
results.NodePreemptions, err = snapshot.DenormalizeAllocationSlice(results.NodePreemptions)
if err != nil {
return err
}

txn := s.db.WriteTxnMsgT(msgType, index)
defer txn.Abort()

Expand Down Expand Up @@ -418,29 +412,17 @@ func (s *StateStore) UpsertPlanResults(msgType structs.MessageType, index uint64
}

numAllocs := 0
if len(results.Alloc) > 0 || len(results.NodePreemptions) > 0 {
// COMPAT 0.11: This branch will be removed, when Alloc is removed
// Attach the job to all the allocations. It is pulled out in the payload to
// avoid the redundancy of encoding, but should be denormalized prior to
// being inserted into MemDB.
addComputedAllocAttrs(results.Alloc, results.Job)
numAllocs = len(results.Alloc) + len(results.NodePreemptions)
} else {
// Attach the job to all the allocations. It is pulled out in the payload to
// avoid the redundancy of encoding, but should be denormalized prior to
// being inserted into MemDB.
addComputedAllocAttrs(results.AllocsUpdated, results.Job)
numAllocs = len(allocsStopped) + len(results.AllocsUpdated) + len(allocsPreempted)
}

allocsToUpsert := make([]*structs.Allocation, 0, numAllocs)

// COMPAT 0.11: Both these appends should be removed when Alloc and NodePreemptions are removed
allocsToUpsert = append(allocsToUpsert, results.Alloc...)
allocsToUpsert = append(allocsToUpsert, results.NodePreemptions...)
// Attach the job to all the allocations. It is pulled out in the payload to
// avoid the redundancy of encoding, but should be denormalized prior to
// being inserted into MemDB.
addComputedAllocAttrs(results.AllocsUpdated, results.Job)
addComputedAllocAttrs(allocsStopped, results.Job)
numAllocs = len(allocsStopped) + len(results.AllocsUpdated) + len(allocsPreempted)

allocsToUpsert = append(allocsToUpsert, allocsStopped...)
allocsToUpsert := make([]*structs.Allocation, 0, numAllocs)
allocsToUpsert = append(allocsToUpsert, results.AllocsUpdated...)
allocsToUpsert = append(allocsToUpsert, allocsStopped...)
allocsToUpsert = append(allocsToUpsert, allocsPreempted...)

// handle upgrade path
Expand Down Expand Up @@ -7376,6 +7358,7 @@ func (s *StateSnapshot) DenormalizeAllocationDiffSlice(allocDiffs []*structs.All
// If alloc is a stopped alloc
allocCopy.DesiredDescription = allocDiff.DesiredDescription
allocCopy.DesiredStatus = structs.AllocDesiredStatusStop
allocCopy.AllocStates = allocDiff.AllocStates
if allocDiff.ClientStatus != "" {
allocCopy.ClientStatus = allocDiff.ClientStatus
}
Expand Down
4 changes: 1 addition & 3 deletions nomad/state/state_store_service_registration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,9 +695,7 @@ func TestAlloc_ServiceRegistrationLifecycle(t *testing.T) {
index++
must.NoError(t, store.UpsertPlanResults(structs.MsgTypeTestSetup, index,
&structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
AllocsStopped: []*structs.AllocationDiff{&diff},
},
AllocsStopped: []*structs.AllocationDiff{&diff},
}))

iter, err = store.GetServiceRegistrationsByAllocID(nil, alloc1.ID)
Expand Down
Loading