Skip to content

Commit be469b3

Browse files
committed
Unified stopped-instance cleanup in syncLauncherInstances
Signed-off-by: Jun Duan <jun.duan.phd@outlook.com>
1 parent 48664fe commit be469b3

File tree

1 file changed

+47
-31
lines changed

1 file changed

+47
-31
lines changed

pkg/controller/dual-pods/inference-server.go

Lines changed: 47 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,12 @@ type nodeItem struct {
5757
NodeName string
5858
}
5959

60+
type launcherSyncResult struct {
61+
instances *AllInstancesState
62+
deletedStoppedInstanceIDs sets.Set[string]
63+
failedStoppedInstanceErrs map[string]error
64+
}
65+
6066
func (ni nodeItem) process(ctx context.Context, ctl *controller) (error, bool) {
6167
logger := klog.FromContext(ctx).WithValues("node", ni.NodeName)
6268
ctx = klog.NewContext(ctx, logger)
@@ -101,7 +107,6 @@ func (item unboundLauncherPodItem) process(ctx context.Context, ctl *controller,
101107
}
102108

103109
// Sync launcher instances to keep internal state fresh and clean up stopped instances.
104-
// This is triggered by annotation changes from the sidecar notifier.
105110
_, syncErr, syncRetry := ctl.syncLauncherInstances(ctx, nodeDat, launcherPod)
106111
if syncErr != nil {
107112
if syncRetry {
@@ -365,32 +370,24 @@ func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat *
365370
// The sidecar notifier updates the Pod annotation when instance status changes,
366371
// which triggers reconciliation through the informer.
367372
if launcherBased && serverDat.InstanceID != "" && providingPod.Status.PodIP != "" {
368-
launcherBaseURL := fmt.Sprintf("http://%s:%d", providingPod.Status.PodIP, ctlrcommon.LauncherServicePort)
369-
lClient, err := NewLauncherClient(launcherBaseURL)
370-
if err != nil {
371-
return fmt.Errorf("failed to create launcher client for instance status check: %w", err), true
372-
}
373-
instState, err := lClient.GetInstanceState(ctx, serverDat.InstanceID)
374-
instanceStopped := false
375-
if err != nil {
376-
if IsLauncherNotFoundError(err) {
377-
// Instance is gone from the launcher entirely.
378-
logger.V(2).Info("Bound instance not found in launcher, treating as stopped", "instanceID", serverDat.InstanceID)
379-
instanceStopped = true
380-
} else {
381-
return fmt.Errorf("failed to get instance state from launcher: %w", err), true
373+
syncResult, err, retry := ctl.syncLauncherInstances(ctx, nodeDat, providingPod)
374+
if err != nil || retry {
375+
if err != nil {
376+
return fmt.Errorf("failed to sync launcher instances for bound launcher Pod: %w", err), true
382377
}
383-
} else if instState.Status == InstanceStatusStopped {
384-
logger.V(2).Info("Bound instance is stopped", "instanceID", serverDat.InstanceID)
385-
instanceStopped = true
378+
return nil, true
386379
}
387-
if instanceStopped {
388-
// Clean up the stopped instance from the launcher.
389-
_, delErr := lClient.DeleteInstance(ctx, serverDat.InstanceID)
390-
if delErr != nil && !IsLauncherNotFoundError(delErr) {
391-
logger.V(3).Info("Failed to delete stopped instance from launcher", "instanceID", serverDat.InstanceID, "err", delErr)
392-
return ctl.ensureReqStatus(ctx, requestingPod, serverDat,
393-
fmt.Sprintf("failed to delete stopped instance %q from launcher: %s", serverDat.InstanceID, delErr.Error()))
380+
381+
_, instancePresent := findInstanceState(syncResult.instances.Instances, serverDat.InstanceID)
382+
if delErr, failedCleanup := syncResult.failedStoppedInstanceErrs[serverDat.InstanceID]; failedCleanup {
383+
return ctl.ensureReqStatus(ctx, requestingPod, serverDat,
384+
fmt.Sprintf("failed to delete stopped instance %q from launcher: %s", serverDat.InstanceID, delErr.Error()))
385+
}
386+
if _, deletedStopped := syncResult.deletedStoppedInstanceIDs[serverDat.InstanceID]; deletedStopped || !instancePresent {
387+
if deletedStopped {
388+
logger.V(2).Info("Deleted stopped bound instance from launcher during sync", "instanceID", serverDat.InstanceID)
389+
} else {
390+
logger.V(2).Info("Bound instance not found in launcher after sync, treating as deleted", "instanceID", serverDat.InstanceID)
394391
}
395392
// Mark as sleeping so that ensureUnbound (called during requester deletion)
396393
// does not attempt to POST /sleep on the dead instance.
@@ -686,11 +683,12 @@ func (ctl *controller) selectBestLauncherPod(
686683
continue
687684
}
688685

689-
insts, err, retry := ctl.syncLauncherInstances(ctx, nodeDat, launcherPod)
686+
syncResult, err, retry := ctl.syncLauncherInstances(ctx, nodeDat, launcherPod)
690687
if err != nil || retry {
691688
somePodsNotReady = true
692689
continue
693690
}
691+
insts := syncResult.instances
694692

695693
// Check if this launcher has a sleeping instance matching the iscHash
696694
hasSleepingInstance := false
@@ -715,7 +713,7 @@ func (ctl *controller) selectBestLauncherPod(
715713
hasPortConflict = true
716714
break
717715
}
718-
if inst.InstanceID == iscHash {
716+
if inst.InstanceID == iscHash && inst.Status != InstanceStatusStopped {
719717
hasSleepingInstance = true
720718
}
721719
}
@@ -1376,10 +1374,19 @@ var coreScheme *k8sruntime.Scheme
13761374
var codecFactory k8sserializer.CodecFactory
13771375
var podDecoder k8sruntime.Decoder
13781376

1377+
func findInstanceState(insts []InstanceState, instanceID string) (*InstanceState, bool) {
1378+
for idx := range insts {
1379+
if insts[idx].InstanceID == instanceID {
1380+
return &insts[idx], true
1381+
}
1382+
}
1383+
return nil, false
1384+
}
1385+
13791386
// syncLauncherInstances queries the launcher pod for its current instances,
13801387
// updates the controller's internal launcherData state, and returns the fresh
13811388
// launcher response used for the update.
1382-
func (ctl *controller) syncLauncherInstances(ctx context.Context, nodeDat *nodeData, launcherPod *corev1.Pod) (*AllInstancesState, error, bool) {
1389+
func (ctl *controller) syncLauncherInstances(ctx context.Context, nodeDat *nodeData, launcherPod *corev1.Pod) (*launcherSyncResult, error, bool) {
13831390
logger := klog.FromContext(ctx)
13841391

13851392
if launcherPod.Status.PodIP == "" || !utils.IsPodReady(launcherPod) {
@@ -1403,6 +1410,8 @@ func (ctl *controller) syncLauncherInstances(ctx context.Context, nodeDat *nodeD
14031410
launcherDat := ctl.getLauncherData(nodeDat, launcherPod.Name)
14041411
newInstances := make(map[string]time.Time)
14051412
remainingInstances := make([]InstanceState, 0, len(insts.Instances))
1413+
deletedStoppedInstanceIDs := sets.New[string]()
1414+
failedStoppedInstanceErrs := map[string]error{}
14061415
for _, inst := range insts.Instances {
14071416
if inst.Status == InstanceStatusStopped {
14081417
// Clean up stopped instances from the launcher.
@@ -1412,12 +1421,15 @@ func (ctl *controller) syncLauncherInstances(ctx context.Context, nodeDat *nodeD
14121421
"instanceID", inst.InstanceID, "err", delErr)
14131422
// Deletion failed — the instance still occupies a slot in the launcher.
14141423
// Include it in remainingInstances so capacity accounting stays accurate.
1415-
remainingInstances = append(remainingInstances, inst)
1424+
failedStoppedInstanceErrs[inst.InstanceID] = delErr
14161425
} else {
14171426
logger.V(2).Info("Deleted stopped instance from launcher during sync",
14181427
"instanceID", inst.InstanceID)
1428+
deletedStoppedInstanceIDs.Insert(inst.InstanceID)
1429+
}
1430+
if delErr == nil || IsLauncherNotFoundError(delErr) {
1431+
continue
14191432
}
1420-
continue
14211433
}
14221434
remainingInstances = append(remainingInstances, inst)
14231435
if lastUsed, exists := launcherDat.Instances[inst.InstanceID]; exists {
@@ -1448,7 +1460,11 @@ func (ctl *controller) syncLauncherInstances(ctx context.Context, nodeDat *nodeD
14481460
"runningInstances", insts.RunningInstances,
14491461
"instanceCount", len(newInstances))
14501462

1451-
return insts, nil, false
1463+
return &launcherSyncResult{
1464+
instances: insts,
1465+
deletedStoppedInstanceIDs: deletedStoppedInstanceIDs,
1466+
failedStoppedInstanceErrs: failedStoppedInstanceErrs,
1467+
}, nil, false
14521468
}
14531469

14541470
func init() {

0 commit comments

Comments
 (0)