Skip to content

Commit 0583aea

Browse files
committed
Improve selectBestLauncherPod
Signed-off-by: Jun Duan <jun.duan.phd@outlook.com>
1 parent a30bea4 commit 0583aea

File tree

1 file changed

+19
-29
lines changed

1 file changed

+19
-29
lines changed

pkg/controller/dual-pods/inference-server.go

Lines changed: 19 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -612,31 +612,21 @@ func (ctl *controller) selectBestLauncherPod(
612612
continue
613613
}
614614

615-
launcherDat := ctl.getLauncherData(nodeDat, launcherPod.Name)
616-
if !launcherDat.Accurate {
617-
err, retry := ctl.syncLauncherInstances(ctx, nodeDat, launcherPod)
618-
if err != nil || retry {
619-
somePodsNotReady = true
620-
continue
621-
}
622-
}
623-
624-
launcherBaseURL := fmt.Sprintf("http://%s:%d", launcherPod.Status.PodIP, ctlrcommon.LauncherServicePort)
625-
lClient, err := NewLauncherClient(launcherBaseURL)
626-
if err != nil {
627-
logger.V(5).Info("Failed to create launcher client, skipping Pod", "name", launcherPod.Name, "err", err)
628-
continue
629-
}
630-
631-
// Query instances from this launcher.
632-
insts, err := lClient.ListInstances(ctx)
633-
if err != nil {
634-
logger.V(5).Info("Failed to list instances from launcher, skipping Pod", "name", launcherPod.Name, "err", err)
615+
insts, err, retry := ctl.syncLauncherInstances(ctx, nodeDat, launcherPod)
616+
if err != nil || retry {
617+
somePodsNotReady = true
635618
continue
636619
}
637620

638621
// Check if this launcher has a sleeping instance matching the iscHash
639-
if _, instExists := launcherDat.Instances[iscHash]; instExists {
622+
hasSleepingInstance := false
623+
for _, inst := range insts.Instances {
624+
if inst.InstanceID == iscHash {
625+
hasSleepingInstance = true
626+
break
627+
}
628+
}
629+
if hasSleepingInstance {
640630
// Priority 1: Found a sleeping instance
641631
logger.V(5).Info("Found launcher with sleeping instance (fastest path)",
642632
"name", launcherPod.Name,
@@ -1268,28 +1258,28 @@ var coreScheme *k8sruntime.Scheme
12681258
var codecFactory k8sserializer.CodecFactory
12691259
var podDecoder k8sruntime.Decoder
12701260

1271-
// syncLauncherInstances queries the launcher pod for its current instances
1272-
// and updates the controller's internal launcherData state.
1273-
// This is called for both bound and unbound launcher pods.
1274-
func (ctl *controller) syncLauncherInstances(ctx context.Context, nodeDat *nodeData, launcherPod *corev1.Pod) (error, bool) {
1261+
// syncLauncherInstances queries the launcher pod for its current instances,
1262+
// updates the controller's internal launcherData state, and returns the fresh
1263+
// launcher response used for the update.
1264+
func (ctl *controller) syncLauncherInstances(ctx context.Context, nodeDat *nodeData, launcherPod *corev1.Pod) (*AllInstancesStatus, error, bool) {
12751265
logger := klog.FromContext(ctx)
12761266

12771267
if launcherPod.Status.PodIP == "" || !utils.IsPodReady(launcherPod) {
12781268
logger.V(5).Info("Launcher pod not ready yet, waiting for another Pod event", "name", launcherPod.Name)
1279-
return nil, false
1269+
return nil, nil, true
12801270
}
12811271

12821272
launcherBaseURL := fmt.Sprintf("http://%s:%d", launcherPod.Status.PodIP, ctlrcommon.LauncherServicePort)
12831273
lClient, err := NewLauncherClient(launcherBaseURL)
12841274
if err != nil {
12851275
logger.Error(err, "Failed to create launcher client")
1286-
return err, true
1276+
return nil, err, true
12871277
}
12881278

12891279
insts, err := lClient.ListInstances(ctx)
12901280
if err != nil {
12911281
logger.Error(err, "Failed to list instances from launcher")
1292-
return err, true
1282+
return nil, err, true
12931283
}
12941284

12951285
launcherDat := ctl.getLauncherData(nodeDat, launcherPod.Name)
@@ -1311,7 +1301,7 @@ func (ctl *controller) syncLauncherInstances(ctx context.Context, nodeDat *nodeD
13111301
"runningInstances", insts.RunningInstances,
13121302
"instanceCount", len(newInstances))
13131303

1314-
return nil, false
1304+
return insts, nil, false
13151305
}
13161306

13171307
func init() {

0 commit comments

Comments
 (0)