Skip to content

Commit 48a5bc8

Browse files
committed
testing
Signed-off-by: Jun Duan <jun.duan.phd@outlook.com>
1 parent 21f5b1b commit 48a5bc8

2 files changed

Lines changed: 69 additions & 11 deletions

File tree

pkg/controller/dual-pods/controller.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ func (config ControllerConfig) NewController(
163163
nodeInformer: corev1PreInformers.Nodes().Informer(),
164164
nodeLister: corev1PreInformers.Nodes().Lister(),
165165
lcInformer: fmaInformerFactory.Fma().V1alpha1().LauncherConfigs().Informer(),
166+
lcLister: fmaInformerFactory.Fma().V1alpha1().LauncherConfigs().Lister(),
166167
sleeperLimit: config.SleeperLimit,
167168
debugAccelMemory: config.AcceleratorSleepingMemoryLimitMiB < math.MaxInt32,
168169
accelMemoryLimitMiB: config.AcceleratorSleepingMemoryLimitMiB,
@@ -339,7 +340,7 @@ const (
339340
// The controller cares about server-requesting Pods, bound direct server-providing Pods, and launcher-based server-providing Pods.
340341
// The controller doesn't care about unbound direct providers and other Pods.
341342
func careAbout(pod *corev1.Pod) (item infSvrItem, it infSvrItemType) {
342-
if len(pod.Annotations[api.ServerPatchAnnotationName]) > 0 {
343+
if len(pod.Annotations[api.ServerPatchAnnotationName]) > 0 || len(pod.Annotations[api.LauncherConfigAnnotationName]) > 0 {
343344
return infSvrItem{pod.UID, pod.Name}, infSvrItemRequester
344345
}
345346
requesterStr := pod.Annotations[requesterAnnotationKey]

pkg/controller/dual-pods/inference-server.go

Lines changed: 67 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import (
4747
"k8s.io/utils/ptr"
4848
"sigs.k8s.io/yaml"
4949

50+
fmav1alpha1 "github.com/llm-d-incubation/llm-d-fast-model-actuation/api/fma/v1alpha1"
5051
"github.com/llm-d-incubation/llm-d-fast-model-actuation/pkg/api"
5152
stubapi "github.com/llm-d-incubation/llm-d-fast-model-actuation/pkg/spi"
5253
)
@@ -349,16 +350,33 @@ func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat *
349350
}
350351
// What remains to be done is to wake or create a server-providing Pod
351352

352-
serverPatch := requestingPod.Annotations[api.ServerPatchAnnotationName]
353-
if serverPatch == "" { // this is bad, somebody has hacked important data
354-
return ctl.ensureReqStatus(ctx, requestingPod, serverDat, "the "+api.ServerPatchAnnotationName+" annotation is missing")
355-
}
356-
// use the server patch to build the server-providing pod, if not already done.
357-
desiredProvidingPod, nominalHash, err := serverDat.getNominalServerProvidingPod(ctx, requestingPod, serverPatch, api.ProviderData{
358-
NodeName: requestingPod.Spec.NodeName,
359-
})
360-
if err != nil {
361-
return ctl.ensureReqStatus(ctx, requestingPod, serverDat, fmt.Sprintf("failed to construct the nominal server-providing Pod: %s", err.Error()))
353+
var desiredProvidingPod *corev1.Pod
354+
var nominalHash string
355+
if requestingPod.Annotations[api.LauncherConfigAnnotationName] != "" {
356+
lcname := requestingPod.Annotations[api.LauncherConfigAnnotationName]
357+
lc, err := ctl.lcLister.LauncherConfigs(ctl.namespace).Get(lcname)
358+
if err != nil {
359+
if apierrors.IsNotFound(err) {
360+
return ctl.ensureReqStatus(ctx, requestingPod, serverDat, fmt.Sprintf("the LauncherConfig %q does not exist", lcname))
361+
}
362+
return ctl.ensureReqStatus(ctx, requestingPod, serverDat, fmt.Sprintf("failed to get the LauncherConfig %q: %s", lcname, err.Error()))
363+
}
364+
desiredProvidingPod, nominalHash, err = serverDat.getNominalServerProvidingPodFromLC(ctx, requestingPod, *lc)
365+
if err != nil {
366+
return ctl.ensureReqStatus(ctx, requestingPod, serverDat, fmt.Sprintf("failed to construct the nominal server-providing Pod from LauncherConfig %q: %s", lcname, err.Error()))
367+
}
368+
} else {
369+
serverPatch := requestingPod.Annotations[api.ServerPatchAnnotationName]
370+
if serverPatch == "" { // this is bad, somebody has hacked important data
371+
return ctl.ensureReqStatus(ctx, requestingPod, serverDat, "the "+api.ServerPatchAnnotationName+" annotation is missing")
372+
}
373+
// use the server patch to build the server-providing pod, if not already done.
374+
desiredProvidingPod, nominalHash, err = serverDat.getNominalServerProvidingPod(ctx, requestingPod, serverPatch, api.ProviderData{
375+
NodeName: requestingPod.Spec.NodeName,
376+
})
377+
if err != nil {
378+
return ctl.ensureReqStatus(ctx, requestingPod, serverDat, fmt.Sprintf("failed to construct the nominal server-providing Pod: %s", err.Error()))
379+
}
362380
}
363381

364382
sleepingAnys, err := ctl.podInformer.GetIndexer().ByIndex(nominalHashIndexName, nominalHash)
@@ -810,6 +828,45 @@ func (serverDat *serverData) getNominalServerProvidingPod(ctx context.Context, r
810828
return serverDat.NominalProvidingPod, serverDat.NominalProvidingPodHash, nil
811829
}
812830

831+
func (serverDat *serverData) getNominalServerProvidingPodFromLC(ctx context.Context, reqPod *corev1.Pod, lc fmav1alpha1.LauncherConfig) (*corev1.Pod, string, error) {
832+
logger := klog.FromContext(ctx)
833+
if serverDat.NominalProvidingPod == nil {
834+
podSpec := lc.Spec.PodTemplate.Spec
835+
podSpec = *deIndividualize(podSpec.DeepCopy())
836+
pod := &corev1.Pod{
837+
ObjectMeta: metav1.ObjectMeta{
838+
Labels: lc.Spec.PodTemplate.Labels,
839+
},
840+
Spec: podSpec,
841+
}
842+
hasher := sha256.New()
843+
podJSON, err := json.Marshal(pod)
844+
if err != nil {
845+
return nil, "", fmt.Errorf("failed to marshal launcher config pod spec: %w", err)
846+
}
847+
hasher.Write(podJSON)
848+
var modifiedHash [sha256.Size]byte
849+
modifiedHashSl := hasher.Sum(modifiedHash[:0])
850+
nominalHash := base64.RawStdEncoding.EncodeToString(modifiedHashSl)
851+
852+
logger.V(5).Info("Computed nominalHash from LauncherConfig", "nominalHash", nominalHash, "podJSON", podJSON)
853+
854+
serverDat.NominalProvidingPod = pod
855+
serverDat.NominalProvidingPodHash = nominalHash
856+
857+
pod.GenerateName = reqPod.Name + "-dual-"
858+
// pod.Finalizers = append(pod.Finalizers, providerFinalizer)
859+
// pod.Annotations = MapSet(pod.Annotations, nominalHashAnnotationKey, nominalHash)
860+
// pod.Annotations[requesterAnnotationKey] = string(reqPod.UID) + " " + reqPod.Name
861+
// pod.Annotations[api.AcceleratorsAnnotationName] = *serverDat.GPUIDsStr
862+
// pod.Labels = MapSet(pod.Labels, api.DualLabelName, reqPod.Name)
863+
// pod.Labels[api.SleepingLabelName] = "false"
864+
serverDat.NominalProvidingPod = pod
865+
serverDat.NominalProvidingPodHash = nominalHash
866+
}
867+
return serverDat.NominalProvidingPod, serverDat.NominalProvidingPodHash, nil
868+
}
869+
813870
// deIndividualize removes the parts of a PodSpec that are specific to an individual.
814871
// This func side-effects the given `*PodSpec` and returns it.
815872
func deIndividualize(podSpec *corev1.PodSpec) *corev1.PodSpec {

0 commit comments

Comments
 (0)