From 79e48b6a19b6eb7759d86c75abfaee7ba0ced5d3 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Wed, 19 Nov 2025 16:49:55 -0500 Subject: [PATCH] scheduler: reschedule tracker dropped if follow-up fails placement In #12319 we attempted to fix a bug where the reschedule tracker could be dropped if the scheduler couldn't place the replacement allocation. This would result in the eventual replacement being reschedulable more than the job author's policy intended. In #5602 we introduced plan normalization, where we reduced the size of the required Raft log entry for plan apply by dropping the job spec from the plan. This required backwards compatibility shims that we intended to remove in Nomad 0.11. It's been long impossible to upgrade to any currently supported version of Nomad from that far back, so I attempted to remove these backwards compatibility shims. But in doing so, this uncovered that the #12319 fix was incorrect. The scheduler test harness used the old code paths, which did not normalize the plans. With normalized plans, we end up dropping the reschedule tracker. This changeset fixes the bug by ensuring that a rescheduled allocation that cannot be placed is not marked with `DesiredStatus: stop`, to match the behavior we see when an evaluation fires before the `reschedule.delay` window expires. This ensures that the plan applier doesn't clear the reschedule tracker because the allocation is terminal. I've also removed the backwards compatibility shims and version checks for plan normalization, and fixed a few test incorrect assertions revealed by the fix. Ref: https://github.com/hashicorp/nomad/pull/12319 Ref: https://github.com/hashicorp/nomad/pull/5602 --- .changelog/27129.txt | 3 + nomad/acl_test.go | 6 +- nomad/eval_endpoint_test.go | 8 +- nomad/fsm.go | 2 +- nomad/fsm_test.go | 21 ++--- nomad/plan_apply.go | 90 ++++++------------- nomad/plan_normalization_test.go | 80 ----------------- nomad/state/events_test.go | 10 +-- nomad/state/state_store.go | 35 ++------ .../state_store_service_registration_test.go | 4 +- nomad/state/state_store_test.go | 59 +++++------- nomad/structs/structs.go | 49 +++++----- nomad/util.go | 5 -- nomad/worker.go | 9 +- scheduler/generic_sched.go | 23 ++++- scheduler/generic_sched_test.go | 67 +++++++++----- scheduler/structs/structs.go | 59 ++++++------ 17 files changed, 197 insertions(+), 333 deletions(-) create mode 100644 .changelog/27129.txt delete mode 100644 nomad/plan_normalization_test.go diff --git a/.changelog/27129.txt b/.changelog/27129.txt new file mode 100644 index 00000000000..04a15b3bfab --- /dev/null +++ b/.changelog/27129.txt @@ -0,0 +1,3 @@ +```release-note:bug +scheduler: Fixed a bug that was previously patched incorrectly where rescheduled allocations that could not be placed would later ignore their reschedule policy limits +``` diff --git a/nomad/acl_test.go b/nomad/acl_test.go index 0c8e0ff00e8..771ce0a35e0 100644 --- a/nomad/acl_test.go +++ b/nomad/acl_test.go @@ -152,10 +152,8 @@ func TestAuthenticate_mTLS(t *testing.T) { must.NoError(t, err, must.Sprint("could not sign claims")) planReq := &structs.ApplyPlanResultsRequest{ - AllocUpdateRequest: structs.AllocUpdateRequest{ - Alloc: []*structs.Allocation{alloc1, alloc2}, - Job: job, - }, + AllocsUpdated: []*structs.Allocation{alloc1, alloc2}, + Job: job, } _, _, err = leader.raftApply(structs.ApplyPlanResultsRequestType, planReq) must.NoError(t, err) diff --git a/nomad/eval_endpoint_test.go b/nomad/eval_endpoint_test.go index 62e84036016..cd57a785ca4 100644 --- a/nomad/eval_endpoint_test.go +++ b/nomad/eval_endpoint_test.go @@ -397,11 +397,9 @@ func TestEvalEndpoint_Dequeue_UpdateWaitIndex(t *testing.T) { // Create a plan result and apply it with a later index res := structs.ApplyPlanResultsRequest{ - AllocUpdateRequest: structs.AllocUpdateRequest{ - Alloc: []*structs.Allocation{alloc}, - Job: job, - }, - EvalID: eval.ID, + AllocsUpdated: []*structs.Allocation{alloc}, + Job: job, + EvalID: eval.ID, } assert := assert.New(t) err := state.UpsertPlanResults(structs.MsgTypeTestSetup, 1000, &res) diff --git a/nomad/fsm.go b/nomad/fsm.go index dc80b544d2d..29fcab314ea 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -989,7 +989,7 @@ func (n *nomadFSM) applyAllocClientUpdate(msgType structs.MessageType, buf []byt return err } - // Update any evals + // Update any evals that were added by the RPC handler if len(req.Evals) > 0 { if err := n.upsertEvals(msgType, index, req.Evals); err != nil { n.logger.Error("applyAllocClientUpdate failed to update evaluations", "error", err) diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index e76e3373052..678c8168ff2 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -1635,13 +1635,12 @@ func TestFSM_ApplyPlanResults(t *testing.T) { eval2.JobID = job2.ID req := structs.ApplyPlanResultsRequest{ - AllocUpdateRequest: structs.AllocUpdateRequest{ - Job: job, - Alloc: []*structs.Allocation{alloc}, - }, - Deployment: d, - EvalID: eval.ID, - NodePreemptions: []*structs.Allocation{alloc1, alloc2}, + Job: job, + AllocsUpdated: []*structs.Allocation{alloc}, + Deployment: d, + EvalID: eval.ID, + AllocsPreempted: []*structs.AllocationDiff{ + alloc1.AllocationDiff(), alloc2.AllocationDiff()}, PreemptionEvals: []*structs.Evaluation{eval1, eval2}, } buf, err := structs.Encode(structs.ApplyPlanResultsRequestType, req) @@ -1703,11 +1702,9 @@ func TestFSM_ApplyPlanResults(t *testing.T) { evictAlloc.Job = nil evictAlloc.DesiredStatus = structs.AllocDesiredStatusEvict req2 := structs.ApplyPlanResultsRequest{ - AllocUpdateRequest: structs.AllocUpdateRequest{ - Job: job, - Alloc: []*structs.Allocation{evictAlloc}, - }, - EvalID: eval.ID, + Job: job, + AllocsUpdated: []*structs.Allocation{evictAlloc}, + EvalID: eval.ID, } buf, err = structs.Encode(structs.ApplyPlanResultsRequestType, req2) assert.Nil(err) diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index e8075b153db..9516ee83bc3 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -244,9 +244,7 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap // Setup the update request req := structs.ApplyPlanResultsRequest{ - AllocUpdateRequest: structs.AllocUpdateRequest{ - Job: plan.Job, - }, + Job: plan.Job, Deployment: result.Deployment, DeploymentUpdates: result.DeploymentUpdates, IneligibleNodes: result.IneligibleNodes, @@ -256,74 +254,38 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap preemptedJobIDs := make(map[structs.NamespacedID]struct{}) - if p.srv.peersCache.ServersMeetMinimumVersion(p.srv.Region(), MinVersionPlanNormalization, true) { - // Initialize the allocs request using the new optimized log entry format. - // Determine the minimum number of updates, could be more if there - // are multiple updates per node - req.AllocsStopped = make([]*structs.AllocationDiff, 0, len(result.NodeUpdate)) - req.AllocsUpdated = make([]*structs.Allocation, 0, len(result.NodeAllocation)) - req.AllocsPreempted = make([]*structs.AllocationDiff, 0, len(result.NodePreemptions)) - - for _, updateList := range result.NodeUpdate { - for _, stoppedAlloc := range updateList { - req.AllocsStopped = append(req.AllocsStopped, normalizeStoppedAlloc(stoppedAlloc, unixNow)) - } - } - - for _, allocList := range result.NodeAllocation { - req.AllocsUpdated = append(req.AllocsUpdated, allocList...) - } - - // Set the time the alloc was applied for the first time. This can be used - // to approximate the scheduling time. - updateAllocTimestamps(req.AllocsUpdated, unixNow) + // Initialize the allocs request using the new optimized log entry format. + // Determine the minimum number of updates, could be more if there + // are multiple updates per node + req.AllocsStopped = make([]*structs.AllocationDiff, 0, len(result.NodeUpdate)) + req.AllocsUpdated = make([]*structs.Allocation, 0, len(result.NodeAllocation)) + req.AllocsPreempted = make([]*structs.AllocationDiff, 0, len(result.NodePreemptions)) - err := signAllocIdentities(p.srv.encrypter, plan.Job, req.AllocsUpdated, now) - if err != nil { - return nil, err - } - - for _, preemptions := range result.NodePreemptions { - for _, preemptedAlloc := range preemptions { - req.AllocsPreempted = append(req.AllocsPreempted, normalizePreemptedAlloc(preemptedAlloc, unixNow)) - - // Gather jobids to create follow up evals - appendNamespacedJobID(preemptedJobIDs, preemptedAlloc) - } + for _, updateList := range result.NodeUpdate { + for _, stoppedAlloc := range updateList { + req.AllocsStopped = append(req.AllocsStopped, normalizeStoppedAlloc(stoppedAlloc, unixNow)) } - } else { - // COMPAT 0.11: This branch is deprecated and will only be used to support - // application of older log entries. Expected to be removed in a future version. - - // Determine the minimum number of updates, could be more if there - // are multiple updates per node - minUpdates := len(result.NodeUpdate) - minUpdates += len(result.NodeAllocation) + } - // Initialize using the older log entry format for Alloc and NodePreemptions - req.Alloc = make([]*structs.Allocation, 0, minUpdates) - req.NodePreemptions = make([]*structs.Allocation, 0, len(result.NodePreemptions)) + for _, allocList := range result.NodeAllocation { + req.AllocsUpdated = append(req.AllocsUpdated, allocList...) + } - for _, updateList := range result.NodeUpdate { - req.Alloc = append(req.Alloc, updateList...) - } - for _, allocList := range result.NodeAllocation { - req.Alloc = append(req.Alloc, allocList...) - } + // Set the time the alloc was applied for the first time. This can be used + // to approximate the scheduling time. + updateAllocTimestamps(req.AllocsUpdated, unixNow) - for _, preemptions := range result.NodePreemptions { - req.NodePreemptions = append(req.NodePreemptions, preemptions...) - } + err := signAllocIdentities(p.srv.encrypter, plan.Job, req.AllocsUpdated, now) + if err != nil { + return nil, err + } - // Set the time the alloc was applied for the first time. This can be used - // to approximate the scheduling time. - updateAllocTimestamps(req.Alloc, unixNow) + for _, preemptions := range result.NodePreemptions { + for _, preemptedAlloc := range preemptions { + req.AllocsPreempted = append(req.AllocsPreempted, normalizePreemptedAlloc(preemptedAlloc, unixNow)) - // Set modify time for preempted allocs if any - // Also gather jobids to create follow up evals - for _, alloc := range req.NodePreemptions { - alloc.ModifyTime = unixNow - appendNamespacedJobID(preemptedJobIDs, alloc) + // Gather jobids to create follow up evals + appendNamespacedJobID(preemptedJobIDs, preemptedAlloc) } } diff --git a/nomad/plan_normalization_test.go b/nomad/plan_normalization_test.go deleted file mode 100644 index fe43e072721..00000000000 --- a/nomad/plan_normalization_test.go +++ /dev/null @@ -1,80 +0,0 @@ -// Copyright (c) HashiCorp, Inc. -// SPDX-License-Identifier: BUSL-1.1 - -package nomad - -import ( - "bytes" - "testing" - "time" - - "github.com/hashicorp/go-msgpack/v2/codec" - "github.com/hashicorp/nomad/ci" - "github.com/hashicorp/nomad/nomad/mock" - "github.com/hashicorp/nomad/nomad/structs" - "github.com/shoenig/test/must" -) - -// This test compares the size of the normalized + OmitEmpty raft plan log entry -// with the earlier denormalized log. -// -// Whenever this test is changed, care should be taken to ensure the older msgpack size -// is recalculated when new fields are introduced in ApplyPlanResultsRequest -// -// If you make an unrelated change that unexpectedly fails this test, -// consider adding omitempty to the struct you are modifying, e.g.: -// -// type NetworkResource struct { -// // msgpack omit empty fields during serialization -// _struct bool `codec:",omitempty"` // nolint: structcheck -func TestPlanNormalize(t *testing.T) { - ci.Parallel(t) - - // This size was calculated using the older ApplyPlanResultsRequest format, in which allocations - // didn't use OmitEmpty and only the job was normalized in the stopped and preempted allocs. - // The newer format uses OmitEmpty and uses a minimal set of fields for the diff of the - // stopped and preempted allocs. The file for the older format hasn't been checked in, because - // it's not a good idea to check-in a 20mb file to the git repo. - unoptimizedLogSize := 19460168 - - numUpdatedAllocs := 10000 - numStoppedAllocs := 8000 - numPreemptedAllocs := 2000 - mockAlloc := mock.Alloc() - mockAlloc.Job = nil - - mockUpdatedAllocSlice := make([]*structs.Allocation, numUpdatedAllocs) - for i := 0; i < numUpdatedAllocs; i++ { - mockUpdatedAllocSlice = append(mockUpdatedAllocSlice, mockAlloc) - } - - now := time.Now().UTC().UnixNano() - mockStoppedAllocSlice := make([]*structs.AllocationDiff, numStoppedAllocs) - for i := 0; i < numStoppedAllocs; i++ { - mockStoppedAllocSlice = append(mockStoppedAllocSlice, normalizeStoppedAlloc(mockAlloc, now)) - } - - mockPreemptionAllocSlice := make([]*structs.AllocationDiff, numPreemptedAllocs) - for i := 0; i < numPreemptedAllocs; i++ { - mockPreemptionAllocSlice = append(mockPreemptionAllocSlice, normalizePreemptedAlloc(mockAlloc, now)) - } - - // Create a plan result - applyPlanLogEntry := structs.ApplyPlanResultsRequest{ - AllocUpdateRequest: structs.AllocUpdateRequest{ - AllocsUpdated: mockUpdatedAllocSlice, - AllocsStopped: mockStoppedAllocSlice, - }, - AllocsPreempted: mockPreemptionAllocSlice, - } - - handle := structs.MsgpackHandle - var buf bytes.Buffer - if err := codec.NewEncoder(&buf, handle).Encode(applyPlanLogEntry); err != nil { - t.Fatalf("Encoding failed: %v", err) - } - - optimizedLogSize := buf.Len() - ratio := float64(optimizedLogSize) / float64(unoptimizedLogSize) - must.Less(t, 0.6, ratio) -} diff --git a/nomad/state/events_test.go b/nomad/state/events_test.go index 7559dff31ca..484e03ecf49 100644 --- a/nomad/state/events_test.go +++ b/nomad/state/events_test.go @@ -505,12 +505,10 @@ func TestEventsFromChanges_ApplyPlanResultsRequestType(t *testing.T) { msgType := structs.ApplyPlanResultsRequestType req := &structs.ApplyPlanResultsRequest{ - AllocUpdateRequest: structs.AllocUpdateRequest{ - Alloc: []*structs.Allocation{alloc, alloc2}, - Job: job, - }, - Deployment: d, - EvalID: eval.ID, + AllocsUpdated: []*structs.Allocation{alloc, alloc2}, + Job: job, + Deployment: d, + EvalID: eval.ID, } must.NoError(t, s.UpsertPlanResults(msgType, 100, req)) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index d3d2df60984..62f0c5ee973 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -374,12 +374,6 @@ func (s *StateStore) UpsertPlanResults(msgType structs.MessageType, index uint64 return err } - // COMPAT 0.11: Remove this denormalization when NodePreemptions is removed - results.NodePreemptions, err = snapshot.DenormalizeAllocationSlice(results.NodePreemptions) - if err != nil { - return err - } - txn := s.db.WriteTxnMsgT(msgType, index) defer txn.Abort() @@ -418,29 +412,17 @@ func (s *StateStore) UpsertPlanResults(msgType structs.MessageType, index uint64 } numAllocs := 0 - if len(results.Alloc) > 0 || len(results.NodePreemptions) > 0 { - // COMPAT 0.11: This branch will be removed, when Alloc is removed - // Attach the job to all the allocations. It is pulled out in the payload to - // avoid the redundancy of encoding, but should be denormalized prior to - // being inserted into MemDB. - addComputedAllocAttrs(results.Alloc, results.Job) - numAllocs = len(results.Alloc) + len(results.NodePreemptions) - } else { - // Attach the job to all the allocations. It is pulled out in the payload to - // avoid the redundancy of encoding, but should be denormalized prior to - // being inserted into MemDB. - addComputedAllocAttrs(results.AllocsUpdated, results.Job) - numAllocs = len(allocsStopped) + len(results.AllocsUpdated) + len(allocsPreempted) - } - allocsToUpsert := make([]*structs.Allocation, 0, numAllocs) - - // COMPAT 0.11: Both these appends should be removed when Alloc and NodePreemptions are removed - allocsToUpsert = append(allocsToUpsert, results.Alloc...) - allocsToUpsert = append(allocsToUpsert, results.NodePreemptions...) + // Attach the job to all the allocations. It is pulled out in the payload to + // avoid the redundancy of encoding, but should be denormalized prior to + // being inserted into MemDB. + addComputedAllocAttrs(results.AllocsUpdated, results.Job) + addComputedAllocAttrs(allocsStopped, results.Job) + numAllocs = len(allocsStopped) + len(results.AllocsUpdated) + len(allocsPreempted) - allocsToUpsert = append(allocsToUpsert, allocsStopped...) + allocsToUpsert := make([]*structs.Allocation, 0, numAllocs) allocsToUpsert = append(allocsToUpsert, results.AllocsUpdated...) + allocsToUpsert = append(allocsToUpsert, allocsStopped...) allocsToUpsert = append(allocsToUpsert, allocsPreempted...) // handle upgrade path @@ -7376,6 +7358,7 @@ func (s *StateSnapshot) DenormalizeAllocationDiffSlice(allocDiffs []*structs.All // If alloc is a stopped alloc allocCopy.DesiredDescription = allocDiff.DesiredDescription allocCopy.DesiredStatus = structs.AllocDesiredStatusStop + allocCopy.AllocStates = allocDiff.AllocStates if allocDiff.ClientStatus != "" { allocCopy.ClientStatus = allocDiff.ClientStatus } diff --git a/nomad/state/state_store_service_registration_test.go b/nomad/state/state_store_service_registration_test.go index e495d660254..660b60d3455 100644 --- a/nomad/state/state_store_service_registration_test.go +++ b/nomad/state/state_store_service_registration_test.go @@ -695,9 +695,7 @@ func TestAlloc_ServiceRegistrationLifecycle(t *testing.T) { index++ must.NoError(t, store.UpsertPlanResults(structs.MsgTypeTestSetup, index, &structs.ApplyPlanResultsRequest{ - AllocUpdateRequest: structs.AllocUpdateRequest{ - AllocsStopped: []*structs.AllocationDiff{&diff}, - }, + AllocsStopped: []*structs.AllocationDiff{&diff}, })) iter, err = store.GetServiceRegistrationsByAllocID(nil, alloc1.ID) diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index a744549550e..f6abf4831f0 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -106,7 +106,6 @@ func TestStateStore_Blocking_MinQuery(t *testing.T) { test.True(t, resp.(bool)) } -// COMPAT 0.11: Uses AllocUpdateRequest.Alloc // This test checks that: // 1) The job is denormalized // 2) Allocations are created @@ -128,11 +127,9 @@ func TestStateStore_UpsertPlanResults_AllocationsCreated_Denormalized(t *testing // Create a plan result res := structs.ApplyPlanResultsRequest{ - AllocUpdateRequest: structs.AllocUpdateRequest{ - Alloc: []*structs.Allocation{alloc}, - Job: job, - }, - EvalID: eval.ID, + AllocsUpdated: []*structs.Allocation{alloc}, + Job: job, + EvalID: eval.ID, } err := state.UpsertPlanResults(structs.MsgTypeTestSetup, 1000, &res) @@ -198,11 +195,9 @@ func TestStateStore_UpsertPlanResults_AllocationsDenormalized(t *testing.T) { // Create a plan result res := structs.ApplyPlanResultsRequest{ - AllocUpdateRequest: structs.AllocUpdateRequest{ - AllocsUpdated: []*structs.Allocation{alloc}, - AllocsStopped: []*structs.AllocationDiff{stoppedAllocDiff}, - Job: mJob, - }, + AllocsUpdated: []*structs.Allocation{alloc}, + AllocsStopped: []*structs.AllocationDiff{stoppedAllocDiff}, + Job: mJob, EvalID: eval.ID, AllocsPreempted: []*structs.AllocationDiff{preemptedAllocDiff}, } @@ -276,12 +271,10 @@ func TestStateStore_UpsertPlanResults_Deployment(t *testing.T) { // Create a plan result res := structs.ApplyPlanResultsRequest{ - AllocUpdateRequest: structs.AllocUpdateRequest{ - Alloc: []*structs.Allocation{alloc, alloc2}, - Job: job, - }, - Deployment: d, - EvalID: eval.ID, + AllocsUpdated: []*structs.Allocation{alloc, alloc2}, + Job: job, + Deployment: d, + EvalID: eval.ID, } err := state.UpsertPlanResults(structs.MsgTypeTestSetup, 1000, &res) @@ -318,12 +311,10 @@ func TestStateStore_UpsertPlanResults_Deployment(t *testing.T) { // Create another plan res = structs.ApplyPlanResultsRequest{ - AllocUpdateRequest: structs.AllocUpdateRequest{ - Alloc: []*structs.Allocation{allocNew, allocNew2}, - Job: job, - }, - Deployment: d2, - EvalID: eval.ID, + AllocsUpdated: []*structs.Allocation{allocNew, allocNew2}, + Job: job, + Deployment: d2, + EvalID: eval.ID, } err = state.UpsertPlanResults(structs.MsgTypeTestSetup, 1001, &res) @@ -382,12 +373,10 @@ func TestStateStore_UpsertPlanResults_PreemptedAllocs(t *testing.T) { // Create a plan result res := structs.ApplyPlanResultsRequest{ - AllocUpdateRequest: structs.AllocUpdateRequest{ - Alloc: []*structs.Allocation{alloc}, - Job: job, - }, + AllocsUpdated: []*structs.Allocation{alloc}, + Job: job, EvalID: eval.ID, - NodePreemptions: []*structs.Allocation{minimalPreemptedAlloc}, + AllocsPreempted: []*structs.AllocationDiff{minimalPreemptedAlloc.AllocationDiff()}, PreemptionEvals: []*structs.Evaluation{eval2}, } @@ -462,10 +451,8 @@ func TestStateStore_UpsertPlanResults_DeploymentUpdates(t *testing.T) { // Create a plan result res := structs.ApplyPlanResultsRequest{ - AllocUpdateRequest: structs.AllocUpdateRequest{ - Alloc: []*structs.Allocation{alloc}, - Job: job, - }, + AllocsUpdated: []*structs.Allocation{alloc}, + Job: job, Deployment: dnew, DeploymentUpdates: []*structs.DeploymentStatusUpdate{update}, EvalID: eval.ID, @@ -527,11 +514,9 @@ func TestStateStore_UpsertPlanResults_AllocationResources(t *testing.T) { // Create a plan result res := structs.ApplyPlanResultsRequest{ - AllocUpdateRequest: structs.AllocUpdateRequest{ - Alloc: []*structs.Allocation{alloc}, - Job: job, - }, - EvalID: eval.ID, + AllocsUpdated: []*structs.Allocation{alloc}, + Job: job, + EvalID: eval.ID, } must.NoError(t, state.UpsertPlanResults(structs.MsgTypeTestSetup, 1000, &res)) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 615ea398d55..b4449c9c4ab 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1062,11 +1062,22 @@ type PlanRequest struct { } // ApplyPlanResultsRequest is used by the planner to apply a Raft transaction -// committing the result of a plan. +// committing the result of a plan, including assigning new allocations or +// evicting existing ones. type ApplyPlanResultsRequest struct { - // AllocUpdateRequest holds the allocation updates to be made by the - // scheduler. - AllocUpdateRequest + // Allocations to stop. Contains only the diff, not the entire allocation + AllocsStopped []*AllocationDiff + + // New or updated allocations + AllocsUpdated []*Allocation + + // Evals is the list of new evaluations to create Evals are valid only when + // used in the Raft RPC + Evals []*Evaluation + + // Job is the shared parent job of the allocations. It is pulled out of the + // request sent over the wire from the scheduler to reduce payload size. + Job *Job // Deployment is the deployment created or updated as a result of a // scheduling event. @@ -1085,12 +1096,6 @@ type ApplyPlanResultsRequest struct { // the evaluation itself being updated. EvalID string - // COMPAT 0.11 - // NodePreemptions is a slice of allocations from other lower priority jobs - // that are preempted. Preempted allocations are marked as evicted. - // Deprecated: Replaced with AllocsPreempted which contains only the diff - NodePreemptions []*Allocation - // AllocsPreempted is a slice of allocation diffs from other lower priority jobs // that are preempted. Preempted allocations are marked as evicted. AllocsPreempted []*AllocationDiff @@ -1108,30 +1113,16 @@ type ApplyPlanResultsRequest struct { UpdatedAt int64 } -// AllocUpdateRequest is used to submit changes to allocations, either -// to cause evictions or to assign new allocations. Both can be done -// within a single transaction +// AllocUpdateRequest is used to update the server from the client. type AllocUpdateRequest struct { - // COMPAT 0.11 - // Alloc is the list of new allocations to assign - // Deprecated: Replaced with two separate slices, one containing stopped allocations - // and another containing updated allocations + // Alloc is the list of allocation updates from the client Alloc []*Allocation - // Allocations to stop. Contains only the diff, not the entire allocation - AllocsStopped []*AllocationDiff - - // New or updated allocations - AllocsUpdated []*Allocation - - // Evals is the list of new evaluations to create - // Evals are valid only when used in the Raft RPC + // Evals is the list of new evaluations to create; these are only added to + // the request object in the RPC handler so that we're writing them into the + // Raft log entry Evals []*Evaluation - // Job is the shared parent job of the allocations. - // It is pulled out since it is common to reduce payload size. - Job *Job - WriteRequest } diff --git a/nomad/util.go b/nomad/util.go index 365f83f5acd..4e0f02492a2 100644 --- a/nomad/util.go +++ b/nomad/util.go @@ -23,11 +23,6 @@ const ( deprecatedAPIMajorVersionStr = "1" ) -// MinVersionPlanNormalization is the minimum version to support the -// normalization of Plan in SubmitPlan, and the denormalization raft log entry committed -// in ApplyPlanResultsRequest -var MinVersionPlanNormalization = version.Must(version.NewVersion("0.9.2")) - // ensurePath is used to make sure a path exists func ensurePath(path string, dir bool) error { if !dir { diff --git a/nomad/worker.go b/nomad/worker.go index 467a2e7d1f6..d1ca16fc2ac 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -664,14 +664,7 @@ func (w *Worker) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, sstructs.S plan.SnapshotIndex = w.snapshotIndex // Normalize stopped and preempted allocs before RPC - normalizePlan := w.srv.peersCache.ServersMeetMinimumVersion( - w.srv.Region(), - MinVersionPlanNormalization, - true, - ) - if normalizePlan { - plan.NormalizeAllocations() - } + plan.NormalizeAllocations() // Setup the request req := structs.PlanRequest{ diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index ea253ef7c80..032f35d2c90 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -707,10 +707,8 @@ func (s *GenericScheduler) computePlacements( // blocked eval without dropping the reschedule tracker if prevAllocation != nil { if missing.IsRescheduling() { - updatedPrevAllocation := prevAllocation.Copy() missing.SetPreviousAllocation(prevAllocation) - annotateRescheduleTracker(updatedPrevAllocation, structs.LastRescheduleFailedToPlace) - swapAllocInPlan(s.plan, prevAllocation, updatedPrevAllocation) + markFailedToReschedule(s.plan, prevAllocation, s.job) } } @@ -722,6 +720,25 @@ func (s *GenericScheduler) computePlacements( return nil } +// markFailedToReschedule takes a "previous" allocation that we were unable to +// reschedule and updates the plan to annotate its reschedule tracker and to +// move it out of the stop list and into the update list so that we don't drop +// tracking information in the plan applier +func markFailedToReschedule(plan *structs.Plan, original *structs.Allocation, job *structs.Job) { + updated := original.Copy() + annotateRescheduleTracker(updated, structs.LastRescheduleFailedToPlace) + + plan.PopUpdate(original) + nodeID := original.NodeID + for i, alloc := range plan.NodeAllocation[nodeID] { + if alloc.ID == original.ID { + plan.NodeAllocation[nodeID][i] = updated + return + } + } + plan.AppendAlloc(updated, job) +} + // swapAllocInPlan updates a plan to swap out an allocation that's already in // the plan with an updated definition of that allocation. The updated // definition should be a deep copy. diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 0b3914eaadf..b25a75b9391 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -1009,14 +1009,14 @@ func TestServiceSched_JobRegister_Datacenter_Downgrade(t *testing.T) { // Create 5 nodes in each datacenter. // Use two loops so nodes are separated by datacenter. nodes := []*structs.Node{} - for i := 0; i < 5; i++ { + for i := range 5 { node := mock.Node() node.Name = fmt.Sprintf("node-dc1-%d", i) node.Datacenter = "dc1" nodes = append(nodes, node) must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) } - for i := 0; i < 5; i++ { + for i := range 5 { node := mock.Node() node.Name = fmt.Sprintf("node-dc2-%d", i) node.Datacenter = "dc2" @@ -1045,7 +1045,7 @@ func TestServiceSched_JobRegister_Datacenter_Downgrade(t *testing.T) { // Create allocs for this job version with one being a canary and another // marked as failed. allocs := []*structs.Allocation{} - for i := 0; i < 3; i++ { + for i := range 3 { alloc := mock.Alloc() alloc.Job = job1 alloc.JobID = job1.ID @@ -1099,10 +1099,15 @@ func TestServiceSched_JobRegister_Datacenter_Downgrade(t *testing.T) { must.Len(t, 1, allocs) alloc := allocs[0] - must.SliceContains(t, alloc.Job.Datacenters, node.Datacenter, must.Sprintf( + + expect := "dc2" + if alloc.RescheduleTracker != nil { + expect = "dc1" + } + + must.Eq(t, expect, node.Datacenter, must.Sprintf( "alloc for job in datacenter %q placed in %q", - alloc.Job.Datacenters, - node.Datacenter, + expect, node.Datacenter, )) } } @@ -1143,14 +1148,14 @@ func TestServiceSched_JobRegister_NodePool_Downgrade(t *testing.T) { // Create 5 nodes in each node pool. // Use two loops so nodes are separated by node pool. nodes := []*structs.Node{} - for i := 0; i < 5; i++ { + for i := range 5 { node := mock.Node() node.Name = fmt.Sprintf("node-binpack-%d", i) node.NodePool = poolBinpack.Name nodes = append(nodes, node) must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) } - for i := 0; i < 5; i++ { + for i := range 5 { node := mock.Node() node.Name = fmt.Sprintf("node-spread-%d", i) node.NodePool = poolSpread.Name @@ -1179,7 +1184,7 @@ func TestServiceSched_JobRegister_NodePool_Downgrade(t *testing.T) { // Create allocs for this job version with one being a canary and another // marked as failed. allocs := []*structs.Allocation{} - for i := 0; i < 3; i++ { + for i := range 3 { alloc := mock.Alloc() alloc.Job = job1 alloc.JobID = job1.ID @@ -1233,10 +1238,15 @@ func TestServiceSched_JobRegister_NodePool_Downgrade(t *testing.T) { must.Len(t, 1, allocs) alloc := allocs[0] - must.Eq(t, alloc.Job.NodePool, node.NodePool, must.Sprintf( + + expect := poolSpread.Name + if alloc.RescheduleTracker != nil { + expect = poolBinpack.Name + } + + must.Eq(t, expect, node.NodePool, must.Sprintf( "alloc for job in node pool %q placed in node in node pool %q", - alloc.Job.NodePool, - node.NodePool, + expect, node.NodePool, )) } } @@ -3764,6 +3774,7 @@ func TestServiceSched_NodeDown(t *testing.T) { reschedule bool terminal bool lost bool + inplace bool }{ { name: "should stop is running should be lost", @@ -3800,6 +3811,7 @@ func TestServiceSched_NodeDown(t *testing.T) { desired: structs.AllocDesiredStatusRun, client: structs.AllocClientStatusFailed, reschedule: true, + inplace: true, }, { name: "should evict is running should be lost", @@ -3857,12 +3869,19 @@ func TestServiceSched_NodeDown(t *testing.T) { must.Len(t, 0, h.Plans, must.Sprint("expected no plan")) } else { must.Len(t, 1, h.Plans, must.Sprint("expected plan")) - plan := h.Plans[0] - out := plan.NodeUpdate[node.ID] - must.Len(t, 1, out) - outAlloc := out[0] + var outAlloc *structs.Allocation + if tc.inplace { + out := plan.NodeAllocation[node.ID] + must.Len(t, 1, out) + outAlloc = out[0] + } else { + out := plan.NodeUpdate[node.ID] + must.Len(t, 1, out) + outAlloc = out[0] + } + if tc.migrate { must.NotEq(t, structs.AllocClientStatusLost, outAlloc.ClientStatus) } else if tc.reschedule { @@ -3995,7 +4014,8 @@ func TestServiceSched_StopOnClientAfter(t *testing.T) { must.Eq(t, structs.AllocClientStatusLost, alloc.ClientStatus) // 1 if rescheduled, 2 for rescheduled later - test.Len(t, tc.expectedAllocStates, alloc.AllocStates) + test.Len(t, tc.expectedAllocStates, alloc.AllocStates, + test.Sprint("unexpected allocation states")) if tc.expectBlockedEval { must.Eq(t, structs.EvalStatusBlocked, followupEval.Status) @@ -4999,13 +5019,14 @@ func TestServiceSched_BlockedReschedule(t *testing.T) { // "use up" resources on the node so the follow-up will block node.NodeResources.Memory.MemoryMB = 200 + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) // Process the follow-up eval, which results in a stop but not a replacement must.NoError(t, h.Process(NewServiceScheduler, followupEval)) must.Len(t, 4, h.Plans) - must.MapLen(t, 1, h.Plans[3].NodeUpdate) // stop - must.MapLen(t, 0, h.Plans[3].NodeAllocation) // place + must.MapLen(t, 0, h.Plans[3].NodeUpdate) // stop + must.MapLen(t, 1, h.Plans[3].NodeAllocation) // update out, err = h.State.AllocsByJob(ws, job.Namespace, job.ID, false) must.NoError(t, err) @@ -5028,10 +5049,14 @@ func TestServiceSched_BlockedReschedule(t *testing.T) { must.NoError(t, h.Process(NewServiceScheduler, blockedEval)) must.Len(t, 4, h.Plans, must.Sprint("expected no new plan")) - // bypass the timer check by setting the alloc's follow-up eval ID to be the - // blocked eval alloc, err = h.State.AllocByID(ws, replacementAllocID) must.NoError(t, err) + must.NotNil(t, alloc) + must.NotNil(t, alloc.RescheduleTracker) + must.Eq(t, structs.LastRescheduleFailedToPlace, alloc.RescheduleTracker.LastReschedule) + + // bypass the timer check by setting the alloc's follow-up eval ID to be the + // blocked eval alloc = alloc.Copy() alloc.FollowupEvalID = blockedEval.ID must.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, diff --git a/scheduler/structs/structs.go b/scheduler/structs/structs.go index 58bd8df2b03..7a53717129d 100644 --- a/scheduler/structs/structs.go +++ b/scheduler/structs/structs.go @@ -10,6 +10,7 @@ import ( "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-version" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" ) @@ -66,7 +67,9 @@ func NewPlanWithStateAndIndex(state *state.StateStore, nextIndex uint64, servers return &PlanBuilder{State: state, nextIndex: nextIndex, serversMeetMinimumVersion: serversMeetMinimumVersion} } -// PlanBuilder is used to submit plans. +// PlanBuilder is used to create plans outside the usual scheduler worker flow, +// such as testing, recalculating queued allocs during snapshot restore in the +// FSM, or the online plans created in the Job.Plan RPC type PlanBuilder struct { State *state.StateStore @@ -111,7 +114,6 @@ func (p *PlanBuilder) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State result.NodePreemptions = plan.NodePreemptions result.AllocIndex = index - // Flatten evicts and allocs now := time.Now().UTC().UnixNano() allocsUpdated := make([]*structs.Allocation, 0, len(result.NodeAllocation)) @@ -120,39 +122,37 @@ func (p *PlanBuilder) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State } updateCreateTimestamp(allocsUpdated, now) - // Setup the update request - req := structs.ApplyPlanResultsRequest{ - AllocUpdateRequest: structs.AllocUpdateRequest{ - Job: plan.Job, - }, - Deployment: plan.Deployment, - DeploymentUpdates: plan.DeploymentUpdates, - EvalID: plan.EvalID, - } - - var allocs []*structs.Allocation + snap, _ := p.State.Snapshot() - allocsStopped := make([]*structs.Allocation, 0, len(result.NodeUpdate)) + // make sure these are denormalized the same way they would be in the real + // plan applier + allocsStopped := make([]*structs.AllocationDiff, 0, len(result.NodeUpdate)) for _, updateList := range plan.NodeUpdate { - allocsStopped = append(allocsStopped, updateList...) - } - allocs = append(allocs, allocsStopped...) - - allocs = append(allocs, allocsUpdated...) - updateCreateTimestamp(allocs, now) + stopped, _ := snap.DenormalizeAllocationSlice(updateList) + allocsStopped = append(allocsStopped, helper.ConvertSlice(stopped, + func(a *structs.Allocation) *structs.AllocationDiff { return a.AllocationDiff() })...) - req.Alloc = allocs + } - // Set modify time for preempted allocs and flatten them - var preemptedAllocs []*structs.Allocation - for _, preemptions := range result.NodePreemptions { - for _, alloc := range preemptions { - alloc.ModifyTime = now - preemptedAllocs = append(preemptedAllocs, alloc) - } + // make sure these are denormalized the same way they would be in the real + // plan applier + allocsPreempted := make([]*structs.AllocationDiff, 0, len(result.NodePreemptions)) + for _, preemptionList := range result.NodePreemptions { + preemptions, _ := snap.DenormalizeAllocationSlice(preemptionList) + allocsPreempted = append(allocsPreempted, helper.ConvertSlice(preemptions, + func(a *structs.Allocation) *structs.AllocationDiff { return a.AllocationDiff() })...) } - req.NodePreemptions = preemptedAllocs + // Setup the update request + req := structs.ApplyPlanResultsRequest{ + AllocsStopped: allocsStopped, + AllocsUpdated: allocsUpdated, + AllocsPreempted: allocsPreempted, + Job: plan.Job, + Deployment: plan.Deployment, + DeploymentUpdates: plan.DeploymentUpdates, + EvalID: plan.EvalID, + } if p.noSubmit { return result, nil, nil @@ -170,6 +170,7 @@ func updateCreateTimestamp(allocations []*structs.Allocation, now int64) { if alloc.CreateTime == 0 { alloc.CreateTime = now } + alloc.ModifyTime = now } }