diff --git a/pkg/controller/dual-pods/controller.go b/pkg/controller/dual-pods/controller.go index 76e27beb..eb78ddde 100644 --- a/pkg/controller/dual-pods/controller.go +++ b/pkg/controller/dual-pods/controller.go @@ -304,8 +304,9 @@ type serverData struct { GPUIndices []string GPUIndicesStr *string - ProvidingPodName string - InstanceID string // if provider launcher-based + ProvidingPodName string + InstanceID string // ISC hash; set when computed, independent of instance existence + InstanceKnownToExist bool // meaningful only for launcher-based providers ISCLabelKeys []string // keys of ISC labels applied to providingPod ISCAnnotationKeys []string // keys of ISC annotations applied to providingPod @@ -817,4 +818,3 @@ func (ctl *controller) clearServerData(nodeDat *nodeData, uid apitypes.UID) { defer ctl.mutex.Unlock() delete(nodeDat.InferenceServers, uid) } - diff --git a/pkg/controller/dual-pods/inference-server.go b/pkg/controller/dual-pods/inference-server.go index 99501b02..d3803f1d 100644 --- a/pkg/controller/dual-pods/inference-server.go +++ b/pkg/controller/dual-pods/inference-server.go @@ -42,8 +42,8 @@ import ( k8sruntime "k8s.io/apimachinery/pkg/runtime" k8sserializer "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/util/sets" - k8svalidation "k8s.io/apimachinery/pkg/util/validation" "k8s.io/apimachinery/pkg/util/strategicpatch" + k8svalidation "k8s.io/apimachinery/pkg/util/validation" "k8s.io/klog/v2" "k8s.io/utils/ptr" "sigs.k8s.io/yaml" @@ -70,9 +70,8 @@ type nodeItem struct { } type launcherSyncResult struct { - instances *AllInstancesState - deletedStoppedInstanceIDs sets.Set[string] - failedStoppedInstanceErrs map[string]error + instances *AllInstancesState + stoppedInstanceIDs sets.Set[string] // bound instances found stopped (not deleted by sync) } func (ni nodeItem) process(ctx context.Context, ctl *controller) (error, bool) { @@ -363,6 +362,19 @@ func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat * } } + var cfg *VllmConfig + var iscHash string + if launcherBased { + cfg, iscHash, err = ctl.configInferenceServer(isc, serverDat.GPUIDs) + if err != nil { + return fmt.Errorf("failed to configure inference server config: %w", err), true + } + if serverDat.InstanceID == "" { + serverDat.InstanceID = iscHash + serverDat.ServerPort = int16(isc.Spec.ModelServerConfig.Port) + } + } + // If there is already a bound server-providing Pod then ensure that it is awake, // ensure status reported, and relay readiness if needed. if providingPod != nil { @@ -394,50 +406,77 @@ func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat * return fmt.Errorf("unable to wake up server because port not known: %w", err), true } } - // For launcher-based providers, check whether the bound instance is still alive. - // The sidecar notifier updates the Pod annotation when instance status changes, - // which triggers reconciliation through the informer. - if launcherBased && serverDat.InstanceID != "" && providingPod.Status.PodIP != "" { + if launcherBased { + if providingPod.Status.PodIP == "" || !utils.IsPodReady(providingPod) { + logger.V(5).Info("Bound launcher pod not yet reachable, waiting", "podIP", providingPod.Status.PodIP, "ready", utils.IsPodReady(providingPod)) + return nil, false + } + syncResult, err, retry := ctl.syncLauncherInstances(ctx, nodeDat, providingPod) if err != nil || retry { if err != nil { return fmt.Errorf("failed to sync launcher instances for bound launcher Pod: %w", err), retry } + logger.V(5).Info("Launcher instance sync requested retry") return nil, true } _, instancePresent := findInstanceState(syncResult.instances.Instances, serverDat.InstanceID) - if delErr, failedCleanup := syncResult.failedStoppedInstanceErrs[serverDat.InstanceID]; failedCleanup { - return fmt.Errorf("failed to delete stopped instance %q from launcher: %w", serverDat.InstanceID, delErr), true - } - if _, deletedStopped := syncResult.deletedStoppedInstanceIDs[serverDat.InstanceID]; deletedStopped || !instancePresent { - if deletedStopped { - logger.V(2).Info("Deleted stopped bound instance from launcher during sync") - } else { - logger.V(2).Info("Bound instance not found in launcher after sync, treating as deleted") + _, instanceStopped := syncResult.stoppedInstanceIDs[serverDat.InstanceID] + + if instanceStopped || !instancePresent { + if instanceStopped || serverDat.InstanceKnownToExist { + // instanceStopped is an objective signal that the instance existed + // and died — no dependency on in-memory InstanceKnownToExist state. + // When !instancePresent && InstanceKnownToExist==true the instance vanished + // (e.g. launcher restart) — same treatment. + // Delete the requesting Pod first so the intent is durable in the + // Kubernetes API; the stopped vLLM instance is cleaned up by the + // next sync after the server data is removed. + if instanceStopped { + logger.V(2).Info("Bound instance found stopped on launcher") + } else { + logger.V(2).Info("Bound instance not found in launcher, treating as dead") + } + // Mark as sleeping so that ensureUnbound (called during requester deletion) + // does not attempt to POST /sleep on the dead instance. + serverDat.Sleeping = ptr.To(true) + err = podOps.Delete(ctx, requestingPod.Name, metav1.DeleteOptions{ + PropagationPolicy: ptr.To(metav1.DeletePropagationBackground), + Preconditions: &metav1.Preconditions{UID: &item.UID, ResourceVersion: &requestingPod.ResourceVersion}}) + if err == nil { + logger.V(2).Info("Requested deletion of server-requesting Pod because bound instance stopped") + } else if apierrors.IsGone(err) || apierrors.IsNotFound(err) { + logger.V(5).Info("The server-requesting Pod is already gone") + } else { + return fmt.Errorf("failed to delete server-requesting Pod for stopped instance: %w", err), true + } + serverDat.RequesterDeleteRequested = true + return nil, false } - // Mark as sleeping so that ensureUnbound (called during requester deletion) - // does not attempt to POST /sleep on the dead instance. - // The instance process is dead — this is not a real sleeping state, - // but it prevents ensureUnbound from hitting a dead endpoint and retrying forever. - serverDat.Sleeping = ptr.To(true) - // Delete the server-requesting Pod. - // This is analogous to the direct-provider case where a providing Pod's - // deletion is reflected to deletion of the server-requesting Pod. - // The ReplicaSet will recreate the requesting Pod, triggering a fresh bind. - err = podOps.Delete(ctx, requestingPod.Name, metav1.DeleteOptions{ - PropagationPolicy: ptr.To(metav1.DeletePropagationBackground), - Preconditions: &metav1.Preconditions{UID: &item.UID, ResourceVersion: &requestingPod.ResourceVersion}}) - if err == nil { - logger.V(2).Info("Requested deletion of server-requesting Pod because bound instance stopped") - } else if apierrors.IsGone(err) || apierrors.IsNotFound(err) { - logger.V(5).Info("The server-requesting Pod is already gone") - } else { - return fmt.Errorf("failed to delete server-requesting Pod for stopped instance: %w", err), true + // InstanceKnownToExist is false and instance is absent (not stopped) — + // not yet created (bind-first path) or controller restarted and lost tracking. + // We just synced, so we know the instance is not on the launcher — create directly. + launcherBaseURL := fmt.Sprintf("http://%s:%d", providingPod.Status.PodIP, ctlrcommon.LauncherServicePort) + lClient, err := NewLauncherClient(launcherBaseURL) + if err != nil { + return err, true + } + result, err := lClient.CreateNamedInstance(ctx, serverDat.InstanceID, *cfg) + if err != nil { + return fmt.Errorf("failed to create vLLM instance %q: %w", serverDat.InstanceID, err), true + } + serverDat.InstanceKnownToExist = true + launcherDat := ctl.getLauncherData(nodeDat, providingPod.Name) + launcherDat.Instances[serverDat.InstanceID] = time.Now() + logger.V(5).Info("Created vLLM instance", "instance_id", result.InstanceID, "status", result.Status) + // If ISC tracking annotations are missing (pre-bound pod), propagate the ISC metadata. + if _, propagated := providingPod.Annotations[iscLabelKeysAnnotationKey]; !propagated { + return ctl.bind(ctx, serverDat, requestingPod, providingPod, &serverDat.InstanceID, int16(isc.Spec.ModelServerConfig.Port), + isc.Spec.ModelServerConfig.Labels, isc.Spec.ModelServerConfig.Annotations, true) } - serverDat.RequesterDeleteRequested = true - return nil, false } + serverDat.InstanceKnownToExist = true } if serverDat.Sleeping == nil { sleeping, err := ctl.querySleeping(ctx, providingPod, serverPort) @@ -524,7 +563,7 @@ func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat * logger.V(2).Info("Unexpected: multiple sleeping Pods match; using the first", "requesterName", requestingPod.Name) } providingPod = sleepingAnys[0].(*corev1.Pod) - return ctl.bind(ctx, serverDat, requestingPod, providingPod, nil, -1, nil, nil) + return ctl.bind(ctx, serverDat, requestingPod, providingPod, nil, -1, nil, nil, false) } // What remains is to make a new server-providing Pod --- if the sleeper budget allows. @@ -574,10 +613,6 @@ func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat * return err, false } - cfg, iscHash, err := ctl.configInferenceServer(isc, serverDat.GPUIDs) - if err != nil { - return fmt.Errorf("failed to configure inference server config: %w", err), true - } desiredPort := isc.Spec.ModelServerConfig.Port logger.V(5).Info("Nominal hash of InferenceServerConfig", "hash", iscHash) @@ -599,43 +634,11 @@ func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat * logger.V(5).Info("No suitable launcher Pod found with sleeping instance or necessary capacity") // Fall through to create new launcher Pod } else { - logger.V(5).Info("Selected launcher Pod", "name", launcherPod.Name, "hasSleepingInstance", hasSleepingInstance) - launcherIP := launcherPod.Status.PodIP - if launcherIP == "" { - return fmt.Errorf("launcher Pod %q has no IP assigned yet", launcherPod.Name), true - } - - launcherBaseURL := fmt.Sprintf("http://%s:%d", launcherIP, ctlrcommon.LauncherServicePort) - lClient, err := NewLauncherClient(launcherBaseURL) - if err != nil { - return err, true - } - - launcherDat := ctl.getLauncherData(nodeDat, launcherPod.Name) - - if hasSleepingInstance { - // Fast path: wake up existing sleeping instance - logger.V(5).Info("Waking up existing vLLM instance", "iscHash", iscHash) - err := ctl.wakeupInstance(ctx, lClient, iscHash, isc.Spec.ModelServerConfig.Port) - if err != nil { - return fmt.Errorf("wake up vLLM instance: %w", err), true - } - launcherDat.Instances[iscHash] = time.Now() - return ctl.bind(ctx, serverDat, requestingPod, launcherPod, &iscHash, int16(isc.Spec.ModelServerConfig.Port), isc.Spec.ModelServerConfig.Labels, isc.Spec.ModelServerConfig.Annotations) - } else { - // Slower path: create new instance in launcher with capacity - logger.V(5).Info("Creating new vLLM instance", "iscHash", iscHash) - result, err := lClient.CreateNamedInstance(ctx, iscHash, *cfg) - if err != nil { - return fmt.Errorf("create vLLM instance: %w", err), true - } - logger.V(5).Info("Created new vLLM instance", - "instance_id", result.InstanceID, - "status", result.Status, - ) - launcherDat.Instances[iscHash] = time.Now() - return ctl.bind(ctx, serverDat, requestingPod, launcherPod, &iscHash, int16(isc.Spec.ModelServerConfig.Port), isc.Spec.ModelServerConfig.Labels, isc.Spec.ModelServerConfig.Annotations) - } + // Bind first, then rely on informer notification to trigger re-reconciliation. + // The "bound provider" path will handle instance creation/waking. + // This ensures the invariant: vllm awake implies provider Pod is bound. + logger.V(5).Info("Selected launcher Pod, binding first", "name", launcherPod.Name, "hasSleepingInstance", hasSleepingInstance) + return ctl.bind(ctx, serverDat, requestingPod, launcherPod, &iscHash, int16(isc.Spec.ModelServerConfig.Port), isc.Spec.ModelServerConfig.Labels, isc.Spec.ModelServerConfig.Annotations, true) } } // Remains: Zero matching launcher Pods, or the matching launcher Pod cannot host more instances to fulfill the request. @@ -647,6 +650,14 @@ func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat * } // Sleeper budget is met. Make a new launcher Pod. + // Bind at creation time so the launcher-populator cannot delete this pod + // while the vLLM instance is being set up. + desiredLauncherPod.Annotations = utils.MapSet(desiredLauncherPod.Annotations, requesterAnnotationKey, string(requestingPod.UID)+" "+requestingPod.Name) + desiredLauncherPod.Labels = utils.MapSet(desiredLauncherPod.Labels, api.DualLabelName, requestingPod.Name) + if !slices.Contains(desiredLauncherPod.Finalizers, providerFinalizer) { + desiredLauncherPod.Finalizers = append(desiredLauncherPod.Finalizers, providerFinalizer) + } + echo, err := podOps.Create(ctx, desiredLauncherPod, metav1.CreateOptions{}) if err != nil { errMsg := err.Error() @@ -825,17 +836,6 @@ func getVLLMInstancePort(inst InstanceState) (int32, error) { return 0, fmt.Errorf("missing annotations[%s]", VllmConfigInferencePortAnnotationKey) } -func (ctl *controller) wakeupInstance(ctx context.Context, lClient *LauncherClient, instanceID string, instancePort int32) error { - logger := klog.FromContext(ctx) - endpoint := lClient.baseURL.Hostname() + ":" + strconv.Itoa(int(instancePort)) - err := doPost("http://" + endpoint + "/wake_up") - if err != nil { - return fmt.Errorf("failed to wake up vLLM instance %q (at %s): %w", instanceID, endpoint, err) - } - logger.V(2).Info("Woke up vLLM instance", "instanceID", instanceID, "endpoint", endpoint) - return nil -} - func (ctl *controller) ensureSleepingLabel(ctx context.Context, providingPod *corev1.Pod, desired bool) error { logger := klog.FromContext(ctx) desiredStr := strconv.FormatBool(desired) @@ -930,7 +930,7 @@ func (ctl *controller) enforceSleeperBudget(ctx context.Context, serverDat *serv // Note: instPort is used only for launcher-based server-providing Pods. // instanceID is non-nil iff launcher-based -func (ctl *controller) bind(ctx context.Context, serverDat *serverData, requestingPod, providingPod *corev1.Pod, instanceID *string, instPort int16, iscLabels, iscAnnotations map[string]string) (error, bool) { +func (ctl *controller) bind(ctx context.Context, serverDat *serverData, requestingPod, providingPod *corev1.Pod, instanceID *string, instPort int16, iscLabels, iscAnnotations map[string]string, skipWake bool) (error, bool) { logger := klog.FromContext(ctx) providingPod = providingPod.DeepCopy() providingPod.Annotations = utils.MapSet(providingPod.Annotations, requesterAnnotationKey, string(requestingPod.UID)+" "+requestingPod.Name) @@ -1005,9 +1005,11 @@ func (ctl *controller) bind(ctx context.Context, serverDat *serverData, requesti return fmt.Errorf("unable to wake up server because port not known: %w", err), true } } - err = ctl.wakeSleeper(ctx, serverDat, requestingPod, providingPod, serverPort, "freshly-bound") - if err != nil { - return err, true + if !skipWake { + err = ctl.wakeSleeper(ctx, serverDat, requestingPod, providingPod, serverPort, "freshly-bound") + if err != nil { + return err, true + } } return ctl.ensureReqState(ctx, requestingPod, serverDat, !slices.Contains(requestingPod.Finalizers, requesterFinalizer), false) } @@ -1301,6 +1303,8 @@ func (ctl *controller) ensureUnbound(ctx context.Context, serverDat *serverData, } serverDat.ProvidingPodName = "" serverDat.ServerPort = -1 + serverDat.InstanceID = "" + serverDat.InstanceKnownToExist = false return nil } @@ -1669,26 +1673,37 @@ func (ctl *controller) syncLauncherInstances(ctx context.Context, nodeDat *nodeD } launcherDat := ctl.getLauncherData(nodeDat, launcherPod.Name) + + boundInstanceIDs := sets.New[string]() + for _, sd := range nodeDat.InferenceServers { + if sd.ProvidingPodName == launcherPod.Name && sd.InstanceID != "" { + boundInstanceIDs.Insert(sd.InstanceID) + } + } + newInstances := make(map[string]time.Time) remainingInstances := make([]InstanceState, 0, len(insts.Instances)) - deletedStoppedInstanceIDs := sets.New[string]() - failedStoppedInstanceErrs := map[string]error{} + stoppedInstanceIDs := sets.New[string]() runningCount := 0 for _, inst := range insts.Instances { if inst.Status == InstanceStatusStopped { - // Clean up stopped instances from the launcher. - _, delErr := lClient.DeleteInstance(ctx, inst.InstanceID) - if delErr != nil && !IsInstanceNotFoundError(delErr) { - logger.V(3).Info("Failed to delete stopped instance from launcher during sync", - "instanceID", inst.InstanceID, "err", delErr) - // Deletion failed — the instance still occupies a slot in the launcher. - failedStoppedInstanceErrs[inst.InstanceID] = delErr - } else { - logger.V(2).Info("Deleted stopped instance from launcher during sync", + if boundInstanceIDs.Has(inst.InstanceID) { + // Bound stopped instance — defer deletion so the caller can + // delete the requesting Pod first (resolves create/delete ambiguity). + stoppedInstanceIDs.Insert(inst.InstanceID) + logger.V(2).Info("Found stopped bound instance, deferring cleanup", "instanceID", inst.InstanceID) - deletedStoppedInstanceIDs.Insert(inst.InstanceID) - continue + } else { + _, delErr := lClient.DeleteInstance(ctx, inst.InstanceID) + if delErr != nil && !IsInstanceNotFoundError(delErr) { + logger.V(3).Info("Failed to delete stopped instance from launcher during sync", + "instanceID", inst.InstanceID, "err", delErr) + } else { + logger.V(2).Info("Deleted stopped instance from launcher during sync", + "instanceID", inst.InstanceID) + } } + continue } remainingInstances = append(remainingInstances, inst) if inst.Status == "running" { @@ -1717,9 +1732,8 @@ func (ctl *controller) syncLauncherInstances(ctx context.Context, nodeDat *nodeD "instanceCount", len(newInstances)) return &launcherSyncResult{ - instances: insts, - deletedStoppedInstanceIDs: deletedStoppedInstanceIDs, - failedStoppedInstanceErrs: failedStoppedInstanceErrs, + instances: insts, + stoppedInstanceIDs: stoppedInstanceIDs, }, nil, false }