diff --git a/.changelog/27129.txt b/.changelog/27129.txt new file mode 100644 index 00000000000..04a15b3bfab --- /dev/null +++ b/.changelog/27129.txt @@ -0,0 +1,3 @@ +```release-note:bug +scheduler: Fixed a bug that was previously patched incorrectly where rescheduled allocations that could not be placed would later ignore their reschedule policy limits +``` diff --git a/nomad/acl_test.go b/nomad/acl_test.go index 0c8e0ff00e8..771ce0a35e0 100644 --- a/nomad/acl_test.go +++ b/nomad/acl_test.go @@ -152,10 +152,8 @@ func TestAuthenticate_mTLS(t *testing.T) { must.NoError(t, err, must.Sprint("could not sign claims")) planReq := &structs.ApplyPlanResultsRequest{ - AllocUpdateRequest: structs.AllocUpdateRequest{ - Alloc: []*structs.Allocation{alloc1, alloc2}, - Job: job, - }, + AllocsUpdated: []*structs.Allocation{alloc1, alloc2}, + Job: job, } _, _, err = leader.raftApply(structs.ApplyPlanResultsRequestType, planReq) must.NoError(t, err) diff --git a/nomad/eval_endpoint_test.go b/nomad/eval_endpoint_test.go index 62e84036016..cd57a785ca4 100644 --- a/nomad/eval_endpoint_test.go +++ b/nomad/eval_endpoint_test.go @@ -397,11 +397,9 @@ func TestEvalEndpoint_Dequeue_UpdateWaitIndex(t *testing.T) { // Create a plan result and apply it with a later index res := structs.ApplyPlanResultsRequest{ - AllocUpdateRequest: structs.AllocUpdateRequest{ - Alloc: []*structs.Allocation{alloc}, - Job: job, - }, - EvalID: eval.ID, + AllocsUpdated: []*structs.Allocation{alloc}, + Job: job, + EvalID: eval.ID, } assert := assert.New(t) err := state.UpsertPlanResults(structs.MsgTypeTestSetup, 1000, &res) diff --git a/nomad/fsm.go b/nomad/fsm.go index dc80b544d2d..29fcab314ea 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -989,7 +989,7 @@ func (n *nomadFSM) applyAllocClientUpdate(msgType structs.MessageType, buf []byt return err } - // Update any evals + // Update any evals that were added by the RPC handler if len(req.Evals) > 0 { if err := n.upsertEvals(msgType, index, req.Evals); err != nil { n.logger.Error("applyAllocClientUpdate failed to update evaluations", "error", err) diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index e76e3373052..678c8168ff2 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -1635,13 +1635,12 @@ func TestFSM_ApplyPlanResults(t *testing.T) { eval2.JobID = job2.ID req := structs.ApplyPlanResultsRequest{ - AllocUpdateRequest: structs.AllocUpdateRequest{ - Job: job, - Alloc: []*structs.Allocation{alloc}, - }, - Deployment: d, - EvalID: eval.ID, - NodePreemptions: []*structs.Allocation{alloc1, alloc2}, + Job: job, + AllocsUpdated: []*structs.Allocation{alloc}, + Deployment: d, + EvalID: eval.ID, + AllocsPreempted: []*structs.AllocationDiff{ + alloc1.AllocationDiff(), alloc2.AllocationDiff()}, PreemptionEvals: []*structs.Evaluation{eval1, eval2}, } buf, err := structs.Encode(structs.ApplyPlanResultsRequestType, req) @@ -1703,11 +1702,9 @@ func TestFSM_ApplyPlanResults(t *testing.T) { evictAlloc.Job = nil evictAlloc.DesiredStatus = structs.AllocDesiredStatusEvict req2 := structs.ApplyPlanResultsRequest{ - AllocUpdateRequest: structs.AllocUpdateRequest{ - Job: job, - Alloc: []*structs.Allocation{evictAlloc}, - }, - EvalID: eval.ID, + Job: job, + AllocsUpdated: []*structs.Allocation{evictAlloc}, + EvalID: eval.ID, } buf, err = structs.Encode(structs.ApplyPlanResultsRequestType, req2) assert.Nil(err) diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index e8075b153db..9516ee83bc3 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -244,9 +244,7 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap // Setup the update request req := structs.ApplyPlanResultsRequest{ - AllocUpdateRequest: structs.AllocUpdateRequest{ - Job: plan.Job, - }, + Job: plan.Job, Deployment: result.Deployment, DeploymentUpdates: result.DeploymentUpdates, IneligibleNodes: result.IneligibleNodes, @@ -256,74 +254,38 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap preemptedJobIDs := make(map[structs.NamespacedID]struct{}) - if p.srv.peersCache.ServersMeetMinimumVersion(p.srv.Region(), MinVersionPlanNormalization, true) { - // Initialize the allocs request using the new optimized log entry format. - // Determine the minimum number of updates, could be more if there - // are multiple updates per node - req.AllocsStopped = make([]*structs.AllocationDiff, 0, len(result.NodeUpdate)) - req.AllocsUpdated = make([]*structs.Allocation, 0, len(result.NodeAllocation)) - req.AllocsPreempted = make([]*structs.AllocationDiff, 0, len(result.NodePreemptions)) - - for _, updateList := range result.NodeUpdate { - for _, stoppedAlloc := range updateList { - req.AllocsStopped = append(req.AllocsStopped, normalizeStoppedAlloc(stoppedAlloc, unixNow)) - } - } - - for _, allocList := range result.NodeAllocation { - req.AllocsUpdated = append(req.AllocsUpdated, allocList...) - } - - // Set the time the alloc was applied for the first time. This can be used - // to approximate the scheduling time. - updateAllocTimestamps(req.AllocsUpdated, unixNow) + // Initialize the allocs request using the new optimized log entry format. + // Determine the minimum number of updates, could be more if there + // are multiple updates per node + req.AllocsStopped = make([]*structs.AllocationDiff, 0, len(result.NodeUpdate)) + req.AllocsUpdated = make([]*structs.Allocation, 0, len(result.NodeAllocation)) + req.AllocsPreempted = make([]*structs.AllocationDiff, 0, len(result.NodePreemptions)) - err := signAllocIdentities(p.srv.encrypter, plan.Job, req.AllocsUpdated, now) - if err != nil { - return nil, err - } - - for _, preemptions := range result.NodePreemptions { - for _, preemptedAlloc := range preemptions { - req.AllocsPreempted = append(req.AllocsPreempted, normalizePreemptedAlloc(preemptedAlloc, unixNow)) - - // Gather jobids to create follow up evals - appendNamespacedJobID(preemptedJobIDs, preemptedAlloc) - } + for _, updateList := range result.NodeUpdate { + for _, stoppedAlloc := range updateList { + req.AllocsStopped = append(req.AllocsStopped, normalizeStoppedAlloc(stoppedAlloc, unixNow)) } - } else { - // COMPAT 0.11: This branch is deprecated and will only be used to support - // application of older log entries. Expected to be removed in a future version. - - // Determine the minimum number of updates, could be more if there - // are multiple updates per node - minUpdates := len(result.NodeUpdate) - minUpdates += len(result.NodeAllocation) + } - // Initialize using the older log entry format for Alloc and NodePreemptions - req.Alloc = make([]*structs.Allocation, 0, minUpdates) - req.NodePreemptions = make([]*structs.Allocation, 0, len(result.NodePreemptions)) + for _, allocList := range result.NodeAllocation { + req.AllocsUpdated = append(req.AllocsUpdated, allocList...) + } - for _, updateList := range result.NodeUpdate { - req.Alloc = append(req.Alloc, updateList...) - } - for _, allocList := range result.NodeAllocation { - req.Alloc = append(req.Alloc, allocList...) - } + // Set the time the alloc was applied for the first time. This can be used + // to approximate the scheduling time. + updateAllocTimestamps(req.AllocsUpdated, unixNow) - for _, preemptions := range result.NodePreemptions { - req.NodePreemptions = append(req.NodePreemptions, preemptions...) - } + err := signAllocIdentities(p.srv.encrypter, plan.Job, req.AllocsUpdated, now) + if err != nil { + return nil, err + } - // Set the time the alloc was applied for the first time. This can be used - // to approximate the scheduling time. - updateAllocTimestamps(req.Alloc, unixNow) + for _, preemptions := range result.NodePreemptions { + for _, preemptedAlloc := range preemptions { + req.AllocsPreempted = append(req.AllocsPreempted, normalizePreemptedAlloc(preemptedAlloc, unixNow)) - // Set modify time for preempted allocs if any - // Also gather jobids to create follow up evals - for _, alloc := range req.NodePreemptions { - alloc.ModifyTime = unixNow - appendNamespacedJobID(preemptedJobIDs, alloc) + // Gather jobids to create follow up evals + appendNamespacedJobID(preemptedJobIDs, preemptedAlloc) } } diff --git a/nomad/plan_normalization_test.go b/nomad/plan_normalization_test.go deleted file mode 100644 index fe43e072721..00000000000 --- a/nomad/plan_normalization_test.go +++ /dev/null @@ -1,80 +0,0 @@ -// Copyright (c) HashiCorp, Inc. -// SPDX-License-Identifier: BUSL-1.1 - -package nomad - -import ( - "bytes" - "testing" - "time" - - "github.com/hashicorp/go-msgpack/v2/codec" - "github.com/hashicorp/nomad/ci" - "github.com/hashicorp/nomad/nomad/mock" - "github.com/hashicorp/nomad/nomad/structs" - "github.com/shoenig/test/must" -) - -// This test compares the size of the normalized + OmitEmpty raft plan log entry -// with the earlier denormalized log. -// -// Whenever this test is changed, care should be taken to ensure the older msgpack size -// is recalculated when new fields are introduced in ApplyPlanResultsRequest -// -// If you make an unrelated change that unexpectedly fails this test, -// consider adding omitempty to the struct you are modifying, e.g.: -// -// type NetworkResource struct { -// // msgpack omit empty fields during serialization -// _struct bool `codec:",omitempty"` // nolint: structcheck -func TestPlanNormalize(t *testing.T) { - ci.Parallel(t) - - // This size was calculated using the older ApplyPlanResultsRequest format, in which allocations - // didn't use OmitEmpty and only the job was normalized in the stopped and preempted allocs. - // The newer format uses OmitEmpty and uses a minimal set of fields for the diff of the - // stopped and preempted allocs. The file for the older format hasn't been checked in, because - // it's not a good idea to check-in a 20mb file to the git repo. - unoptimizedLogSize := 19460168 - - numUpdatedAllocs := 10000 - numStoppedAllocs := 8000 - numPreemptedAllocs := 2000 - mockAlloc := mock.Alloc() - mockAlloc.Job = nil - - mockUpdatedAllocSlice := make([]*structs.Allocation, numUpdatedAllocs) - for i := 0; i < numUpdatedAllocs; i++ { - mockUpdatedAllocSlice = append(mockUpdatedAllocSlice, mockAlloc) - } - - now := time.Now().UTC().UnixNano() - mockStoppedAllocSlice := make([]*structs.AllocationDiff, numStoppedAllocs) - for i := 0; i < numStoppedAllocs; i++ { - mockStoppedAllocSlice = append(mockStoppedAllocSlice, normalizeStoppedAlloc(mockAlloc, now)) - } - - mockPreemptionAllocSlice := make([]*structs.AllocationDiff, numPreemptedAllocs) - for i := 0; i < numPreemptedAllocs; i++ { - mockPreemptionAllocSlice = append(mockPreemptionAllocSlice, normalizePreemptedAlloc(mockAlloc, now)) - } - - // Create a plan result - applyPlanLogEntry := structs.ApplyPlanResultsRequest{ - AllocUpdateRequest: structs.AllocUpdateRequest{ - AllocsUpdated: mockUpdatedAllocSlice, - AllocsStopped: mockStoppedAllocSlice, - }, - AllocsPreempted: mockPreemptionAllocSlice, - } - - handle := structs.MsgpackHandle - var buf bytes.Buffer - if err := codec.NewEncoder(&buf, handle).Encode(applyPlanLogEntry); err != nil { - t.Fatalf("Encoding failed: %v", err) - } - - optimizedLogSize := buf.Len() - ratio := float64(optimizedLogSize) / float64(unoptimizedLogSize) - must.Less(t, 0.6, ratio) -} diff --git a/nomad/state/events_test.go b/nomad/state/events_test.go index 7559dff31ca..484e03ecf49 100644 --- a/nomad/state/events_test.go +++ b/nomad/state/events_test.go @@ -505,12 +505,10 @@ func TestEventsFromChanges_ApplyPlanResultsRequestType(t *testing.T) { msgType := structs.ApplyPlanResultsRequestType req := &structs.ApplyPlanResultsRequest{ - AllocUpdateRequest: structs.AllocUpdateRequest{ - Alloc: []*structs.Allocation{alloc, alloc2}, - Job: job, - }, - Deployment: d, - EvalID: eval.ID, + AllocsUpdated: []*structs.Allocation{alloc, alloc2}, + Job: job, + Deployment: d, + EvalID: eval.ID, } must.NoError(t, s.UpsertPlanResults(msgType, 100, req)) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index d3d2df60984..62f0c5ee973 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -374,12 +374,6 @@ func (s *StateStore) UpsertPlanResults(msgType structs.MessageType, index uint64 return err } - // COMPAT 0.11: Remove this denormalization when NodePreemptions is removed - results.NodePreemptions, err = snapshot.DenormalizeAllocationSlice(results.NodePreemptions) - if err != nil { - return err - } - txn := s.db.WriteTxnMsgT(msgType, index) defer txn.Abort() @@ -418,29 +412,17 @@ func (s *StateStore) UpsertPlanResults(msgType structs.MessageType, index uint64 } numAllocs := 0 - if len(results.Alloc) > 0 || len(results.NodePreemptions) > 0 { - // COMPAT 0.11: This branch will be removed, when Alloc is removed - // Attach the job to all the allocations. It is pulled out in the payload to - // avoid the redundancy of encoding, but should be denormalized prior to - // being inserted into MemDB. - addComputedAllocAttrs(results.Alloc, results.Job) - numAllocs = len(results.Alloc) + len(results.NodePreemptions) - } else { - // Attach the job to all the allocations. It is pulled out in the payload to - // avoid the redundancy of encoding, but should be denormalized prior to - // being inserted into MemDB. - addComputedAllocAttrs(results.AllocsUpdated, results.Job) - numAllocs = len(allocsStopped) + len(results.AllocsUpdated) + len(allocsPreempted) - } - allocsToUpsert := make([]*structs.Allocation, 0, numAllocs) - - // COMPAT 0.11: Both these appends should be removed when Alloc and NodePreemptions are removed - allocsToUpsert = append(allocsToUpsert, results.Alloc...) - allocsToUpsert = append(allocsToUpsert, results.NodePreemptions...) + // Attach the job to all the allocations. It is pulled out in the payload to + // avoid the redundancy of encoding, but should be denormalized prior to + // being inserted into MemDB. + addComputedAllocAttrs(results.AllocsUpdated, results.Job) + addComputedAllocAttrs(allocsStopped, results.Job) + numAllocs = len(allocsStopped) + len(results.AllocsUpdated) + len(allocsPreempted) - allocsToUpsert = append(allocsToUpsert, allocsStopped...) + allocsToUpsert := make([]*structs.Allocation, 0, numAllocs) allocsToUpsert = append(allocsToUpsert, results.AllocsUpdated...) + allocsToUpsert = append(allocsToUpsert, allocsStopped...) allocsToUpsert = append(allocsToUpsert, allocsPreempted...) // handle upgrade path @@ -7376,6 +7358,7 @@ func (s *StateSnapshot) DenormalizeAllocationDiffSlice(allocDiffs []*structs.All // If alloc is a stopped alloc allocCopy.DesiredDescription = allocDiff.DesiredDescription allocCopy.DesiredStatus = structs.AllocDesiredStatusStop + allocCopy.AllocStates = allocDiff.AllocStates if allocDiff.ClientStatus != "" { allocCopy.ClientStatus = allocDiff.ClientStatus } diff --git a/nomad/state/state_store_service_registration_test.go b/nomad/state/state_store_service_registration_test.go index e495d660254..660b60d3455 100644 --- a/nomad/state/state_store_service_registration_test.go +++ b/nomad/state/state_store_service_registration_test.go @@ -695,9 +695,7 @@ func TestAlloc_ServiceRegistrationLifecycle(t *testing.T) { index++ must.NoError(t, store.UpsertPlanResults(structs.MsgTypeTestSetup, index, &structs.ApplyPlanResultsRequest{ - AllocUpdateRequest: structs.AllocUpdateRequest{ - AllocsStopped: []*structs.AllocationDiff{&diff}, - }, + AllocsStopped: []*structs.AllocationDiff{&diff}, })) iter, err = store.GetServiceRegistrationsByAllocID(nil, alloc1.ID) diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index a744549550e..f6abf4831f0 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -106,7 +106,6 @@ func TestStateStore_Blocking_MinQuery(t *testing.T) { test.True(t, resp.(bool)) } -// COMPAT 0.11: Uses AllocUpdateRequest.Alloc // This test checks that: // 1) The job is denormalized // 2) Allocations are created @@ -128,11 +127,9 @@ func TestStateStore_UpsertPlanResults_AllocationsCreated_Denormalized(t *testing // Create a plan result res := structs.ApplyPlanResultsRequest{ - AllocUpdateRequest: structs.AllocUpdateRequest{ - Alloc: []*structs.Allocation{alloc}, - Job: job, - }, - EvalID: eval.ID, + AllocsUpdated: []*structs.Allocation{alloc}, + Job: job, + EvalID: eval.ID, } err := state.UpsertPlanResults(structs.MsgTypeTestSetup, 1000, &res) @@ -198,11 +195,9 @@ func TestStateStore_UpsertPlanResults_AllocationsDenormalized(t *testing.T) { // Create a plan result res := structs.ApplyPlanResultsRequest{ - AllocUpdateRequest: structs.AllocUpdateRequest{ - AllocsUpdated: []*structs.Allocation{alloc}, - AllocsStopped: []*structs.AllocationDiff{stoppedAllocDiff}, - Job: mJob, - }, + AllocsUpdated: []*structs.Allocation{alloc}, + AllocsStopped: []*structs.AllocationDiff{stoppedAllocDiff}, + Job: mJob, EvalID: eval.ID, AllocsPreempted: []*structs.AllocationDiff{preemptedAllocDiff}, } @@ -276,12 +271,10 @@ func TestStateStore_UpsertPlanResults_Deployment(t *testing.T) { // Create a plan result res := structs.ApplyPlanResultsRequest{ - AllocUpdateRequest: structs.AllocUpdateRequest{ - Alloc: []*structs.Allocation{alloc, alloc2}, - Job: job, - }, - Deployment: d, - EvalID: eval.ID, + AllocsUpdated: []*structs.Allocation{alloc, alloc2}, + Job: job, + Deployment: d, + EvalID: eval.ID, } err := state.UpsertPlanResults(structs.MsgTypeTestSetup, 1000, &res) @@ -318,12 +311,10 @@ func TestStateStore_UpsertPlanResults_Deployment(t *testing.T) { // Create another plan res = structs.ApplyPlanResultsRequest{ - AllocUpdateRequest: structs.AllocUpdateRequest{ - Alloc: []*structs.Allocation{allocNew, allocNew2}, - Job: job, - }, - Deployment: d2, - EvalID: eval.ID, + AllocsUpdated: []*structs.Allocation{allocNew, allocNew2}, + Job: job, + Deployment: d2, + EvalID: eval.ID, } err = state.UpsertPlanResults(structs.MsgTypeTestSetup, 1001, &res) @@ -382,12 +373,10 @@ func TestStateStore_UpsertPlanResults_PreemptedAllocs(t *testing.T) { // Create a plan result res := structs.ApplyPlanResultsRequest{ - AllocUpdateRequest: structs.AllocUpdateRequest{ - Alloc: []*structs.Allocation{alloc}, - Job: job, - }, + AllocsUpdated: []*structs.Allocation{alloc}, + Job: job, EvalID: eval.ID, - NodePreemptions: []*structs.Allocation{minimalPreemptedAlloc}, + AllocsPreempted: []*structs.AllocationDiff{minimalPreemptedAlloc.AllocationDiff()}, PreemptionEvals: []*structs.Evaluation{eval2}, } @@ -462,10 +451,8 @@ func TestStateStore_UpsertPlanResults_DeploymentUpdates(t *testing.T) { // Create a plan result res := structs.ApplyPlanResultsRequest{ - AllocUpdateRequest: structs.AllocUpdateRequest{ - Alloc: []*structs.Allocation{alloc}, - Job: job, - }, + AllocsUpdated: []*structs.Allocation{alloc}, + Job: job, Deployment: dnew, DeploymentUpdates: []*structs.DeploymentStatusUpdate{update}, EvalID: eval.ID, @@ -527,11 +514,9 @@ func TestStateStore_UpsertPlanResults_AllocationResources(t *testing.T) { // Create a plan result res := structs.ApplyPlanResultsRequest{ - AllocUpdateRequest: structs.AllocUpdateRequest{ - Alloc: []*structs.Allocation{alloc}, - Job: job, - }, - EvalID: eval.ID, + AllocsUpdated: []*structs.Allocation{alloc}, + Job: job, + EvalID: eval.ID, } must.NoError(t, state.UpsertPlanResults(structs.MsgTypeTestSetup, 1000, &res)) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 615ea398d55..b4449c9c4ab 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1062,11 +1062,22 @@ type PlanRequest struct { } // ApplyPlanResultsRequest is used by the planner to apply a Raft transaction -// committing the result of a plan. +// committing the result of a plan, including assigning new allocations or +// evicting existing ones. type ApplyPlanResultsRequest struct { - // AllocUpdateRequest holds the allocation updates to be made by the - // scheduler. - AllocUpdateRequest + // Allocations to stop. Contains only the diff, not the entire allocation + AllocsStopped []*AllocationDiff + + // New or updated allocations + AllocsUpdated []*Allocation + + // Evals is the list of new evaluations to create Evals are valid only when + // used in the Raft RPC + Evals []*Evaluation + + // Job is the shared parent job of the allocations. It is pulled out of the + // request sent over the wire from the scheduler to reduce payload size. + Job *Job // Deployment is the deployment created or updated as a result of a // scheduling event. @@ -1085,12 +1096,6 @@ type ApplyPlanResultsRequest struct { // the evaluation itself being updated. EvalID string - // COMPAT 0.11 - // NodePreemptions is a slice of allocations from other lower priority jobs - // that are preempted. Preempted allocations are marked as evicted. - // Deprecated: Replaced with AllocsPreempted which contains only the diff - NodePreemptions []*Allocation - // AllocsPreempted is a slice of allocation diffs from other lower priority jobs // that are preempted. Preempted allocations are marked as evicted. AllocsPreempted []*AllocationDiff @@ -1108,30 +1113,16 @@ type ApplyPlanResultsRequest struct { UpdatedAt int64 } -// AllocUpdateRequest is used to submit changes to allocations, either -// to cause evictions or to assign new allocations. Both can be done -// within a single transaction +// AllocUpdateRequest is used to update the server from the client. type AllocUpdateRequest struct { - // COMPAT 0.11 - // Alloc is the list of new allocations to assign - // Deprecated: Replaced with two separate slices, one containing stopped allocations - // and another containing updated allocations + // Alloc is the list of allocation updates from the client Alloc []*Allocation - // Allocations to stop. Contains only the diff, not the entire allocation - AllocsStopped []*AllocationDiff - - // New or updated allocations - AllocsUpdated []*Allocation - - // Evals is the list of new evaluations to create - // Evals are valid only when used in the Raft RPC + // Evals is the list of new evaluations to create; these are only added to + // the request object in the RPC handler so that we're writing them into the + // Raft log entry Evals []*Evaluation - // Job is the shared parent job of the allocations. - // It is pulled out since it is common to reduce payload size. - Job *Job - WriteRequest } diff --git a/nomad/util.go b/nomad/util.go index 365f83f5acd..4e0f02492a2 100644 --- a/nomad/util.go +++ b/nomad/util.go @@ -23,11 +23,6 @@ const ( deprecatedAPIMajorVersionStr = "1" ) -// MinVersionPlanNormalization is the minimum version to support the -// normalization of Plan in SubmitPlan, and the denormalization raft log entry committed -// in ApplyPlanResultsRequest -var MinVersionPlanNormalization = version.Must(version.NewVersion("0.9.2")) - // ensurePath is used to make sure a path exists func ensurePath(path string, dir bool) error { if !dir { diff --git a/nomad/worker.go b/nomad/worker.go index 467a2e7d1f6..d1ca16fc2ac 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -664,14 +664,7 @@ func (w *Worker) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, sstructs.S plan.SnapshotIndex = w.snapshotIndex // Normalize stopped and preempted allocs before RPC - normalizePlan := w.srv.peersCache.ServersMeetMinimumVersion( - w.srv.Region(), - MinVersionPlanNormalization, - true, - ) - if normalizePlan { - plan.NormalizeAllocations() - } + plan.NormalizeAllocations() // Setup the request req := structs.PlanRequest{ diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index ea253ef7c80..032f35d2c90 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -707,10 +707,8 @@ func (s *GenericScheduler) computePlacements( // blocked eval without dropping the reschedule tracker if prevAllocation != nil { if missing.IsRescheduling() { - updatedPrevAllocation := prevAllocation.Copy() missing.SetPreviousAllocation(prevAllocation) - annotateRescheduleTracker(updatedPrevAllocation, structs.LastRescheduleFailedToPlace) - swapAllocInPlan(s.plan, prevAllocation, updatedPrevAllocation) + markFailedToReschedule(s.plan, prevAllocation, s.job) } } @@ -722,6 +720,25 @@ func (s *GenericScheduler) computePlacements( return nil } +// markFailedToReschedule takes a "previous" allocation that we were unable to +// reschedule and updates the plan to annotate its reschedule tracker and to +// move it out of the stop list and into the update list so that we don't drop +// tracking information in the plan applier +func markFailedToReschedule(plan *structs.Plan, original *structs.Allocation, job *structs.Job) { + updated := original.Copy() + annotateRescheduleTracker(updated, structs.LastRescheduleFailedToPlace) + + plan.PopUpdate(original) + nodeID := original.NodeID + for i, alloc := range plan.NodeAllocation[nodeID] { + if alloc.ID == original.ID { + plan.NodeAllocation[nodeID][i] = updated + return + } + } + plan.AppendAlloc(updated, job) +} + // swapAllocInPlan updates a plan to swap out an allocation that's already in // the plan with an updated definition of that allocation. The updated // definition should be a deep copy. diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 0b3914eaadf..b25a75b9391 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -1009,14 +1009,14 @@ func TestServiceSched_JobRegister_Datacenter_Downgrade(t *testing.T) { // Create 5 nodes in each datacenter. // Use two loops so nodes are separated by datacenter. nodes := []*structs.Node{} - for i := 0; i < 5; i++ { + for i := range 5 { node := mock.Node() node.Name = fmt.Sprintf("node-dc1-%d", i) node.Datacenter = "dc1" nodes = append(nodes, node) must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) } - for i := 0; i < 5; i++ { + for i := range 5 { node := mock.Node() node.Name = fmt.Sprintf("node-dc2-%d", i) node.Datacenter = "dc2" @@ -1045,7 +1045,7 @@ func TestServiceSched_JobRegister_Datacenter_Downgrade(t *testing.T) { // Create allocs for this job version with one being a canary and another // marked as failed. allocs := []*structs.Allocation{} - for i := 0; i < 3; i++ { + for i := range 3 { alloc := mock.Alloc() alloc.Job = job1 alloc.JobID = job1.ID @@ -1099,10 +1099,15 @@ func TestServiceSched_JobRegister_Datacenter_Downgrade(t *testing.T) { must.Len(t, 1, allocs) alloc := allocs[0] - must.SliceContains(t, alloc.Job.Datacenters, node.Datacenter, must.Sprintf( + + expect := "dc2" + if alloc.RescheduleTracker != nil { + expect = "dc1" + } + + must.Eq(t, expect, node.Datacenter, must.Sprintf( "alloc for job in datacenter %q placed in %q", - alloc.Job.Datacenters, - node.Datacenter, + expect, node.Datacenter, )) } } @@ -1143,14 +1148,14 @@ func TestServiceSched_JobRegister_NodePool_Downgrade(t *testing.T) { // Create 5 nodes in each node pool. // Use two loops so nodes are separated by node pool. nodes := []*structs.Node{} - for i := 0; i < 5; i++ { + for i := range 5 { node := mock.Node() node.Name = fmt.Sprintf("node-binpack-%d", i) node.NodePool = poolBinpack.Name nodes = append(nodes, node) must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) } - for i := 0; i < 5; i++ { + for i := range 5 { node := mock.Node() node.Name = fmt.Sprintf("node-spread-%d", i) node.NodePool = poolSpread.Name @@ -1179,7 +1184,7 @@ func TestServiceSched_JobRegister_NodePool_Downgrade(t *testing.T) { // Create allocs for this job version with one being a canary and another // marked as failed. allocs := []*structs.Allocation{} - for i := 0; i < 3; i++ { + for i := range 3 { alloc := mock.Alloc() alloc.Job = job1 alloc.JobID = job1.ID @@ -1233,10 +1238,15 @@ func TestServiceSched_JobRegister_NodePool_Downgrade(t *testing.T) { must.Len(t, 1, allocs) alloc := allocs[0] - must.Eq(t, alloc.Job.NodePool, node.NodePool, must.Sprintf( + + expect := poolSpread.Name + if alloc.RescheduleTracker != nil { + expect = poolBinpack.Name + } + + must.Eq(t, expect, node.NodePool, must.Sprintf( "alloc for job in node pool %q placed in node in node pool %q", - alloc.Job.NodePool, - node.NodePool, + expect, node.NodePool, )) } } @@ -3764,6 +3774,7 @@ func TestServiceSched_NodeDown(t *testing.T) { reschedule bool terminal bool lost bool + inplace bool }{ { name: "should stop is running should be lost", @@ -3800,6 +3811,7 @@ func TestServiceSched_NodeDown(t *testing.T) { desired: structs.AllocDesiredStatusRun, client: structs.AllocClientStatusFailed, reschedule: true, + inplace: true, }, { name: "should evict is running should be lost", @@ -3857,12 +3869,19 @@ func TestServiceSched_NodeDown(t *testing.T) { must.Len(t, 0, h.Plans, must.Sprint("expected no plan")) } else { must.Len(t, 1, h.Plans, must.Sprint("expected plan")) - plan := h.Plans[0] - out := plan.NodeUpdate[node.ID] - must.Len(t, 1, out) - outAlloc := out[0] + var outAlloc *structs.Allocation + if tc.inplace { + out := plan.NodeAllocation[node.ID] + must.Len(t, 1, out) + outAlloc = out[0] + } else { + out := plan.NodeUpdate[node.ID] + must.Len(t, 1, out) + outAlloc = out[0] + } + if tc.migrate { must.NotEq(t, structs.AllocClientStatusLost, outAlloc.ClientStatus) } else if tc.reschedule { @@ -3995,7 +4014,8 @@ func TestServiceSched_StopOnClientAfter(t *testing.T) { must.Eq(t, structs.AllocClientStatusLost, alloc.ClientStatus) // 1 if rescheduled, 2 for rescheduled later - test.Len(t, tc.expectedAllocStates, alloc.AllocStates) + test.Len(t, tc.expectedAllocStates, alloc.AllocStates, + test.Sprint("unexpected allocation states")) if tc.expectBlockedEval { must.Eq(t, structs.EvalStatusBlocked, followupEval.Status) @@ -4999,13 +5019,14 @@ func TestServiceSched_BlockedReschedule(t *testing.T) { // "use up" resources on the node so the follow-up will block node.NodeResources.Memory.MemoryMB = 200 + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) // Process the follow-up eval, which results in a stop but not a replacement must.NoError(t, h.Process(NewServiceScheduler, followupEval)) must.Len(t, 4, h.Plans) - must.MapLen(t, 1, h.Plans[3].NodeUpdate) // stop - must.MapLen(t, 0, h.Plans[3].NodeAllocation) // place + must.MapLen(t, 0, h.Plans[3].NodeUpdate) // stop + must.MapLen(t, 1, h.Plans[3].NodeAllocation) // update out, err = h.State.AllocsByJob(ws, job.Namespace, job.ID, false) must.NoError(t, err) @@ -5028,10 +5049,14 @@ func TestServiceSched_BlockedReschedule(t *testing.T) { must.NoError(t, h.Process(NewServiceScheduler, blockedEval)) must.Len(t, 4, h.Plans, must.Sprint("expected no new plan")) - // bypass the timer check by setting the alloc's follow-up eval ID to be the - // blocked eval alloc, err = h.State.AllocByID(ws, replacementAllocID) must.NoError(t, err) + must.NotNil(t, alloc) + must.NotNil(t, alloc.RescheduleTracker) + must.Eq(t, structs.LastRescheduleFailedToPlace, alloc.RescheduleTracker.LastReschedule) + + // bypass the timer check by setting the alloc's follow-up eval ID to be the + // blocked eval alloc = alloc.Copy() alloc.FollowupEvalID = blockedEval.ID must.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, diff --git a/scheduler/structs/structs.go b/scheduler/structs/structs.go index 58bd8df2b03..7a53717129d 100644 --- a/scheduler/structs/structs.go +++ b/scheduler/structs/structs.go @@ -10,6 +10,7 @@ import ( "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-version" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" ) @@ -66,7 +67,9 @@ func NewPlanWithStateAndIndex(state *state.StateStore, nextIndex uint64, servers return &PlanBuilder{State: state, nextIndex: nextIndex, serversMeetMinimumVersion: serversMeetMinimumVersion} } -// PlanBuilder is used to submit plans. +// PlanBuilder is used to create plans outside the usual scheduler worker flow, +// such as testing, recalculating queued allocs during snapshot restore in the +// FSM, or the online plans created in the Job.Plan RPC type PlanBuilder struct { State *state.StateStore @@ -111,7 +114,6 @@ func (p *PlanBuilder) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State result.NodePreemptions = plan.NodePreemptions result.AllocIndex = index - // Flatten evicts and allocs now := time.Now().UTC().UnixNano() allocsUpdated := make([]*structs.Allocation, 0, len(result.NodeAllocation)) @@ -120,39 +122,37 @@ func (p *PlanBuilder) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State } updateCreateTimestamp(allocsUpdated, now) - // Setup the update request - req := structs.ApplyPlanResultsRequest{ - AllocUpdateRequest: structs.AllocUpdateRequest{ - Job: plan.Job, - }, - Deployment: plan.Deployment, - DeploymentUpdates: plan.DeploymentUpdates, - EvalID: plan.EvalID, - } - - var allocs []*structs.Allocation + snap, _ := p.State.Snapshot() - allocsStopped := make([]*structs.Allocation, 0, len(result.NodeUpdate)) + // make sure these are denormalized the same way they would be in the real + // plan applier + allocsStopped := make([]*structs.AllocationDiff, 0, len(result.NodeUpdate)) for _, updateList := range plan.NodeUpdate { - allocsStopped = append(allocsStopped, updateList...) - } - allocs = append(allocs, allocsStopped...) - - allocs = append(allocs, allocsUpdated...) - updateCreateTimestamp(allocs, now) + stopped, _ := snap.DenormalizeAllocationSlice(updateList) + allocsStopped = append(allocsStopped, helper.ConvertSlice(stopped, + func(a *structs.Allocation) *structs.AllocationDiff { return a.AllocationDiff() })...) - req.Alloc = allocs + } - // Set modify time for preempted allocs and flatten them - var preemptedAllocs []*structs.Allocation - for _, preemptions := range result.NodePreemptions { - for _, alloc := range preemptions { - alloc.ModifyTime = now - preemptedAllocs = append(preemptedAllocs, alloc) - } + // make sure these are denormalized the same way they would be in the real + // plan applier + allocsPreempted := make([]*structs.AllocationDiff, 0, len(result.NodePreemptions)) + for _, preemptionList := range result.NodePreemptions { + preemptions, _ := snap.DenormalizeAllocationSlice(preemptionList) + allocsPreempted = append(allocsPreempted, helper.ConvertSlice(preemptions, + func(a *structs.Allocation) *structs.AllocationDiff { return a.AllocationDiff() })...) } - req.NodePreemptions = preemptedAllocs + // Setup the update request + req := structs.ApplyPlanResultsRequest{ + AllocsStopped: allocsStopped, + AllocsUpdated: allocsUpdated, + AllocsPreempted: allocsPreempted, + Job: plan.Job, + Deployment: plan.Deployment, + DeploymentUpdates: plan.DeploymentUpdates, + EvalID: plan.EvalID, + } if p.noSubmit { return result, nil, nil @@ -170,6 +170,7 @@ func updateCreateTimestamp(allocations []*structs.Allocation, now int64) { if alloc.CreateTime == 0 { alloc.CreateTime = now } + alloc.ModifyTime = now } }