Skip to content

Commit bf7ca2f

Browse files
committed
WIP
Signed-off-by: Jun Duan <jun.duan.phd@outlook.com>
1 parent c171dc6 commit bf7ca2f

File tree

1 file changed

+75
-10
lines changed

1 file changed

+75
-10
lines changed

pkg/controller/dual-pods/inference-server.go

Lines changed: 75 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import (
4747
"k8s.io/utils/ptr"
4848
"sigs.k8s.io/yaml"
4949

50+
fmav1alpha1 "github.com/llm-d-incubation/llm-d-fast-model-actuation/api/fma/v1alpha1"
5051
"github.com/llm-d-incubation/llm-d-fast-model-actuation/pkg/api"
5152
stubapi "github.com/llm-d-incubation/llm-d-fast-model-actuation/pkg/spi"
5253
)
@@ -349,16 +350,41 @@ func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat *
349350
}
350351
// What remains to be done is to wake or create a server-providing Pod
351352

352-
serverPatch := requestingPod.Annotations[api.ServerPatchAnnotationName]
353-
if serverPatch == "" { // this is bad, somebody has hacked important data
354-
return ctl.ensureReqStatus(ctx, requestingPod, serverDat, "the "+api.ServerPatchAnnotationName+" annotation is missing")
355-
}
356-
// use the server patch to build the server-providing pod, if not already done.
357-
desiredProvidingPod, nominalHash, err := serverDat.getNominalServerProvidingPod(ctx, requestingPod, serverPatch, api.ProviderData{
358-
NodeName: requestingPod.Spec.NodeName,
359-
})
360-
if err != nil {
361-
return ctl.ensureReqStatus(ctx, requestingPod, serverDat, fmt.Sprintf("failed to construct the nominal server-providing Pod: %s", err.Error()))
353+
var desiredProvidingPod *corev1.Pod
354+
var nominalHash string
355+
if requestingPod.Annotations[api.InferenceServerConfigAnnotationName] != "" {
356+
iscname := requestingPod.Annotations[api.InferenceServerConfigAnnotationName]
357+
isc, err := ctl.iscLister.InferenceServerConfigs(ctl.namespace).Get(iscname)
358+
if err != nil {
359+
if apierrors.IsNotFound(err) {
360+
return ctl.ensureReqStatus(ctx, requestingPod, serverDat, fmt.Sprintf("the InferenceServerConfig %q does not exist", iscname))
361+
}
362+
return ctl.ensureReqStatus(ctx, requestingPod, serverDat, fmt.Sprintf("failed to get the InferenceServerConfig %q: %s", iscname, err.Error()))
363+
}
364+
lcname := isc.Spec.LauncherConfigName
365+
lc, err := ctl.lcLister.LauncherConfigs(ctl.namespace).Get(lcname)
366+
if err != nil {
367+
if apierrors.IsNotFound(err) {
368+
return ctl.ensureReqStatus(ctx, requestingPod, serverDat, fmt.Sprintf("the LauncherConfig %q does not exist", lcname))
369+
}
370+
return ctl.ensureReqStatus(ctx, requestingPod, serverDat, fmt.Sprintf("failed to get the LauncherConfig %q: %s", lcname, err.Error()))
371+
}
372+
desiredProvidingPod, nominalHash, err = serverDat.getNominalServerProvidingPodFromLC(ctx, requestingPod, *lc)
373+
if err != nil {
374+
return ctl.ensureReqStatus(ctx, requestingPod, serverDat, fmt.Sprintf("failed to construct the nominal server-providing Pod from LauncherConfig %q: %s", lcname, err.Error()))
375+
}
376+
} else {
377+
serverPatch := requestingPod.Annotations[api.ServerPatchAnnotationName]
378+
if serverPatch == "" { // this is bad, somebody has hacked important data
379+
return ctl.ensureReqStatus(ctx, requestingPod, serverDat, "the "+api.ServerPatchAnnotationName+" annotation is missing")
380+
}
381+
// use the server patch to build the server-providing pod, if not already done.
382+
desiredProvidingPod, nominalHash, err = serverDat.getNominalServerProvidingPod(ctx, requestingPod, serverPatch, api.ProviderData{
383+
NodeName: requestingPod.Spec.NodeName,
384+
})
385+
if err != nil {
386+
return ctl.ensureReqStatus(ctx, requestingPod, serverDat, fmt.Sprintf("failed to construct the nominal server-providing Pod: %s", err.Error()))
387+
}
362388
}
363389

364390
sleepingAnys, err := ctl.podInformer.GetIndexer().ByIndex(nominalHashIndexName, nominalHash)
@@ -810,6 +836,45 @@ func (serverDat *serverData) getNominalServerProvidingPod(ctx context.Context, r
810836
return serverDat.NominalProvidingPod, serverDat.NominalProvidingPodHash, nil
811837
}
812838

839+
func (serverDat *serverData) getNominalServerProvidingPodFromLC(ctx context.Context, reqPod *corev1.Pod, lc fmav1alpha1.LauncherConfig) (*corev1.Pod, string, error) {
840+
logger := klog.FromContext(ctx)
841+
if serverDat.NominalProvidingPod == nil {
842+
podSpec := lc.Spec.PodTemplate.Spec
843+
podSpec = *deIndividualize(podSpec.DeepCopy())
844+
pod := &corev1.Pod{
845+
ObjectMeta: metav1.ObjectMeta{
846+
Labels: lc.Spec.PodTemplate.Labels,
847+
},
848+
Spec: podSpec,
849+
}
850+
hasher := sha256.New()
851+
podJSON, err := json.Marshal(pod)
852+
if err != nil {
853+
return nil, "", fmt.Errorf("failed to marshal launcher config pod spec: %w", err)
854+
}
855+
hasher.Write(podJSON)
856+
var modifiedHash [sha256.Size]byte
857+
modifiedHashSl := hasher.Sum(modifiedHash[:0])
858+
nominalHash := base64.RawStdEncoding.EncodeToString(modifiedHashSl)
859+
860+
logger.V(5).Info("Computed nominalHash from LauncherConfig", "nominalHash", nominalHash, "podJSON", podJSON)
861+
862+
serverDat.NominalProvidingPod = pod
863+
serverDat.NominalProvidingPodHash = nominalHash
864+
865+
pod.GenerateName = reqPod.Name + "-dual-"
866+
// pod.Finalizers = append(pod.Finalizers, providerFinalizer)
867+
// pod.Annotations = MapSet(pod.Annotations, nominalHashAnnotationKey, nominalHash)
868+
// pod.Annotations[requesterAnnotationKey] = string(reqPod.UID) + " " + reqPod.Name
869+
// pod.Annotations[api.AcceleratorsAnnotationName] = *serverDat.GPUIDsStr
870+
// pod.Labels = MapSet(pod.Labels, api.DualLabelName, reqPod.Name)
871+
// pod.Labels[api.SleepingLabelName] = "false"
872+
serverDat.NominalProvidingPod = pod
873+
serverDat.NominalProvidingPodHash = nominalHash
874+
}
875+
return serverDat.NominalProvidingPod, serverDat.NominalProvidingPodHash, nil
876+
}
877+
813878
// deIndividualize removes the parts of a PodSpec that are specific to an individual.
814879
// This func side-effects the given `*PodSpec` and returns it.
815880
func deIndividualize(podSpec *corev1.PodSpec) *corev1.PodSpec {

0 commit comments

Comments
 (0)