@@ -22,10 +22,9 @@ import (
2222 "encoding/base64"
2323 "encoding/json"
2424 "fmt"
25+ "strings"
2526
26- "github.com/llm-d-incubation/llm-d-fast-model-actuation/pkg/api"
27- dualpods "github.com/llm-d-incubation/llm-d-fast-model-actuation/pkg/controller/dual-pods"
28- "github.com/llm-d-incubation/llm-d-fast-model-actuation/pkg/controller/utils"
27+ "github.com/llm-d-incubation/llm-d-fast-model-actuation/pkg/controller/common"
2928 corev1 "k8s.io/api/core/v1"
3029 apierrors "k8s.io/apimachinery/pkg/api/errors"
3130 "k8s.io/apimachinery/pkg/api/resource"
@@ -41,7 +40,10 @@ import (
4140 "k8s.io/klog/v2"
4241
4342 fmav1alpha1 "github.com/llm-d-incubation/llm-d-fast-model-actuation/api/fma/v1alpha1"
43+ "github.com/llm-d-incubation/llm-d-fast-model-actuation/pkg/api"
44+ dualpods "github.com/llm-d-incubation/llm-d-fast-model-actuation/pkg/controller/dual-pods"
4445 genctlr "github.com/llm-d-incubation/llm-d-fast-model-actuation/pkg/controller/generic"
46+ "github.com/llm-d-incubation/llm-d-fast-model-actuation/pkg/controller/utils"
4547 fmainformers "github.com/llm-d-incubation/llm-d-fast-model-actuation/pkg/generated/informers/externalversions"
4648 fmalisters "github.com/llm-d-incubation/llm-d-fast-model-actuation/pkg/generated/listers/fma/v1alpha1"
4749)
@@ -322,7 +324,7 @@ func (ctl *controller) reconcileLaunchersOnNode(ctx context.Context, key NodeLau
322324}
323325
324326// getCurrentLaunchersOnNode returns launcher pods for a specific config on a specific node
325- func (ctl * controller ) getCurrentLaunchersOnNode (ctx context.Context , key NodeLauncherKey ) ([]corev1.Pod , error ) {
327+ func (ctl * controller ) getCurrentLaunchersOnNode (ctx context.Context , key NodeLauncherKey ) ([]* corev1.Pod , error ) {
326328 launcherLabels := map [string ]string {
327329 ComponentLabelKey : LauncherComponentLabelValue ,
328330 LauncherConfigNameLabelKey : key .LauncherConfigName ,
@@ -334,13 +336,7 @@ func (ctl *controller) getCurrentLaunchersOnNode(ctx context.Context, key NodeLa
334336 return nil , fmt .Errorf ("failed to list pods with launcher labels: %w" , err )
335337 }
336338
337- // Filter pods that are on the specified node
338- var filteredPods []corev1.Pod
339- for _ , pod := range pods {
340- filteredPods = append (filteredPods , * pod )
341- }
342-
343- return filteredPods , nil
339+ return pods , nil
344340}
345341
346342// createLaunchers creates the specified number of launcher pods on a node
@@ -377,19 +373,73 @@ func (ctl *controller) createLaunchers(ctx context.Context, node corev1.Node, ke
377373}
378374
379375// deleteExcessLaunchers deletes the specified number of launcher pods
380- func (ctl * controller ) deleteExcessLaunchers (ctx context.Context , launchers []corev1.Pod , count int ) error {
376+ func (ctl * controller ) deleteExcessLaunchers (ctx context.Context , launchers []* corev1.Pod , count int ) error {
381377 logger := klog .FromContext (ctx )
382- // Delete the specified number of launcher pods (starting from the end)
378+
379+ deletedCount := 0
383380 for i := 0 ; i < count && i < len (launchers ); i ++ {
384381 pod := launchers [len (launchers )- 1 - i ]
385- if err := ctl .coreclient .Pods (pod .Namespace ).Delete (ctx , pod .Name , metav1.DeleteOptions {}); err != nil {
382+ isBound , requesterPodName := ctl .isLauncherBoundToServerRequestingPod (pod )
383+ if isBound {
384+ logger .V (5 ).Info ("Skipping deletion of launcher pod as it is bound to a server-requesting pod" ,
385+ "pod" , pod .Name , "server-requesting pod" , requesterPodName )
386+ continue
387+ }
388+
389+ if err := ctl .coreclient .Pods (pod .Namespace ).Delete (ctx , pod .Name , metav1.DeleteOptions {
390+ Preconditions : & metav1.Preconditions {
391+ ResourceVersion : & pod .ResourceVersion ,
392+ },
393+ }); err != nil {
394+ if apierrors .IsNotFound (err ) {
395+ logger .Info ("Launcher pod already deleted" , "pod" , pod .Name )
396+ deletedCount ++ // Count as deletion target achieved
397+ continue
398+ }
399+ if apierrors .IsConflict (err ) {
400+ logger .Info ("Launcher pod version conflict, skipping deletion" ,
401+ "pod" , pod .Name , "error" , err )
402+ continue
403+ }
386404 return fmt .Errorf ("failed to delete launcher pod %s: %w" , pod .Name , err )
387405 }
388406 logger .Info ("Deleted launcher pod" , "pod" , pod .Name )
407+ deletedCount ++
408+ }
409+
410+ if deletedCount < count {
411+ logger .Info ("Fewer launcher pods were deleted than requested due to bound pods or concurrent changes" ,
412+ "requested" , count ,
413+ "deleted" , deletedCount ,
414+ "skipped" , count - deletedCount )
415+ } else {
416+ logger .Info ("Deleted unbound launcher pods" ,
417+ "deleted" , deletedCount )
389418 }
419+
390420 return nil
391421}
392422
423+ // isLauncherBoundToServerRequestingPod checks if the launcher pod is bound to any server-requesting pod
424+ func (ctl * controller ) isLauncherBoundToServerRequestingPod (launcherPod * corev1.Pod ) (bool , string ) {
425+ // Check if the launcher pod has annotations indicating assignment to a server-requesting pod
426+ requesterAnnotationValue , exists := launcherPod .Annotations [common .RequesterAnnotationKey ]
427+ if ! exists {
428+ return false , ""
429+ }
430+
431+ // Verify the format of the annotation value: should be "UID name"
432+ parts := strings .Split (requesterAnnotationValue , " " )
433+ if len (parts ) != 2 {
434+ return false , "" // Invalid format
435+ }
436+
437+ // Optionally verify that the referenced pod actually exists
438+ // @TODO if need, we can append the check logic in further PR
439+
440+ return true , parts [1 ]
441+ }
442+
393443// buildPodFromTemplate creates a pod from a template and assigns it to a node
394444func (ctl * controller ) buildPodFromTemplate (template corev1.PodTemplateSpec , key NodeLauncherKey ) (* corev1.Pod , error ) {
395445 pod := & corev1.Pod {
@@ -421,7 +471,7 @@ func (ctl *controller) buildPodFromTemplate(template corev1.PodTemplateSpec, key
421471 if pod .Annotations == nil {
422472 pod .Annotations = make (map [string ]string )
423473 }
424- pod .Annotations = dualpods .MapSet (pod .Annotations , genctlr . NominalHashAnnotationKey , nominalHash )
474+ pod .Annotations = dualpods .MapSet (pod .Annotations , NominalHashAnnotationKey , nominalHash )
425475
426476 cIdx , serverPort , err := utils .GetInferenceServerPort (pod )
427477 if err != nil {
0 commit comments