Skip to content

Commit 95e3c24

Browse files
bill-phbenben
andauthored
Gracefully drain workers before shutdown (#690)
* Gracefully drain workers before shutdown * Fix worker drain continuation leaks * Fix drain token lifecycle edge cases * fix(worker-drain): reap drain tokens stranded by GetFlightInfo with no DoGet A GetFlightInfo* call takes a drain token and only the matching DoGet* returns it. If the DoGet never arrives (client cancels between the two RPCs while keeping the session), the token sits forever — only txns had an idle reaper. On SIGTERM the worker's drain then blocks until the full workerShutdownDrainTime (55m), after which the forced shutdown kills genuinely in-flight work. Give query handles and metadata streams the same treatment txns already get: reapIdle (renamed from reapIdleTransactions) now also releases drain tokens older than handleIdleTimeout (10m): - ad-hoc query handles (query-*): drop the whole stale handle. - prepared handles (prep-*): never dropped (live until Close); only their stale pendingDrains are released. - metadata streams (GetFlightInfoSchemas/Tables): stale tokens released. An actively-streaming DoGet has already popped its handle/pendingDrain out of these maps, so anything left past the TTL was abandoned. WaitForDrain stays bounded regardless; the reaper turns "stall to timeout then kill" into "drain promptly". Carries drainToken{finish, at} timestamps for pendingDrains and metadataDrains; QueryHandle gains Prepared + createdAt. Also remove the dead drainZero reopen branch in beginDrainWork: drainZeroOpen only goes false while draining and draining never clears, so the branch was unreachable (replaced with an invariant comment). Tests: TestReapIdleReleasesAbandonedHandleDrains asserts stale ad-hoc / prepared-pending / metadata tokens are released while a fresh handle and the prepared handle itself survive. e2e: tests/e2e-mw-dev/harness.sh graceful_drain — a worker SIGTERM'd mid-query must finish the in-flight query correctly while its pod is Terminating, then retire cleanly (regression net for the drain protocol). --------- Co-authored-by: Benjamin Knofe-Vider <benjamin.k@posthog.com>
1 parent d459229 commit 95e3c24

14 files changed

Lines changed: 2128 additions & 156 deletions

controlplane/k8s_pool.go

Lines changed: 216 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ const workerPodReadyTimeout = 5 * time.Minute
5252
// fast, bin-packed) shapes refilled. See TestWarmSpawnTimeoutExceedsPodReady.
5353
const warmSpawnReconcileTimeout = 6 * time.Minute
5454

55+
const workerTerminationGracePeriodSeconds int64 = 3600
56+
5557
var errStaleRuntimeWorkerClaim = stderrors.New("stale runtime worker claim")
5658

5759
// K8sWorkerPool manages worker pods in Kubernetes.
@@ -725,11 +727,12 @@ func (p *K8sWorkerPool) spawnWorker(ctx context.Context, id int, image string, p
725727
Labels: podLabels,
726728
},
727729
Spec: corev1.PodSpec{
728-
RestartPolicy: corev1.RestartPolicyNever,
729-
ServiceAccountName: p.workerServiceAccountName(),
730-
AutomountServiceAccountToken: boolPtr(false),
731-
PriorityClassName: p.workerPriorityClassName,
732-
NodeSelector: p.nodeSelectorForProfile(profile),
730+
RestartPolicy: corev1.RestartPolicyNever,
731+
TerminationGracePeriodSeconds: int64Ptr(workerTerminationGracePeriodSeconds),
732+
ServiceAccountName: p.workerServiceAccountName(),
733+
AutomountServiceAccountToken: boolPtr(false),
734+
PriorityClassName: p.workerPriorityClassName,
735+
NodeSelector: p.nodeSelectorForProfile(profile),
733736
SecurityContext: &corev1.PodSecurityContext{
734737
RunAsNonRoot: boolPtr(true),
735738
RunAsUser: int64Ptr(1000),
@@ -2110,13 +2113,29 @@ func (p *K8sWorkerPool) checkReservedWorkerLiveness(ctx context.Context, worker
21102113
}
21112114
hctx, cancel := context.WithTimeout(ctx, 2*time.Second)
21122115
defer cancel()
2113-
_, err := doHealthCheckWithMetadata(hctx, worker.client, p.healthCheckPayloadForWorker(worker))
2114-
return err
2116+
result, err := doHealthCheckWithMetadata(hctx, worker.client, p.healthCheckPayloadForWorker(worker))
2117+
if err != nil {
2118+
return err
2119+
}
2120+
return validateReservedWorkerHealth(result)
21152121
}
21162122
}
21172123
return check(ctx, worker)
21182124
}
21192125

2126+
func validateReservedWorkerHealth(result *healthCheckResult) error {
2127+
if result == nil {
2128+
return fmt.Errorf("worker health check returned no result")
2129+
}
2130+
if !result.Healthy {
2131+
return fmt.Errorf("worker health check reported unhealthy")
2132+
}
2133+
if result.Draining {
2134+
return fmt.Errorf("worker is draining")
2135+
}
2136+
return nil
2137+
}
2138+
21202139
// SpawnMinWorkers pre-warms the pool with count workers.
21212140
func (p *K8sWorkerPool) SpawnMinWorkers(count int) error {
21222141
if count <= 0 {
@@ -2534,6 +2553,23 @@ func (p *K8sWorkerPool) HealthCheckLoop(ctx context.Context, interval time.Durat
25342553
delete(failures, lease)
25352554
mu.Unlock()
25362555

2556+
removedWorker, workerCount, replacementID, shouldReplenish, err := p.removeWorkerAfterDrainedLease(lease, LifecycleOriginWorkerDrain)
2557+
if err != nil {
2558+
slog.Error("K8s worker terminated after draining but retire CAS failed; leaving cleanup to retry.", "id", lease.workerID, "owner_cp_instance_id", lease.ownerCPInstanceID, "owner_epoch", lease.ownerEpoch, "error", err)
2559+
return
2560+
}
2561+
if removedWorker != nil {
2562+
observeControlPlaneWorkers(workerCount)
2563+
slog.Info("K8s worker terminated after draining.", "id", lease.workerID)
2564+
if removedWorker.client != nil {
2565+
_ = removedWorker.client.Close()
2566+
}
2567+
if shouldReplenish {
2568+
p.spawnWarmWorkerBackground(replacementID, p.workerImage)
2569+
}
2570+
return
2571+
}
2572+
25372573
lostDisposition, err := p.markWorkerLostForHealthLease(lease, LifecycleOriginInformerCrash)
25382574
if err != nil {
25392575
slog.Error("K8s worker terminated but lease validation failed; leaving cleanup to retry.", "id", lease.workerID, "owner_cp_instance_id", lease.ownerCPInstanceID, "owner_epoch", lease.ownerEpoch, "error", err)
@@ -2559,7 +2595,7 @@ func (p *K8sWorkerPool) HealthCheckLoop(ctx context.Context, interval time.Durat
25592595
}
25602596

25612597
p.mu.Lock()
2562-
removedWorker, workerCount, replacementID, shouldReplenish := p.removeWorkerAfterLostLeaseLocked(lease)
2598+
removedWorker, workerCount, replacementID, shouldReplenish = p.removeWorkerAfterLostLeaseLocked(lease)
25632599
p.mu.Unlock()
25642600
if removedWorker == nil {
25652601
return
@@ -2681,6 +2717,10 @@ func (p *K8sWorkerPool) HealthCheckLoop(ctx context.Context, interval time.Durat
26812717
delete(failures, lease)
26822718
mu.Unlock()
26832719

2720+
if hcResult != nil && hcResult.Draining {
2721+
p.markWorkerDrainingFromHealth(lease)
2722+
}
2723+
26842724
// Forward progress data to the control plane.
26852725
if onProgress != nil && hcResult != nil {
26862726
if sp := hcResult.toSessionProgress(); len(sp) > 0 {
@@ -3163,21 +3203,58 @@ func (p *K8sWorkerPool) cleanDeadWorkersLocked() {
31633203
deletePod := true
31643204
if p.runtimeStore != nil {
31653205
lease := p.workerLeaseSnapshot(w)
3166-
// cleanDeadWorkersLocked sweeps for pods whose informer
3167-
// fired w.done — cluster-driven termination (eviction,
3168-
// OOM, manual delete), not our health-check decision.
3169-
lostDisposition, err := p.markWorkerLostForHealthLease(lease, LifecycleOriginInformerCrash)
3170-
if err != nil {
3171-
slog.Warn("Clean dead worker: lease validation failed; leaving cleanup to retry.",
3172-
"id", id, "owner_cp_instance_id", lease.ownerCPInstanceID, "owner_epoch", lease.ownerEpoch, "error", err)
3173-
continue
3174-
}
3175-
switch lostDisposition {
3176-
case workerLostLeaseStale:
3206+
if p.workerMatchesLease(w, lease) && w.SharedState().NormalizedLifecycle() == WorkerLifecycleDraining {
3207+
lc := p.ensureLifecycle()
3208+
if lc != nil {
3209+
outcome, err := lc.RetireDrained(
3210+
configstore.NewWorkerLease(lease.workerID, lease.ownerCPInstanceID, lease.ownerEpoch, lease.image),
3211+
RetireReasonNormal,
3212+
LifecycleOriginInformerCrash,
3213+
)
3214+
if err != nil {
3215+
slog.Warn("Clean dead worker: retire draining CAS failed; leaving cleanup to retry.",
3216+
"id", id, "owner_cp_instance_id", lease.ownerCPInstanceID, "owner_epoch", lease.ownerEpoch, "error", err)
3217+
continue
3218+
}
3219+
if !outcome.Transitioned {
3220+
record, err := p.runtimeStore.GetWorkerRecord(lease.workerID)
3221+
if err != nil {
3222+
slog.Warn("Clean dead worker: retire draining verification failed; leaving cleanup to retry.",
3223+
"id", id, "owner_cp_instance_id", lease.ownerCPInstanceID, "owner_epoch", lease.ownerEpoch, "error", err)
3224+
continue
3225+
}
3226+
if record == nil || record.OwnerCPInstanceID != lease.ownerCPInstanceID ||
3227+
record.OwnerEpoch != lease.ownerEpoch || record.State != configstore.WorkerStateRetired {
3228+
continue
3229+
}
3230+
}
3231+
}
3232+
p.markWorkerRetiredInMemoryLocked(w)
3233+
delete(p.workers, id)
3234+
removedWorker = w
31773235
deletePod = false
3178-
removedWorker, _ = p.dropLocalWorkerIfSameLeaseLocked(lease)
3179-
case workerLostLeaseCurrent, workerLostLeaseAlreadyLost, workerLostLeaseRetry:
3180-
continue
3236+
if p.shouldReplenishWarmCapacityLocked() {
3237+
replacementID = p.allocateBackgroundSpawnIDLocked()
3238+
p.spawning++
3239+
shouldReplenish = true
3240+
}
3241+
} else {
3242+
// cleanDeadWorkersLocked sweeps for pods whose informer
3243+
// fired w.done — cluster-driven termination (eviction,
3244+
// OOM, manual delete), not our health-check decision.
3245+
lostDisposition, err := p.markWorkerLostForHealthLease(lease, LifecycleOriginInformerCrash)
3246+
if err != nil {
3247+
slog.Warn("Clean dead worker: lease validation failed; leaving cleanup to retry.",
3248+
"id", id, "owner_cp_instance_id", lease.ownerCPInstanceID, "owner_epoch", lease.ownerEpoch, "error", err)
3249+
continue
3250+
}
3251+
switch lostDisposition {
3252+
case workerLostLeaseStale:
3253+
deletePod = false
3254+
removedWorker, _ = p.dropLocalWorkerIfSameLeaseLocked(lease)
3255+
case workerLostLeaseCurrent, workerLostLeaseAlreadyLost, workerLostLeaseRetry:
3256+
continue
3257+
}
31813258
}
31823259
} else {
31833260
removedWorker, _, replacementID, shouldReplenish = p.removeWorkerLocked(id)
@@ -3396,6 +3473,123 @@ func (p *K8sWorkerPool) markWorkerLostIfCurrentLease(lease workerLeaseSnapshot,
33963473
return outcome.Transitioned, nil
33973474
}
33983475

3476+
func (p *K8sWorkerPool) markWorkerDrainingFromHealth(lease workerLeaseSnapshot) {
3477+
if lease.ownerCPInstanceID != p.cpInstanceID {
3478+
return
3479+
}
3480+
p.mu.Lock()
3481+
w, ok := p.workers[lease.workerID]
3482+
if !ok || !p.workerMatchesLease(w, lease) {
3483+
p.mu.Unlock()
3484+
return
3485+
}
3486+
previous := w.SharedState()
3487+
switch previous.NormalizedLifecycle() {
3488+
case WorkerLifecycleDraining, WorkerLifecycleRetired:
3489+
p.mu.Unlock()
3490+
return
3491+
}
3492+
next, err := previous.Transition(WorkerLifecycleDraining, previous.Assignment)
3493+
if err != nil {
3494+
p.mu.Unlock()
3495+
slog.Warn("Worker reported draining but local lifecycle transition failed.",
3496+
"worker_id", lease.workerID, "state", previous.NormalizedLifecycle(), "error", err)
3497+
return
3498+
}
3499+
w.sharedState = next
3500+
p.mu.Unlock()
3501+
3502+
durableVerified := false
3503+
defer func() {
3504+
if durableVerified {
3505+
return
3506+
}
3507+
p.mu.Lock()
3508+
defer p.mu.Unlock()
3509+
w, ok := p.workers[lease.workerID]
3510+
if !ok || !p.workerMatchesLease(w, lease) {
3511+
return
3512+
}
3513+
if w.SharedState().NormalizedLifecycle() == WorkerLifecycleDraining {
3514+
w.sharedState = previous
3515+
}
3516+
}()
3517+
3518+
lc := p.ensureLifecycle()
3519+
if lc != nil {
3520+
outcome, err := lc.Drain(
3521+
configstore.NewWorkerLease(lease.workerID, p.cpInstanceID, lease.ownerEpoch, lease.image),
3522+
LifecycleOriginWorkerDrain,
3523+
)
3524+
if err != nil {
3525+
slog.Warn("Worker reported draining but durable drain CAS failed; keeping local worker schedulable until retry.",
3526+
"worker_id", lease.workerID, "owner_epoch", lease.ownerEpoch, "error", err)
3527+
return
3528+
}
3529+
if !outcome.Transitioned {
3530+
record, err := p.runtimeStore.GetWorkerRecord(lease.workerID)
3531+
if err != nil {
3532+
slog.Warn("Worker reported draining but durable worker record could not be verified after CAS miss.",
3533+
"worker_id", lease.workerID, "owner_epoch", lease.ownerEpoch, "error", err)
3534+
return
3535+
}
3536+
if record == nil || record.OwnerCPInstanceID != p.cpInstanceID || record.OwnerEpoch != lease.ownerEpoch || record.State != configstore.WorkerStateDraining {
3537+
return
3538+
}
3539+
}
3540+
}
3541+
durableVerified = true
3542+
}
3543+
3544+
func (p *K8sWorkerPool) removeWorkerAfterDrainedLease(lease workerLeaseSnapshot, origin LifecycleOrigin) (*ManagedWorker, int, int, bool, error) {
3545+
p.mu.RLock()
3546+
w, ok := p.workers[lease.workerID]
3547+
if !ok || !p.workerMatchesLease(w, lease) || w.SharedState().NormalizedLifecycle() != WorkerLifecycleDraining {
3548+
p.mu.RUnlock()
3549+
return nil, 0, 0, false, nil
3550+
}
3551+
p.mu.RUnlock()
3552+
3553+
lc := p.ensureLifecycle()
3554+
if lc != nil {
3555+
outcome, err := lc.RetireDrained(
3556+
configstore.NewWorkerLease(lease.workerID, p.cpInstanceID, lease.ownerEpoch, lease.image),
3557+
RetireReasonNormal,
3558+
origin,
3559+
)
3560+
if err != nil {
3561+
return nil, 0, 0, false, err
3562+
}
3563+
if !outcome.Transitioned {
3564+
record, err := p.runtimeStore.GetWorkerRecord(lease.workerID)
3565+
if err != nil {
3566+
return nil, 0, 0, false, err
3567+
}
3568+
if record == nil || record.OwnerCPInstanceID != p.cpInstanceID || record.OwnerEpoch != lease.ownerEpoch || record.State != configstore.WorkerStateRetired {
3569+
return nil, 0, 0, false, nil
3570+
}
3571+
}
3572+
}
3573+
3574+
p.mu.Lock()
3575+
defer p.mu.Unlock()
3576+
current, ok := p.workers[lease.workerID]
3577+
if !ok || !p.workerMatchesLease(current, lease) || current.SharedState().NormalizedLifecycle() != WorkerLifecycleDraining {
3578+
return nil, 0, 0, false, nil
3579+
}
3580+
p.markWorkerRetiredInMemoryLocked(current)
3581+
delete(p.workers, current.ID)
3582+
workerCount := len(p.workers)
3583+
replacementID := 0
3584+
shouldReplenish := false
3585+
if p.shouldReplenishWarmCapacityLocked() {
3586+
replacementID = p.allocateBackgroundSpawnIDLocked()
3587+
p.spawning++
3588+
shouldReplenish = true
3589+
}
3590+
return current, workerCount, replacementID, shouldReplenish, nil
3591+
}
3592+
33993593
func (p *K8sWorkerPool) removeWorkerAfterLostLeaseLocked(lease workerLeaseSnapshot) (*ManagedWorker, int, int, bool) {
34003594
current, ok := p.workers[lease.workerID]
34013595
if !ok || !p.workerMatchesLease(current, lease) {

0 commit comments

Comments
 (0)