From 2f10a9631ef52460c87bbfb011cc2270915b4548 Mon Sep 17 00:00:00 2001 From: Jun Duan Date: Wed, 1 Apr 2026 13:00:52 -0400 Subject: [PATCH 1/3] Consider port when selecting launcher --- pkg/controller/dual-pods/inference-server.go | 55 ++++++++++++++++- test/e2e/run-launcher-based.sh | 1 + test/e2e/test-cases.sh | 64 +++++++++++++++++++- 3 files changed, 115 insertions(+), 5 deletions(-) diff --git a/pkg/controller/dual-pods/inference-server.go b/pkg/controller/dual-pods/inference-server.go index 9f54c7bb..7af3297f 100644 --- a/pkg/controller/dual-pods/inference-server.go +++ b/pkg/controller/dual-pods/inference-server.go @@ -490,6 +490,7 @@ func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat * if err != nil { return fmt.Errorf("failed to configure inference server config: %w", err), true } + desiredPort := isc.Spec.ModelServerConfig.Port logger.V(5).Info("Nominal hash of InferenceServerConfig", "hash", iscHash) if len(launcherPodAnys) > 0 { @@ -498,7 +499,7 @@ func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat * // then those with capacity for new instances. // Note that multiple vLLM instances could exist in one launcher Pod, but at most one instance could be awake at a time. - launcherPod, hasSleepingInstance, someNotReady, err := ctl.selectBestLauncherPod(ctx, launcherPodAnys, iscHash, int(lc.Spec.MaxSleepingInstances), nodeDat) + launcherPod, hasSleepingInstance, someNotReady, err := ctl.selectBestLauncherPod(ctx, launcherPodAnys, iscHash, desiredPort, int(lc.Spec.MaxSleepingInstances), nodeDat) if err != nil { return err, true } @@ -591,6 +592,7 @@ func (ctl *controller) selectBestLauncherPod( ctx context.Context, launcherPodAnys []interface{}, iscHash string, + desiredPort int32, maxOthers int, nodeDat *nodeData, ) (*corev1.Pod, bool, bool, error) { @@ -605,6 +607,10 @@ func (ctl *controller) selectBestLauncherPod( if launcherPod.Status.Phase == corev1.PodFailed || launcherPod.DeletionTimestamp != nil { continue } + if launcherPod.Labels[api.DualLabelName] != "" { + logger.V(5).Info("Launcher Pod already bound to another requester, skipping", "name", launcherPod.Name, "boundRequester", launcherPod.Labels[api.DualLabelName]) + continue + } // Track pods that are not ready yet - we should give them time instead of // failing and creating new launcher Pods immediately. @@ -622,12 +628,33 @@ func (ctl *controller) selectBestLauncherPod( // Check if this launcher has a sleeping instance matching the iscHash hasSleepingInstance := false + hasPortConflict := false for _, inst := range insts.Instances { + instPort, err := getVLLMInstancePort(inst.Options) + if err != nil { + logger.V(5).Info("Skipping launcher Pod because an instance has unparseable options", + "name", launcherPod.Name, + "instanceID", inst.InstanceID, + "options", inst.Options, + "err", err) + hasPortConflict = true + break + } + if instPort == desiredPort && inst.InstanceID != iscHash { + logger.V(5).Info("Skipping launcher Pod because a different instance already uses the desired port", + "name", launcherPod.Name, + "instanceID", inst.InstanceID, + "port", desiredPort) + hasPortConflict = true + break + } if inst.InstanceID == iscHash { hasSleepingInstance = true - break } } + if hasPortConflict { + continue + } if hasSleepingInstance { // Priority 1: Found a sleeping instance logger.V(5).Info("Found launcher with sleeping instance (fastest path)", @@ -693,6 +720,30 @@ func (ctl *controller) configInferenceServer(isc *fmav1alpha1.InferenceServerCon return &vllmCfg, nominalHash, nil } +func getVLLMInstancePort(options string) (int32, error) { + parts := strings.Fields(options) + for idx, part := range parts { + if part == "--port" { + if idx+1 >= len(parts) { + return 0, fmt.Errorf("missing value for --port") + } + port, err := strconv.ParseInt(parts[idx+1], 10, 32) + if err != nil { + return 0, fmt.Errorf("parse --port value %q: %w", parts[idx+1], err) + } + return int32(port), nil + } + if value, ok := strings.CutPrefix(part, "--port="); ok { + port, err := strconv.ParseInt(value, 10, 32) + if err != nil { + return 0, fmt.Errorf("parse --port value %q: %w", value, err) + } + return int32(port), nil + } + } + return 0, fmt.Errorf("missing --port in options %q", options) +} + func (ctl *controller) wakeupInstance(ctx context.Context, lClient *LauncherClient, instanceID string, instancePort int32) error { logger := klog.FromContext(ctx) endpoint := lClient.baseURL.Hostname() + ":" + strconv.Itoa(int(instancePort)) diff --git a/test/e2e/run-launcher-based.sh b/test/e2e/run-launcher-based.sh index 6febfd08..5e3c5b92 100755 --- a/test/e2e/run-launcher-based.sh +++ b/test/e2e/run-launcher-based.sh @@ -167,4 +167,5 @@ FMA_NAMESPACE=default \ MKOBJS_SCRIPT=./test/e2e/mkobjs.sh \ FMA_CHART_INSTANCE_NAME=fma \ READY_TARGET=1 \ +E2E_PLATFORM=kind \ ./test/e2e/test-cases.sh diff --git a/test/e2e/test-cases.sh b/test/e2e/test-cases.sh index 488914cc..3dd345db 100755 --- a/test/e2e/test-cases.sh +++ b/test/e2e/test-cases.sh @@ -13,6 +13,7 @@ # FMA_CHART_INSTANCE_NAME - Helm release name prefix (default: fma) # READY_TARGET - minimum ready launchers before proceeding (default: 2) # POLICIES_ENABLED - "true"/"false"; auto-detected if unset +# E2E_PLATFORM - "openshift" or "kind" (default: openshift) # POLL_LIMIT_SECS - polling timeout seconds (default: 600) # FMA_DEBUG - "true" to enable shell tracing (set -x) @@ -27,6 +28,7 @@ fi POLL_LIMIT_SECS="${POLL_LIMIT_SECS:-600}" READY_TARGET="${READY_TARGET:-2}" FMA_CHART_INSTANCE_NAME="${FMA_CHART_INSTANCE_NAME:-fma}" +E2E_PLATFORM="${E2E_PLATFORM:-openshift}" NS="$FMA_NAMESPACE" @@ -154,13 +156,69 @@ if [ "$POLICIES_ENABLED" = true ]; then cheer CEL policy checks passed fi -# TODO: stop skipping once Issues 387 and 388 are resolved -if [ "$FMA_NAMESPACE" != debug ]; then - echo "Skipping the remaining test cases because of Issues 387 and 388" >&2 +# TODO: stop skipping once Issues 387 is resolved +if [ "$E2E_PLATFORM" = "openshift" ]; then + echo "Skipping the remaining test cases on OpenShift because Issue 387 is not resolved there yet" >&2 cheer All launcher-based tests that are currently expected to pass on OpenShift have done so exit 0 fi +# --------------------------------------------------------------------------- +# Same-Node Port Collision +# --------------------------------------------------------------------------- + +intro_case Same-Node Port Collision Creates New Launcher + +collision_inst="${instlb}-collision" +collision_rs="my-request-collision-$instlb" + +kubectl get rs "$rslb" -n "$NS" -o json \ + | jq \ + --arg collision_rs "$collision_rs" \ + --arg collision_inst "$collision_inst" \ + --arg testnode "$testnode" \ + --arg isc "$isc" \ + ' + .metadata.name = $collision_rs | + del(.metadata.uid, .metadata.resourceVersion, .metadata.creationTimestamp, .metadata.annotations, .metadata.ownerReferences, .status) | + .spec.replicas = 1 | + .spec.selector.matchLabels.instance = $collision_inst | + .spec.template.metadata.labels.instance = $collision_inst | + .spec.template.spec.nodeSelector = {"kubernetes.io/hostname": $testnode} | + .spec.template.metadata.annotations["dual-pods.llm-d.ai/inference-server-config"] = $isc + ' \ + | kubectl apply -n "$NS" -f - + +expect "kubectl get pods -n $NS -o name -l app=dp-example,instance=$collision_inst | wc -l | grep -w 1" + +collision_req=$(kubectl get pods -n "$NS" -o name -l app=dp-example,instance=$collision_inst | sed s%pod/%%) +echo "Collision requester Pod is $collision_req" + +expect '[ "$(kubectl get pod -n '"$NS"' '"$collision_req"' -o jsonpath={.spec.nodeName})" == "'"$testnode"'" ]' +expect "kubectl get pods -n $NS -o name -l dual-pods.llm-d.ai/dual=$collision_req | wc -l | grep -w 1" + +collision_launcher=$(kubectl get pods -n "$NS" -o name -l dual-pods.llm-d.ai/dual=$collision_req | sed s%pod/%%) +echo "Collision launcher Pod is $collision_launcher" + +[ "$collision_launcher" != "$launcherlb" ] + +expect '[ "$(kubectl get pod -n '"$NS"' '"$collision_req"' -o jsonpath={.metadata.labels.dual-pods\\.llm-d\\.ai/dual})" == "'"$collision_launcher"'" ]' + +date +kubectl wait --for condition=Ready pod/$collision_req -n "$NS" --timeout=120s +[ "$(kubectl get pod $collision_launcher -n "$NS" -o jsonpath='{.status.conditions[?(@.type=="Ready")].status}')" = "True" ] + +req_gpus=$(kubectl get pod "$reqlb" -n "$NS" -o jsonpath='{.metadata.annotations.dual-pods\.llm-d\.ai/accelerators}') +collision_gpus=$(kubectl get pod "$collision_req" -n "$NS" -o jsonpath='{.metadata.annotations.dual-pods\.llm-d\.ai/accelerators}') +[ -n "$req_gpus" ] +[ -n "$collision_gpus" ] +[ "$req_gpus" != "$collision_gpus" ] + +kubectl delete rs "$collision_rs" -n "$NS" --wait=true +expect "kubectl get pods -n $NS -o name -l app=dp-example,instance=$collision_inst | wc -l | grep -w 0" + +cheer Successful same-node collision handling + # --------------------------------------------------------------------------- # Instance Wake-up Fast Path # --------------------------------------------------------------------------- From 3f972366c42842044acd292aa0ecfc054afc8834 Mon Sep 17 00:00:00 2001 From: Jun Duan Date: Wed, 1 Apr 2026 13:50:38 -0400 Subject: [PATCH 2/3] Correct the check for bound launcher --- pkg/controller/dual-pods/inference-server.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/controller/dual-pods/inference-server.go b/pkg/controller/dual-pods/inference-server.go index 7af3297f..7a522db9 100644 --- a/pkg/controller/dual-pods/inference-server.go +++ b/pkg/controller/dual-pods/inference-server.go @@ -607,8 +607,9 @@ func (ctl *controller) selectBestLauncherPod( if launcherPod.Status.Phase == corev1.PodFailed || launcherPod.DeletionTimestamp != nil { continue } - if launcherPod.Labels[api.DualLabelName] != "" { - logger.V(5).Info("Launcher Pod already bound to another requester, skipping", "name", launcherPod.Name, "boundRequester", launcherPod.Labels[api.DualLabelName]) + requesterParts := strings.Split(launcherPod.Annotations[requesterAnnotationKey], " ") + if len(requesterParts) == 2 { + logger.V(5).Info("Launcher Pod already bound to another requester, skipping", "name", launcherPod.Name, "boundRequester", requesterParts[1]) continue } From 9c4f7c23bae607916a43774571ba50fb90bed67a Mon Sep 17 00:00:00 2001 From: Jun Duan Date: Wed, 1 Apr 2026 14:08:48 -0400 Subject: [PATCH 3/3] Try to fix test --- test/e2e/test-cases.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/e2e/test-cases.sh b/test/e2e/test-cases.sh index 3dd345db..8b33021e 100755 --- a/test/e2e/test-cases.sh +++ b/test/e2e/test-cases.sh @@ -216,6 +216,8 @@ collision_gpus=$(kubectl get pod "$collision_req" -n "$NS" -o jsonpath='{.metada kubectl delete rs "$collision_rs" -n "$NS" --wait=true expect "kubectl get pods -n $NS -o name -l app=dp-example,instance=$collision_inst | wc -l | grep -w 0" +kubectl delete pod "$collision_launcher" -n "$NS" --wait=true +expect '! kubectl get pods -n '"$NS"' -o name | grep -qw pod/'"$collision_launcher" cheer Successful same-node collision handling