Skip to content

Commit 976a361

Browse files
authored
Consider port when selecting launcher (llm-d-incubation#396)
* Consider port when selecting launcher * Correct the check for bound launcher * Try to fix test
1 parent 55a64b8 commit 976a361

File tree

3 files changed

+118
-5
lines changed

3 files changed

+118
-5
lines changed

pkg/controller/dual-pods/inference-server.go

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,7 @@ func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat *
490490
if err != nil {
491491
return fmt.Errorf("failed to configure inference server config: %w", err), true
492492
}
493+
desiredPort := isc.Spec.ModelServerConfig.Port
493494
logger.V(5).Info("Nominal hash of InferenceServerConfig", "hash", iscHash)
494495

495496
if len(launcherPodAnys) > 0 {
@@ -498,7 +499,7 @@ func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat *
498499
// then those with capacity for new instances.
499500
// Note that multiple vLLM instances could exist in one launcher Pod, but at most one instance could be awake at a time.
500501

501-
launcherPod, hasSleepingInstance, someNotReady, err := ctl.selectBestLauncherPod(ctx, launcherPodAnys, iscHash, int(lc.Spec.MaxSleepingInstances), nodeDat)
502+
launcherPod, hasSleepingInstance, someNotReady, err := ctl.selectBestLauncherPod(ctx, launcherPodAnys, iscHash, desiredPort, int(lc.Spec.MaxSleepingInstances), nodeDat)
502503
if err != nil {
503504
return err, true
504505
}
@@ -591,6 +592,7 @@ func (ctl *controller) selectBestLauncherPod(
591592
ctx context.Context,
592593
launcherPodAnys []interface{},
593594
iscHash string,
595+
desiredPort int32,
594596
maxOthers int,
595597
nodeDat *nodeData,
596598
) (*corev1.Pod, bool, bool, error) {
@@ -605,6 +607,11 @@ func (ctl *controller) selectBestLauncherPod(
605607
if launcherPod.Status.Phase == corev1.PodFailed || launcherPod.DeletionTimestamp != nil {
606608
continue
607609
}
610+
requesterParts := strings.Split(launcherPod.Annotations[requesterAnnotationKey], " ")
611+
if len(requesterParts) == 2 {
612+
logger.V(5).Info("Launcher Pod already bound to another requester, skipping", "name", launcherPod.Name, "boundRequester", requesterParts[1])
613+
continue
614+
}
608615

609616
// Track pods that are not ready yet - we should give them time instead of
610617
// failing and creating new launcher Pods immediately.
@@ -622,12 +629,33 @@ func (ctl *controller) selectBestLauncherPod(
622629

623630
// Check if this launcher has a sleeping instance matching the iscHash
624631
hasSleepingInstance := false
632+
hasPortConflict := false
625633
for _, inst := range insts.Instances {
634+
instPort, err := getVLLMInstancePort(inst.Options)
635+
if err != nil {
636+
logger.V(5).Info("Skipping launcher Pod because an instance has unparseable options",
637+
"name", launcherPod.Name,
638+
"instanceID", inst.InstanceID,
639+
"options", inst.Options,
640+
"err", err)
641+
hasPortConflict = true
642+
break
643+
}
644+
if instPort == desiredPort && inst.InstanceID != iscHash {
645+
logger.V(5).Info("Skipping launcher Pod because a different instance already uses the desired port",
646+
"name", launcherPod.Name,
647+
"instanceID", inst.InstanceID,
648+
"port", desiredPort)
649+
hasPortConflict = true
650+
break
651+
}
626652
if inst.InstanceID == iscHash {
627653
hasSleepingInstance = true
628-
break
629654
}
630655
}
656+
if hasPortConflict {
657+
continue
658+
}
631659
if hasSleepingInstance {
632660
// Priority 1: Found a sleeping instance
633661
logger.V(5).Info("Found launcher with sleeping instance (fastest path)",
@@ -693,6 +721,30 @@ func (ctl *controller) configInferenceServer(isc *fmav1alpha1.InferenceServerCon
693721
return &vllmCfg, nominalHash, nil
694722
}
695723

724+
func getVLLMInstancePort(options string) (int32, error) {
725+
parts := strings.Fields(options)
726+
for idx, part := range parts {
727+
if part == "--port" {
728+
if idx+1 >= len(parts) {
729+
return 0, fmt.Errorf("missing value for --port")
730+
}
731+
port, err := strconv.ParseInt(parts[idx+1], 10, 32)
732+
if err != nil {
733+
return 0, fmt.Errorf("parse --port value %q: %w", parts[idx+1], err)
734+
}
735+
return int32(port), nil
736+
}
737+
if value, ok := strings.CutPrefix(part, "--port="); ok {
738+
port, err := strconv.ParseInt(value, 10, 32)
739+
if err != nil {
740+
return 0, fmt.Errorf("parse --port value %q: %w", value, err)
741+
}
742+
return int32(port), nil
743+
}
744+
}
745+
return 0, fmt.Errorf("missing --port in options %q", options)
746+
}
747+
696748
func (ctl *controller) wakeupInstance(ctx context.Context, lClient *LauncherClient, instanceID string, instancePort int32) error {
697749
logger := klog.FromContext(ctx)
698750
endpoint := lClient.baseURL.Hostname() + ":" + strconv.Itoa(int(instancePort))

test/e2e/run-launcher-based.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,4 +167,5 @@ FMA_NAMESPACE=default \
167167
MKOBJS_SCRIPT=./test/e2e/mkobjs.sh \
168168
FMA_CHART_INSTANCE_NAME=fma \
169169
READY_TARGET=1 \
170+
E2E_PLATFORM=kind \
170171
./test/e2e/test-cases.sh

test/e2e/test-cases.sh

Lines changed: 63 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# FMA_CHART_INSTANCE_NAME - Helm release name prefix (default: fma)
1414
# READY_TARGET - minimum ready launchers before proceeding (default: 2)
1515
# POLICIES_ENABLED - "true"/"false"; auto-detected if unset
16+
# E2E_PLATFORM - "openshift" or "kind" (default: openshift)
1617
# POLL_LIMIT_SECS - polling timeout seconds (default: 600)
1718
# FMA_DEBUG - "true" to enable shell tracing (set -x)
1819

@@ -27,6 +28,7 @@ fi
2728
POLL_LIMIT_SECS="${POLL_LIMIT_SECS:-600}"
2829
READY_TARGET="${READY_TARGET:-2}"
2930
FMA_CHART_INSTANCE_NAME="${FMA_CHART_INSTANCE_NAME:-fma}"
31+
E2E_PLATFORM="${E2E_PLATFORM:-openshift}"
3032

3133
NS="$FMA_NAMESPACE"
3234

@@ -154,13 +156,71 @@ if [ "$POLICIES_ENABLED" = true ]; then
154156
cheer CEL policy checks passed
155157
fi
156158

157-
# TODO: stop skipping once Issues 387 and 388 are resolved
158-
if [ "$FMA_NAMESPACE" != debug ]; then
159-
echo "Skipping the remaining test cases because of Issues 387 and 388" >&2
159+
# TODO: stop skipping once Issues 387 is resolved
160+
if [ "$E2E_PLATFORM" = "openshift" ]; then
161+
echo "Skipping the remaining test cases on OpenShift because Issue 387 is not resolved there yet" >&2
160162
cheer All launcher-based tests that are currently expected to pass on OpenShift have done so
161163
exit 0
162164
fi
163165

166+
# ---------------------------------------------------------------------------
167+
# Same-Node Port Collision
168+
# ---------------------------------------------------------------------------
169+
170+
intro_case Same-Node Port Collision Creates New Launcher
171+
172+
collision_inst="${instlb}-collision"
173+
collision_rs="my-request-collision-$instlb"
174+
175+
kubectl get rs "$rslb" -n "$NS" -o json \
176+
| jq \
177+
--arg collision_rs "$collision_rs" \
178+
--arg collision_inst "$collision_inst" \
179+
--arg testnode "$testnode" \
180+
--arg isc "$isc" \
181+
'
182+
.metadata.name = $collision_rs |
183+
del(.metadata.uid, .metadata.resourceVersion, .metadata.creationTimestamp, .metadata.annotations, .metadata.ownerReferences, .status) |
184+
.spec.replicas = 1 |
185+
.spec.selector.matchLabels.instance = $collision_inst |
186+
.spec.template.metadata.labels.instance = $collision_inst |
187+
.spec.template.spec.nodeSelector = {"kubernetes.io/hostname": $testnode} |
188+
.spec.template.metadata.annotations["dual-pods.llm-d.ai/inference-server-config"] = $isc
189+
' \
190+
| kubectl apply -n "$NS" -f -
191+
192+
expect "kubectl get pods -n $NS -o name -l app=dp-example,instance=$collision_inst | wc -l | grep -w 1"
193+
194+
collision_req=$(kubectl get pods -n "$NS" -o name -l app=dp-example,instance=$collision_inst | sed s%pod/%%)
195+
echo "Collision requester Pod is $collision_req"
196+
197+
expect '[ "$(kubectl get pod -n '"$NS"' '"$collision_req"' -o jsonpath={.spec.nodeName})" == "'"$testnode"'" ]'
198+
expect "kubectl get pods -n $NS -o name -l dual-pods.llm-d.ai/dual=$collision_req | wc -l | grep -w 1"
199+
200+
collision_launcher=$(kubectl get pods -n "$NS" -o name -l dual-pods.llm-d.ai/dual=$collision_req | sed s%pod/%%)
201+
echo "Collision launcher Pod is $collision_launcher"
202+
203+
[ "$collision_launcher" != "$launcherlb" ]
204+
205+
expect '[ "$(kubectl get pod -n '"$NS"' '"$collision_req"' -o jsonpath={.metadata.labels.dual-pods\\.llm-d\\.ai/dual})" == "'"$collision_launcher"'" ]'
206+
207+
date
208+
kubectl wait --for condition=Ready pod/$collision_req -n "$NS" --timeout=120s
209+
[ "$(kubectl get pod $collision_launcher -n "$NS" -o jsonpath='{.status.conditions[?(@.type=="Ready")].status}')" = "True" ]
210+
211+
req_gpus=$(kubectl get pod "$reqlb" -n "$NS" -o jsonpath='{.metadata.annotations.dual-pods\.llm-d\.ai/accelerators}')
212+
collision_gpus=$(kubectl get pod "$collision_req" -n "$NS" -o jsonpath='{.metadata.annotations.dual-pods\.llm-d\.ai/accelerators}')
213+
[ -n "$req_gpus" ]
214+
[ -n "$collision_gpus" ]
215+
[ "$req_gpus" != "$collision_gpus" ]
216+
217+
kubectl delete rs "$collision_rs" -n "$NS" --wait=true
218+
expect "kubectl get pods -n $NS -o name -l app=dp-example,instance=$collision_inst | wc -l | grep -w 0"
219+
kubectl delete pod "$collision_launcher" -n "$NS" --wait=true
220+
expect '! kubectl get pods -n '"$NS"' -o name | grep -qw pod/'"$collision_launcher"
221+
222+
cheer Successful same-node collision handling
223+
164224
# ---------------------------------------------------------------------------
165225
# Instance Wake-up Fast Path
166226
# ---------------------------------------------------------------------------

0 commit comments

Comments
 (0)