Skip to content

Commit 6457982

Browse files
Merge pull request llm-d-incubation#216 from osswangxining/externalize-utils-logic-for-controllers
Externalize utils logic for controllers Thanks for reviews, as llm-d-incubation#217 & llm-d-incubation#218 are depending on this PR, I close for now. If any other comments, please raise the comments in llm-d-incubation#217 or llm-d-incubation#218.
2 parents e998d3f + bbdc661 commit 6457982

3 files changed

Lines changed: 116 additions & 101 deletions

File tree

pkg/controller/dual-pods/controller.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"sync/atomic"
3030
"time"
3131

32+
"github.com/llm-d-incubation/llm-d-fast-model-actuation/pkg/controller/utils"
3233
corev1 "k8s.io/api/core/v1"
3334
apierrors "k8s.io/apimachinery/pkg/api/errors"
3435
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -102,7 +103,7 @@ func GPUIndexFunc(obj any) ([]string, error) {
102103
if len(pod.Annotations[nominalHashAnnotationKey]) == 0 || pod.Spec.NodeName == "" {
103104
return []string{}, nil
104105
}
105-
isIdx, _, err := getInferenceServerPort(pod)
106+
isIdx, _, err := utils.GetInferenceServerPort(pod)
106107
if err != nil {
107108
return []string{}, nil
108109
}

pkg/controller/dual-pods/inference-server.go

Lines changed: 9 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,13 @@ import (
3434
"text/template"
3535
"time"
3636

37+
"github.com/llm-d-incubation/llm-d-fast-model-actuation/pkg/controller/utils"
3738
corev1 "k8s.io/api/core/v1"
3839
apierrors "k8s.io/apimachinery/pkg/api/errors"
3940
"k8s.io/apimachinery/pkg/api/resource"
4041
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
4142
k8sruntime "k8s.io/apimachinery/pkg/runtime"
4243
k8sserializer "k8s.io/apimachinery/pkg/runtime/serializer"
43-
"k8s.io/apimachinery/pkg/util/intstr"
4444
"k8s.io/apimachinery/pkg/util/sets"
4545
"k8s.io/apimachinery/pkg/util/strategicpatch"
4646
"k8s.io/klog/v2"
@@ -196,7 +196,7 @@ func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat *
196196
if (requestingPod == nil || requestingPod.DeletionTimestamp != nil) && providingPod != nil {
197197
// Time to unbind.
198198
// As a special favor, delete providingPod if it is in trouble.
199-
if podIsInTrouble(providingPod) {
199+
if utils.PodIsInTrouble(providingPod) {
200200
err := podOps.Delete(ctx, providingPod.Name, metav1.DeleteOptions{
201201
Preconditions: &metav1.Preconditions{UID: &providingPod.UID, ResourceVersion: &providingPod.ResourceVersion},
202202
PropagationPolicy: ptr.To(metav1.DeletePropagationBackground),
@@ -288,7 +288,7 @@ func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat *
288288
// If there is already a server-providing Pod then ensure that it is awake,
289289
// ensure status reported, and relay readiness if needed.
290290
if providingPod != nil {
291-
_, serverPort, err := getInferenceServerPort(providingPod)
291+
_, serverPort, err := utils.GetInferenceServerPort(providingPod)
292292
if err != nil { // Impossible, because such a providingPod would never be created by this controller
293293
return fmt.Errorf("unable to wake up server because port not known: %w", err), true
294294
}
@@ -315,7 +315,7 @@ func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat *
315315
return err, true
316316
}
317317
// Relay readiness if not already done
318-
ready := isPodReady(providingPod)
318+
ready := utils.IsPodReady(providingPod)
319319
if serverDat.ReadinessRelayed == nil || ready != *serverDat.ReadinessRelayed {
320320
url, readiness := fmt.Sprintf("http://%s:%s", requestingPod.Status.PodIP, adminPort), ""
321321
if ready {
@@ -417,28 +417,7 @@ func (ctl *controller) ensureSleepingLabel(ctx context.Context, providingPod *co
417417
return nil
418418
}
419419

420-
// Trouble is both (a) some container restarts and (b) Pod not ready
421-
func podIsInTrouble(pod *corev1.Pod) bool {
422-
var sumRestarts int32
423-
for _, ctrStat := range pod.Status.ContainerStatuses {
424-
sumRestarts += ctrStat.RestartCount
425-
}
426-
if sumRestarts == 0 {
427-
return false
428-
}
429-
condIdx := slices.IndexFunc(pod.Status.Conditions, func(cond corev1.PodCondition) bool {
430-
return cond.Type == "Ready"
431-
})
432-
if condIdx >= 0 {
433-
if pod.Status.Conditions[condIdx].Status == corev1.ConditionTrue {
434-
return false
435-
}
436-
}
437-
return true
438-
}
439-
440420
var invalidPodRE = regexp.MustCompile(`^Pod "[a-z0-9.-]*" is invalid`)
441-
var apiAccessRE = regexp.MustCompile(`^kube-api-access-[a-z0-9]+$`)
442421

443422
func (ctl *controller) enforceSleeperBudget(ctx context.Context, serverDat *serverData, requestingPod *corev1.Pod) (error, bool) {
444423
logger := klog.FromContext(ctx)
@@ -529,7 +508,7 @@ func (ctl *controller) bind(ctx context.Context, serverDat *serverData, requesti
529508
}
530509
serverDat.ProvidingPodName = providingPod.Name
531510
logger.V(2).Info("Bound server-providing Pod", "name", providingPod.Name, "node", requestingPod.Spec.NodeName, "gpus", serverDat.GPUIndicesStr, "newResourceVersion", echo.ResourceVersion)
532-
_, serverPort, err := getInferenceServerPort(providingPod)
511+
_, serverPort, err := utils.GetInferenceServerPort(providingPod)
533512
if err != nil { // Impossible, because such a providingPod would never be created by this controller
534513
return fmt.Errorf("unable to wake up server because port not known: %w", err), true
535514
}
@@ -567,7 +546,7 @@ func (ctl *controller) maybeRemoveRequesterFinalizer(ctx context.Context, reques
567546
// First, determine whether finalizer should be present
568547
var wantFinalizer bool
569548
if providingPod != nil {
570-
isIdx, _, err := getInferenceServerPort(providingPod)
549+
isIdx, _, err := utils.GetInferenceServerPort(providingPod)
571550
if err == nil {
572551
isCtr := &providingPod.Spec.Containers[isIdx]
573552
statIdx := slices.IndexFunc(providingPod.Status.ContainerStatuses,
@@ -648,7 +627,7 @@ func (ctl *controller) ensureUnbound(ctx context.Context, serverDat *serverData,
648627
serverPort := serverDat.ServerPort
649628
if serverDat.NominalProvidingPod == nil {
650629
var err error
651-
_, serverPort, err = getInferenceServerPort(providingPod)
630+
_, serverPort, err = utils.GetInferenceServerPort(providingPod)
652631
if err != nil { // Impossible, because such a providingPod would never be created by this controller
653632
return fmt.Errorf("unable to put server to sleep because port not known: %w", err)
654633
}
@@ -726,7 +705,7 @@ func (serverDat *serverData) getNominalServerProvidingPod(ctx context.Context, r
726705
Labels: reqPod.Labels,
727706
Namespace: reqPod.Namespace,
728707
},
729-
Spec: *deIndividualize(reqPod.Spec.DeepCopy()),
708+
Spec: *utils.DeIndividualize(reqPod.Spec.DeepCopy()),
730709
}
731710
// marshal into json
732711
baseJSON, err := json.Marshal(basePod)
@@ -767,7 +746,7 @@ func (serverDat *serverData) getNominalServerProvidingPod(ctx context.Context, r
767746
}
768747
nodeSelector["kubernetes.io/hostname"] = reqPod.Spec.NodeName
769748

770-
cIdx, serverPort, err := getInferenceServerPort(pod)
749+
cIdx, serverPort, err := utils.GetInferenceServerPort(pod)
771750
if err != nil {
772751
return nil, "", err
773752
}
@@ -810,36 +789,6 @@ func (serverDat *serverData) getNominalServerProvidingPod(ctx context.Context, r
810789
return serverDat.NominalProvidingPod, serverDat.NominalProvidingPodHash, nil
811790
}
812791

813-
// deIndividualize removes the parts of a PodSpec that are specific to an individual.
814-
// This func side-effects the given `*PodSpec` and returns it.
815-
func deIndividualize(podSpec *corev1.PodSpec) *corev1.PodSpec {
816-
podSpec.EphemeralContainers = nil // these may not be given in Create
817-
// The api-access Volume is individualized
818-
volIdx := slices.IndexFunc(podSpec.Volumes, func(vol corev1.Volume) bool {
819-
return apiAccessRE.MatchString(vol.Name)
820-
})
821-
if volIdx >= 0 {
822-
volName := podSpec.Volumes[volIdx].Name
823-
podSpec.Volumes = slices.Delete(podSpec.Volumes, volIdx, volIdx+1)
824-
for ctrIdx := range podSpec.Containers {
825-
removeVolumeMount(&podSpec.Containers[ctrIdx], volName)
826-
}
827-
for ctrIdx := range podSpec.InitContainers {
828-
removeVolumeMount(&podSpec.InitContainers[ctrIdx], volName)
829-
}
830-
}
831-
return podSpec
832-
}
833-
834-
func removeVolumeMount(ctr *corev1.Container, volumeName string) {
835-
mntIdx := slices.IndexFunc(ctr.VolumeMounts, func(mnt corev1.VolumeMount) bool {
836-
return mnt.Name == volumeName
837-
})
838-
if mntIdx >= 0 {
839-
ctr.VolumeMounts = slices.Delete(ctr.VolumeMounts, mntIdx, mntIdx+1)
840-
}
841-
}
842-
843792
// reducedContainerState is the subset of `corev1.ContainerState` that we want to log
844793
type reducedContainerState struct {
845794
State corev1.ContainerState
@@ -872,37 +821,6 @@ func getReducedInferenceContainerState(from *corev1.Pod) *reducedContainerState
872821
return &ans
873822
}
874823

875-
// getInferenceServerPort, given a server-providing Pod,
876-
// returns (containerIndex int, port int16, err error)
877-
func getInferenceServerPort(pod *corev1.Pod) (int, int16, error) {
878-
// identify the inference server container
879-
cIdx := slices.IndexFunc(pod.Spec.Containers, func(c corev1.Container) bool {
880-
return c.Name == api.InferenceServerContainerName
881-
})
882-
if cIdx == -1 {
883-
return 0, 0, fmt.Errorf("container %q not found", api.InferenceServerContainerName)
884-
}
885-
isCtr := &pod.Spec.Containers[cIdx]
886-
if isCtr.ReadinessProbe == nil {
887-
return 0, 0, errors.New("the inference server container has no readinessProbe")
888-
} else if isCtr.ReadinessProbe.HTTPGet == nil {
889-
return 0, 0, fmt.Errorf("the readinessProbe is not an HTTPGet")
890-
}
891-
portIOS := isCtr.ReadinessProbe.HTTPGet.Port
892-
switch portIOS.Type {
893-
case intstr.Int:
894-
return cIdx, int16(portIOS.IntVal), nil
895-
case intstr.String:
896-
if portIOS.StrVal == "http" || portIOS.StrVal == "HTTP" {
897-
return cIdx, 80, nil
898-
} else {
899-
return 0, 0, fmt.Errorf("unsupported readinessProbe port %q", portIOS.StrVal)
900-
}
901-
default:
902-
return 0, 0, fmt.Errorf("the readinessProbe port has unexpected type %q", portIOS.Type)
903-
}
904-
}
905-
906824
func (ctl *controller) querySleeping(ctx context.Context, providingPod *corev1.Pod, serverPort int16) (bool, error) {
907825
queryURL := fmt.Sprintf("http://%s:%d/is_sleeping", providingPod.Status.PodIP, serverPort)
908826
body, err := doGet(queryURL)
@@ -1018,15 +936,6 @@ func doPost(url string) error {
1018936
return nil
1019937
}
1020938

1021-
func isPodReady(pod *corev1.Pod) bool {
1022-
for _, cond := range pod.Status.Conditions {
1023-
if cond.Type == corev1.PodReady && cond.Status == corev1.ConditionTrue {
1024-
return true
1025-
}
1026-
}
1027-
return false
1028-
}
1029-
1030939
var coreScheme *k8sruntime.Scheme
1031940
var codecFactory k8sserializer.CodecFactory
1032941
var podDecoder k8sruntime.Decoder

pkg/controller/utils/pod-helper.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package utils
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"regexp"
7+
"slices"
8+
9+
corev1 "k8s.io/api/core/v1"
10+
"k8s.io/apimachinery/pkg/util/intstr"
11+
12+
"github.com/llm-d-incubation/llm-d-fast-model-actuation/pkg/api"
13+
)
14+
15+
var apiAccessRE = regexp.MustCompile(`^kube-api-access-[a-z0-9]+$`)
16+
17+
// PodIsInTrouble is both (a) some container restarts and (b) Pod not ready
18+
func PodIsInTrouble(pod *corev1.Pod) bool {
19+
var sumRestarts int32
20+
for _, ctrStat := range pod.Status.ContainerStatuses {
21+
sumRestarts += ctrStat.RestartCount
22+
}
23+
if sumRestarts == 0 {
24+
return false
25+
}
26+
condIdx := slices.IndexFunc(pod.Status.Conditions, func(cond corev1.PodCondition) bool {
27+
return cond.Type == "Ready"
28+
})
29+
if condIdx >= 0 {
30+
if pod.Status.Conditions[condIdx].Status == corev1.ConditionTrue {
31+
return false
32+
}
33+
}
34+
return true
35+
}
36+
37+
// DeIndividualize removes the parts of a PodSpec that are specific to an individual.
38+
// This func side-effects the given `*PodSpec` and returns it.
39+
func DeIndividualize(podSpec *corev1.PodSpec) *corev1.PodSpec {
40+
podSpec.EphemeralContainers = nil // these may not be given in Create
41+
// The api-access Volume is individualized
42+
volIdx := slices.IndexFunc(podSpec.Volumes, func(vol corev1.Volume) bool {
43+
return apiAccessRE.MatchString(vol.Name)
44+
})
45+
if volIdx >= 0 {
46+
volName := podSpec.Volumes[volIdx].Name
47+
podSpec.Volumes = slices.Delete(podSpec.Volumes, volIdx, volIdx+1)
48+
for ctrIdx := range podSpec.Containers {
49+
removeVolumeMount(&podSpec.Containers[ctrIdx], volName)
50+
}
51+
for ctrIdx := range podSpec.InitContainers {
52+
removeVolumeMount(&podSpec.InitContainers[ctrIdx], volName)
53+
}
54+
}
55+
return podSpec
56+
}
57+
58+
func removeVolumeMount(ctr *corev1.Container, volumeName string) {
59+
mntIdx := slices.IndexFunc(ctr.VolumeMounts, func(mnt corev1.VolumeMount) bool {
60+
return mnt.Name == volumeName
61+
})
62+
if mntIdx >= 0 {
63+
ctr.VolumeMounts = slices.Delete(ctr.VolumeMounts, mntIdx, mntIdx+1)
64+
}
65+
}
66+
67+
// GetInferenceServerPort, given a server-providing Pod,
68+
// returns (containerIndex int, port int16, err error)
69+
func GetInferenceServerPort(pod *corev1.Pod) (int, int16, error) {
70+
// identify the inference server container
71+
cIdx := slices.IndexFunc(pod.Spec.Containers, func(c corev1.Container) bool {
72+
return c.Name == api.InferenceServerContainerName
73+
})
74+
if cIdx == -1 {
75+
return 0, 0, fmt.Errorf("container %q not found", api.InferenceServerContainerName)
76+
}
77+
isCtr := &pod.Spec.Containers[cIdx]
78+
if isCtr.ReadinessProbe == nil {
79+
return 0, 0, errors.New("the inference server container has no readinessProbe")
80+
} else if isCtr.ReadinessProbe.HTTPGet == nil {
81+
return 0, 0, fmt.Errorf("the readinessProbe is not an HTTPGet")
82+
}
83+
portIOS := isCtr.ReadinessProbe.HTTPGet.Port
84+
switch portIOS.Type {
85+
case intstr.Int:
86+
return cIdx, int16(portIOS.IntVal), nil
87+
case intstr.String:
88+
if portIOS.StrVal == "http" || portIOS.StrVal == "HTTP" {
89+
return cIdx, 80, nil
90+
} else {
91+
return 0, 0, fmt.Errorf("unsupported readinessProbe port %q", portIOS.StrVal)
92+
}
93+
default:
94+
return 0, 0, fmt.Errorf("the readinessProbe port has unexpected type %q", portIOS.Type)
95+
}
96+
}
97+
98+
func IsPodReady(pod *corev1.Pod) bool {
99+
for _, cond := range pod.Status.Conditions {
100+
if cond.Type == corev1.PodReady && cond.Status == corev1.ConditionTrue {
101+
return true
102+
}
103+
}
104+
return false
105+
}

0 commit comments

Comments
 (0)