Skip to content
Merged
2 changes: 1 addition & 1 deletion dockerfiles/Dockerfile.launcher.benchmark
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ FROM ${BASE_IMAGE}

WORKDIR /app

COPY inference_server/launcher/launcher.py inference_server/launcher/gputranslator.py /app/
COPY inference_server/launcher/launcher.py inference_server/launcher/gputranslator.py inference_server/launcher/launcher_pod_notifier.py /app/

# Install uvicorn for serving the launcher API, nvidia-ml-py for gputranslator and kubernetes
RUN pip install --root-user-action=ignore --no-cache-dir uvicorn nvidia-ml-py kubernetes
Expand Down
2 changes: 1 addition & 1 deletion dockerfiles/Dockerfile.launcher.cpu
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ FROM base-${TARGETARCH} AS final

WORKDIR /app

COPY inference_server/launcher/launcher.py inference_server/launcher/gputranslator.py /app/
COPY inference_server/launcher/launcher.py inference_server/launcher/gputranslator.py inference_server/launcher/launcher_pod_notifier.py /app/
RUN chmod a+x /app/launcher.py

# Install uvicorn for serving the launcher API and nvidia-ml-py for gputranslator
Expand Down
198 changes: 198 additions & 0 deletions inference_server/launcher/launcher_pod_notifier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
#!/usr/bin/env python3
# Copyright 2026 The llm-d Authors.

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Publish vLLM instance-state changes onto the enclosing Pod."""

import hashlib
import json
import logging
import os
import sys
import time
import urllib.error
import urllib.request
from typing import Any

from kubernetes import client, config
from kubernetes.client import ApiException

SIGNATURE_ANNOTATION = "dual-pods.llm-d.ai/vllm-instance-signature"

DEFAULT_BASE_URL = "http://127.0.0.1:8001"
DEFAULT_POLL_INTERVAL_SECONDS = 2.0
DEFAULT_ERROR_BACKOFF_SECONDS = 5.0
INFERENCE_SERVER_CONTAINER_NAME = "inference-server"


logger = logging.getLogger("launcher_pod_notifier")


def configure_logging() -> None:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)


def get_required_env(name: str) -> str:
value = os.getenv(name)
if not value:
raise RuntimeError(f"missing required environment variable {name}")
return value


def is_inference_server_ready(
api: client.CoreV1Api, namespace: str, pod_name: str
) -> bool:
"""Check if the inference-server container is ready in the pod."""
try:
pod = api.read_namespaced_pod(name=pod_name, namespace=namespace)
Comment thread
MikeSpreitzer marked this conversation as resolved.
if not pod.status or not pod.status.container_statuses:
return False

for container_status in pod.status.container_statuses:
if container_status.name == INFERENCE_SERVER_CONTAINER_NAME:
return container_status.ready or False

# inference-server container not found
return False
except Exception as exc:
logger.warning("Failed to check inference-server readiness: %s", exc)
return False


def fetch_launcher_state(base_url: str) -> dict[str, Any]:
url = f"{base_url}/v2/vllm/instances"
with urllib.request.urlopen(url, timeout=5) as response:
payload = json.load(response)
if not isinstance(payload, dict):
raise ValueError(f"launcher response is not an object: {payload!r}")
return payload


def canonicalize_launcher_state(payload: dict[str, Any]) -> list[tuple[str, str]]:
instances = payload.get("instances", [])
if not isinstance(instances, list):
raise ValueError(f"instances must be a list, got {type(instances).__name__}")
canonical_instances: list[tuple[str, str]] = []
for instance in instances:
Comment thread
MikeSpreitzer marked this conversation as resolved.
if not isinstance(instance, dict):
raise ValueError(f"unexpected instance entry: {instance!r}")
instance_id = str(instance.get("instance_id", ""))
status = str(instance.get("status", ""))
canonical_instances.append((instance_id, status))
canonical_instances.sort()
return canonical_instances


def compute_signature(payload: dict[str, Any]) -> str:
canonical = canonicalize_launcher_state(payload)
blob = json.dumps(canonical, separators=(",", ":")).encode("utf-8")
return hashlib.sha256(blob).hexdigest()


def load_incluster_client() -> client.CoreV1Api:
config.load_incluster_config()
return client.CoreV1Api()


def get_pod_annotations(
api: client.CoreV1Api, namespace: str, pod_name: str
) -> dict[str, str]:
pod = api.read_namespaced_pod(name=pod_name, namespace=namespace)
return pod.metadata.annotations or {}


def patch_pod_signature(
api: client.CoreV1Api, namespace: str, pod_name: str, signature: str
) -> None:
body = {
"metadata": {
"annotations": {
SIGNATURE_ANNOTATION: signature,
}
}
}
api.patch_namespaced_pod(name=pod_name, namespace=namespace, body=body)
logger.info(
"Published launcher state change",
extra={"pod": pod_name, "signature": signature},
)


def main() -> int:
configure_logging()

try:
pod_name = get_required_env("POD_NAME")
namespace = get_required_env("NAMESPACE")
except RuntimeError as exc:
logger.error("%s", exc)
return 1

base_url = os.getenv("LAUNCHER_BASE_URL", DEFAULT_BASE_URL).rstrip("/")
poll_interval = DEFAULT_POLL_INTERVAL_SECONDS
error_backoff = DEFAULT_ERROR_BACKOFF_SECONDS

try:
api = load_incluster_client()
except Exception as exc:
logger.error("Failed to initialize in-cluster Kubernetes client: %s", exc)
return 1

logger.info(
"Launcher Pod notifier started for pod %s in namespace %s against %s",
pod_name,
namespace,
base_url,
)

last_published_signature: str | None = None
inference_server_ready = False

while True:
try:
if not inference_server_ready:
if not is_inference_server_ready(api, namespace, pod_name):
logger.info("Inference-server container is not ready yet; retrying")
time.sleep(error_backoff)
continue
inference_server_ready = True
logger.info(
"Inference-server container is ready; starting notifier polling"
)

signature = compute_signature(fetch_launcher_state(base_url))
if signature != last_published_signature:
patch_pod_signature(api, namespace, pod_name, signature)
last_published_signature = signature
time.sleep(poll_interval)
except (
ApiException,
OSError,
TimeoutError,
ValueError,
urllib.error.HTTPError,
urllib.error.URLError,
) as exc:
logger.warning("Notifier failure: %s", exc)
time.sleep(error_backoff)
except Exception as exc:
logger.exception("Unexpected notifier failure: %s", exc)
time.sleep(error_backoff)


if __name__ == "__main__":
sys.exit(main())
9 changes: 8 additions & 1 deletion pkg/controller/generic/queue-work.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import (
"k8s.io/klog/v2"
)

const (
controllerQueuePerItemRetryMaxDelay = 20 * time.Second
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed?
Would a larger value be adequate?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now more events from the launchers are informed so the retry interval increases faster. This is needed to reduce the slack between 'vLLM instance ready' and 'controller's next retry'.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we need an even more aggressive solution to that problem. This latency is on the critical path that we want to minimize. OK if we address this in a later PR.

)

// QueueAndWorkers is generic code for a typical controller's workqueue and worker goroutines
// that pull from that queue.
type QueueAndWorkers[Item comparable] struct {
Expand Down Expand Up @@ -62,7 +66,10 @@ func newQueueAndWorkers[Item comparable](
ans := QueueAndWorkers[Item]{
ControllerName: controllerName,
Queue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[Item](),
workqueue.NewTypedWithMaxWaitRateLimiter(
workqueue.DefaultTypedControllerRateLimiter[Item](),
controllerQueuePerItemRetryMaxDelay,
),
workqueue.TypedRateLimitingQueueConfig[Item]{
Name: controllerName,
}),
Expand Down
49 changes: 49 additions & 0 deletions pkg/controller/utils/pod-helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ func BuildLauncherPodFromTemplate(template corev1.PodTemplateSpec, ns, nodeName,
pod.Spec.NodeSelector = make(map[string]string)
}
pod.Spec.NodeSelector["kubernetes.io/hostname"] = nodeName
addLauncherNotifierSidecar(pod, container.Image, container.ImagePullPolicy)
return pod, nil
}

Expand Down Expand Up @@ -271,3 +272,51 @@ func removeGPUResourceLimits(container *corev1.Container) {
container.Resources.Requests[corev1.ResourceName("nvidia.com/gpu")] = resource.MustParse("0")
}
}

func addLauncherNotifierSidecar(pod *corev1.Pod, launcherImage string, pullPolicy corev1.PullPolicy) {
const sidecarName = "state-change-reflector"
idx := slices.IndexFunc(pod.Spec.Containers, func(c corev1.Container) bool {
return c.Name == sidecarName
})

notifier := corev1.Container{
Name: sidecarName,
Image: launcherImage,
ImagePullPolicy: pullPolicy,
Command: []string{"python3", "/app/launcher_pod_notifier.py"},
Env: []corev1.EnvVar{
{
Name: "LAUNCHER_BASE_URL",
Value: fmt.Sprintf("http://127.0.0.1:%d", common.LauncherServicePort),
},
{
Name: "POD_NAME",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.name"},
},
},
{
Name: "NAMESPACE",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.namespace"},
},
},
},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("10m"),
corev1.ResourceMemory: resource.MustParse("64Mi"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("100m"),
corev1.ResourceMemory: resource.MustParse("128Mi"),
},
},
}

if idx >= 0 {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather define this case as a user error, to be reflected in .status.errors of the LauncherConfig.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make later in another PR?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK with me.

pod.Spec.Containers[idx] = notifier
return
}
pod.Spec.Containers = append(pod.Spec.Containers, notifier)
}
37 changes: 37 additions & 0 deletions test/e2e/mkobjs-openshift.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,42 @@ if [ -n "${RUNTIME_CLASS_NAME:-}" ]; then
fi

if out=$(kubectl apply "${ns_flag[@]}" -f - 2>&1 <<EOF
apiVersion: v1
kind: ServiceAccount
metadata:
name: launcher-$inst
labels:
fma-e2e-instance: "$inst"
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: launcher-pod-state-writer-$inst
labels:
fma-e2e-instance: "$inst"
rules:
- apiGroups:
- ""
resources:
- pods
verbs:
- get
- patch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: launcher-pod-state-writer-$inst
labels:
fma-e2e-instance: "$inst"
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: launcher-pod-state-writer-$inst
subjects:
- kind: ServiceAccount
name: launcher-$inst
---
apiVersion: fma.llm-d.ai/v1alpha1
kind: InferenceServerConfig
metadata:
Expand Down Expand Up @@ -120,6 +156,7 @@ spec:
podTemplate:
spec:
${runtime_class}
serviceAccountName: launcher-$inst
containers:
- name: inference-server
image: ${LAUNCHER_IMAGE}
Expand Down
7 changes: 7 additions & 0 deletions test/e2e/run-launcher-based.sh
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,13 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- pods
verbs:
- get
- patch
Comment thread
MikeSpreitzer marked this conversation as resolved.
EOF

kubectl create rolebinding testlauncher --role=testlauncher --serviceaccount=$(kubectl get sa default -o jsonpath={.metadata.namespace}):testlauncher
Expand Down
Loading