Skip to content

Commit 00a8c42

Browse files
Merge pull request #465 from gastownhall/fix/drained-wake-mode-invariant-complete
fix: honor wake_mode on drain-ack
2 parents b5d9963 + c596d2f commit 00a8c42

6 files changed

Lines changed: 406 additions & 4 deletions

File tree

cmd/gc/session_reconcile.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -753,8 +753,8 @@ func healState(session *beads.Bead, alive bool, store beads.Store, clk clock.Clo
753753
sleepReason := session.Metadata["sleep_reason"]
754754
isDraining := sleepReason == "idle" || sleepReason == "idle-timeout" ||
755755
sleepReason == "no-wake-reason" || sleepReason == "config-drift" ||
756-
sleepReason == "drained" || sleepReason == "user-hold" ||
757-
sleepReason == "wait-hold"
756+
sleepReason == "drained" ||
757+
sleepReason == "user-hold" || sleepReason == "wait-hold"
758758
if !isDraining && (prevState == "active" || prevState == "awake" || prevState == "creating") {
759759
batch["session_key"] = ""
760760
batch["started_config_hash"] = ""

cmd/gc/session_reconcile_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1204,6 +1204,7 @@ func TestHealState_ClearsStaleResumeMetadata(t *testing.T) {
12041204
name string
12051205
prevState string
12061206
sleepReason string
1207+
wakeMode string
12071208
sessionKey string
12081209
startedConfigHash string
12091210
wantKeyCleared bool
@@ -1294,6 +1295,17 @@ func TestHealState_ClearsStaleResumeMetadata(t *testing.T) {
12941295
name: "drained — resume metadata preserved",
12951296
prevState: "active",
12961297
sleepReason: "drained",
1298+
wakeMode: "resume",
1299+
sessionKey: "abc-123",
1300+
startedConfigHash: "hash-before",
1301+
wantKeyCleared: false,
1302+
wantStartedHashCleared: false,
1303+
},
1304+
{
1305+
name: "drained with wake_mode=fresh — resume metadata preserved (identity cleared at drain-ack/completeDrain)",
1306+
prevState: "active",
1307+
sleepReason: "drained",
1308+
wakeMode: "fresh",
12971309
sessionKey: "abc-123",
12981310
startedConfigHash: "hash-before",
12991311
wantKeyCleared: false,
@@ -1326,6 +1338,7 @@ func TestHealState_ClearsStaleResumeMetadata(t *testing.T) {
13261338
session := makeBead("b1", map[string]string{
13271339
"state": tt.prevState,
13281340
"sleep_reason": tt.sleepReason,
1341+
"wake_mode": tt.wakeMode,
13291342
"session_key": tt.sessionKey,
13301343
"started_config_hash": tt.startedConfigHash,
13311344
})

cmd/gc/session_reconciler.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -344,10 +344,12 @@ func reconcileSessionBeadsTraced(
344344
if dops != nil {
345345
if acked, _ := dops.isDrainAcked(name); acked {
346346
_ = dops.clearDrain(name)
347+
stopped := !alive // already dead = effectively stopped
347348
if alive {
348349
if err := sp.Stop(name); err != nil {
349350
fmt.Fprintf(stderr, "session reconciler: stopping drain-acked %s: %v\n", name, err) //nolint:errcheck
350351
} else {
352+
stopped = true
351353
fmt.Fprintf(stdout, "Stopped drain-acked session '%s'\n", name) //nolint:errcheck
352354
}
353355
}
@@ -357,9 +359,24 @@ func reconcileSessionBeadsTraced(
357359
Subject: tp.DisplayName(),
358360
Message: "drain acknowledged by agent",
359361
})
360-
if store != nil && session.ID != "" {
361-
_ = store.SetMetadata(session.ID, "state", "drained")
362+
if stopped && store != nil && session.ID != "" {
363+
batch := map[string]string{
364+
"state": "drained",
365+
"last_woke_at": "",
366+
}
367+
if session.Metadata["wake_mode"] == "fresh" {
368+
batch["session_key"] = ""
369+
batch["started_config_hash"] = ""
370+
batch["continuation_reset_pending"] = "true"
371+
}
372+
_ = store.SetMetadataBatch(session.ID, batch)
362373
session.Metadata["state"] = "drained"
374+
session.Metadata["last_woke_at"] = ""
375+
if session.Metadata["wake_mode"] == "fresh" {
376+
session.Metadata["session_key"] = ""
377+
session.Metadata["started_config_hash"] = ""
378+
session.Metadata["continuation_reset_pending"] = "true"
379+
}
363380
}
364381
continue
365382
}

cmd/gc/session_reconciler_test.go

Lines changed: 302 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,308 @@ func TestReconcileSessionBeads_DrainAckKeepsBeadOpen(t *testing.T) {
278278
}
279279
}
280280

281+
func TestReconcileSessionBeads_DrainAckResumeModePreservesSessionIdentity(t *testing.T) {
282+
env := newReconcilerTestEnv()
283+
env.cfg = &config.City{
284+
Agents: []config.Agent{{Name: "worker"}},
285+
}
286+
env.addDesired("worker", "worker", true)
287+
session := env.createSessionBead("worker", "worker")
288+
env.markSessionActive(&session)
289+
env.setSessionMetadata(&session, map[string]string{
290+
"wake_mode": "resume",
291+
"session_key": "resume-key",
292+
"started_config_hash": "hash-before-drain",
293+
})
294+
295+
dops := newFakeDrainOps()
296+
if err := dops.setDrainAck("worker"); err != nil {
297+
t.Fatalf("setDrainAck: %v", err)
298+
}
299+
300+
woken := reconcileSessionBeads(
301+
context.Background(),
302+
[]beads.Bead{session},
303+
env.desiredState,
304+
map[string]bool{"worker": true},
305+
env.cfg,
306+
env.sp,
307+
env.store,
308+
dops,
309+
nil,
310+
nil,
311+
env.dt,
312+
nil,
313+
false,
314+
nil,
315+
"",
316+
nil,
317+
env.clk,
318+
env.rec,
319+
0,
320+
0,
321+
&env.stdout,
322+
&env.stderr,
323+
)
324+
if woken != 0 {
325+
t.Fatalf("woken = %d, want 0", woken)
326+
}
327+
328+
got, err := env.store.Get(session.ID)
329+
if err != nil {
330+
t.Fatalf("Get(%s): %v", session.ID, err)
331+
}
332+
if got.Metadata["state"] != "drained" {
333+
t.Fatalf("state = %q, want drained", got.Metadata["state"])
334+
}
335+
if got.Metadata["session_key"] != "resume-key" {
336+
t.Fatalf("session_key = %q, want preserved resume key", got.Metadata["session_key"])
337+
}
338+
if got.Metadata["started_config_hash"] != "hash-before-drain" {
339+
t.Fatalf("started_config_hash = %q, want preserved hash", got.Metadata["started_config_hash"])
340+
}
341+
if got.Metadata["continuation_reset_pending"] != "" {
342+
t.Fatalf("continuation_reset_pending = %q, want empty", got.Metadata["continuation_reset_pending"])
343+
}
344+
}
345+
346+
func TestReconcileSessionBeads_DrainAckFreshModeClearsSessionIdentity(t *testing.T) {
347+
env := newReconcilerTestEnv()
348+
env.cfg = &config.City{
349+
Agents: []config.Agent{{Name: "worker"}},
350+
}
351+
env.addDesired("worker", "worker", true)
352+
session := env.createSessionBead("worker", "worker")
353+
env.markSessionActive(&session)
354+
env.setSessionMetadata(&session, map[string]string{
355+
"wake_mode": "fresh",
356+
"session_key": "fresh-key",
357+
"started_config_hash": "hash-before-drain",
358+
})
359+
360+
dops := newFakeDrainOps()
361+
if err := dops.setDrainAck("worker"); err != nil {
362+
t.Fatalf("setDrainAck: %v", err)
363+
}
364+
365+
woken := reconcileSessionBeads(
366+
context.Background(),
367+
[]beads.Bead{session},
368+
env.desiredState,
369+
map[string]bool{"worker": true},
370+
env.cfg,
371+
env.sp,
372+
env.store,
373+
dops,
374+
nil,
375+
nil,
376+
env.dt,
377+
nil,
378+
false,
379+
nil,
380+
"",
381+
nil,
382+
env.clk,
383+
env.rec,
384+
0,
385+
0,
386+
&env.stdout,
387+
&env.stderr,
388+
)
389+
if woken != 0 {
390+
t.Fatalf("woken = %d, want 0", woken)
391+
}
392+
393+
got, err := env.store.Get(session.ID)
394+
if err != nil {
395+
t.Fatalf("Get(%s): %v", session.ID, err)
396+
}
397+
if got.Metadata["state"] != "drained" {
398+
t.Fatalf("state = %q, want drained", got.Metadata["state"])
399+
}
400+
if got.Metadata["session_key"] != "" {
401+
t.Fatalf("session_key = %q, want cleared for wake_mode=fresh", got.Metadata["session_key"])
402+
}
403+
if got.Metadata["started_config_hash"] != "" {
404+
t.Fatalf("started_config_hash = %q, want cleared for wake_mode=fresh", got.Metadata["started_config_hash"])
405+
}
406+
if got.Metadata["continuation_reset_pending"] != "true" {
407+
t.Fatalf("continuation_reset_pending = %q, want true", got.Metadata["continuation_reset_pending"])
408+
}
409+
}
410+
411+
// stopFailProvider wraps a Fake but makes Stop always fail.
412+
// The session remains running (IsRunning returns true).
413+
type stopFailProvider struct {
414+
*runtime.Fake
415+
}
416+
417+
func (p *stopFailProvider) Stop(_ string) error {
418+
return fmt.Errorf("stop failed: session unavailable")
419+
}
420+
421+
func TestReconcileSessionBeads_DrainAckStopFailurePreservesMetadata(t *testing.T) {
422+
env := newReconcilerTestEnv()
423+
env.cfg = &config.City{
424+
Agents: []config.Agent{{Name: "worker"}},
425+
}
426+
env.addDesired("worker", "worker", true)
427+
session := env.createSessionBead("worker", "worker")
428+
env.markSessionActive(&session)
429+
env.setSessionMetadata(&session, map[string]string{
430+
"wake_mode": "fresh",
431+
"session_key": "fresh-key",
432+
"started_config_hash": "hash-before-drain",
433+
"last_woke_at": env.clk.Now().Add(-5 * time.Second).UTC().Format(time.RFC3339),
434+
})
435+
436+
dops := newFakeDrainOps()
437+
if err := dops.setDrainAck("worker"); err != nil {
438+
t.Fatalf("setDrainAck: %v", err)
439+
}
440+
441+
// Wrap the real provider so Stop fails but IsRunning still returns true.
442+
failSp := &stopFailProvider{Fake: env.sp}
443+
444+
woken := reconcileSessionBeads(
445+
context.Background(),
446+
[]beads.Bead{session},
447+
env.desiredState,
448+
map[string]bool{"worker": true},
449+
env.cfg,
450+
failSp,
451+
env.store,
452+
dops,
453+
nil,
454+
nil,
455+
env.dt,
456+
nil,
457+
false,
458+
nil,
459+
"",
460+
nil,
461+
env.clk,
462+
env.rec,
463+
0,
464+
0,
465+
&env.stdout,
466+
&env.stderr,
467+
)
468+
if woken != 0 {
469+
t.Fatalf("woken = %d, want 0", woken)
470+
}
471+
472+
got, err := env.store.Get(session.ID)
473+
if err != nil {
474+
t.Fatalf("Get(%s): %v", session.ID, err)
475+
}
476+
// When Stop fails, metadata should NOT be updated — the session is still alive.
477+
if got.Metadata["state"] == "drained" {
478+
t.Fatalf("state should not be drained when stop failed")
479+
}
480+
if got.Metadata["last_woke_at"] == "" {
481+
t.Fatalf("last_woke_at should be preserved when stop failed")
482+
}
483+
if got.Metadata["session_key"] == "" {
484+
t.Fatalf("session_key should be preserved when stop failed")
485+
}
486+
}
487+
488+
func TestReconcileSessionBeads_DrainAckResumeModeNotClassifiedAsCrashNextTick(t *testing.T) {
489+
env := newReconcilerTestEnv()
490+
env.cfg = &config.City{
491+
Agents: []config.Agent{{Name: "worker"}},
492+
}
493+
env.addDesired("worker", "worker", true)
494+
session := env.createSessionBead("worker", "worker")
495+
env.markSessionActive(&session)
496+
env.setSessionMetadata(&session, map[string]string{
497+
"wake_mode": "resume",
498+
"session_key": "resume-key",
499+
"started_config_hash": "hash-before-drain",
500+
})
501+
502+
dops := newFakeDrainOps()
503+
if err := dops.setDrainAck("worker"); err != nil {
504+
t.Fatalf("setDrainAck: %v", err)
505+
}
506+
507+
woken := reconcileSessionBeads(
508+
context.Background(),
509+
[]beads.Bead{session},
510+
env.desiredState,
511+
map[string]bool{"worker": true},
512+
env.cfg,
513+
env.sp,
514+
env.store,
515+
dops,
516+
nil,
517+
nil,
518+
env.dt,
519+
nil,
520+
false,
521+
nil,
522+
"",
523+
nil,
524+
env.clk,
525+
env.rec,
526+
0,
527+
0,
528+
&env.stdout,
529+
&env.stderr,
530+
)
531+
if woken != 0 {
532+
t.Fatalf("woken = %d, want 0", woken)
533+
}
534+
535+
got, err := env.store.Get(session.ID)
536+
if err != nil {
537+
t.Fatalf("Get(%s) after drain-ack: %v", session.ID, err)
538+
}
539+
if got.Metadata["last_woke_at"] != "" {
540+
t.Fatalf("last_woke_at = %q, want cleared after drain-ack", got.Metadata["last_woke_at"])
541+
}
542+
543+
woken = reconcileSessionBeads(
544+
context.Background(),
545+
[]beads.Bead{got},
546+
env.desiredState,
547+
map[string]bool{"worker": true},
548+
env.cfg,
549+
env.sp,
550+
env.store,
551+
dops,
552+
nil,
553+
nil,
554+
env.dt,
555+
nil,
556+
false,
557+
nil,
558+
"",
559+
nil,
560+
env.clk,
561+
env.rec,
562+
0,
563+
0,
564+
&env.stdout,
565+
&env.stderr,
566+
)
567+
if woken != 0 {
568+
t.Fatalf("second tick woken = %d, want 0", woken)
569+
}
570+
571+
got, err = env.store.Get(session.ID)
572+
if err != nil {
573+
t.Fatalf("Get(%s) after second tick: %v", session.ID, err)
574+
}
575+
if got.Metadata["session_key"] != "resume-key" {
576+
t.Fatalf("session_key = %q after second tick, want preserved resume key", got.Metadata["session_key"])
577+
}
578+
if got.Metadata["wake_attempts"] != "" {
579+
t.Fatalf("wake_attempts = %q, want empty for intentional drain", got.Metadata["wake_attempts"])
580+
}
581+
}
582+
281583
func TestReconcileSessionBeads_DrainAckHonoredAfterSessionExit(t *testing.T) {
282584
env := newReconcilerTestEnv()
283585
env.cfg = &config.City{

0 commit comments

Comments
 (0)