-
Notifications
You must be signed in to change notification settings - Fork 14
Consider port when selecting launcher #396
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -490,6 +490,7 @@ func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat * | |
| if err != nil { | ||
| return fmt.Errorf("failed to configure inference server config: %w", err), true | ||
| } | ||
| desiredPort := isc.Spec.ModelServerConfig.Port | ||
| logger.V(5).Info("Nominal hash of InferenceServerConfig", "hash", iscHash) | ||
|
|
||
| if len(launcherPodAnys) > 0 { | ||
|
|
@@ -498,7 +499,7 @@ func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat * | |
| // then those with capacity for new instances. | ||
| // Note that multiple vLLM instances could exist in one launcher Pod, but at most one instance could be awake at a time. | ||
|
|
||
| launcherPod, hasSleepingInstance, someNotReady, err := ctl.selectBestLauncherPod(ctx, launcherPodAnys, iscHash, int(lc.Spec.MaxSleepingInstances), nodeDat) | ||
| launcherPod, hasSleepingInstance, someNotReady, err := ctl.selectBestLauncherPod(ctx, launcherPodAnys, iscHash, desiredPort, int(lc.Spec.MaxSleepingInstances), nodeDat) | ||
| if err != nil { | ||
| return err, true | ||
| } | ||
|
|
@@ -591,6 +592,7 @@ func (ctl *controller) selectBestLauncherPod( | |
| ctx context.Context, | ||
| launcherPodAnys []interface{}, | ||
| iscHash string, | ||
| desiredPort int32, | ||
| maxOthers int, | ||
| nodeDat *nodeData, | ||
| ) (*corev1.Pod, bool, bool, error) { | ||
|
|
@@ -605,6 +607,10 @@ func (ctl *controller) selectBestLauncherPod( | |
| if launcherPod.Status.Phase == corev1.PodFailed || launcherPod.DeletionTimestamp != nil { | ||
| continue | ||
| } | ||
| if launcherPod.Labels[api.DualLabelName] != "" { | ||
| logger.V(5).Info("Launcher Pod already bound to another requester, skipping", "name", launcherPod.Name, "boundRequester", launcherPod.Labels[api.DualLabelName]) | ||
| continue | ||
| } | ||
|
|
||
| // Track pods that are not ready yet - we should give them time instead of | ||
| // failing and creating new launcher Pods immediately. | ||
|
|
@@ -622,12 +628,33 @@ func (ctl *controller) selectBestLauncherPod( | |
|
|
||
| // Check if this launcher has a sleeping instance matching the iscHash | ||
| hasSleepingInstance := false | ||
| hasPortConflict := false | ||
| for _, inst := range insts.Instances { | ||
| instPort, err := getVLLMInstancePort(inst.Options) | ||
| if err != nil { | ||
| logger.V(5).Info("Skipping launcher Pod because an instance has unparseable options", | ||
| "name", launcherPod.Name, | ||
| "instanceID", inst.InstanceID, | ||
| "options", inst.Options, | ||
| "err", err) | ||
| hasPortConflict = true | ||
| break | ||
| } | ||
| if instPort == desiredPort && inst.InstanceID != iscHash { | ||
| logger.V(5).Info("Skipping launcher Pod because a different instance already uses the desired port", | ||
| "name", launcherPod.Name, | ||
| "instanceID", inst.InstanceID, | ||
| "port", desiredPort) | ||
| hasPortConflict = true | ||
| break | ||
| } | ||
| if inst.InstanceID == iscHash { | ||
| hasSleepingInstance = true | ||
| break | ||
| } | ||
| } | ||
| if hasPortConflict { | ||
| continue | ||
| } | ||
| if hasSleepingInstance { | ||
| // Priority 1: Found a sleeping instance | ||
| logger.V(5).Info("Found launcher with sleeping instance (fastest path)", | ||
|
|
@@ -693,6 +720,30 @@ func (ctl *controller) configInferenceServer(isc *fmav1alpha1.InferenceServerCon | |
| return &vllmCfg, nominalHash, nil | ||
| } | ||
|
|
||
| func getVLLMInstancePort(options string) (int32, error) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| parts := strings.Fields(options) | ||
| for idx, part := range parts { | ||
| if part == "--port" { | ||
| if idx+1 >= len(parts) { | ||
| return 0, fmt.Errorf("missing value for --port") | ||
| } | ||
| port, err := strconv.ParseInt(parts[idx+1], 10, 32) | ||
| if err != nil { | ||
|
waltforme marked this conversation as resolved.
|
||
| return 0, fmt.Errorf("parse --port value %q: %w", parts[idx+1], err) | ||
| } | ||
| return int32(port), nil | ||
| } | ||
| if value, ok := strings.CutPrefix(part, "--port="); ok { | ||
| port, err := strconv.ParseInt(value, 10, 32) | ||
| if err != nil { | ||
| return 0, fmt.Errorf("parse --port value %q: %w", value, err) | ||
| } | ||
| return int32(port), nil | ||
| } | ||
| } | ||
| return 0, fmt.Errorf("missing --port in options %q", options) | ||
| } | ||
|
|
||
| func (ctl *controller) wakeupInstance(ctx context.Context, lClient *LauncherClient, instanceID string, instancePort int32) error { | ||
| logger := klog.FromContext(ctx) | ||
| endpoint := lClient.baseURL.Hostname() + ":" + strconv.Itoa(int(instancePort)) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,6 +13,7 @@ | |
| # FMA_CHART_INSTANCE_NAME - Helm release name prefix (default: fma) | ||
| # READY_TARGET - minimum ready launchers before proceeding (default: 2) | ||
| # POLICIES_ENABLED - "true"/"false"; auto-detected if unset | ||
| # E2E_PLATFORM - "openshift" or "kind" (default: openshift) | ||
| # POLL_LIMIT_SECS - polling timeout seconds (default: 600) | ||
| # FMA_DEBUG - "true" to enable shell tracing (set -x) | ||
|
|
||
|
|
@@ -27,6 +28,7 @@ fi | |
| POLL_LIMIT_SECS="${POLL_LIMIT_SECS:-600}" | ||
| READY_TARGET="${READY_TARGET:-2}" | ||
| FMA_CHART_INSTANCE_NAME="${FMA_CHART_INSTANCE_NAME:-fma}" | ||
| E2E_PLATFORM="${E2E_PLATFORM:-openshift}" | ||
|
|
||
| NS="$FMA_NAMESPACE" | ||
|
|
||
|
|
@@ -154,13 +156,69 @@ if [ "$POLICIES_ENABLED" = true ]; then | |
| cheer CEL policy checks passed | ||
| fi | ||
|
|
||
| # TODO: stop skipping once Issues 387 and 388 are resolved | ||
| if [ "$FMA_NAMESPACE" != debug ]; then | ||
| echo "Skipping the remaining test cases because of Issues 387 and 388" >&2 | ||
| # TODO: stop skipping once Issues 387 is resolved | ||
| if [ "$E2E_PLATFORM" = "openshift" ]; then | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doesn't this PR resolve Issue #387 ?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No. Another PR will address the GPU allocation on OpenShift using one of the proposed ideas in #387. |
||
| echo "Skipping the remaining test cases on OpenShift because Issue 387 is not resolved there yet" >&2 | ||
| cheer All launcher-based tests that are currently expected to pass on OpenShift have done so | ||
| exit 0 | ||
| fi | ||
|
|
||
| # --------------------------------------------------------------------------- | ||
| # Same-Node Port Collision | ||
| # --------------------------------------------------------------------------- | ||
|
|
||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test case needs comments. Either or both of (a) an outline for the test case as a whole and (b) comments on individual steps. The outline for the whole test case might be something like the following.
Comments on individual steps might be like in other test cases. |
||
| intro_case Same-Node Port Collision Creates New Launcher | ||
|
|
||
| collision_inst="${instlb}-collision" | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add this, collision_req, and collision_launcher to the things dumped in the EXIT trap. |
||
| collision_rs="my-request-collision-$instlb" | ||
|
|
||
| kubectl get rs "$rslb" -n "$NS" -o json \ | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This hackery is pretty complicated. It would be better if there were a simpler technique. I have been thinking that Why not just scale the existing ReplicaSet to 2?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Scaling to 2 was the first thing tried and failed. It makes the rest of the test nondeterministic. For example, after this test case and scaled the ReplicaSet back to 1, which pod stays (and which gets deleted) is uncertain. |
||
| | jq \ | ||
| --arg collision_rs "$collision_rs" \ | ||
| --arg collision_inst "$collision_inst" \ | ||
| --arg testnode "$testnode" \ | ||
| --arg isc "$isc" \ | ||
| ' | ||
| .metadata.name = $collision_rs | | ||
| del(.metadata.uid, .metadata.resourceVersion, .metadata.creationTimestamp, .metadata.annotations, .metadata.ownerReferences, .status) | | ||
| .spec.replicas = 1 | | ||
| .spec.selector.matchLabels.instance = $collision_inst | | ||
| .spec.template.metadata.labels.instance = $collision_inst | | ||
| .spec.template.spec.nodeSelector = {"kubernetes.io/hostname": $testnode} | | ||
| .spec.template.metadata.annotations["dual-pods.llm-d.ai/inference-server-config"] = $isc | ||
| ' \ | ||
| | kubectl apply -n "$NS" -f - | ||
|
|
||
| expect "kubectl get pods -n $NS -o name -l app=dp-example,instance=$collision_inst | wc -l | grep -w 1" | ||
|
|
||
| collision_req=$(kubectl get pods -n "$NS" -o name -l app=dp-example,instance=$collision_inst | sed s%pod/%%) | ||
| echo "Collision requester Pod is $collision_req" | ||
|
|
||
| expect '[ "$(kubectl get pod -n '"$NS"' '"$collision_req"' -o jsonpath={.spec.nodeName})" == "'"$testnode"'" ]' | ||
| expect "kubectl get pods -n $NS -o name -l dual-pods.llm-d.ai/dual=$collision_req | wc -l | grep -w 1" | ||
|
|
||
| collision_launcher=$(kubectl get pods -n "$NS" -o name -l dual-pods.llm-d.ai/dual=$collision_req | sed s%pod/%%) | ||
| echo "Collision launcher Pod is $collision_launcher" | ||
|
|
||
| [ "$collision_launcher" != "$launcherlb" ] | ||
|
|
||
| expect '[ "$(kubectl get pod -n '"$NS"' '"$collision_req"' -o jsonpath={.metadata.labels.dual-pods\\.llm-d\\.ai/dual})" == "'"$collision_launcher"'" ]' | ||
|
|
||
| date | ||
| kubectl wait --for condition=Ready pod/$collision_req -n "$NS" --timeout=120s | ||
| [ "$(kubectl get pod $collision_launcher -n "$NS" -o jsonpath='{.status.conditions[?(@.type=="Ready")].status}')" = "True" ] | ||
|
|
||
| req_gpus=$(kubectl get pod "$reqlb" -n "$NS" -o jsonpath='{.metadata.annotations.dual-pods\.llm-d\.ai/accelerators}') | ||
| collision_gpus=$(kubectl get pod "$collision_req" -n "$NS" -o jsonpath='{.metadata.annotations.dual-pods\.llm-d\.ai/accelerators}') | ||
| [ -n "$req_gpus" ] | ||
| [ -n "$collision_gpus" ] | ||
| [ "$req_gpus" != "$collision_gpus" ] | ||
|
|
||
| kubectl delete rs "$collision_rs" -n "$NS" --wait=true | ||
| expect "kubectl get pods -n $NS -o name -l app=dp-example,instance=$collision_inst | wc -l | grep -w 0" | ||
|
|
||
| cheer Successful same-node collision handling | ||
|
|
||
| # --------------------------------------------------------------------------- | ||
| # Instance Wake-up Fast Path | ||
| # --------------------------------------------------------------------------- | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.