Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/controller/dual-pods/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,9 @@ type serverData struct {
GPUIndices []string
GPUIndicesStr *string

ProvidingPodName string
InstanceID string // if provider launcher-based
ProvidingPodName string
InstanceID string // ISC hash; set when computed, independent of instance existence
InstanceKnownToExist bool // meaningful only for launcher-based providers
ISCLabelKeys []string // keys of ISC labels applied to providingPod
ISCAnnotationKeys []string // keys of ISC annotations applied to providingPod
Comment on lines 310 to 311
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, the code that recovers these in the case of controller crash and restart is duplicated (appears in two places). A later PR could refactor to remove the duplication.


Expand Down Expand Up @@ -817,4 +818,3 @@ func (ctl *controller) clearServerData(nodeDat *nodeData, uid apitypes.UID) {
defer ctl.mutex.Unlock()
delete(nodeDat.InferenceServers, uid)
}

232 changes: 123 additions & 109 deletions pkg/controller/dual-pods/inference-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ import (
k8sruntime "k8s.io/apimachinery/pkg/runtime"
k8sserializer "k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/sets"
k8svalidation "k8s.io/apimachinery/pkg/util/validation"
"k8s.io/apimachinery/pkg/util/strategicpatch"
k8svalidation "k8s.io/apimachinery/pkg/util/validation"
Copy link
Copy Markdown
Collaborator Author

@waltforme waltforme Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some linter suggested this reordering.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am surprised that this disorder even got merged. I thought that we had stuff both preventing and checking that. Worth an Issue.

If this fix were a separate PR then I would have already merged it.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened #447 for the tooling issue.

"k8s.io/klog/v2"
"k8s.io/utils/ptr"
"sigs.k8s.io/yaml"
Expand All @@ -70,9 +70,8 @@ type nodeItem struct {
}

type launcherSyncResult struct {
instances *AllInstancesState
deletedStoppedInstanceIDs sets.Set[string]
failedStoppedInstanceErrs map[string]error
instances *AllInstancesState
stoppedInstanceIDs sets.Set[string] // bound instances found stopped (not deleted by sync)
}

func (ni nodeItem) process(ctx context.Context, ctl *controller) (error, bool) {
Expand Down Expand Up @@ -363,6 +362,19 @@ func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat *
}
}

var cfg *VllmConfig
var iscHash string
if launcherBased {
cfg, iscHash, err = ctl.configInferenceServer(isc, serverDat.GPUIDs)
if err != nil {
return fmt.Errorf("failed to configure inference server config: %w", err), true
}
if serverDat.InstanceID == "" {
serverDat.InstanceID = iscHash
serverDat.ServerPort = int16(isc.Spec.ModelServerConfig.Port)
Comment on lines +373 to +374
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not save cfg in the serverDat too? That would reducing the calling of ctl.configInferenceServer from once per infSvItem.process invocation to once per inference server.

}
}

// If there is already a bound server-providing Pod then ensure that it is awake,
// ensure status reported, and relay readiness if needed.
if providingPod != nil {
Expand Down Expand Up @@ -394,50 +406,77 @@ func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat *
return fmt.Errorf("unable to wake up server because port not known: %w", err), true
}
}
// For launcher-based providers, check whether the bound instance is still alive.
// The sidecar notifier updates the Pod annotation when instance status changes,
// which triggers reconciliation through the informer.
if launcherBased && serverDat.InstanceID != "" && providingPod.Status.PodIP != "" {
if launcherBased {
if providingPod.Status.PodIP == "" || !utils.IsPodReady(providingPod) {
logger.V(5).Info("Bound launcher pod not yet reachable, waiting", "podIP", providingPod.Status.PodIP, "ready", utils.IsPodReady(providingPod))
return nil, false
Comment thread
MikeSpreitzer marked this conversation as resolved.
}

syncResult, err, retry := ctl.syncLauncherInstances(ctx, nodeDat, providingPod)
if err != nil || retry {
if err != nil {
return fmt.Errorf("failed to sync launcher instances for bound launcher Pod: %w", err), retry
}
logger.V(5).Info("Launcher instance sync requested retry")
return nil, true
}

_, instancePresent := findInstanceState(syncResult.instances.Instances, serverDat.InstanceID)
if delErr, failedCleanup := syncResult.failedStoppedInstanceErrs[serverDat.InstanceID]; failedCleanup {
return fmt.Errorf("failed to delete stopped instance %q from launcher: %w", serverDat.InstanceID, delErr), true
}
if _, deletedStopped := syncResult.deletedStoppedInstanceIDs[serverDat.InstanceID]; deletedStopped || !instancePresent {
if deletedStopped {
logger.V(2).Info("Deleted stopped bound instance from launcher during sync")
} else {
logger.V(2).Info("Bound instance not found in launcher after sync, treating as deleted")
_, instanceStopped := syncResult.stoppedInstanceIDs[serverDat.InstanceID]

if instanceStopped || !instancePresent {
if instanceStopped || serverDat.InstanceKnownToExist {
// instanceStopped is an objective signal that the instance existed
// and died — no dependency on in-memory InstanceKnownToExist state.
// When !instancePresent && InstanceKnownToExist==true the instance vanished
// (e.g. launcher restart) — same treatment.
// Delete the requesting Pod first so the intent is durable in the
// Kubernetes API; the stopped vLLM instance is cleaned up by the
// next sync after the server data is removed.
if instanceStopped {
logger.V(2).Info("Bound instance found stopped on launcher")
} else {
logger.V(2).Info("Bound instance not found in launcher, treating as dead")
}
// Mark as sleeping so that ensureUnbound (called during requester deletion)
// does not attempt to POST /sleep on the dead instance.
serverDat.Sleeping = ptr.To(true)
Copy link
Copy Markdown
Collaborator

@MikeSpreitzer MikeSpreitzer Apr 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a sign of cruftiness. What this is really trying to say is that this vllm instance no longer exists, and now there is a way to say that (serverDat.InstanceExists != nil && !(*serverDat.InstanceExists)). Can be addressed in a follow-on PR.

err = podOps.Delete(ctx, requestingPod.Name, metav1.DeleteOptions{
PropagationPolicy: ptr.To(metav1.DeletePropagationBackground),
Preconditions: &metav1.Preconditions{UID: &item.UID, ResourceVersion: &requestingPod.ResourceVersion}})
if err == nil {
logger.V(2).Info("Requested deletion of server-requesting Pod because bound instance stopped")
} else if apierrors.IsGone(err) || apierrors.IsNotFound(err) {
logger.V(5).Info("The server-requesting Pod is already gone")
} else {
return fmt.Errorf("failed to delete server-requesting Pod for stopped instance: %w", err), true
}
serverDat.RequesterDeleteRequested = true
return nil, false
}
// Mark as sleeping so that ensureUnbound (called during requester deletion)
// does not attempt to POST /sleep on the dead instance.
// The instance process is dead — this is not a real sleeping state,
// but it prevents ensureUnbound from hitting a dead endpoint and retrying forever.
serverDat.Sleeping = ptr.To(true)
// Delete the server-requesting Pod.
// This is analogous to the direct-provider case where a providing Pod's
// deletion is reflected to deletion of the server-requesting Pod.
// The ReplicaSet will recreate the requesting Pod, triggering a fresh bind.
err = podOps.Delete(ctx, requestingPod.Name, metav1.DeleteOptions{
PropagationPolicy: ptr.To(metav1.DeletePropagationBackground),
Preconditions: &metav1.Preconditions{UID: &item.UID, ResourceVersion: &requestingPod.ResourceVersion}})
if err == nil {
logger.V(2).Info("Requested deletion of server-requesting Pod because bound instance stopped")
} else if apierrors.IsGone(err) || apierrors.IsNotFound(err) {
logger.V(5).Info("The server-requesting Pod is already gone")
} else {
return fmt.Errorf("failed to delete server-requesting Pod for stopped instance: %w", err), true
// InstanceKnownToExist is false and instance is absent (not stopped) —
// not yet created (bind-first path) or controller restarted and lost tracking.
// We just synced, so we know the instance is not on the launcher — create directly.
launcherBaseURL := fmt.Sprintf("http://%s:%d", providingPod.Status.PodIP, ctlrcommon.LauncherServicePort)
lClient, err := NewLauncherClient(launcherBaseURL)
if err != nil {
return err, true
}
result, err := lClient.CreateNamedInstance(ctx, serverDat.InstanceID, *cfg)
if err != nil {
return fmt.Errorf("failed to create vLLM instance %q: %w", serverDat.InstanceID, err), true
}
serverDat.InstanceKnownToExist = true
launcherDat := ctl.getLauncherData(nodeDat, providingPod.Name)
launcherDat.Instances[serverDat.InstanceID] = time.Now()
logger.V(5).Info("Created vLLM instance", "instance_id", result.InstanceID, "status", result.Status)
// If ISC tracking annotations are missing (pre-bound pod), propagate the ISC metadata.
if _, propagated := providingPod.Annotations[iscLabelKeysAnnotationKey]; !propagated {
return ctl.bind(ctx, serverDat, requestingPod, providingPod, &serverDat.InstanceID, int16(isc.Spec.ModelServerConfig.Port),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since ctl.bind does more than binding, perhaps it should be renamed to something like bindAndPropagate.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be fair, it does more than "bind" and "propagate", so a truly good name would cover all of it.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, it took me some time to verify that all the other things that bind does can be skipped if propagated is false. This is part of what I mean about this code being "crufty". There are a lot of important things that are not written down. To be addressed in a later PR.

isc.Spec.ModelServerConfig.Labels, isc.Spec.ModelServerConfig.Annotations, true)
}
serverDat.RequesterDeleteRequested = true
return nil, false
}
serverDat.InstanceKnownToExist = true
}
if serverDat.Sleeping == nil {
sleeping, err := ctl.querySleeping(ctx, providingPod, serverPort)
Expand Down Expand Up @@ -524,7 +563,7 @@ func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat *
logger.V(2).Info("Unexpected: multiple sleeping Pods match; using the first", "requesterName", requestingPod.Name)
}
providingPod = sleepingAnys[0].(*corev1.Pod)
return ctl.bind(ctx, serverDat, requestingPod, providingPod, nil, -1, nil, nil)
return ctl.bind(ctx, serverDat, requestingPod, providingPod, nil, -1, nil, nil, false)
}
// What remains is to make a new server-providing Pod --- if the sleeper budget allows.

Expand Down Expand Up @@ -574,10 +613,6 @@ func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat *
return err, false
}

cfg, iscHash, err := ctl.configInferenceServer(isc, serverDat.GPUIDs)
if err != nil {
return fmt.Errorf("failed to configure inference server config: %w", err), true
}
desiredPort := isc.Spec.ModelServerConfig.Port
logger.V(5).Info("Nominal hash of InferenceServerConfig", "hash", iscHash)

Expand All @@ -599,43 +634,11 @@ func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat *
logger.V(5).Info("No suitable launcher Pod found with sleeping instance or necessary capacity")
// Fall through to create new launcher Pod
} else {
logger.V(5).Info("Selected launcher Pod", "name", launcherPod.Name, "hasSleepingInstance", hasSleepingInstance)
launcherIP := launcherPod.Status.PodIP
if launcherIP == "" {
return fmt.Errorf("launcher Pod %q has no IP assigned yet", launcherPod.Name), true
}

launcherBaseURL := fmt.Sprintf("http://%s:%d", launcherIP, ctlrcommon.LauncherServicePort)
lClient, err := NewLauncherClient(launcherBaseURL)
if err != nil {
return err, true
}

launcherDat := ctl.getLauncherData(nodeDat, launcherPod.Name)

if hasSleepingInstance {
// Fast path: wake up existing sleeping instance
logger.V(5).Info("Waking up existing vLLM instance", "iscHash", iscHash)
err := ctl.wakeupInstance(ctx, lClient, iscHash, isc.Spec.ModelServerConfig.Port)
if err != nil {
return fmt.Errorf("wake up vLLM instance: %w", err), true
}
launcherDat.Instances[iscHash] = time.Now()
return ctl.bind(ctx, serverDat, requestingPod, launcherPod, &iscHash, int16(isc.Spec.ModelServerConfig.Port), isc.Spec.ModelServerConfig.Labels, isc.Spec.ModelServerConfig.Annotations)
} else {
// Slower path: create new instance in launcher with capacity
logger.V(5).Info("Creating new vLLM instance", "iscHash", iscHash)
result, err := lClient.CreateNamedInstance(ctx, iscHash, *cfg)
if err != nil {
return fmt.Errorf("create vLLM instance: %w", err), true
}
logger.V(5).Info("Created new vLLM instance",
"instance_id", result.InstanceID,
"status", result.Status,
)
launcherDat.Instances[iscHash] = time.Now()
return ctl.bind(ctx, serverDat, requestingPod, launcherPod, &iscHash, int16(isc.Spec.ModelServerConfig.Port), isc.Spec.ModelServerConfig.Labels, isc.Spec.ModelServerConfig.Annotations)
}
// Bind first, then rely on informer notification to trigger re-reconciliation.
// The "bound provider" path will handle instance creation/waking.
// This ensures the invariant: vllm awake implies provider Pod is bound.
logger.V(5).Info("Selected launcher Pod, binding first", "name", launcherPod.Name, "hasSleepingInstance", hasSleepingInstance)
return ctl.bind(ctx, serverDat, requestingPod, launcherPod, &iscHash, int16(isc.Spec.ModelServerConfig.Port), isc.Spec.ModelServerConfig.Labels, isc.Spec.ModelServerConfig.Annotations, true)
}
}
// Remains: Zero matching launcher Pods, or the matching launcher Pod cannot host more instances to fulfill the request.
Expand All @@ -647,6 +650,14 @@ func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat *
}
// Sleeper budget is met. Make a new launcher Pod.

// Bind at creation time so the launcher-populator cannot delete this pod
// while the vLLM instance is being set up.
desiredLauncherPod.Annotations = utils.MapSet(desiredLauncherPod.Annotations, requesterAnnotationKey, string(requestingPod.UID)+" "+requestingPod.Name)
desiredLauncherPod.Labels = utils.MapSet(desiredLauncherPod.Labels, api.DualLabelName, requestingPod.Name)
Comment on lines +655 to +656
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, BuildLauncherPodFromTemplate ensures that desiredLauncherPod.Annotations and desiredLauncherPod.Labels are not nil, so these statements can use bare indexing instead of utils.MapSet.

if !slices.Contains(desiredLauncherPod.Finalizers, providerFinalizer) {
desiredLauncherPod.Finalizers = append(desiredLauncherPod.Finalizers, providerFinalizer)
}

Comment thread
MikeSpreitzer marked this conversation as resolved.
echo, err := podOps.Create(ctx, desiredLauncherPod, metav1.CreateOptions{})
if err != nil {
errMsg := err.Error()
Expand Down Expand Up @@ -825,17 +836,6 @@ func getVLLMInstancePort(inst InstanceState) (int32, error) {
return 0, fmt.Errorf("missing annotations[%s]", VllmConfigInferencePortAnnotationKey)
}

func (ctl *controller) wakeupInstance(ctx context.Context, lClient *LauncherClient, instanceID string, instancePort int32) error {
logger := klog.FromContext(ctx)
endpoint := lClient.baseURL.Hostname() + ":" + strconv.Itoa(int(instancePort))
err := doPost("http://" + endpoint + "/wake_up")
if err != nil {
return fmt.Errorf("failed to wake up vLLM instance %q (at %s): %w", instanceID, endpoint, err)
}
logger.V(2).Info("Woke up vLLM instance", "instanceID", instanceID, "endpoint", endpoint)
return nil
}

func (ctl *controller) ensureSleepingLabel(ctx context.Context, providingPod *corev1.Pod, desired bool) error {
logger := klog.FromContext(ctx)
desiredStr := strconv.FormatBool(desired)
Expand Down Expand Up @@ -930,7 +930,7 @@ func (ctl *controller) enforceSleeperBudget(ctx context.Context, serverDat *serv

// Note: instPort is used only for launcher-based server-providing Pods.
// instanceID is non-nil iff launcher-based
func (ctl *controller) bind(ctx context.Context, serverDat *serverData, requestingPod, providingPod *corev1.Pod, instanceID *string, instPort int16, iscLabels, iscAnnotations map[string]string) (error, bool) {
func (ctl *controller) bind(ctx context.Context, serverDat *serverData, requestingPod, providingPod *corev1.Pod, instanceID *string, instPort int16, iscLabels, iscAnnotations map[string]string, skipWake bool) (error, bool) {
logger := klog.FromContext(ctx)
providingPod = providingPod.DeepCopy()
providingPod.Annotations = utils.MapSet(providingPod.Annotations, requesterAnnotationKey, string(requestingPod.UID)+" "+requestingPod.Name)
Expand Down Expand Up @@ -1005,9 +1005,11 @@ func (ctl *controller) bind(ctx context.Context, serverDat *serverData, requesti
return fmt.Errorf("unable to wake up server because port not known: %w", err), true
}
}
err = ctl.wakeSleeper(ctx, serverDat, requestingPod, providingPod, serverPort, "freshly-bound")
if err != nil {
return err, true
if !skipWake {
err = ctl.wakeSleeper(ctx, serverDat, requestingPod, providingPod, serverPort, "freshly-bound")
if err != nil {
return err, true
}
}
return ctl.ensureReqState(ctx, requestingPod, serverDat, !slices.Contains(requestingPod.Finalizers, requesterFinalizer), false)
}
Expand Down Expand Up @@ -1301,6 +1303,8 @@ func (ctl *controller) ensureUnbound(ctx context.Context, serverDat *serverData,
}
serverDat.ProvidingPodName = ""
serverDat.ServerPort = -1
serverDat.InstanceID = ""
serverDat.InstanceKnownToExist = false
return nil
}

Expand Down Expand Up @@ -1669,26 +1673,37 @@ func (ctl *controller) syncLauncherInstances(ctx context.Context, nodeDat *nodeD
}

launcherDat := ctl.getLauncherData(nodeDat, launcherPod.Name)

boundInstanceIDs := sets.New[string]()
for _, sd := range nodeDat.InferenceServers {
if sd.ProvidingPodName == launcherPod.Name && sd.InstanceID != "" {
boundInstanceIDs.Insert(sd.InstanceID)
}
}

newInstances := make(map[string]time.Time)
remainingInstances := make([]InstanceState, 0, len(insts.Instances))
deletedStoppedInstanceIDs := sets.New[string]()
failedStoppedInstanceErrs := map[string]error{}
stoppedInstanceIDs := sets.New[string]()
runningCount := 0
for _, inst := range insts.Instances {
if inst.Status == InstanceStatusStopped {
// Clean up stopped instances from the launcher.
_, delErr := lClient.DeleteInstance(ctx, inst.InstanceID)
if delErr != nil && !IsInstanceNotFoundError(delErr) {
logger.V(3).Info("Failed to delete stopped instance from launcher during sync",
"instanceID", inst.InstanceID, "err", delErr)
// Deletion failed — the instance still occupies a slot in the launcher.
failedStoppedInstanceErrs[inst.InstanceID] = delErr
} else {
logger.V(2).Info("Deleted stopped instance from launcher during sync",
if boundInstanceIDs.Has(inst.InstanceID) {
// Bound stopped instance — defer deletion so the caller can
// delete the requesting Pod first (resolves create/delete ambiguity).
stoppedInstanceIDs.Insert(inst.InstanceID)
logger.V(2).Info("Found stopped bound instance, deferring cleanup",
"instanceID", inst.InstanceID)
deletedStoppedInstanceIDs.Insert(inst.InstanceID)
continue
} else {
_, delErr := lClient.DeleteInstance(ctx, inst.InstanceID)
if delErr != nil && !IsInstanceNotFoundError(delErr) {
logger.V(3).Info("Failed to delete stopped instance from launcher during sync",
Copy link
Copy Markdown
Collaborator

@MikeSpreitzer MikeSpreitzer Apr 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps in this case there should be a retry, so that a garbage instance is not left behind.

"instanceID", inst.InstanceID, "err", delErr)
} else {
logger.V(2).Info("Deleted stopped instance from launcher during sync",
"instanceID", inst.InstanceID)
}
}
continue
}
remainingInstances = append(remainingInstances, inst)
if inst.Status == "running" {
Expand Down Expand Up @@ -1717,9 +1732,8 @@ func (ctl *controller) syncLauncherInstances(ctx context.Context, nodeDat *nodeD
"instanceCount", len(newInstances))

return &launcherSyncResult{
instances: insts,
deletedStoppedInstanceIDs: deletedStoppedInstanceIDs,
failedStoppedInstanceErrs: failedStoppedInstanceErrs,
instances: insts,
stoppedInstanceIDs: stoppedInstanceIDs,
}, nil, false
}

Expand Down
Loading