Skip to content

Commit 491a0a0

Browse files
committed
Take all matching launcher Pods into consideration
Signed-off-by: Jun Duan <jun.duan.phd@outlook.com>
1 parent bd3cd51 commit 491a0a0

File tree

2 files changed

+159
-52
lines changed

2 files changed

+159
-52
lines changed

pkg/controller/dual-pods/inference-server.go

Lines changed: 141 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -460,70 +460,69 @@ func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat *
460460
return err, false
461461
}
462462

463+
cfg, iscHash, err := ctl.parseInferenceServerConfig(isc)
464+
if err != nil {
465+
return fmt.Errorf("parse inference server config: %w", err), true
466+
}
467+
logger.V(5).Info("Nominal hash of InferenceServerConfig", "hash", iscHash)
468+
463469
if len(launcherPodAnys) > 0 {
464470
// Multiple launcher Pods could exist for one LauncherConfig object on one node.
465-
// TODO(waltforme): The logic currently picks the first Pod but should consider all of them
466-
467-
// Note: Multiple vLLM instances could exist in one launcher Pod, but at most one instance
468-
// could be awake at a time.
469-
// TODO(waltforme): Revise accordingly.
470-
471-
launcherPod := launcherPodAnys[0].(*corev1.Pod)
472-
logger.V(5).Info("Found launcher Pod", "name", launcherPod.Name)
473-
launcherIP := launcherPod.Status.PodIP
474-
if launcherIP == "" {
475-
return fmt.Errorf("launcher Pod %q has no IP assigned yet", launcherPod.Name), true
476-
}
477-
478-
launcherBaseURL := fmt.Sprintf("http://%s:%d", launcherIP, ctlrcommon.LauncherServicePort)
479-
lClient, err := NewLauncherClient(launcherBaseURL)
480-
if err != nil {
481-
return err, true
482-
}
471+
// Select the best launcher Pod: prioritize those with sleeping instances (fast wake-up),
472+
// then those with capacity for new instances.
473+
// Note that multiple vLLM instances could exist in one launcher Pod, but at most one instance could be awake at a time.
483474

484-
insts, err := lClient.ListInstances(ctx)
475+
launcherPod, hasSleepingInstance, someNotReady, err := ctl.selectBestLauncherPod(ctx, launcherPodAnys, iscHash, nodeDat)
485476
if err != nil {
486477
return err, true
487478
}
488-
logger.V(5).Info("vLLM instance counts",
489-
"total_instances", insts.TotalInstances,
490-
"running_instances", insts.RunningInstances,
491-
)
492-
493-
cfg, iscHash, err := ctl.parseInferenceServerConfig(isc)
494-
if err != nil {
495-
return fmt.Errorf("parse inference server config: %w", err), true
479+
if someNotReady {
480+
logger.V(4).Info("Launcher Pods exist but some are not ready yet, will retry later")
481+
return nil, true
496482
}
497-
logger.V(5).Info("Nominal hash of InferenceServerConfig", "hash", iscHash)
498-
499-
launcherDat := ctl.getLauncherData(nodeDat, launcherPod.Name)
500-
_, instExists := launcherDat.Instances[iscHash]
483+
if launcherPod == nil {
484+
logger.V(5).Info("No suitable launcher Pod found with sleeping instance or necessary capacity")
485+
// Fall through to create new launcher Pod
486+
} else {
487+
logger.V(5).Info("Selected launcher Pod", "name", launcherPod.Name, "hasSleepingInstance", hasSleepingInstance)
488+
launcherIP := launcherPod.Status.PodIP
489+
if launcherIP == "" {
490+
return fmt.Errorf("launcher Pod %q has no IP assigned yet", launcherPod.Name), true
491+
}
501492

502-
// if the matching launcher Pod hosts a matching instance, make sure that instance is awake, then bound the launcher Pod to the requester.
503-
// if the matching launcher Pod hosts zero instances, create an instance and bind the launcher Pod to the requester.
504-
if instExists {
505-
logger.V(5).Info("vLLM instance for InferenceServerConfig already exists", "iscHash", iscHash)
506-
err := ctl.wakeupInstance(ctx, lClient, iscHash, isc.Spec.ModelServerConfig.Port)
493+
launcherBaseURL := fmt.Sprintf("http://%s:%d", launcherIP, ctlrcommon.LauncherServicePort)
494+
lClient, err := NewLauncherClient(launcherBaseURL)
507495
if err != nil {
508-
return fmt.Errorf("wake up vLLM instance: %w", err), true
496+
return err, true
509497
}
510-
launcherDat.Instances[iscHash] = time.Now()
511-
// TODO(waltforme): the bind method may need more revision to fully handle launcher-based server providing Pods
512-
return ctl.bind(ctx, serverDat, requestingPod, launcherPod, true)
513-
} else if insts.TotalInstances == 0 {
514-
result, err := lClient.CreateNamedInstance(ctx, iscHash, *cfg)
515-
if err != nil {
516-
return fmt.Errorf("create vLLM instance: %w", err), true
498+
499+
launcherDat := ctl.getLauncherData(nodeDat, launcherPod.Name)
500+
501+
if hasSleepingInstance {
502+
// Fast path: wake up existing sleeping instance
503+
logger.V(5).Info("Waking up existing vLLM instance", "iscHash", iscHash)
504+
err := ctl.wakeupInstance(ctx, lClient, iscHash, isc.Spec.ModelServerConfig.Port)
505+
if err != nil {
506+
return fmt.Errorf("wake up vLLM instance: %w", err), true
507+
}
508+
launcherDat.Instances[iscHash] = time.Now()
509+
// TODO(waltforme): the bind method may need more revision to fully handle launcher-based server providing Pods
510+
return ctl.bind(ctx, serverDat, requestingPod, launcherPod, true)
511+
} else {
512+
// Slower path: create new instance in launcher with capacity
513+
result, err := lClient.CreateNamedInstance(ctx, iscHash, *cfg)
514+
if err != nil {
515+
return fmt.Errorf("create vLLM instance: %w", err), true
516+
}
517+
logger.V(5).Info("Created new vLLM instance",
518+
"instance_id", result.InstanceID,
519+
"status", result.Status,
520+
)
521+
launcherDat.Instances[iscHash] = time.Now()
522+
// TODO(waltforme): the bind method may need more revision to fully handle launcher-based server providing Pods
523+
return ctl.bind(ctx, serverDat, requestingPod, launcherPod, true)
517524
}
518-
logger.V(5).Info("Created new vLLM instance",
519-
"instance_id", result.InstanceID,
520-
"status", result.Status,
521-
)
522-
launcherDat.Instances[iscHash] = time.Now()
523-
// TODO(waltforme): the bind method may need more revision to fully handle launcher-based server providing Pods
524-
return ctl.bind(ctx, serverDat, requestingPod, launcherPod, true)
525525
}
526-
527526
}
528527
// Remains: Zero matching launcher Pods, or the matching launcher Pod cannot host more instances to fulfill the request.
529528

@@ -553,6 +552,96 @@ func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat *
553552
return ctl.ensureReqStatus(ctx, requestingPod, serverDat)
554553
}
555554

555+
// selectBestLauncherPod evaluates all matching launcher Pods and selects the 'best' one for fulfilling a request.
556+
// Currently the definition of 'best' is radically simple:
557+
// Priority 1: Launcher with a sleeping instance matching iscHash (fastest - just wake up);
558+
// Priority 2: Launcher with capacity for a new instance (slower - need to create);
559+
// Otherwise, no launcher Pod is selected.
560+
// Returns (selectedPod, hasSleepingInstance, somePodsNotReady, error).
561+
// Returns (nil, false, false, nil) if no suitable launcher found and all pods are ready or failed.
562+
// Returns (nil, false, true, nil) if there are pods not ready yet - caller should retry later.
563+
func (ctl *controller) selectBestLauncherPod(
564+
ctx context.Context,
565+
launcherPodAnys []interface{},
566+
iscHash string,
567+
nodeDat *nodeData,
568+
) (*corev1.Pod, bool, bool, error) {
569+
logger := klog.FromContext(ctx)
570+
571+
var candidateWithCapacity *corev1.Pod
572+
var somePodsNotReady bool
573+
574+
for _, podAny := range launcherPodAnys {
575+
launcherPod := podAny.(*corev1.Pod)
576+
577+
if launcherPod.Status.Phase == corev1.PodFailed || launcherPod.DeletionTimestamp != nil {
578+
continue
579+
}
580+
581+
// Track pods that are not ready yet - we should give them time instead of
582+
// failing and creating new launcher Pods immediately.
583+
if launcherPod.Status.PodIP == "" || !utils.IsPodReady(launcherPod) {
584+
logger.V(5).Info("Launcher Pod not ready yet", "name", launcherPod.Name, "hasIP", launcherPod.Status.PodIP != "")
585+
somePodsNotReady = true
586+
continue
587+
}
588+
589+
launcherBaseURL := fmt.Sprintf("http://%s:%d", launcherPod.Status.PodIP, ctlrcommon.LauncherServicePort)
590+
lClient, err := NewLauncherClient(launcherBaseURL)
591+
if err != nil {
592+
logger.V(5).Info("Failed to create launcher client, skipping Pod", "name", launcherPod.Name, "err", err)
593+
continue
594+
}
595+
596+
// Query instances from this launcher
597+
insts, err := lClient.ListInstances(ctx)
598+
if err != nil {
599+
logger.V(5).Info("Failed to list instances from launcher, skipping Pod", "name", launcherPod.Name, "err", err)
600+
continue
601+
}
602+
603+
launcherDat := ctl.getLauncherData(nodeDat, launcherPod.Name)
604+
605+
// Check if this launcher has a sleeping instance matching the iscHash
606+
if _, instExists := launcherDat.Instances[iscHash]; instExists {
607+
// Priority 1: Found a sleeping instance
608+
logger.V(5).Info("Found launcher with sleeping instance (fastest path)",
609+
"name", launcherPod.Name,
610+
"iscHash", iscHash,
611+
"totalInstances", insts.TotalInstances,
612+
"runningInstances", insts.RunningInstances)
613+
return launcherPod, true, false, nil
614+
}
615+
616+
// Check if this launcher has capacity for a new instance
617+
// A launcher has capacity if it has zero instances (can host at least one)
618+
if insts.TotalInstances == 0 && candidateWithCapacity == nil {
619+
// Priority 2: Has capacity for new instance
620+
logger.V(5).Info("Found launcher with capacity for new instance",
621+
"name", launcherPod.Name,
622+
"totalInstances", insts.TotalInstances)
623+
candidateWithCapacity = launcherPod
624+
// Don't return yet - keep looking for sleeping instances (higher priority)
625+
}
626+
}
627+
628+
// No sleeper but we found a launcher with capacity, use it
629+
if candidateWithCapacity != nil {
630+
logger.V(4).Info("Selected launcher with capacity (slower path)", "name", candidateWithCapacity.Name)
631+
return candidateWithCapacity, false, false, nil
632+
}
633+
634+
// Found sleeper nor capable ones, but there are pods not ready yet, signal caller to retry later
635+
if somePodsNotReady {
636+
logger.V(4).Info("Found launcher Pods not ready yet, will retry later")
637+
return nil, false, true, nil
638+
}
639+
640+
// No suitable launchers found
641+
logger.V(4).Info("No suitable launcher Pod found with sleeping instance or necessary capacity")
642+
return nil, false, false, nil
643+
}
644+
556645
func (ctl *controller) parseInferenceServerConfig(isc *fmav1alpha1.InferenceServerConfig) (*VllmConfig, string, error) {
557646
options := isc.Spec.ModelServerConfig.Options + " --port " + strconv.Itoa(int(isc.Spec.ModelServerConfig.Port))
558647
vllmCfg := VllmConfig{ // TODO(waltforme): update this when type VllmConfig is updated

pkg/controller/utils/pod-helper.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,24 @@ func BuildLauncherPodFromTemplate(template corev1.PodTemplateSpec, ns, nodeName,
192192
FailureThreshold: 3,
193193
}
194194

195+
// Set readiness probe to check if launcher can list instances.
196+
// This is necessary because otherwise the dual-pod controller will be confused when
197+
// the launcher Pod is said to be ready but got refused when listing its vLLM instances.
198+
container.ReadinessProbe = &corev1.Probe{
199+
ProbeHandler: corev1.ProbeHandler{
200+
HTTPGet: &corev1.HTTPGetAction{
201+
Path: "/v2/vllm/instances",
202+
Port: intstr.FromInt(int(serverPort)),
203+
Scheme: corev1.URISchemeHTTP,
204+
},
205+
},
206+
InitialDelaySeconds: 2,
207+
PeriodSeconds: 5,
208+
TimeoutSeconds: 2,
209+
SuccessThreshold: 1,
210+
FailureThreshold: 3,
211+
}
212+
195213
// Remove nvidia.com/gpu from resource limits
196214
removeGPUResourceLimits(container)
197215

0 commit comments

Comments
 (0)