Skip to content

Commit ea150b3

Browse files
backport of commit ff40ce9 (#27892)
Co-authored-by: Juana De La Cuesta <mcj@delacuesta.co>
1 parent f4a672e commit ea150b3

17 files changed

Lines changed: 343 additions & 141 deletions

.changelog/27852.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:bug
2+
core: avoid setting job to dead while waiting for allocations to reschedule
3+
```

nomad/alloc_endpoint_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -620,9 +620,13 @@ func TestAllocEndpoint_List_Blocking(t *testing.T) {
620620
alloc2 := mock.Alloc()
621621
alloc2.ID = alloc.ID
622622
alloc2.ClientStatus = structs.AllocClientStatusRunning
623+
updateReq := structs.AllocUpdateRequest{
624+
Alloc: []*structs.Allocation{alloc2},
625+
}
626+
623627
time.AfterFunc(100*time.Millisecond, func() {
624628
state.UpsertJobSummary(3, mock.JobSummary(alloc2.JobID))
625-
if err := state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 5, []*structs.Allocation{alloc2}); err != nil {
629+
if err := state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 5, updateReq); err != nil {
626630
t.Fatalf("err: %v", err)
627631
}
628632
})

nomad/deployment_endpoint_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1758,7 +1758,11 @@ func TestDeploymentEndpoint_Allocations_Blocking(t *testing.T) {
17581758
a2.ClientStatus = structs.AllocClientStatusRunning
17591759
time.AfterFunc(100*time.Millisecond, func() {
17601760
assert.Nil(state.UpsertJobSummary(5, mock.JobSummary(a2.JobID)), "UpsertJobSummary")
1761-
assert.Nil(state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 6, []*structs.Allocation{a2}), "updateAllocsFromClient")
1761+
1762+
req := structs.AllocUpdateRequest{
1763+
Alloc: []*structs.Allocation{a2},
1764+
}
1765+
assert.Nil(state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 6, req), "updateAllocsFromClient")
17621766
})
17631767

17641768
req.MinQueryIndex = 4

nomad/deploymentwatcher/deployments_watcher_test.go

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -998,7 +998,11 @@ func TestDeploymentWatcher_Watch_ProgressDeadline(t *testing.T) {
998998
Healthy: pointer.Of(false),
999999
Timestamp: now,
10001000
}
1001-
must.NoError(t, m.state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 100, []*structs.Allocation{a2}))
1001+
1002+
updateReq := structs.AllocUpdateRequest{
1003+
Alloc: []*structs.Allocation{a2},
1004+
}
1005+
must.NoError(t, m.state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 100, updateReq))
10021006

10031007
// Wait for the deployment to be failed
10041008
must.Wait(t, wait.InitialSuccess(wait.BoolFunc(func() bool {
@@ -1149,7 +1153,11 @@ func TestDeploymentWatcher_Watch_ProgressDeadline_Canaries(t *testing.T) {
11491153
Healthy: pointer.Of(true),
11501154
Timestamp: now,
11511155
}
1152-
must.NoError(t, m.state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a2}))
1156+
1157+
updateReq := structs.AllocUpdateRequest{
1158+
Alloc: []*structs.Allocation{a2},
1159+
}
1160+
must.NoError(t, m.state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, m.nextIndex(), updateReq))
11531161

11541162
// Wait for the deployment to cross the deadline
11551163
dout, err := m.state.DeploymentByID(nil, d.ID)
@@ -1320,8 +1328,11 @@ func TestDeploymentWatcher_ProgressDeadline_LatePromote(t *testing.T) {
13201328
Timestamp: now,
13211329
}
13221330

1323-
allocs = []*structs.Allocation{canary2}
1324-
err := m.state.UpdateAllocsFromClient(mtype, m.nextIndex(), allocs)
1331+
updateReq := structs.AllocUpdateRequest{
1332+
Alloc: []*structs.Allocation{canary2},
1333+
}
1334+
1335+
err := m.state.UpdateAllocsFromClient(mtype, m.nextIndex(), updateReq)
13251336
must.NoError(t, err)
13261337

13271338
// wait for long enough to ensure we read deployment update channel
@@ -1339,8 +1350,11 @@ func TestDeploymentWatcher_ProgressDeadline_LatePromote(t *testing.T) {
13391350
Timestamp: now,
13401351
}
13411352

1342-
allocs = []*structs.Allocation{canary1}
1343-
err = m.state.UpdateAllocsFromClient(mtype, m.nextIndex(), allocs)
1353+
updateReq2 := structs.AllocUpdateRequest{
1354+
Alloc: []*structs.Allocation{canary1},
1355+
}
1356+
1357+
err = m.state.UpdateAllocsFromClient(mtype, m.nextIndex(), updateReq2)
13441358
must.NoError(t, err)
13451359

13461360
// ensure progress_deadline has definitely expired
@@ -1406,8 +1420,11 @@ func TestDeploymentWatcher_ProgressDeadline_LatePromote(t *testing.T) {
14061420
Timestamp: now,
14071421
}
14081422

1409-
allocs = []*structs.Allocation{alloc1a, alloc1b}
1410-
err = m.state.UpdateAllocsFromClient(mtype, m.nextIndex(), allocs)
1423+
updateReq3 := structs.AllocUpdateRequest{
1424+
Alloc: []*structs.Allocation{alloc1a, alloc1b},
1425+
}
1426+
1427+
err = m.state.UpdateAllocsFromClient(mtype, m.nextIndex(), updateReq3)
14111428
must.NoError(t, err)
14121429

14131430
// ensure any progress deadline has expired
@@ -1470,7 +1487,12 @@ func TestDeploymentWatcher_Watch_StartWithoutProgressDeadline(t *testing.T) {
14701487
Healthy: pointer.Of(false),
14711488
Timestamp: time.Now(),
14721489
}
1473-
must.NoError(t, m.state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a2}))
1490+
1491+
updateReq := structs.AllocUpdateRequest{
1492+
Alloc: []*structs.Allocation{a2},
1493+
}
1494+
1495+
must.NoError(t, m.state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, m.nextIndex(), updateReq))
14741496

14751497
// Wait for the alloc's DesiredState to set reschedule
14761498
must.Wait(t, wait.InitialSuccess(wait.ErrorFunc(func() error {
@@ -1528,7 +1550,12 @@ func TestDeploymentWatcher_Watch_FailEarly(t *testing.T) {
15281550
Healthy: pointer.Of(false),
15291551
Timestamp: now,
15301552
}
1531-
must.Nil(t, m.state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a2}))
1553+
1554+
updateReq := structs.AllocUpdateRequest{
1555+
Alloc: []*structs.Allocation{a2},
1556+
}
1557+
1558+
must.Nil(t, m.state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, m.nextIndex(), updateReq))
15321559

15331560
// Wait for the deployment to be failed
15341561
must.Wait(t, wait.InitialSuccess(wait.ErrorFunc(func() error {

nomad/drainer/watch_jobs_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,10 @@ func TestDrainingJobWatcher_DrainJobs(t *testing.T) {
210210
a.ClientStatus = structs.AllocClientStatusComplete
211211
completeAllocs[i] = a
212212
}
213-
must.NoError(t, store.UpdateAllocsFromClient(structs.MsgTypeTestSetup, index, completeAllocs))
213+
updateReq := structs.AllocUpdateRequest{
214+
Alloc: completeAllocs,
215+
}
216+
must.NoError(t, store.UpdateAllocsFromClient(structs.MsgTypeTestSetup, index, updateReq))
214217
index++
215218

216219
// The drained allocs stopping cause migrations but no new drains

nomad/fsm.go

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -665,6 +665,7 @@ func (n *nomadFSM) applyNodePoolDelete(msgType structs.MessageType, buf []byte,
665665

666666
func (n *nomadFSM) applyUpsertJob(msgType structs.MessageType, buf []byte, index uint64) interface{} {
667667
defer metrics.MeasureSince([]string{"nomad", "fsm", "register_job"}, time.Now())
668+
668669
var req structs.JobRegisterRequest
669670
if err := structs.Decode(buf, &req); err != nil {
670671
panic(fmt.Errorf("failed to decode request: %v", err))
@@ -937,6 +938,7 @@ func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, pu
937938

938939
func (n *nomadFSM) applyUpdateEval(msgType structs.MessageType, buf []byte, index uint64) interface{} {
939940
defer metrics.MeasureSince([]string{"nomad", "fsm", "update_eval"}, time.Now())
941+
940942
var req structs.EvalUpdateRequest
941943
if err := structs.Decode(buf, &req); err != nil {
942944
panic(fmt.Errorf("failed to decode request: %v", err))
@@ -1012,6 +1014,7 @@ func (n *nomadFSM) applyAllocUpdate(_ structs.MessageType, _ []byte, _ uint64) i
10121014

10131015
func (n *nomadFSM) applyAllocClientUpdate(msgType structs.MessageType, buf []byte, index uint64) interface{} {
10141016
defer metrics.MeasureSince([]string{"nomad", "fsm", "alloc_client_update"}, time.Now())
1017+
10151018
var req structs.AllocUpdateRequest
10161019
if err := structs.Decode(buf, &req); err != nil {
10171020
panic(fmt.Errorf("failed to decode request: %v", err))
@@ -1024,7 +1027,6 @@ func (n *nomadFSM) applyAllocClientUpdate(msgType structs.MessageType, buf []byt
10241027
ws := memdb.NewWatchSet()
10251028

10261029
followupEvalsToCancel := []string{}
1027-
10281030
// Updating the allocs with the job id and task group name
10291031
for _, alloc := range req.Alloc {
10301032
if existing, _ := n.state.AllocByID(ws, alloc.ID); existing != nil {
@@ -1041,19 +1043,13 @@ func (n *nomadFSM) applyAllocClientUpdate(msgType structs.MessageType, buf []byt
10411043
}
10421044
}
10431045

1044-
// Update all the client allocations
1045-
if err := n.state.UpdateAllocsFromClient(msgType, index, req.Alloc); err != nil {
1046+
if err := n.state.UpdateAllocsFromClient(msgType, index, req); err != nil {
10461047
n.logger.Error("UpdateAllocFromClient failed", "error", err)
10471048
return err
10481049
}
10491050

1050-
// Update any evals that were added by the RPC handler
1051-
if len(req.Evals) > 0 {
1052-
if err := n.upsertEvals(msgType, index, req.Evals); err != nil {
1053-
n.logger.Error("applyAllocClientUpdate failed to update evaluations", "error", err)
1054-
return err
1055-
}
1056-
}
1051+
// Enqueue any evals that were added by the RPC handler
1052+
n.handleUpsertedEvals(req.Evals)
10571053

10581054
// Unblock evals for the nodes computed node class if the client has
10591055
// finished running an allocation.
@@ -2137,6 +2133,7 @@ func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error {
21372133
Status: structs.EvalStatusPending,
21382134
AnnotatePlan: true,
21392135
}
2136+
21402137
// Ignore eval event creation during snapshot restore
21412138
snap.UpsertEvals(structs.IgnoreUnknownTypeFlag, 100, []*structs.Evaluation{eval})
21422139
// Create the scheduler and run it

nomad/job_endpoint_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7259,7 +7259,11 @@ func TestJobEndpoint_Dispatch_JobChildrenSummary(t *testing.T) {
72597259
require.NoError(t, err)
72607260
nalloc = nalloc.Copy()
72617261
nalloc.ClientStatus = status
7262-
err = s1.State().UpdateAllocsFromClient(structs.MsgTypeTestSetup, nextIdx, []*structs.Allocation{nalloc})
7262+
7263+
updateReq := structs.AllocUpdateRequest{
7264+
Alloc: []*structs.Allocation{nalloc},
7265+
}
7266+
err = s1.State().UpdateAllocsFromClient(structs.MsgTypeTestSetup, nextIdx, updateReq)
72637267
require.NoError(t, err)
72647268
}
72657269

nomad/node_endpoint_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3410,7 +3410,11 @@ func TestClientEndpoint_GetAllocs_Blocking(t *testing.T) {
34103410
allocUpdate.ID = alloc.ID
34113411
allocUpdate.ClientStatus = structs.AllocClientStatusRunning
34123412
state.UpsertJobSummary(199, mock.JobSummary(allocUpdate.JobID))
3413-
err := state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 200, []*structs.Allocation{allocUpdate})
3413+
3414+
req := structs.AllocUpdateRequest{
3415+
Alloc: []*structs.Allocation{allocUpdate},
3416+
}
3417+
err := state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 200, req)
34143418
if err != nil {
34153419
t.Fatalf("err: %v", err)
34163420
}

0 commit comments

Comments
 (0)