Skip to content

Commit b5cd296

Browse files
authored
React on externalized instance state change (#414)
* React on externalized instance state change Signed-off-by: Jun Duan <jun.duan.phd@outlook.com> * Rename type launcherPodItem Signed-off-by: Jun Duan <jun.duan.phd@outlook.com> * Don't delete requester if deletion of a stopped instance fails Signed-off-by: Jun Duan <jun.duan.phd@outlook.com> * Unified stopped-instance cleanup in syncLauncherInstances Signed-off-by: Jun Duan <jun.duan.phd@outlook.com> * One TODO is done, another to be done by launcher populator Signed-off-by: Jun Duan <jun.duan.phd@outlook.com> * Minor simplifications; some changes to comments Signed-off-by: Jun Duan <jun.duan.phd@outlook.com> * Improve error handling for unboundLauncherPodItem.process Signed-off-by: Jun Duan <jun.duan.phd@outlook.com> * Introduce IsInstanceNotFoundError Signed-off-by: Jun Duan <jun.duan.phd@outlook.com> * Clean up logging for infSvrItem.process Signed-off-by: Jun Duan <jun.duan.phd@outlook.com> * Retry when failed to delete a stopped instance Signed-off-by: Jun Duan <jun.duan.phd@outlook.com> * Return retry rather than true when err is outstanding from syncLauncherInstances Signed-off-by: Jun Duan <jun.duan.phd@outlook.com> --------- Signed-off-by: Jun Duan <jun.duan.phd@outlook.com>
1 parent 64262a5 commit b5cd296

4 files changed

Lines changed: 205 additions & 17 deletions

File tree

pkg/controller/dual-pods/controller.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ type infSvrItem struct {
332332
RequesterName string
333333
}
334334

335-
type launcherPodItem struct {
335+
type unboundLauncherPodItem struct {
336336
LauncherPodName string
337337
NodeName string
338338
}
@@ -347,6 +347,7 @@ const (
347347
infSvrItemBoundProvider infSvrItemType = "bound_provider"
348348
// infSvrItemUnboundLauncherBasedProvider is for a server-providing Pod that
349349
// is launcher-based and not bound to any server-requesting Pods.
350+
// Note that technically an unbound launcher-based server-providing Pod is not part of any inference server (yet/anymore).
350351
infSvrItemUnboundLauncherBasedProvider infSvrItemType = "unbound_launcher_based_provider"
351352
// infSvrItemDontCare is not a real infSvrItemType but only a placeholder
352353
// saying the corresponding infSvrItem is not relevant to the controller.
@@ -431,10 +432,10 @@ func (ctl *controller) OnAdd(obj any, isInInitialList bool) {
431432
return
432433
}
433434
nd := ctl.getNodeData(nodeName)
434-
launcherPodItem := launcherPodItem{LauncherPodName: typed.Name, NodeName: nodeName}
435+
unboundLauncher := unboundLauncherPodItem{LauncherPodName: typed.Name, NodeName: nodeName}
435436
ctl.enqueueLogger.V(5).Info("Enqueuing launcher reference due to notification of add",
436437
"nodeName", nodeName, "launcherPod", typed.Name, "isInInitialList", isInInitialList, "resourceVersion", typed.ResourceVersion)
437-
nd.add(launcherPodItem)
438+
nd.add(unboundLauncher)
438439
ctl.Queue.Add(nodeItem{nodeName})
439440
} else {
440441
nodeName := typed.Spec.NodeName
@@ -484,10 +485,10 @@ func (ctl *controller) OnUpdate(prev, obj any) {
484485
return
485486
}
486487
nd := ctl.getNodeData(nodeName)
487-
launcherPodItem := launcherPodItem{LauncherPodName: typed.Name, NodeName: nodeName}
488+
unboundLauncher := unboundLauncherPodItem{LauncherPodName: typed.Name, NodeName: nodeName}
488489
ctl.enqueueLogger.V(5).Info("Enqueuing launcher reference due to notification of update",
489490
"nodeName", nodeName, "launcherPod", typed.Name, "resourceVersion", typed.ResourceVersion)
490-
nd.add(launcherPodItem)
491+
nd.add(unboundLauncher)
491492
ctl.Queue.Add(nodeItem{nodeName})
492493
} else {
493494
nodeName := typed.Spec.NodeName
@@ -540,10 +541,10 @@ func (ctl *controller) OnDelete(obj any) {
540541
return
541542
}
542543
nd := ctl.getNodeData(nodeName)
543-
launcherPodItem := launcherPodItem{LauncherPodName: typed.Name, NodeName: nodeName}
544+
unboundLauncher := unboundLauncherPodItem{LauncherPodName: typed.Name, NodeName: nodeName}
544545
ctl.enqueueLogger.V(5).Info("Enqueuing launcher reference due to notification of delete",
545546
"nodeName", nodeName, "launcherPod", typed.Name, "resourceVersion", typed.ResourceVersion)
546-
nd.add(launcherPodItem)
547+
nd.add(unboundLauncher)
547548
ctl.Queue.Add(nodeItem{nodeName})
548549
} else {
549550
nodeName := typed.Spec.NodeName

pkg/controller/dual-pods/inference-server.go

Lines changed: 108 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,12 @@ type nodeItem struct {
5757
NodeName string
5858
}
5959

60+
type launcherSyncResult struct {
61+
instances *AllInstancesState
62+
deletedStoppedInstanceIDs sets.Set[string]
63+
failedStoppedInstanceErrs map[string]error
64+
}
65+
6066
func (ni nodeItem) process(ctx context.Context, ctl *controller) (error, bool) {
6167
logger := klog.FromContext(ctx).WithValues("node", ni.NodeName)
6268
ctx = klog.NewContext(ctx, logger)
@@ -85,11 +91,11 @@ func (ni nodeItem) process(ctx context.Context, ctl *controller) (error, bool) {
8591
return nil, retries > 0
8692
}
8793

88-
func (item launcherPodItem) process(ctx context.Context, ctl *controller, nodeDat *nodeData) (error, bool) {
94+
func (item unboundLauncherPodItem) process(ctx context.Context, ctl *controller, nodeDat *nodeData) (error, bool) {
8995
logger := klog.FromContext(ctx).WithValues("launcherPod", item.LauncherPodName, "node", item.NodeName)
9096
ctx = klog.NewContext(ctx, logger)
9197

92-
_, err := ctl.podLister.Pods(ctl.namespace).Get(item.LauncherPodName)
98+
launcherPod, err := ctl.podLister.Pods(ctl.namespace).Get(item.LauncherPodName)
9399
if err != nil {
94100
if apierrors.IsNotFound(err) {
95101
logger.V(2).Info("Launcher pod deleted, cleaning up launcher data")
@@ -100,8 +106,15 @@ func (item launcherPodItem) process(ctx context.Context, ctl *controller, nodeDa
100106
return err, true
101107
}
102108

109+
// Sync launcher instances to keep internal state fresh and clean up stopped instances.
110+
_, syncErr, syncRetry := ctl.syncLauncherInstances(ctx, nodeDat, launcherPod)
111+
103112
ctl.enqueueUnboundInfSvrItemsOnNode(ctx, item.NodeName, fmt.Sprintf("launcher pod %s changed", item.LauncherPodName))
104-
return nil, false
113+
114+
if syncErr != nil {
115+
return fmt.Errorf("failed to sync launcher instances: %w", syncErr), syncRetry
116+
}
117+
return nil, syncRetry
105118
}
106119

107120
func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat *nodeData) (error, bool) {
@@ -350,6 +363,51 @@ func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat *
350363
return fmt.Errorf("unable to wake up server because port not known: %w", err), true
351364
}
352365
}
366+
// For launcher-based providers, check whether the bound instance is still alive.
367+
// The sidecar notifier updates the Pod annotation when instance status changes,
368+
// which triggers reconciliation through the informer.
369+
if launcherBased && serverDat.InstanceID != "" && providingPod.Status.PodIP != "" {
370+
syncResult, err, retry := ctl.syncLauncherInstances(ctx, nodeDat, providingPod)
371+
if err != nil || retry {
372+
if err != nil {
373+
return fmt.Errorf("failed to sync launcher instances for bound launcher Pod: %w", err), retry
374+
}
375+
return nil, true
376+
}
377+
378+
_, instancePresent := findInstanceState(syncResult.instances.Instances, serverDat.InstanceID)
379+
if delErr, failedCleanup := syncResult.failedStoppedInstanceErrs[serverDat.InstanceID]; failedCleanup {
380+
return fmt.Errorf("failed to delete stopped instance %q from launcher: %w", serverDat.InstanceID, delErr), true
381+
}
382+
if _, deletedStopped := syncResult.deletedStoppedInstanceIDs[serverDat.InstanceID]; deletedStopped || !instancePresent {
383+
if deletedStopped {
384+
logger.V(2).Info("Deleted stopped bound instance from launcher during sync")
385+
} else {
386+
logger.V(2).Info("Bound instance not found in launcher after sync, treating as deleted")
387+
}
388+
// Mark as sleeping so that ensureUnbound (called during requester deletion)
389+
// does not attempt to POST /sleep on the dead instance.
390+
// The instance process is dead — this is not a real sleeping state,
391+
// but it prevents ensureUnbound from hitting a dead endpoint and retrying forever.
392+
serverDat.Sleeping = ptr.To(true)
393+
// Delete the server-requesting Pod.
394+
// This is analogous to the direct-provider case where a providing Pod's
395+
// deletion is reflected to deletion of the server-requesting Pod.
396+
// The ReplicaSet will recreate the requesting Pod, triggering a fresh bind.
397+
err = podOps.Delete(ctx, requestingPod.Name, metav1.DeleteOptions{
398+
PropagationPolicy: ptr.To(metav1.DeletePropagationBackground),
399+
Preconditions: &metav1.Preconditions{UID: &item.UID, ResourceVersion: &requestingPod.ResourceVersion}})
400+
if err == nil {
401+
logger.V(2).Info("Requested deletion of server-requesting Pod because bound instance stopped")
402+
} else if apierrors.IsGone(err) || apierrors.IsNotFound(err) {
403+
logger.V(5).Info("The server-requesting Pod is already gone")
404+
} else {
405+
return fmt.Errorf("failed to delete server-requesting Pod for stopped instance: %w", err), true
406+
}
407+
serverDat.RequesterDeleteRequested = true
408+
return nil, false
409+
}
410+
}
353411
if serverDat.Sleeping == nil {
354412
sleeping, err := ctl.querySleeping(ctx, providingPod, serverPort)
355413
if err != nil {
@@ -469,7 +527,6 @@ func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat *
469527
lcName := isc.Spec.LauncherConfigName
470528
lc, err := ctl.lcLister.LauncherConfigs(ctl.namespace).Get(lcName)
471529
if err != nil {
472-
// TODO(waltforme): introduce the 'enqueue requesters by launcherconfigs' logic to the controller
473530
return ctl.ensureReqStatus(ctx, requestingPod, serverDat,
474531
fmt.Sprintf("failed to get LauncherConfig %q: %v", lcName, err),
475532
)
@@ -561,7 +618,6 @@ func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat *
561618
}
562619
// Sleeper budget is met. Make a new launcher Pod.
563620

564-
// TODO(waltforme): introduce the 'enqueue requesters by launcher pods' logic to the controller.
565621
echo, err := podOps.Create(ctx, desiredLauncherPod, metav1.CreateOptions{})
566622
if err != nil {
567623
errMsg := err.Error()
@@ -621,11 +677,12 @@ func (ctl *controller) selectBestLauncherPod(
621677
continue
622678
}
623679

624-
insts, err, retry := ctl.syncLauncherInstances(ctx, nodeDat, launcherPod)
680+
syncResult, err, retry := ctl.syncLauncherInstances(ctx, nodeDat, launcherPod)
625681
if err != nil || retry {
626682
somePodsNotReady = true
627683
continue
628684
}
685+
insts := syncResult.instances
629686

630687
// Check if this launcher has a sleeping instance matching the iscHash
631688
hasSleepingInstance := false
@@ -650,7 +707,7 @@ func (ctl *controller) selectBestLauncherPod(
650707
hasPortConflict = true
651708
break
652709
}
653-
if inst.InstanceID == iscHash {
710+
if inst.InstanceID == iscHash && inst.Status != InstanceStatusStopped {
654711
hasSleepingInstance = true
655712
}
656713
}
@@ -1324,10 +1381,19 @@ var coreScheme *k8sruntime.Scheme
13241381
var codecFactory k8sserializer.CodecFactory
13251382
var podDecoder k8sruntime.Decoder
13261383

1384+
func findInstanceState(insts []InstanceState, instanceID string) (*InstanceState, bool) {
1385+
for idx := range insts {
1386+
if insts[idx].InstanceID == instanceID {
1387+
return &insts[idx], true
1388+
}
1389+
}
1390+
return nil, false
1391+
}
1392+
13271393
// syncLauncherInstances queries the launcher pod for its current instances,
13281394
// updates the controller's internal launcherData state, and returns the fresh
13291395
// launcher response used for the update.
1330-
func (ctl *controller) syncLauncherInstances(ctx context.Context, nodeDat *nodeData, launcherPod *corev1.Pod) (*AllInstancesState, error, bool) {
1396+
func (ctl *controller) syncLauncherInstances(ctx context.Context, nodeDat *nodeData, launcherPod *corev1.Pod) (*launcherSyncResult, error, bool) {
13311397
logger := klog.FromContext(ctx)
13321398

13331399
if launcherPod.Status.PodIP == "" || !utils.IsPodReady(launcherPod) {
@@ -1350,14 +1416,43 @@ func (ctl *controller) syncLauncherInstances(ctx context.Context, nodeDat *nodeD
13501416

13511417
launcherDat := ctl.getLauncherData(nodeDat, launcherPod.Name)
13521418
newInstances := make(map[string]time.Time)
1419+
remainingInstances := make([]InstanceState, 0, len(insts.Instances))
1420+
deletedStoppedInstanceIDs := sets.New[string]()
1421+
failedStoppedInstanceErrs := map[string]error{}
1422+
runningCount := 0
13531423
for _, inst := range insts.Instances {
1424+
if inst.Status == InstanceStatusStopped {
1425+
// Clean up stopped instances from the launcher.
1426+
_, delErr := lClient.DeleteInstance(ctx, inst.InstanceID)
1427+
if delErr != nil && !IsInstanceNotFoundError(delErr) {
1428+
logger.V(3).Info("Failed to delete stopped instance from launcher during sync",
1429+
"instanceID", inst.InstanceID, "err", delErr)
1430+
// Deletion failed — the instance still occupies a slot in the launcher.
1431+
failedStoppedInstanceErrs[inst.InstanceID] = delErr
1432+
} else {
1433+
logger.V(2).Info("Deleted stopped instance from launcher during sync",
1434+
"instanceID", inst.InstanceID)
1435+
deletedStoppedInstanceIDs.Insert(inst.InstanceID)
1436+
continue
1437+
}
1438+
}
1439+
remainingInstances = append(remainingInstances, inst)
1440+
if inst.Status == "running" {
1441+
runningCount++
1442+
}
13541443
if lastUsed, exists := launcherDat.Instances[inst.InstanceID]; exists {
13551444
newInstances[inst.InstanceID] = lastUsed
13561445
} else {
13571446
newInstances[inst.InstanceID] = time.Now()
13581447
}
13591448
}
13601449

1450+
// Replace the returned instance list and counts with the filtered view
1451+
// so that callers (e.g. selectBestLauncherPod) see accurate capacity.
1452+
insts.Instances = remainingInstances
1453+
insts.TotalInstances = len(remainingInstances)
1454+
insts.RunningInstances = runningCount
1455+
13611456
launcherDat.Instances = newInstances
13621457
launcherDat.Accurate = true
13631458

@@ -1367,7 +1462,11 @@ func (ctl *controller) syncLauncherInstances(ctx context.Context, nodeDat *nodeD
13671462
"runningInstances", insts.RunningInstances,
13681463
"instanceCount", len(newInstances))
13691464

1370-
return insts, nil, false
1465+
return &launcherSyncResult{
1466+
instances: insts,
1467+
deletedStoppedInstanceIDs: deletedStoppedInstanceIDs,
1468+
failedStoppedInstanceErrs: failedStoppedInstanceErrs,
1469+
}, nil, false
13711470
}
13721471

13731472
func init() {

pkg/controller/dual-pods/launcherclient.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"bytes"
2121
"context"
2222
"encoding/json"
23+
"errors"
2324
"fmt"
2425
"io"
2526
"net/http"
@@ -34,9 +35,22 @@ type LauncherClient struct {
3435
httpClient *http.Client
3536
}
3637

38+
type launcherError struct {
39+
StatusCode int
40+
Body string
41+
}
42+
43+
func (e *launcherError) Error() string {
44+
return fmt.Sprintf("launcher error %d: %s", e.StatusCode, e.Body)
45+
}
46+
3747
const (
3848
VllmConfigISCNameAnnotationKey = "isc-name"
3949
VllmConfigInferencePortAnnotationKey = "inference-port"
50+
51+
// InstanceStatusStopped is the status value reported by the launcher
52+
// when a vLLM instance's process has terminated.
53+
InstanceStatusStopped = "stopped"
4054
)
4155

4256
func NewLauncherClient(baseURL string) (*LauncherClient, error) {
@@ -184,6 +198,13 @@ func (c *LauncherClient) create(
184198
return &out, nil
185199
}
186200

201+
// IsInstanceNotFoundError returns true when the launcher reports the instance
202+
// does not exist.
203+
func IsInstanceNotFoundError(err error) bool {
204+
var launcherErr *launcherError
205+
return errors.As(err, &launcherErr) && launcherErr.StatusCode == http.StatusNotFound
206+
}
207+
187208
func (c *LauncherClient) do(
188209
ctx context.Context,
189210
method string,
@@ -218,7 +239,7 @@ func (c *LauncherClient) do(
218239

219240
if resp.StatusCode >= 300 {
220241
b, _ := io.ReadAll(resp.Body)
221-
return fmt.Errorf("launcher error %d: %s", resp.StatusCode, string(b))
242+
return &launcherError{StatusCode: resp.StatusCode, Body: string(b)}
222243
}
223244

224245
if out != nil {

test/e2e/test-cases.sh

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -499,4 +499,71 @@ if [ "$E2E_PLATFORM" = "openshift" ]; then check_gpu_pin $req_after_delete; fi
499499

500500
cheer Successful unbound launcher deletion cleanup
501501

502+
# ---------------------------------------------------------------------------
503+
# Stopped Instance Recovery
504+
# ---------------------------------------------------------------------------
505+
506+
intro_case Stopped Instance Recovery
507+
508+
# This test verifies that the dual-pods controller detects a stopped vLLM
509+
# instance (via the sidecar notifier annotation) and deletes the server-
510+
# requesting Pod so that the ReplicaSet recreates it with a fresh instance.
511+
#
512+
# Starting state: $req_after_delete is bound to $launcher_after_delete, both Ready.
513+
514+
echo "Bound requester: $req_after_delete, launcher: $launcher_after_delete"
515+
req_uid_before=$(kubectl get pod $req_after_delete -n "$NS" -o jsonpath='{.metadata.uid}')
516+
517+
# Get the running instance ID from the launcher
518+
instance_id=$(kubectl exec -n "$NS" $launcher_after_delete -c inference-server -- python3 -c '
519+
import json, urllib.request
520+
resp = json.load(urllib.request.urlopen("http://127.0.0.1:8001/v2/vllm/instances"))
521+
for inst in resp["instances"]:
522+
if inst["status"] == "running":
523+
print(inst["instance_id"])
524+
break
525+
')
526+
echo "Running instance ID: $instance_id"
527+
[ -n "$instance_id" ]
528+
529+
# Delete the running instance from the launcher to simulate a crash.
530+
# The notifier sidecar will detect the change and update the Pod annotation.
531+
# The dual-pods controller will then query the instance, get 404, and delete the requester.
532+
kubectl exec -n "$NS" $launcher_after_delete -c inference-server -- python3 -c '
533+
import urllib.request
534+
req = urllib.request.Request(
535+
"http://127.0.0.1:8001/v2/vllm/instances/'"$instance_id"'",
536+
method="DELETE",
537+
)
538+
urllib.request.urlopen(req)
539+
print("Instance deleted from launcher")
540+
'
541+
542+
# Wait for the old requester Pod to be deleted (the dual-pods controller should do this)
543+
expect '[ "$(kubectl get pod -n '"$NS"' $req_after_delete -o jsonpath={.metadata.uid} 2>/dev/null)" != "$req_uid_before" ]'
544+
echo "Old requester $req_after_delete was deleted by the controller"
545+
546+
# Wait for the ReplicaSet to recreate a new requester Pod
547+
expect "kubectl get pods -n $NS -o name -l app=dp-example,instance=$inst | wc -l | grep -w 1"
548+
req_recovered=$(kubectl get pods -n "$NS" -o name -l app=dp-example,instance=$inst | sed s%pod/%%)
549+
echo "Recovered server-requesting Pod: $req_recovered"
550+
551+
# Wait for the new requester to be bound to the same launcher
552+
expect "kubectl get pods -n $NS -o name -l dual-pods.llm-d.ai/dual=$req_recovered | wc -l | grep -w 1"
553+
launcher_recovered=$(kubectl get pods -n "$NS" -o name -l dual-pods.llm-d.ai/dual=$req_recovered | sed s%pod/%%)
554+
echo "Recovered launcher: $launcher_recovered"
555+
[ "$launcher_recovered" == "$launcher_after_delete" ]
556+
557+
# Verify bidirectional binding
558+
expect '[ "$(kubectl get pod -n '"$NS"' $req_recovered -o jsonpath={.metadata.labels.dual-pods\\.llm-d\\.ai/dual})" == "$launcher_after_delete" ]'
559+
560+
# Wait for both to be ready
561+
date
562+
kubectl wait --for condition=Ready pod/$req_recovered -n "$NS" --timeout=120s
563+
[ "$(kubectl get pod $launcher_after_delete -n "$NS" -o jsonpath='{.status.conditions[?(@.type=="Ready")].status}')" = "True" ]
564+
565+
if [ "$E2E_PLATFORM" = "openshift" ]; then check_gpu_pin $req_recovered; fi
566+
567+
cheer Successful stopped instance recovery
568+
502569
cheer All launcher-based tests passed

0 commit comments

Comments
 (0)