Skip to content

Commit 5570d18

Browse files
authored
Sync unbound launcher-based server-providing pods (#362)
* Sync unbound launcher-based server-providing pods Signed-off-by: Jun Duan <jun.duan.phd@outlook.com> * Improve launcher pod synchronization Signed-off-by: Jun Duan <jun.duan.phd@outlook.com> * Remove unnecessary synchronization for nodeData.Launchers Signed-off-by: Jun Duan <jun.duan.phd@outlook.com> * Use NodeSelector instead of NodeName when constructing launcher pods Signed-off-by: Jun Duan <jun.duan.phd@outlook.com> * Make careAbout cleaner Signed-off-by: Jun Duan <jun.duan.phd@outlook.com> * Various improvments to the syncing of unbound launchers Signed-off-by: Jun Duan <jun.duan.phd@outlook.com> * Improve selectBestLauncherPod Signed-off-by: Jun Duan <jun.duan.phd@outlook.com> * Improve the launcher-based e2e test Signed-off-by: Jun Duan <jun.duan.phd@outlook.com> * Revise const values and improve logging Signed-off-by: Jun Duan <jun.duan.phd@outlook.com> --------- Signed-off-by: Jun Duan <jun.duan.phd@outlook.com>
1 parent 02705aa commit 5570d18

File tree

4 files changed

+289
-37
lines changed

4 files changed

+289
-37
lines changed

pkg/controller/dual-pods/controller.go

Lines changed: 112 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ func (config ControllerConfig) NewController(
178178
inferenceServerConfigIndexName: inferenceServerConfigIndexFunc,
179179
launcherConfigHashIndexName: launcherConfigHashIndexFunc,
180180
requesterIndexName: requesterIndexFunc,
181+
nodeNameIndexName: nodeNameIndexFunc,
181182
nominalHashIndexName: nominalHashIndexFunc,
182183
GPUIndexName: GPUIndexFunc})
183184
if err != nil { //impossible
@@ -258,7 +259,7 @@ type nodeData struct {
258259
InferenceServers map[apitypes.UID]*serverData
259260

260261
// Launchers maps name of launcher-based server-providing Pod to launcherData.
261-
// Access only while holding controller mutex.
262+
// Access only inside the calling hierarchy that `nodeItem.process()` is the root caller.
262263
Launchers map[string]*launcherData
263264

264265
// ItemsMutex may be acquired while holding controller mutex, not vice-versa.
@@ -330,34 +331,47 @@ type infSvrItem struct {
330331
RequesterName string
331332
}
332333

334+
type launcherPodItem struct {
335+
LauncherPodName string
336+
NodeName string
337+
}
338+
333339
type infSvrItemType string
334340

335341
const (
336342
// infSvrItemRequester is for a server-requesting Pod.
337343
infSvrItemRequester infSvrItemType = "requester"
338-
// infSvrItemBoundDirectProvider is for a server-providing Pod that
339-
// is 'direct' (i.e. not launcher-based), and bound to a server-requesting Pod.
340-
infSvrItemBoundDirectProvider infSvrItemType = "bound_direct_provider"
341-
// infSvrItemLauncherBasedProvider is for a server-providing Pod that is launcher-based.
342-
infSvrItemLauncherBasedProvider infSvrItemType = "launcher_based_provider"
344+
// infSvrItemBoundProvider is for a server-providing Pod that
345+
// is bound to a server-requesting Pod.
346+
infSvrItemBoundProvider infSvrItemType = "bound_provider"
347+
// infSvrItemUnboundLauncherBasedProvider is for a server-providing Pod that
348+
// is launcher-based and not bound to any server-requesting Pods.
349+
infSvrItemUnboundLauncherBasedProvider infSvrItemType = "unbound_launcher_based_provider"
343350
// infSvrItemDontCare is not a real infSvrItemType but only a placeholder
344351
// saying the corresponding infSvrItem is not relevant to the controller.
345352
infSvrItemDontCare infSvrItemType = "dont_care"
346353
)
347354

348355
// careAbout returns an infSvrItem and an infSvrItemType.
349-
// The controller cares about server-requesting Pods, bound direct server-providing Pods, and launcher-based server-providing Pods.
356+
// The controller cares about
357+
// - server-requesting Pods,
358+
// - bound server-providing Pods,
359+
// - unbound launcher-based server-providing Pods.
350360
// The controller doesn't care about unbound direct providers and other Pods.
351361
func careAbout(pod *corev1.Pod) (item infSvrItem, it infSvrItemType) {
352362
if len(pod.Annotations[api.ServerPatchAnnotationName]) > 0 || len(pod.Annotations[api.InferenceServerConfigAnnotationName]) > 0 {
353363
return infSvrItem{pod.UID, pod.Name}, infSvrItemRequester
354364
}
355365
requesterStr := pod.Annotations[requesterAnnotationKey]
356366
requesterParts := strings.Split(requesterStr, " ")
357-
if len(requesterParts) != 2 {
358-
return infSvrItem{}, infSvrItemDontCare
367+
if len(requesterParts) == 2 {
368+
return infSvrItem{apitypes.UID(requesterParts[0]), requesterParts[1]}, infSvrItemBoundProvider
359369
}
360-
return infSvrItem{apitypes.UID(requesterParts[0]), requesterParts[1]}, infSvrItemBoundDirectProvider
370+
if _, hasLauncherLabel := pod.Labels[ctlrcommon.LauncherConfigNameLabelKey]; hasLauncherLabel {
371+
// For an unbound launcher-based server-providing Pod, use the Pod's own UID and name
372+
return infSvrItem{pod.UID, pod.Name}, infSvrItemUnboundLauncherBasedProvider
373+
}
374+
return infSvrItem{}, infSvrItemDontCare
361375
}
362376

363377
const inferenceServerConfigIndexName = "inferenceserverconfig"
@@ -387,21 +401,43 @@ const requesterIndexName = "requester"
387401
func requesterIndexFunc(obj any) ([]string, error) {
388402
pod := obj.(*corev1.Pod)
389403
item, it := careAbout(pod)
390-
if it == infSvrItemBoundDirectProvider {
404+
if it == infSvrItemBoundProvider {
391405
return []string{string(item.UID)}, nil
392406
}
393407
return []string{}, nil
394408
}
395409

410+
const nodeNameIndexName = "nodeName"
411+
412+
func nodeNameIndexFunc(obj any) ([]string, error) {
413+
pod := obj.(*corev1.Pod)
414+
if pod.Spec.NodeName == "" {
415+
return []string{}, nil
416+
}
417+
return []string{pod.Spec.NodeName}, nil
418+
}
419+
396420
func (ctl *controller) OnAdd(obj any, isInInitialList bool) {
397421
switch typed := obj.(type) {
398422
case *corev1.Pod:
399423
if item, it := careAbout(typed); it == infSvrItemDontCare {
400424
ctl.enqueueLogger.V(5).Info("Ignoring add of irrelevant Pod", "name", typed.Name)
401425
return
426+
} else if it == infSvrItemUnboundLauncherBasedProvider {
427+
nodeName, err := getProviderNodeName(typed)
428+
if err != nil {
429+
ctl.enqueueLogger.Error(err, "Failed to determine node of launcher")
430+
return
431+
}
432+
nd := ctl.getNodeData(nodeName)
433+
launcherPodItem := launcherPodItem{LauncherPodName: typed.Name, NodeName: nodeName}
434+
ctl.enqueueLogger.V(5).Info("Enqueuing launcher reference due to notification of add",
435+
"nodeName", nodeName, "launcherPod", typed.Name, "isInInitialList", isInInitialList, "resourceVersion", typed.ResourceVersion)
436+
nd.add(launcherPodItem)
437+
ctl.Queue.Add(nodeItem{nodeName})
402438
} else {
403439
nodeName := typed.Spec.NodeName
404-
if it == infSvrItemBoundDirectProvider || it == infSvrItemLauncherBasedProvider {
440+
if it == infSvrItemBoundProvider {
405441
var err error
406442
nodeName, err = getProviderNodeName(typed)
407443
if err != nil {
@@ -440,9 +476,21 @@ func (ctl *controller) OnUpdate(prev, obj any) {
440476
if item, it := careAbout(typed); it == infSvrItemDontCare {
441477
ctl.enqueueLogger.V(5).Info("Ignoring update of irrelevant Pod", "name", typed.Name)
442478
return
479+
} else if it == infSvrItemUnboundLauncherBasedProvider {
480+
nodeName, err := getProviderNodeName(typed)
481+
if err != nil {
482+
ctl.enqueueLogger.Error(err, "Failed to determine node of launcher")
483+
return
484+
}
485+
nd := ctl.getNodeData(nodeName)
486+
launcherPodItem := launcherPodItem{LauncherPodName: typed.Name, NodeName: nodeName}
487+
ctl.enqueueLogger.V(5).Info("Enqueuing launcher reference due to notification of update",
488+
"nodeName", nodeName, "launcherPod", typed.Name, "resourceVersion", typed.ResourceVersion)
489+
nd.add(launcherPodItem)
490+
ctl.Queue.Add(nodeItem{nodeName})
443491
} else {
444492
nodeName := typed.Spec.NodeName
445-
if it == infSvrItemBoundDirectProvider || it == infSvrItemLauncherBasedProvider {
493+
if it == infSvrItemBoundProvider {
446494
var err error
447495
nodeName, err = getProviderNodeName(typed)
448496
if err != nil {
@@ -484,9 +532,21 @@ func (ctl *controller) OnDelete(obj any) {
484532
if item, it := careAbout(typed); it == infSvrItemDontCare {
485533
ctl.enqueueLogger.V(5).Info("Ignoring delete of irrelevant Pod", "name", typed.Name)
486534
return
535+
} else if it == infSvrItemUnboundLauncherBasedProvider {
536+
nodeName, err := getProviderNodeName(typed)
537+
if err != nil {
538+
ctl.enqueueLogger.Error(err, "Failed to determine node of launcher")
539+
return
540+
}
541+
nd := ctl.getNodeData(nodeName)
542+
launcherPodItem := launcherPodItem{LauncherPodName: typed.Name, NodeName: nodeName}
543+
ctl.enqueueLogger.V(5).Info("Enqueuing launcher reference due to notification of delete",
544+
"nodeName", nodeName, "launcherPod", typed.Name, "resourceVersion", typed.ResourceVersion)
545+
nd.add(launcherPodItem)
546+
ctl.Queue.Add(nodeItem{nodeName})
487547
} else {
488548
nodeName := typed.Spec.NodeName
489-
if it == infSvrItemBoundDirectProvider || it == infSvrItemLauncherBasedProvider {
549+
if it == infSvrItemBoundProvider {
490550
var err error
491551
nodeName, err = getProviderNodeName(typed)
492552
if err != nil {
@@ -630,6 +690,40 @@ func (ctl *controller) enqueueRequestersByInferenceServerConfig(isc *fmav1alpha1
630690
}
631691
}
632692

693+
func (ctl *controller) enqueueUnboundInfSvrItemsOnNode(ctx context.Context, nodeName string, whyEnqueue string) {
694+
logger := klog.FromContext(ctx)
695+
nd := ctl.getNodeData(nodeName)
696+
itemCount := 0
697+
podObjs, err := ctl.podInformer.GetIndexer().ByIndex(nodeNameIndexName, nodeName)
698+
if err != nil {
699+
logger.Error(err, "Failed to list Pods by nodeName index", "nodeName", nodeName, "whyEnqueue", whyEnqueue)
700+
return
701+
}
702+
for _, podObj := range podObjs {
703+
pod := podObj.(*corev1.Pod)
704+
item, it := careAbout(pod)
705+
if it != infSvrItemRequester {
706+
continue
707+
}
708+
// skip bound Inference Servers
709+
// a podObj could be either a server-requesting Pod or a server-providing Pod
710+
// but after the `it != infSvrItemRequester` check above, it must be a server-requesting Pod here, and we want to skip it if it's bound to a server-providing Pod
711+
// we can use the controller's data to check whether it's bound or not
712+
serverDat := ctl.getServerData(nd, pod.Name, pod.UID)
713+
if serverDat.ProvidingPodName != "" {
714+
continue
715+
}
716+
nd.add(item)
717+
itemCount++
718+
}
719+
if itemCount == 0 {
720+
logger.V(5).Info("No unbound infSvrItems to enqueue on node", "node", nodeName, "whyEnqueue", whyEnqueue)
721+
return
722+
}
723+
logger.V(5).Info("Enqueuing unbound infSvrItems on node", "node", nodeName, "whyEnqueue", whyEnqueue, "itemCount", itemCount)
724+
ctl.Queue.Add(nodeItem{nodeName})
725+
}
726+
633727
func (ctl *controller) getNodeData(nodeName string) *nodeData {
634728
ctl.mutex.Lock()
635729
defer ctl.mutex.Unlock()
@@ -673,8 +767,6 @@ func (ctl *controller) getServerData(nodeDat *nodeData, reqName string, reqUID a
673767
}
674768

675769
func (ctl *controller) getLauncherData(nodeDat *nodeData, launcherPodName string) *launcherData {
676-
ctl.mutex.Lock()
677-
defer ctl.mutex.Unlock()
678770
ans := nodeDat.Launchers[launcherPodName]
679771
if ans == nil {
680772
ans = &launcherData{
@@ -685,6 +777,10 @@ func (ctl *controller) getLauncherData(nodeDat *nodeData, launcherPodName string
685777
return ans
686778
}
687779

780+
func (ctl *controller) clearLauncherData(nodeDat *nodeData, launcherPodName string) {
781+
delete(nodeDat.Launchers, launcherPodName)
782+
}
783+
688784
func (ctl *controller) clearServerData(nodeDat *nodeData, uid apitypes.UID) {
689785
ctl.mutex.Lock()
690786
defer ctl.mutex.Unlock()

pkg/controller/dual-pods/inference-server.go

Lines changed: 82 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,25 @@ func (ni nodeItem) process(ctx context.Context, ctl *controller) (error, bool) {
8585
return nil, retries > 0
8686
}
8787

88+
func (item launcherPodItem) process(ctx context.Context, ctl *controller, nodeDat *nodeData) (error, bool) {
89+
logger := klog.FromContext(ctx).WithValues("launcherPod", item.LauncherPodName, "node", item.NodeName)
90+
ctx = klog.NewContext(ctx, logger)
91+
92+
_, err := ctl.podLister.Pods(ctl.namespace).Get(item.LauncherPodName)
93+
if err != nil {
94+
if apierrors.IsNotFound(err) {
95+
logger.V(2).Info("Launcher pod deleted, cleaning up launcher data")
96+
ctl.clearLauncherData(nodeDat, item.LauncherPodName)
97+
ctl.enqueueUnboundInfSvrItemsOnNode(ctx, item.NodeName, fmt.Sprintf("launcher pod %s deleted", item.LauncherPodName))
98+
return nil, false
99+
}
100+
return err, true
101+
}
102+
103+
ctl.enqueueUnboundInfSvrItemsOnNode(ctx, item.NodeName, fmt.Sprintf("launcher pod %s changed", item.LauncherPodName))
104+
return nil, false
105+
}
106+
88107
func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat *nodeData) (error, bool) {
89108
logger := klog.FromContext(urCtx).WithValues("serverUID", item.UID, "requesterName", item.RequesterName)
90109
ctx := klog.NewContext(urCtx, logger)
@@ -350,8 +369,13 @@ func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat *
350369
if err != nil {
351370
return err, true
352371
}
353-
// Relay readiness if not already done
372+
// Relay readiness if not already done.
373+
// For launcher-based providers, readiness follows the bound instance's
374+
// sleeping state rather than the launcher's Pod readiness.
354375
ready := utils.IsPodReady(providingPod)
376+
if launcherBased {
377+
ready = !*serverDat.Sleeping
378+
}
355379
if serverDat.ReadinessRelayed == nil || ready != *serverDat.ReadinessRelayed {
356380
url, readiness := fmt.Sprintf("http://%s:%s", requestingPod.Status.PodIP, adminPort), ""
357381
if ready {
@@ -588,24 +612,21 @@ func (ctl *controller) selectBestLauncherPod(
588612
continue
589613
}
590614

591-
launcherBaseURL := fmt.Sprintf("http://%s:%d", launcherPod.Status.PodIP, ctlrcommon.LauncherServicePort)
592-
lClient, err := NewLauncherClient(launcherBaseURL)
593-
if err != nil {
594-
logger.V(5).Info("Failed to create launcher client, skipping Pod", "name", launcherPod.Name, "err", err)
595-
continue
596-
}
597-
598-
// Query instances from this launcher
599-
insts, err := lClient.ListInstances(ctx)
600-
if err != nil {
601-
logger.V(5).Info("Failed to list instances from launcher, skipping Pod", "name", launcherPod.Name, "err", err)
615+
insts, err, retry := ctl.syncLauncherInstances(ctx, nodeDat, launcherPod)
616+
if err != nil || retry {
617+
somePodsNotReady = true
602618
continue
603619
}
604620

605-
launcherDat := ctl.getLauncherData(nodeDat, launcherPod.Name)
606-
607621
// Check if this launcher has a sleeping instance matching the iscHash
608-
if _, instExists := launcherDat.Instances[iscHash]; instExists {
622+
hasSleepingInstance := false
623+
for _, inst := range insts.Instances {
624+
if inst.InstanceID == iscHash {
625+
hasSleepingInstance = true
626+
break
627+
}
628+
}
629+
if hasSleepingInstance {
609630
// Priority 1: Found a sleeping instance
610631
logger.V(5).Info("Found launcher with sleeping instance (fastest path)",
611632
"name", launcherPod.Name,
@@ -1237,6 +1258,52 @@ var coreScheme *k8sruntime.Scheme
12371258
var codecFactory k8sserializer.CodecFactory
12381259
var podDecoder k8sruntime.Decoder
12391260

1261+
// syncLauncherInstances queries the launcher pod for its current instances,
1262+
// updates the controller's internal launcherData state, and returns the fresh
1263+
// launcher response used for the update.
1264+
func (ctl *controller) syncLauncherInstances(ctx context.Context, nodeDat *nodeData, launcherPod *corev1.Pod) (*AllInstancesStatus, error, bool) {
1265+
logger := klog.FromContext(ctx)
1266+
1267+
if launcherPod.Status.PodIP == "" || !utils.IsPodReady(launcherPod) {
1268+
logger.V(5).Info("Launcher pod not ready yet, waiting for another Pod event", "name", launcherPod.Name)
1269+
return nil, nil, true
1270+
}
1271+
1272+
launcherBaseURL := fmt.Sprintf("http://%s:%d", launcherPod.Status.PodIP, ctlrcommon.LauncherServicePort)
1273+
lClient, err := NewLauncherClient(launcherBaseURL)
1274+
if err != nil {
1275+
logger.Error(err, "Failed to create launcher client")
1276+
return nil, err, true
1277+
}
1278+
1279+
insts, err := lClient.ListInstances(ctx)
1280+
if err != nil {
1281+
logger.Error(err, "Failed to list instances from launcher")
1282+
return nil, err, true
1283+
}
1284+
1285+
launcherDat := ctl.getLauncherData(nodeDat, launcherPod.Name)
1286+
newInstances := make(map[string]time.Time)
1287+
for _, inst := range insts.Instances {
1288+
if lastUsed, exists := launcherDat.Instances[inst.InstanceID]; exists {
1289+
newInstances[inst.InstanceID] = lastUsed
1290+
} else {
1291+
newInstances[inst.InstanceID] = time.Now()
1292+
}
1293+
}
1294+
1295+
launcherDat.Instances = newInstances
1296+
launcherDat.Accurate = true
1297+
1298+
logger.V(2).Info("Synced launcher instances",
1299+
"launcherPod", launcherPod.Name,
1300+
"totalInstances", insts.TotalInstances,
1301+
"runningInstances", insts.RunningInstances,
1302+
"instanceCount", len(newInstances))
1303+
1304+
return insts, nil, false
1305+
}
1306+
12401307
func init() {
12411308
coreScheme = k8sruntime.NewScheme()
12421309
err := corev1.AddToScheme(coreScheme)

pkg/controller/utils/pod-helper.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -224,8 +224,10 @@ func BuildLauncherPodFromTemplate(template corev1.PodTemplateSpec, ns, nodeName,
224224
delete(pod.Spec.Overhead, corev1.ResourceName("nvidia.com/gpu"))
225225
}
226226

227-
// Assign to specific node
228-
pod.Spec.NodeName = nodeName
227+
if pod.Spec.NodeSelector == nil {
228+
pod.Spec.NodeSelector = make(map[string]string)
229+
}
230+
pod.Spec.NodeSelector["kubernetes.io/hostname"] = nodeName
229231
return pod, nil
230232
}
231233

0 commit comments

Comments
 (0)