Skip to content

Commit 0ff763b

Browse files
committed
Better handle the inference server port of launcher-hosted vLLM instances
Signed-off-by: Jun Duan <jun.duan.phd@outlook.com>
1 parent 491a0a0 commit 0ff763b

File tree

2 files changed

+39
-18
lines changed

2 files changed

+39
-18
lines changed

pkg/controller/dual-pods/inference-server.go

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,7 @@ func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat *
409409
logger.V(2).Info("Unexpected: multiple sleeping Pods match; using the first", "requesterName", requestingPod.Name)
410410
}
411411
providingPod = sleepingAnys[0].(*corev1.Pod)
412-
return ctl.bind(ctx, serverDat, requestingPod, providingPod, false)
412+
return ctl.bind(ctx, serverDat, requestingPod, providingPod, false, -1)
413413
}
414414
// What remains is to make a new server-providing Pod --- if the sleeper budget allows.
415415

@@ -507,7 +507,7 @@ func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat *
507507
}
508508
launcherDat.Instances[iscHash] = time.Now()
509509
// TODO(waltforme): the bind method may need more revision to fully handle launcher-based server providing Pods
510-
return ctl.bind(ctx, serverDat, requestingPod, launcherPod, true)
510+
return ctl.bind(ctx, serverDat, requestingPod, launcherPod, true, int16(isc.Spec.ModelServerConfig.Port))
511511
} else {
512512
// Slower path: create new instance in launcher with capacity
513513
result, err := lClient.CreateNamedInstance(ctx, iscHash, *cfg)
@@ -520,7 +520,7 @@ func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat *
520520
)
521521
launcherDat.Instances[iscHash] = time.Now()
522522
// TODO(waltforme): the bind method may need more revision to fully handle launcher-based server providing Pods
523-
return ctl.bind(ctx, serverDat, requestingPod, launcherPod, true)
523+
return ctl.bind(ctx, serverDat, requestingPod, launcherPod, true, int16(isc.Spec.ModelServerConfig.Port))
524524
}
525525
}
526526
}
@@ -768,7 +768,8 @@ func (ctl *controller) enforceSleeperBudget(ctx context.Context, serverDat *serv
768768
return nil, len(gonerNames) > 0
769769
}
770770

771-
func (ctl *controller) bind(ctx context.Context, serverDat *serverData, requestingPod, providingPod *corev1.Pod, launcherBased bool) (error, bool) {
771+
// Note: instPort is used only for launcher-based server-providing Pods.
772+
func (ctl *controller) bind(ctx context.Context, serverDat *serverData, requestingPod, providingPod *corev1.Pod, launcherBased bool, instPort int16) (error, bool) {
772773
logger := klog.FromContext(ctx)
773774
providingPod = providingPod.DeepCopy()
774775
providingPod.Annotations[requesterAnnotationKey] = string(requestingPod.UID) + " " + requestingPod.Name
@@ -783,9 +784,20 @@ func (ctl *controller) bind(ctx context.Context, serverDat *serverData, requesti
783784
}
784785
serverDat.ProvidingPodName = providingPod.Name
785786
logger.V(2).Info("Bound server-providing Pod", "name", providingPod.Name, "node", requestingPod.Spec.NodeName, "gpus", serverDat.GPUIndicesStr, "newResourceVersion", echo.ResourceVersion)
786-
_, serverPort, err := utils.GetInferenceServerPort(providingPod, launcherBased)
787-
if err != nil { // Impossible, because such a providingPod would never be created by this controller
788-
return fmt.Errorf("unable to wake up server because port not known: %w", err), true
787+
var serverPort int16
788+
if launcherBased {
789+
serverPort = instPort
790+
} else {
791+
_, serverPort, err = utils.GetInferenceServerPort(providingPod, false)
792+
if err != nil { // Impossible, because such a providingPod would never be created by this controller
793+
return fmt.Errorf("unable to wake up server because port not known: %w", err), true
794+
}
795+
}
796+
// For launcher-based server-providing Pods, ServerPort is written when binding.
797+
// For direct server-providing Pods, ServerPort is written (earlier) when
798+
// constructingthe server-providing Pod's spec in getNominalServerProvidingPod.
799+
if launcherBased {
800+
serverDat.ServerPort = serverPort
789801
}
790802
err = ctl.wakeSleeper(ctx, serverDat, requestingPod, providingPod, serverPort)
791803
if err != nil {
@@ -900,11 +912,16 @@ func (ctl *controller) ensureUnbound(ctx context.Context, serverDat *serverData,
900912
// If providingPod is stale then the update will fail.
901913
if (serverDat.Sleeping == nil || !*(serverDat.Sleeping)) && providingPod.Status.PodIP != "" { // need to put to sleep
902914
serverPort := serverDat.ServerPort
903-
if serverDat.NominalProvidingPod == nil {
904-
var err error
905-
_, serverPort, err = utils.GetInferenceServerPort(providingPod, launcherBased)
906-
if err != nil { // Impossible, because such a providingPod would never be created by this controller
907-
return fmt.Errorf("unable to put server to sleep because port not known: %w", err)
915+
// TODO(waltforme): Is serverPort always set correctly for launcher-based server-providing Pods upon unbinding?
916+
// E.g. What if requestingPod is deleted during a crash and restart of the dual-pods controller?
917+
// In order to find the port in this case, I think the best effort is to recompute hash for all InferenceServerConfig objects and try to match.
918+
if !launcherBased {
919+
if serverDat.NominalProvidingPod == nil {
920+
var err error
921+
_, serverPort, err = utils.GetInferenceServerPort(providingPod, false)
922+
if err != nil { // Impossible, because such a providingPod would never be created by this controller
923+
return fmt.Errorf("unable to put server to sleep because port not known: %w", err)
924+
}
908925
}
909926
}
910927
sleepURL := fmt.Sprintf("http://%s:%d/sleep", providingPod.Status.PodIP, serverPort)
@@ -947,6 +964,7 @@ func (ctl *controller) ensureUnbound(ctx context.Context, serverDat *serverData,
947964
logger.V(3).Info("Server-providing Pod remains unbound", "name", providingPod.Name, "resourceVersion", providingPod.ResourceVersion)
948965
}
949966
serverDat.ProvidingPodName = ""
967+
serverDat.ServerPort = -1
950968
return nil
951969
}
952970

pkg/controller/utils/pod-helper.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,10 @@ func removeVolumeMount(ctr *corev1.Container, volumeName string) {
8686
}
8787

8888
// GetInferenceServerPort, given a server-providing Pod and whether it is launcher-based,
89-
// returns (containerIndex int, port int16, err error)
89+
// returns (containerIndex int, inferenceServerPort int16, err error)
90+
// For direct server-providing pods, the inference server port is identified from readinessProbe.
91+
// For launcher-based server-providing pods, the inference server port can't be identified from the launcher pod,
92+
// so we return a dummy value -1.
9093
func GetInferenceServerPort(pod *corev1.Pod, launcherBased bool) (int, int16, error) {
9194
// identify the inference server container
9295
cIdx := slices.IndexFunc(pod.Spec.Containers, func(c corev1.Container) bool {
@@ -96,9 +99,9 @@ func GetInferenceServerPort(pod *corev1.Pod, launcherBased bool) (int, int16, er
9699
return 0, 0, fmt.Errorf("container %q not found", api.InferenceServerContainerName)
97100
}
98101

99-
// for launcher-based server-providing pod, the port is predefined
102+
// for launcher-based server-providing pod, return a dummy value
100103
if launcherBased {
101-
return cIdx, common.LauncherServicePort, nil
104+
return cIdx, -1, nil
102105
}
103106

104107
// for direct server-providing pod, identify the port from readinessProbe
@@ -167,7 +170,7 @@ func BuildLauncherPodFromTemplate(template corev1.PodTemplateSpec, ns, nodeName,
167170
}
168171
pod.Annotations = MapSet(pod.Annotations, string(common.LauncherConfigHashAnnotationKey), nominalHash)
169172

170-
cIdx, serverPort, err := GetInferenceServerPort(pod, true)
173+
cIdx, _, err := GetInferenceServerPort(pod, true)
171174
if err != nil {
172175
return nil, err
173176
}
@@ -181,7 +184,7 @@ func BuildLauncherPodFromTemplate(template corev1.PodTemplateSpec, ns, nodeName,
181184
ProbeHandler: corev1.ProbeHandler{
182185
HTTPGet: &corev1.HTTPGetAction{
183186
Path: "/health",
184-
Port: intstr.FromInt(int(serverPort)),
187+
Port: intstr.FromInt(common.LauncherServicePort),
185188
Scheme: corev1.URISchemeHTTP,
186189
},
187190
},
@@ -199,7 +202,7 @@ func BuildLauncherPodFromTemplate(template corev1.PodTemplateSpec, ns, nodeName,
199202
ProbeHandler: corev1.ProbeHandler{
200203
HTTPGet: &corev1.HTTPGetAction{
201204
Path: "/v2/vllm/instances",
202-
Port: intstr.FromInt(int(serverPort)),
205+
Port: intstr.FromInt(common.LauncherServicePort),
203206
Scheme: corev1.URISchemeHTTP,
204207
},
205208
},

0 commit comments

Comments
 (0)