Skip to content
Merged
128 changes: 112 additions & 16 deletions pkg/controller/dual-pods/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ func (config ControllerConfig) NewController(
inferenceServerConfigIndexName: inferenceServerConfigIndexFunc,
launcherConfigHashIndexName: launcherConfigHashIndexFunc,
requesterIndexName: requesterIndexFunc,
nodeNameIndexName: nodeNameIndexFunc,
nominalHashIndexName: nominalHashIndexFunc,
GPUIndexName: GPUIndexFunc})
if err != nil { //impossible
Expand Down Expand Up @@ -258,7 +259,7 @@ type nodeData struct {
InferenceServers map[apitypes.UID]*serverData

// Launchers maps name of launcher-based server-providing Pod to launcherData.
// Access only while holding controller mutex.
Copy link
Copy Markdown
Collaborator

@MikeSpreitzer MikeSpreitzer Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Include a statement about what does synchronize access. Perhaps something like the following.

// Access only inside `nodeItem.process()`

Similarly, every func (below nodeItem.process()) that (directly or in a called func) accesses this field should have a comment stating the restriction. Perhaps something like the following.

// Call this func only from within `nodeItem.process()`

// Access only inside the calling hierarchy that `nodeItem.process()` is the root caller.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"root" is not right, since it is not at the coldest end of the stack. Maybe something like the following?

Access only while nodeItem.process is on the call stack.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A subtree has a root as well.

I think the current expression and the suggested expression are equivalent.

Launchers map[string]*launcherData

// ItemsMutex may be acquired while holding controller mutex, not vice-versa.
Expand Down Expand Up @@ -330,34 +331,47 @@ type infSvrItem struct {
RequesterName string
}

type launcherPodItem struct {
LauncherPodName string
NodeName string
}

type infSvrItemType string

const (
// infSvrItemRequester is for a server-requesting Pod.
infSvrItemRequester infSvrItemType = "requester"
// infSvrItemBoundDirectProvider is for a server-providing Pod that
// is 'direct' (i.e. not launcher-based), and bound to a server-requesting Pod.
infSvrItemBoundDirectProvider infSvrItemType = "bound_direct_provider"
// infSvrItemLauncherBasedProvider is for a server-providing Pod that is launcher-based.
infSvrItemLauncherBasedProvider infSvrItemType = "launcher_based_provider"
// infSvrItemBoundProvider is for a server-providing Pod that
// is bound to a server-requesting Pod.
infSvrItemBoundProvider infSvrItemType = "bound_provider"
// infSvrItemUnboundLauncherBasedProvider is for a server-providing Pod that
// is launcher-based and not bound to any server-requesting Pods.
infSvrItemUnboundLauncherBasedProvider infSvrItemType = "unbound_launcher_based_provider"
// infSvrItemDontCare is not a real infSvrItemType but only a placeholder
// saying the corresponding infSvrItem is not relevant to the controller.
infSvrItemDontCare infSvrItemType = "dont_care"
)

// careAbout returns an infSvrItem and an infSvrItemType.
// The controller cares about server-requesting Pods, bound direct server-providing Pods, and launcher-based server-providing Pods.
// The controller cares about
// - server-requesting Pods,
// - bound server-providing Pods,
// - unbound launcher-based server-providing Pods.
// The controller doesn't care about unbound direct providers and other Pods.
func careAbout(pod *corev1.Pod) (item infSvrItem, it infSvrItemType) {
if len(pod.Annotations[api.ServerPatchAnnotationName]) > 0 || len(pod.Annotations[api.InferenceServerConfigAnnotationName]) > 0 {
return infSvrItem{pod.UID, pod.Name}, infSvrItemRequester
}
requesterStr := pod.Annotations[requesterAnnotationKey]
requesterParts := strings.Split(requesterStr, " ")
if len(requesterParts) != 2 {
return infSvrItem{}, infSvrItemDontCare
if len(requesterParts) == 2 {
return infSvrItem{apitypes.UID(requesterParts[0]), requesterParts[1]}, infSvrItemBoundProvider
}
return infSvrItem{apitypes.UID(requesterParts[0]), requesterParts[1]}, infSvrItemBoundDirectProvider
if _, hasLauncherLabel := pod.Labels[ctlrcommon.LauncherConfigNameLabelKey]; hasLauncherLabel {
// For an unbound launcher-based server-providing Pod, use the Pod's own UID and name
return infSvrItem{pod.UID, pod.Name}, infSvrItemUnboundLauncherBasedProvider
}
return infSvrItem{}, infSvrItemDontCare
}

const inferenceServerConfigIndexName = "inferenceserverconfig"
Expand Down Expand Up @@ -387,21 +401,43 @@ const requesterIndexName = "requester"
func requesterIndexFunc(obj any) ([]string, error) {
pod := obj.(*corev1.Pod)
item, it := careAbout(pod)
if it == infSvrItemBoundDirectProvider {
if it == infSvrItemBoundProvider {
return []string{string(item.UID)}, nil
}
return []string{}, nil
}

const nodeNameIndexName = "nodeName"

func nodeNameIndexFunc(obj any) ([]string, error) {
pod := obj.(*corev1.Pod)
if pod.Spec.NodeName == "" {
return []string{}, nil
}
return []string{pod.Spec.NodeName}, nil
}

func (ctl *controller) OnAdd(obj any, isInInitialList bool) {
switch typed := obj.(type) {
case *corev1.Pod:
if item, it := careAbout(typed); it == infSvrItemDontCare {
ctl.enqueueLogger.V(5).Info("Ignoring add of irrelevant Pod", "name", typed.Name)
return
} else if it == infSvrItemUnboundLauncherBasedProvider {
nodeName, err := getProviderNodeName(typed)
if err != nil {
ctl.enqueueLogger.Error(err, "Failed to determine node of launcher")
return
}
nd := ctl.getNodeData(nodeName)
launcherPodItem := launcherPodItem{LauncherPodName: typed.Name, NodeName: nodeName}
ctl.enqueueLogger.V(5).Info("Enqueuing launcher reference due to notification of add",
"nodeName", nodeName, "launcherPod", typed.Name, "isInInitialList", isInInitialList, "resourceVersion", typed.ResourceVersion)
nd.add(launcherPodItem)
ctl.Queue.Add(nodeItem{nodeName})
} else {
nodeName := typed.Spec.NodeName
if it == infSvrItemBoundDirectProvider || it == infSvrItemLauncherBasedProvider {
if it == infSvrItemBoundProvider {
var err error
nodeName, err = getProviderNodeName(typed)
if err != nil {
Expand Down Expand Up @@ -440,9 +476,21 @@ func (ctl *controller) OnUpdate(prev, obj any) {
if item, it := careAbout(typed); it == infSvrItemDontCare {
ctl.enqueueLogger.V(5).Info("Ignoring update of irrelevant Pod", "name", typed.Name)
return
} else if it == infSvrItemUnboundLauncherBasedProvider {
nodeName, err := getProviderNodeName(typed)
if err != nil {
ctl.enqueueLogger.Error(err, "Failed to determine node of launcher")
return
}
nd := ctl.getNodeData(nodeName)
launcherPodItem := launcherPodItem{LauncherPodName: typed.Name, NodeName: nodeName}
ctl.enqueueLogger.V(5).Info("Enqueuing launcher reference due to notification of update",
"nodeName", nodeName, "launcherPod", typed.Name, "resourceVersion", typed.ResourceVersion)
nd.add(launcherPodItem)
ctl.Queue.Add(nodeItem{nodeName})
} else {
nodeName := typed.Spec.NodeName
if it == infSvrItemBoundDirectProvider || it == infSvrItemLauncherBasedProvider {
if it == infSvrItemBoundProvider {
var err error
nodeName, err = getProviderNodeName(typed)
if err != nil {
Expand Down Expand Up @@ -484,9 +532,21 @@ func (ctl *controller) OnDelete(obj any) {
if item, it := careAbout(typed); it == infSvrItemDontCare {
ctl.enqueueLogger.V(5).Info("Ignoring delete of irrelevant Pod", "name", typed.Name)
return
} else if it == infSvrItemUnboundLauncherBasedProvider {
nodeName, err := getProviderNodeName(typed)
if err != nil {
ctl.enqueueLogger.Error(err, "Failed to determine node of launcher")
return
}
nd := ctl.getNodeData(nodeName)
launcherPodItem := launcherPodItem{LauncherPodName: typed.Name, NodeName: nodeName}
ctl.enqueueLogger.V(5).Info("Enqueuing launcher reference due to notification of delete",
"nodeName", nodeName, "launcherPod", typed.Name, "resourceVersion", typed.ResourceVersion)
nd.add(launcherPodItem)
ctl.Queue.Add(nodeItem{nodeName})
} else {
nodeName := typed.Spec.NodeName
if it == infSvrItemBoundDirectProvider || it == infSvrItemLauncherBasedProvider {
if it == infSvrItemBoundProvider {
var err error
nodeName, err = getProviderNodeName(typed)
if err != nil {
Expand Down Expand Up @@ -630,6 +690,40 @@ func (ctl *controller) enqueueRequestersByInferenceServerConfig(isc *fmav1alpha1
}
}

func (ctl *controller) enqueueUnboundInfSvrItemsOnNode(ctx context.Context, nodeName string, whyEnqueue string) {
logger := klog.FromContext(ctx)
nd := ctl.getNodeData(nodeName)
itemCount := 0
podObjs, err := ctl.podInformer.GetIndexer().ByIndex(nodeNameIndexName, nodeName)
if err != nil {
logger.Error(err, "Failed to list Pods by nodeName index", "nodeName", nodeName, "whyEnqueue", whyEnqueue)
return
}
for _, podObj := range podObjs {
pod := podObj.(*corev1.Pod)
item, it := careAbout(pod)
if it != infSvrItemRequester {
continue
}
// skip bound Inference Servers
// a podObj could be either a server-requesting Pod or a server-providing Pod
// but after the `it != infSvrItemRequester` check above, it must be a server-requesting Pod here, and we want to skip it if it's bound to a server-providing Pod
// we can use the controller's data to check whether it's bound or not
serverDat := ctl.getServerData(nd, pod.Name, pod.UID)
if serverDat.ProvidingPodName != "" {
continue
}
nd.add(item)
itemCount++
}
if itemCount == 0 {
logger.V(5).Info("No unbound infSvrItems to enqueue on node", "node", nodeName, "whyEnqueue", whyEnqueue)
return
}
logger.V(5).Info("Enqueuing unbound infSvrItems on node", "node", nodeName, "whyEnqueue", whyEnqueue, "itemCount", itemCount)
ctl.Queue.Add(nodeItem{nodeName})
}

func (ctl *controller) getNodeData(nodeName string) *nodeData {
ctl.mutex.Lock()
defer ctl.mutex.Unlock()
Expand Down Expand Up @@ -673,8 +767,6 @@ func (ctl *controller) getServerData(nodeDat *nodeData, reqName string, reqUID a
}

func (ctl *controller) getLauncherData(nodeDat *nodeData, launcherPodName string) *launcherData {
ctl.mutex.Lock()
defer ctl.mutex.Unlock()
ans := nodeDat.Launchers[launcherPodName]
if ans == nil {
ans = &launcherData{
Expand All @@ -685,6 +777,10 @@ func (ctl *controller) getLauncherData(nodeDat *nodeData, launcherPodName string
return ans
}

func (ctl *controller) clearLauncherData(nodeDat *nodeData, launcherPodName string) {
delete(nodeDat.Launchers, launcherPodName)
}

func (ctl *controller) clearServerData(nodeDat *nodeData, uid apitypes.UID) {
ctl.mutex.Lock()
defer ctl.mutex.Unlock()
Expand Down
97 changes: 82 additions & 15 deletions pkg/controller/dual-pods/inference-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,25 @@ func (ni nodeItem) process(ctx context.Context, ctl *controller) (error, bool) {
return nil, retries > 0
}

func (item launcherPodItem) process(ctx context.Context, ctl *controller, nodeDat *nodeData) (error, bool) {
logger := klog.FromContext(ctx).WithValues("launcherPod", item.LauncherPodName, "node", item.NodeName)
ctx = klog.NewContext(ctx, logger)

_, err := ctl.podLister.Pods(ctl.namespace).Get(item.LauncherPodName)
if err != nil {
if apierrors.IsNotFound(err) {
logger.V(2).Info("Launcher pod deleted, cleaning up launcher data")
ctl.clearLauncherData(nodeDat, item.LauncherPodName)
ctl.enqueueUnboundInfSvrItemsOnNode(ctx, item.NodeName, fmt.Sprintf("launcher pod %s deleted", item.LauncherPodName))
return nil, false
}
return err, true
}

ctl.enqueueUnboundInfSvrItemsOnNode(ctx, item.NodeName, fmt.Sprintf("launcher pod %s changed", item.LauncherPodName))
return nil, false
}

func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat *nodeData) (error, bool) {
logger := klog.FromContext(urCtx).WithValues("serverUID", item.UID, "requesterName", item.RequesterName)
ctx := klog.NewContext(urCtx, logger)
Expand Down Expand Up @@ -350,8 +369,13 @@ func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat *
if err != nil {
return err, true
}
// Relay readiness if not already done
// Relay readiness if not already done.
// For launcher-based providers, readiness follows the bound instance's
// sleeping state rather than the launcher's Pod readiness.
ready := utils.IsPodReady(providingPod)
if launcherBased {
ready = !*serverDat.Sleeping
}
if serverDat.ReadinessRelayed == nil || ready != *serverDat.ReadinessRelayed {
url, readiness := fmt.Sprintf("http://%s:%s", requestingPod.Status.PodIP, adminPort), ""
if ready {
Expand Down Expand Up @@ -588,24 +612,21 @@ func (ctl *controller) selectBestLauncherPod(
continue
}

launcherBaseURL := fmt.Sprintf("http://%s:%d", launcherPod.Status.PodIP, ctlrcommon.LauncherServicePort)
lClient, err := NewLauncherClient(launcherBaseURL)
if err != nil {
logger.V(5).Info("Failed to create launcher client, skipping Pod", "name", launcherPod.Name, "err", err)
continue
}

// Query instances from this launcher
insts, err := lClient.ListInstances(ctx)
if err != nil {
logger.V(5).Info("Failed to list instances from launcher, skipping Pod", "name", launcherPod.Name, "err", err)
insts, err, retry := ctl.syncLauncherInstances(ctx, nodeDat, launcherPod)
if err != nil || retry {
somePodsNotReady = true
continue
}

launcherDat := ctl.getLauncherData(nodeDat, launcherPod.Name)

// Check if this launcher has a sleeping instance matching the iscHash
if _, instExists := launcherDat.Instances[iscHash]; instExists {
hasSleepingInstance := false
for _, inst := range insts.Instances {
if inst.InstanceID == iscHash {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

launcherPodAnys contains all launchers made from the right LauncherConfig object, right? Including ones with an awake child whose InstanceID == iscHash, right?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not including an awake child because of the same logic as

// They have to be sleeping, the Kube scheduler and kubelet would not have assigned the same
// node/gpus to the requester if there was another one awake.

Copy link
Copy Markdown
Collaborator

@MikeSpreitzer MikeSpreitzer Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not exactly analogous. You cited logic for the direct (milestone 2) case, in which the index value is a hash that takes the node name and the GPU list into account. In that case it is reasonable to expect the mentioned Kube Pod scheduler behavior.

But here in the launcher-based case, the index is on the hash of the LauncherConfig augmented by the node name and "gpus=all". So this index is not so discriminating, and could include other existing launcher Pods with the right LauncherConfig and node but being used for an awake vllm instance using different GPU(s).

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Info for GPUs are hashed into iscHash, right?

Copy link
Copy Markdown
Collaborator

@MikeSpreitzer MikeSpreitzer Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The reason that I mentioned iscHash as well is that the two hashes work here together.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, right. If the iscHash matches then the GPU sets are the same and so the instance being considered must be sleeping.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this is tricky enough to warrant a comment too.

hasSleepingInstance = true
break
}
}
if hasSleepingInstance {
// Priority 1: Found a sleeping instance
logger.V(5).Info("Found launcher with sleeping instance (fastest path)",
"name", launcherPod.Name,
Expand Down Expand Up @@ -1237,6 +1258,52 @@ var coreScheme *k8sruntime.Scheme
var codecFactory k8sserializer.CodecFactory
var podDecoder k8sruntime.Decoder

// syncLauncherInstances queries the launcher pod for its current instances,
// updates the controller's internal launcherData state, and returns the fresh
// launcher response used for the update.
func (ctl *controller) syncLauncherInstances(ctx context.Context, nodeDat *nodeData, launcherPod *corev1.Pod) (*AllInstancesStatus, error, bool) {
logger := klog.FromContext(ctx)

if launcherPod.Status.PodIP == "" || !utils.IsPodReady(launcherPod) {
logger.V(5).Info("Launcher pod not ready yet, waiting for another Pod event", "name", launcherPod.Name)
return nil, nil, true
}

launcherBaseURL := fmt.Sprintf("http://%s:%d", launcherPod.Status.PodIP, ctlrcommon.LauncherServicePort)
lClient, err := NewLauncherClient(launcherBaseURL)
if err != nil {
logger.Error(err, "Failed to create launcher client")
return nil, err, true
}

insts, err := lClient.ListInstances(ctx)
if err != nil {
logger.Error(err, "Failed to list instances from launcher")
return nil, err, true
}

launcherDat := ctl.getLauncherData(nodeDat, launcherPod.Name)
newInstances := make(map[string]time.Time)
for _, inst := range insts.Instances {
if lastUsed, exists := launcherDat.Instances[inst.InstanceID]; exists {
newInstances[inst.InstanceID] = lastUsed
} else {
newInstances[inst.InstanceID] = time.Now()
}
}

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an awake instance evaporated then this should cause the relevant infSvrItem to be enqueued.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is tracked by #375.

launcherDat.Instances = newInstances
launcherDat.Accurate = true

logger.V(2).Info("Synced launcher instances",
"launcherPod", launcherPod.Name,
"totalInstances", insts.TotalInstances,
"runningInstances", insts.RunningInstances,
"instanceCount", len(newInstances))

return insts, nil, false
}

func init() {
coreScheme = k8sruntime.NewScheme()
err := corev1.AddToScheme(coreScheme)
Expand Down
6 changes: 4 additions & 2 deletions pkg/controller/utils/pod-helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,10 @@ func BuildLauncherPodFromTemplate(template corev1.PodTemplateSpec, ns, nodeName,
delete(pod.Spec.Overhead, corev1.ResourceName("nvidia.com/gpu"))
}

// Assign to specific node
pod.Spec.NodeName = nodeName
if pod.Spec.NodeSelector == nil {
pod.Spec.NodeSelector = make(map[string]string)
}
pod.Spec.NodeSelector["kubernetes.io/hostname"] = nodeName
return pod, nil
}

Expand Down
Loading
Loading