Skip to content

Commit c21c96a

Browse files
committed
Polish the launcher pod templete during the pro-active creation
1 parent bbdc661 commit c21c96a

File tree

3 files changed

+148
-1
lines changed

3 files changed

+148
-1
lines changed

pkg/api/interface.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,8 @@ const DualLabelName string = "dual-pods.llm-d.ai/dual"
114114
// (it does not rely on this label for anything).
115115
const SleepingLabelName string = "dual-pods.llm-d.ai/sleeping"
116116

117+
const NominalHashAnnotationKey = "dual-pods.llm-d.ai/nominal"
118+
117119
// SleepState is what HTTP GET /is_sleeping on an inference server
118120
// returns (as JSON).
119121
type SleepState struct {

pkg/controller/launcher-populator/interface.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,8 @@ const (
2626
LauncherConfigNameLabelKey = "dual-pods.llm-d.ai/launcher-config-name"
2727

2828
NodeNameLabelKey = "dual-pods.llm-d.ai/node-name"
29+
30+
PortDiscoveryAnnotationKey = "inference.networking.x-k8s.io/port-discovery"
31+
32+
PortDiscoveryAnnotationEmptyValue = "[]"
2933
)

pkg/controller/launcher-populator/populator.go

Lines changed: 142 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,21 @@ package launcherpopulator
1818

1919
import (
2020
"context"
21+
"crypto/sha256"
22+
"encoding/base64"
23+
"encoding/json"
2124
"fmt"
25+
"os"
2226

27+
"github.com/llm-d-incubation/llm-d-fast-model-actuation/pkg/api"
28+
dualpods "github.com/llm-d-incubation/llm-d-fast-model-actuation/pkg/controller/dual-pods"
29+
"github.com/llm-d-incubation/llm-d-fast-model-actuation/pkg/controller/utils"
2330
corev1 "k8s.io/api/core/v1"
2431
apierrors "k8s.io/apimachinery/pkg/api/errors"
32+
"k8s.io/apimachinery/pkg/api/resource"
2533
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2634
"k8s.io/apimachinery/pkg/labels"
35+
"k8s.io/apimachinery/pkg/util/intstr"
2736
"k8s.io/utils/ptr"
2837

2938
corev1preinformers "k8s.io/client-go/informers/core/v1"
@@ -383,7 +392,7 @@ func (ctl *controller) deleteExcessLaunchers(ctx context.Context, launchers []co
383392
func (ctl *controller) buildPodFromTemplate(template corev1.PodTemplateSpec, key NodeLauncherKey) *corev1.Pod {
384393
pod := &corev1.Pod{
385394
ObjectMeta: template.ObjectMeta,
386-
Spec: template.Spec,
395+
Spec: *utils.DeIndividualize(template.Spec.DeepCopy()),
387396
}
388397
pod.Namespace = ctl.namespace
389398
// Ensure labels are set
@@ -394,7 +403,139 @@ func (ctl *controller) buildPodFromTemplate(template corev1.PodTemplateSpec, key
394403
pod.Labels[LauncherGeneratedByLabelKey] = LauncherGeneratedByLabelValue
395404
pod.Labels[LauncherConfigNameLabelKey] = key.LauncherConfigName
396405
pod.Labels[NodeNameLabelKey] = key.NodeName
406+
pod.Labels[api.SleepingLabelName] = "false"
407+
408+
hasher := sha256.New()
409+
modifiedJSON, _ := json.Marshal(pod)
410+
hasher.Write(modifiedJSON)
411+
hasher.Write([]byte(";gpus="))
412+
hasher.Write([]byte("all")) //@TODO will be refined
413+
hasher.Write([]byte(";node="))
414+
hasher.Write([]byte(key.NodeName))
415+
var modifiedHash [sha256.Size]byte
416+
modifiedHashSl := hasher.Sum(modifiedHash[:0])
417+
nominalHash := base64.RawStdEncoding.EncodeToString(modifiedHashSl)
418+
419+
if pod.Annotations == nil {
420+
pod.Annotations = make(map[string]string)
421+
}
422+
pod.Annotations = dualpods.MapSet(pod.Annotations, api.NominalHashAnnotationKey, nominalHash)
423+
424+
pod.Annotations[PortDiscoveryAnnotationKey] = PortDiscoveryAnnotationEmptyValue
425+
426+
pod.Spec.RestartPolicy = corev1.RestartPolicyNever
427+
428+
// Process container list, keep only one container and apply fixed configuration
429+
if len(pod.Spec.Containers) == 0 {
430+
// If there are no containers in the template, create a default container
431+
pod.Spec.Containers = []corev1.Container{
432+
{
433+
Name: api.InferenceServerContainerName,
434+
},
435+
}
436+
} else {
437+
// If there are multiple containers in the template, keep only the first one and rename it to engine-pilot
438+
container := &pod.Spec.Containers[0]
439+
container.Name = api.InferenceServerContainerName
440+
}
441+
442+
container := &pod.Spec.Containers[0]
443+
// @TODO Should set to specified Launcher image, replacing the default image
444+
launcherImage := os.Getenv("LAUNCHER_IMAGE")
445+
if launcherImage != "" {
446+
container.Image = launcherImage
447+
}
448+
449+
// Ensure port configuration includes port 8001
450+
ensurePortExists(container, 8001, "health", corev1.ProtocolTCP)
451+
452+
// Configure required environment variables
453+
configureRequiredEnvVars(container)
454+
455+
// Set fixed liveness probe
456+
container.LivenessProbe = &corev1.Probe{
457+
ProbeHandler: corev1.ProbeHandler{
458+
HTTPGet: &corev1.HTTPGetAction{
459+
Path: "/health",
460+
Port: intstr.FromInt(8001),
461+
Scheme: corev1.URISchemeHTTP,
462+
},
463+
},
464+
InitialDelaySeconds: 10,
465+
PeriodSeconds: 20,
466+
TimeoutSeconds: 1,
467+
SuccessThreshold: 1,
468+
FailureThreshold: 3,
469+
}
470+
471+
// Remove nvidia.com/gpu from resource limits
472+
removeGPUResourceLimits(container)
473+
474+
// Remove nvidia.com/gpu from Pod-level resource overhead
475+
if pod.Spec.Overhead != nil {
476+
delete(pod.Spec.Overhead, corev1.ResourceName("nvidia.com/gpu"))
477+
}
478+
397479
// Assign to specific node
398480
pod.Spec.NodeName = key.NodeName
399481
return pod
400482
}
483+
484+
// ensurePortExists adds the specified port if it doesn't already exist
485+
func ensurePortExists(container *corev1.Container, port int32, name string, protocol corev1.Protocol) {
486+
portExists := false
487+
for _, existingPort := range container.Ports {
488+
if existingPort.ContainerPort == port {
489+
portExists = true
490+
break
491+
}
492+
}
493+
if !portExists {
494+
container.Ports = append(container.Ports, corev1.ContainerPort{
495+
Name: name,
496+
ContainerPort: port,
497+
Protocol: protocol,
498+
})
499+
}
500+
}
501+
502+
// configureRequiredEnvVars adds or updates required environment variables
503+
func configureRequiredEnvVars(container *corev1.Container) {
504+
envVars := map[string]string{
505+
"PYTHONPATH": "/app",
506+
"NVIDIA_VISIBLE_DEVICES": "all",
507+
"NVIDIA_DRIVER_CAPABILITIES": "compute,utility",
508+
"VLLM_SERVER_DEV_MODE": "1",
509+
}
510+
511+
// Create a mapping of existing environment variables for easy lookup
512+
existingEnv := make(map[string]*corev1.EnvVar)
513+
for i := range container.Env {
514+
envVar := &container.Env[i]
515+
existingEnv[envVar.Name] = envVar
516+
}
517+
518+
// Add or update required environment variables
519+
for envName, envValue := range envVars {
520+
if envVar, exists := existingEnv[envName]; exists {
521+
// If it already exists, update its value
522+
envVar.Value = envValue
523+
} else {
524+
// If it doesn't exist, add a new environment variable
525+
container.Env = append(container.Env, corev1.EnvVar{
526+
Name: envName,
527+
Value: envValue,
528+
})
529+
}
530+
}
531+
}
532+
533+
// removeGPUResourceLimits removes nvidia.com/gpu from container resource limits and requests
534+
func removeGPUResourceLimits(container *corev1.Container) {
535+
if container.Resources.Limits != nil {
536+
container.Resources.Limits[corev1.ResourceName("nvidia.com/gpu")] = resource.MustParse("0")
537+
}
538+
if container.Resources.Requests != nil {
539+
container.Resources.Requests[corev1.ResourceName("nvidia.com/gpu")] = resource.MustParse("0")
540+
}
541+
}

0 commit comments

Comments
 (0)