Skip to content

Commit 5407fbe

Browse files
authored
Retry duplicate runtime worker claims (#566)
1 parent 7350e5e commit 5407fbe

2 files changed

Lines changed: 11 additions & 27 deletions

File tree

controlplane/k8s_pool.go

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1578,16 +1578,9 @@ func (p *K8sWorkerPool) reserveClaimedWorker(ctx context.Context, claimed *confi
15781578
p.mu.Unlock()
15791579
return nil, fmt.Errorf("worker %d claimed with stale owner epoch %d behind current %d: %w", claimed.WorkerID, claimed.OwnerEpoch, currentEpoch, errStaleRuntimeWorkerClaim)
15801580
}
1581-
if isSameRuntimeWorkerClaim(worker, claimed, assignment) {
1582-
worker.reservedAt = time.Now()
1583-
observeWarmPoolLifecycleGauges(p.workers)
1581+
if isDuplicateRuntimeWorkerClaim(worker, claimed, assignment) {
15841582
p.mu.Unlock()
1585-
if err := p.checkReservedWorkerLiveness(ctx, worker); err != nil {
1586-
slog.Warn("Claimed worker failed liveness recheck.", "worker", worker.ID, "worker_pod", worker.PodName(), "error", err)
1587-
p.retireWorkerWithReason(worker.ID, RetireReasonCrash)
1588-
return nil, err
1589-
}
1590-
return worker, nil
1583+
return nil, fmt.Errorf("worker %d already has in-flight claim for owner epoch %d: %w", claimed.WorkerID, claimed.OwnerEpoch, errStaleRuntimeWorkerClaim)
15911584
}
15921585
worker.SetOwnerCPInstanceID(claimed.OwnerCPInstanceID)
15931586
worker.SetOwnerEpoch(claimed.OwnerEpoch)
@@ -1617,7 +1610,7 @@ func (p *K8sWorkerPool) reserveClaimedWorker(ctx context.Context, claimed *confi
16171610
return worker, nil
16181611
}
16191612

1620-
func isSameRuntimeWorkerClaim(worker *ManagedWorker, claimed *configstore.WorkerRecord, assignment *WorkerAssignment) bool {
1613+
func isDuplicateRuntimeWorkerClaim(worker *ManagedWorker, claimed *configstore.WorkerRecord, assignment *WorkerAssignment) bool {
16211614
if worker == nil || claimed == nil || assignment == nil {
16221615
return false
16231616
}

controlplane/k8s_pool_test.go

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1358,7 +1358,7 @@ func TestK8sPoolReserveSharedWorkerClaimsRuntimeWorkerAndAdoptsPod(t *testing.T)
13581358
}
13591359
}
13601360

1361-
func TestK8sPoolReserveClaimedWorkerIsIdempotentForSameActivatingClaim(t *testing.T) {
1361+
func TestK8sPoolReserveClaimedWorkerRejectsDuplicateActivatingClaim(t *testing.T) {
13621362
pool, _ := newTestK8sPool(t, 5)
13631363
assignment := &WorkerAssignment{OrgID: "analytics"}
13641364
worker := &ManagedWorker{ID: 44, podName: "duckgres-worker-test-cp-44", done: make(chan struct{})}
@@ -1371,32 +1371,23 @@ func TestK8sPoolReserveClaimedWorkerIsIdempotentForSameActivatingClaim(t *testin
13711371
t.Fatalf("SetSharedState: %v", err)
13721372
}
13731373
pool.workers[worker.ID] = worker
1374-
pool.healthCheckFunc = func(ctx context.Context, got *ManagedWorker) error {
1375-
if got.ID != worker.ID {
1376-
t.Fatalf("expected liveness check for worker %d, got %d", worker.ID, got.ID)
1377-
}
1378-
return nil
1379-
}
13801374

1381-
got, err := pool.reserveClaimedWorker(context.Background(), &configstore.WorkerRecord{
1375+
_, err := pool.reserveClaimedWorker(context.Background(), &configstore.WorkerRecord{
13821376
WorkerID: worker.ID,
13831377
PodName: worker.PodName(),
13841378
State: configstore.WorkerStateReserved,
13851379
OrgID: assignment.OrgID,
13861380
OwnerCPInstanceID: pool.cpInstanceID,
13871381
OwnerEpoch: 5,
13881382
}, assignment)
1389-
if err != nil {
1390-
t.Fatalf("reserveClaimedWorker: %v", err)
1391-
}
1392-
if got != worker {
1393-
t.Fatalf("expected same worker instance")
1383+
if !errors.Is(err, errStaleRuntimeWorkerClaim) {
1384+
t.Fatalf("expected stale claim error, got %v", err)
13941385
}
1395-
if got.SharedState().Lifecycle != WorkerLifecycleActivating {
1396-
t.Fatalf("expected lifecycle to remain activating, got %q", got.SharedState().Lifecycle)
1386+
if worker.SharedState().Lifecycle != WorkerLifecycleActivating {
1387+
t.Fatalf("expected lifecycle to remain activating, got %q", worker.SharedState().Lifecycle)
13971388
}
1398-
if got.OwnerEpoch() != 5 {
1399-
t.Fatalf("expected owner epoch 5, got %d", got.OwnerEpoch())
1389+
if worker.OwnerEpoch() != 5 {
1390+
t.Fatalf("expected owner epoch 5, got %d", worker.OwnerEpoch())
14001391
}
14011392
}
14021393

0 commit comments

Comments
 (0)