@@ -47,6 +47,7 @@ import (
4747 "k8s.io/utils/ptr"
4848 "sigs.k8s.io/yaml"
4949
50+ fmav1alpha1 "github.com/llm-d-incubation/llm-d-fast-model-actuation/api/fma/v1alpha1"
5051 "github.com/llm-d-incubation/llm-d-fast-model-actuation/pkg/api"
5152 stubapi "github.com/llm-d-incubation/llm-d-fast-model-actuation/pkg/spi"
5253)
@@ -64,7 +65,14 @@ func (ni nodeItem) process(ctx context.Context, ctl *controller) (error, bool) {
6465 logger .V (4 ).Info ("Processing items for node" , "count" , len (items ))
6566 for localItem := range items {
6667 logger .V (4 ).Info ("Processing node-local item" , "item" , localItem )
67- err , retry := localItem .process (ctx , ctl , nodeDat )
68+ launcherbased := true // TODO(waltforme): externalize this switch
69+ var err error
70+ var retry bool
71+ if launcherbased {
72+ err , retry = localItem .processLauncherBased (ctx , ctl , nodeDat )
73+ } else {
74+ err , retry = localItem .process (ctx , ctl , nodeDat )
75+ }
6876 if err != nil {
6977 if retry {
7078 logger .Info ("Processing node local item suffered transient error, will retry" , "item" , localItem , "err" , err )
@@ -401,6 +409,167 @@ func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat *
401409 return ctl .ensureReqStatus (ctx , requestingPod , serverDat )
402410}
403411
412+ func (item infSvrItem ) processLauncherBased (urCtx context.Context , ctl * controller , nodeDat * nodeData ) (error , bool ) {
413+ logger := klog .FromContext (urCtx ).WithValues ("serverUID" , item .UID , "requesterName" , item .RequesterName )
414+ ctx := klog .NewContext (urCtx , logger )
415+
416+ requestingPod , err := ctl .podLister .Pods (ctl .namespace ).Get (item .RequesterName )
417+ if err != nil {
418+ if apierrors .IsNotFound (err ) {
419+ requestingPod = nil
420+ } else {
421+ logger .Error (err , "Failed to get Pod" )
422+ return err , true
423+ }
424+ } else {
425+ logger = logger .WithValues ("requesterRV" , requestingPod .ResourceVersion )
426+ }
427+
428+ // from the requestingPod's annotations, get the InferenceServerConfig object
429+ iscName , have := requestingPod .Annotations [api .InferenceServerConfigAnnotationName ]
430+ if ! have {
431+ // TODO(waltforme): report error in the status annotation
432+ // It is safe not to retry here because once the user update the annotation of requestingPod, another processing is triggered
433+ return fmt .Errorf ("requesting Pod %q is missing annotation %q" , requestingPod .Name , api .InferenceServerConfigAnnotationName ), false
434+ }
435+ isc , err := ctl .iscLister .InferenceServerConfigs (ctl .namespace ).Get (iscName )
436+ if err != nil {
437+ // TODO(waltforme): report error in the status annotation
438+ // It is safe not to retry here because once an event from InferenceServerConfig occurs, another processing is triggered
439+ return fmt .Errorf ("failed to get InferenceServerConfig %q: %w" , iscName , err ), false
440+ }
441+
442+ // from the InferenceServerConfig object, get the launcherConfig object
443+ lcName := isc .Spec .LauncherConfigName
444+ lc , err := ctl .lcLister .LauncherConfigs (ctl .namespace ).Get (lcName )
445+ if err != nil {
446+ // TODO(waltforme): report error in the status annotation
447+ // TODO(waltforme): introduce the 'enqueue requesters by launcherconfigs' logic to the controller
448+ // It is safe not to retry here because once an event from LauncherConfig occurs, another processing is triggered
449+ return fmt .Errorf ("failed to get LauncherConfig %q: %w" , lcName , err ), false
450+ }
451+
452+ // find which launcher Pod is using this launcherConfig, then find its IP
453+ lcTemplateHash , err := ctl .parseLauncherConfig (lc )
454+ if err != nil {
455+ return fmt .Errorf ("parse LauncherConfig %q: %w" , lcName , err ), true
456+ }
457+ logger .V (5 ).Info ("LauncherConfig's PodTemplate hash" , "hash" , lcTemplateHash )
458+ launcherPodAnys , err := ctl .podInformer .GetIndexer ().ByIndex (launcherConfigHashIndexName , lcTemplateHash )
459+ if err != nil {
460+ return err , false
461+ }
462+ if len (launcherPodAnys ) == 0 {
463+ // TODO(waltforme): report error in the status annotation
464+ // TODO(waltforme): introduce the 'enqueue requesters by launcher Pod' logic to the controller
465+ // It will be safe not to retry here because once the launcher Pod exists, another processing is triggered
466+ return fmt .Errorf ("no launcher Pod found for LauncherConfig %q with PodTemplate hash %q" , lcName , lcTemplateHash ), false
467+ }
468+ // Should multiple launcher Pods exist for the same LauncherConfig on one node? The answer is no.
469+ // TODO(waltforme): Should we report error if multiple launcher Pods are found? Should we delete the extra ones?
470+ launcherPod := launcherPodAnys [0 ].(* corev1.Pod )
471+ logger .V (5 ).Info ("Found launcher Pod" , "name" , launcherPod .Name )
472+ launcherIP := launcherPod .Status .PodIP
473+ if launcherIP == "" {
474+ return fmt .Errorf ("launcher Pod %q has no IP assigned yet" , launcherPod .Name ), true
475+ }
476+
477+ // Create launcher client
478+ launcherBaseURL := fmt .Sprintf ("http://%s:%d" , launcherIP , api .LauncherServicePort )
479+ lClient , err := NewLauncherClient (launcherBaseURL )
480+ if err != nil {
481+ return err , true
482+ }
483+
484+ // List vLLM instances
485+ statuses , err := lClient .ListInstances (ctx )
486+ if err != nil {
487+ return err , true
488+ }
489+ logger .V (5 ).Info ("vLLM instance counts" ,
490+ "total_instances" , statuses .TotalInstances ,
491+ "running_instances" , statuses .RunningInstances ,
492+ )
493+
494+ // TODO(waltforme): implement the following logic:
495+ // - if no instance is present for the request, create one
496+ // - if an existing instance is fulfilling the request, noop
497+ // - if some instances are fulfilling an obsolete request, delete them
498+
499+ // First, ensure a vLLM instance exists for the inferenceserverconfig object.
500+ cfg , iscHash , err := ctl .parseInferenceServerConfig (isc )
501+ if err != nil {
502+ return fmt .Errorf ("parse inference server config: %w" , err ), true
503+ }
504+ logger .V (5 ).Info ("Nominal hash of InferenceServerConfig" , "hash" , iscHash )
505+ InstExists := false
506+ if nodeDat .Launchers == nil {
507+ nodeDat .Launchers = make (map [string ]* launcherData )
508+ }
509+ if _ , have := nodeDat .Launchers [lcName ]; ! have {
510+ nodeDat .Launchers [lcName ] = & launcherData {
511+ Instances : make (map [string ]* InstanceData ),
512+ Accurate : true ,
513+ }
514+ }
515+ launcherDat := nodeDat .Launchers [lcName ]
516+ for hash , inst := range launcherDat .Instances {
517+ if hash == iscHash {
518+ InstExists = true
519+ inst .LastUsed = time .Now ()
520+ break
521+ }
522+ }
523+ if ! InstExists {
524+ result , err := lClient .CreateInstance (ctx , * cfg )
525+ if err != nil {
526+ return fmt .Errorf ("create vLLM instance: %w" , err ), true
527+ }
528+ logger .V (5 ).Info ("Created new vLLM instance" ,
529+ "instance_id" , result .InstanceID ,
530+ "status" , result .Status ,
531+ )
532+ launcherDat .Instances [iscHash ] = & InstanceData {ID : result .InstanceID , LastUsed : time .Now ()}
533+ nodeDat .Launchers [lcName ] = launcherDat
534+ }
535+
536+ return nil , false
537+ }
538+
539+ func (ctl * controller ) parseInferenceServerConfig (isc * fmav1alpha1.InferenceServerConfig ) (* VllmConfig , string , error ) {
540+ vllmCfg := VllmConfig {
541+ Options : isc .Spec .ModelServerConfig .Options ,
542+ EnvVars : make (map [string ]interface {}, len (isc .Spec .ModelServerConfig .EnvVars )),
543+ }
544+ for k , v := range isc .Spec .ModelServerConfig .EnvVars {
545+ vllmCfg .EnvVars [k ] = v
546+ }
547+
548+ iscBytes , err := yaml .Marshal (isc .Spec .ModelServerConfig )
549+ if err != nil {
550+ return nil , "" , fmt .Errorf ("failed to marshal InferenceServerConfig %q: %w" , isc .Name , err )
551+ }
552+ hasher := sha256 .New ()
553+ hasher .Write (iscBytes )
554+ var hash [sha256 .Size ]byte
555+ hashSl := hasher .Sum (hash [:0 ])
556+ nominalHash := base64 .RawStdEncoding .EncodeToString (hashSl )
557+
558+ return & vllmCfg , nominalHash , nil
559+ }
560+
561+ func (ctl * controller ) parseLauncherConfig (lc * fmav1alpha1.LauncherConfig ) (string , error ) {
562+ podTemplateBytes , err := yaml .Marshal (lc .Spec .PodTemplate )
563+ if err != nil {
564+ return "" , fmt .Errorf ("failed to marshal LauncherConfig %q: %w" , lc .Name , err )
565+ }
566+ hasher := sha256 .New ()
567+ hasher .Write (podTemplateBytes )
568+ var hash [sha256 .Size ]byte
569+ hashSl := hasher .Sum (hash [:0 ])
570+ return base64 .RawStdEncoding .EncodeToString (hashSl ), nil
571+ }
572+
404573func (ctl * controller ) ensureSleepingLabel (ctx context.Context , providingPod * corev1.Pod , desired bool ) error {
405574 logger := klog .FromContext (ctx )
406575 desiredStr := strconv .FormatBool (desired )
0 commit comments