Skip to content

Commit 7b72c50

Browse files
committed
The self-annotate approach
Signed-off-by: Jun Duan <jun.duan.phd@outlook.com>
1 parent 5333917 commit 7b72c50

File tree

5 files changed

+233
-2
lines changed

5 files changed

+233
-2
lines changed

dockerfiles/Dockerfile.launcher.benchmark

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ FROM ${BASE_IMAGE}
33

44
WORKDIR /app
55

6-
COPY inference_server/launcher/launcher.py inference_server/launcher/gputranslator.py /app/
6+
COPY inference_server/launcher/launcher.py inference_server/launcher/gputranslator.py inference_server/launcher/launcher_pod_notifier.py /app/
77

88
# Install uvicorn for serving the launcher API, nvidia-ml-py for gputranslator and kubernetes
99
RUN pip install --root-user-action=ignore --no-cache-dir uvicorn nvidia-ml-py kubernetes

dockerfiles/Dockerfile.launcher.cpu

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ FROM base-${TARGETARCH} AS final
1414

1515
WORKDIR /app
1616

17-
COPY inference_server/launcher/launcher.py inference_server/launcher/gputranslator.py /app/
17+
COPY inference_server/launcher/launcher.py inference_server/launcher/gputranslator.py inference_server/launcher/launcher_pod_notifier.py /app/
1818
RUN chmod a+x /app/launcher.py
1919

2020
# Install uvicorn for serving the launcher API and nvidia-ml-py for gputranslator
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
#!/usr/bin/env python3
2+
# Copyright 2026 The llm-d Authors.
3+
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
"""Publish vLLM instance-state changes onto the enclosing Pod."""
17+
18+
import hashlib
19+
import json
20+
import logging
21+
import os
22+
import sys
23+
import time
24+
import urllib.error
25+
import urllib.request
26+
from typing import Any
27+
28+
from kubernetes import client, config
29+
from kubernetes.client import ApiException
30+
31+
32+
SIGNATURE_ANNOTATION = "dual-pods.llm-d.ai/vllm-instance-signature"
33+
34+
DEFAULT_BASE_URL = "http://127.0.0.1:8001"
35+
DEFAULT_POLL_INTERVAL_SECONDS = 2.0
36+
DEFAULT_ERROR_BACKOFF_SECONDS = 5.0
37+
38+
39+
logger = logging.getLogger("launcher_pod_notifier")
40+
41+
42+
def configure_logging() -> None:
43+
logging.basicConfig(
44+
level=logging.INFO,
45+
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
46+
)
47+
48+
49+
def get_required_env(name: str) -> str:
50+
value = os.getenv(name)
51+
if not value:
52+
raise RuntimeError(f"missing required environment variable {name}")
53+
return value
54+
55+
56+
def fetch_launcher_state(base_url: str) -> dict[str, Any]:
57+
url = f"{base_url}/v2/vllm/instances"
58+
with urllib.request.urlopen(url, timeout=5) as response:
59+
payload = json.load(response)
60+
if not isinstance(payload, dict):
61+
raise ValueError(f"launcher response is not an object: {payload!r}")
62+
return payload
63+
64+
65+
def canonicalize_launcher_state(payload: dict[str, Any]) -> dict[str, Any]:
66+
instances = payload.get("instances", [])
67+
canonical_instances: list[dict[str, str]] = []
68+
for instance in instances:
69+
if not isinstance(instance, dict):
70+
raise ValueError(f"unexpected instance entry: {instance!r}")
71+
instance_id = str(instance.get("instance_id", ""))
72+
status = str(instance.get("status", ""))
73+
canonical_instances.append({"instance_id": instance_id, "status": status})
74+
canonical_instances.sort(key=lambda item: (item["instance_id"], item["status"]))
75+
return {
76+
"total_instances": int(payload.get("total_instances", len(canonical_instances))),
77+
"running_instances": int(payload.get("running_instances", 0)),
78+
"instances": canonical_instances,
79+
}
80+
81+
82+
def compute_signature(payload: dict[str, Any]) -> str:
83+
canonical = canonicalize_launcher_state(payload)
84+
blob = json.dumps(canonical, sort_keys=True, separators=(",", ":")).encode("utf-8")
85+
return hashlib.sha256(blob).hexdigest()
86+
87+
88+
def load_incluster_client() -> client.CoreV1Api:
89+
config.load_incluster_config()
90+
return client.CoreV1Api()
91+
92+
93+
def get_pod_annotations(api: client.CoreV1Api, namespace: str, pod_name: str) -> dict[str, str]:
94+
pod = api.read_namespaced_pod(name=pod_name, namespace=namespace)
95+
return pod.metadata.annotations or {}
96+
97+
98+
def patch_pod_annotations(
99+
api: client.CoreV1Api,
100+
namespace: str,
101+
pod_name: str,
102+
*,
103+
signature: str,
104+
) -> None:
105+
body = {
106+
"metadata": {
107+
"annotations": {
108+
SIGNATURE_ANNOTATION: signature,
109+
}
110+
}
111+
}
112+
api.patch_namespaced_pod(name=pod_name, namespace=namespace, body=body)
113+
114+
115+
def publish_if_changed(api: client.CoreV1Api, namespace: str, pod_name: str, signature: str) -> None:
116+
annotations = get_pod_annotations(api, namespace, pod_name)
117+
if annotations.get(SIGNATURE_ANNOTATION, "") == signature:
118+
return
119+
120+
patch_pod_annotations(api, namespace, pod_name, signature=signature)
121+
logger.info(
122+
"Published launcher state change",
123+
extra={"pod": pod_name, "signature": signature},
124+
)
125+
126+
127+
def main() -> int:
128+
configure_logging()
129+
130+
try:
131+
pod_name = get_required_env("POD_NAME")
132+
namespace = get_required_env("NAMESPACE")
133+
except RuntimeError as exc:
134+
logger.error("%s", exc)
135+
return 1
136+
137+
base_url = os.getenv("LAUNCHER_BASE_URL", DEFAULT_BASE_URL).rstrip("/")
138+
poll_interval = DEFAULT_POLL_INTERVAL_SECONDS
139+
error_backoff = DEFAULT_ERROR_BACKOFF_SECONDS
140+
141+
try:
142+
api = load_incluster_client()
143+
except Exception as exc:
144+
logger.error("Failed to initialize in-cluster Kubernetes client: %s", exc)
145+
return 1
146+
147+
logger.info(
148+
"Launcher Pod notifier started for pod %s in namespace %s against %s",
149+
pod_name,
150+
namespace,
151+
base_url,
152+
)
153+
154+
while True:
155+
try:
156+
signature = compute_signature(fetch_launcher_state(base_url))
157+
publish_if_changed(api, namespace, pod_name, signature)
158+
time.sleep(poll_interval)
159+
except (
160+
ApiException,
161+
OSError,
162+
TimeoutError,
163+
ValueError,
164+
urllib.error.HTTPError,
165+
urllib.error.URLError,
166+
) as exc:
167+
logger.warning("Notifier loop failed: %s", exc)
168+
time.sleep(error_backoff)
169+
except Exception as exc:
170+
logger.exception("Unexpected notifier failure: %s", exc)
171+
time.sleep(error_backoff)
172+
173+
174+
if __name__ == "__main__":
175+
sys.exit(main())

pkg/controller/utils/pod-helper.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ func BuildLauncherPodFromTemplate(template corev1.PodTemplateSpec, ns, nodeName,
228228
pod.Spec.NodeSelector = make(map[string]string)
229229
}
230230
pod.Spec.NodeSelector["kubernetes.io/hostname"] = nodeName
231+
ensureLauncherNotifierSidecar(pod, container.Image, container.ImagePullPolicy)
231232
return pod, nil
232233
}
233234

@@ -271,3 +272,51 @@ func removeGPUResourceLimits(container *corev1.Container) {
271272
container.Resources.Requests[corev1.ResourceName("nvidia.com/gpu")] = resource.MustParse("0")
272273
}
273274
}
275+
276+
func ensureLauncherNotifierSidecar(pod *corev1.Pod, launcherImage string, pullPolicy corev1.PullPolicy) {
277+
const sidecarName = "vllm-instance-notifier"
278+
idx := slices.IndexFunc(pod.Spec.Containers, func(c corev1.Container) bool {
279+
return c.Name == sidecarName
280+
})
281+
282+
notifier := corev1.Container{
283+
Name: sidecarName,
284+
Image: launcherImage,
285+
ImagePullPolicy: pullPolicy,
286+
Command: []string{"python3", "/app/launcher_pod_notifier.py"},
287+
Env: []corev1.EnvVar{
288+
{
289+
Name: "LAUNCHER_BASE_URL",
290+
Value: fmt.Sprintf("http://127.0.0.1:%d", common.LauncherServicePort),
291+
},
292+
{
293+
Name: "POD_NAME",
294+
ValueFrom: &corev1.EnvVarSource{
295+
FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.name"},
296+
},
297+
},
298+
{
299+
Name: "NAMESPACE",
300+
ValueFrom: &corev1.EnvVarSource{
301+
FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.namespace"},
302+
},
303+
},
304+
},
305+
Resources: corev1.ResourceRequirements{
306+
Requests: corev1.ResourceList{
307+
corev1.ResourceCPU: resource.MustParse("10m"),
308+
corev1.ResourceMemory: resource.MustParse("64Mi"),
309+
},
310+
Limits: corev1.ResourceList{
311+
corev1.ResourceCPU: resource.MustParse("100m"),
312+
corev1.ResourceMemory: resource.MustParse("128Mi"),
313+
},
314+
},
315+
}
316+
317+
if idx >= 0 {
318+
pod.Spec.Containers[idx] = notifier
319+
return
320+
}
321+
pod.Spec.Containers = append(pod.Spec.Containers, notifier)
322+
}

test/e2e/run-launcher-based.sh

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,13 @@ rules:
141141
- get
142142
- list
143143
- watch
144+
- apiGroups:
145+
- ""
146+
resources:
147+
- pods
148+
verbs:
149+
- get
150+
- patch
144151
EOF
145152

146153
kubectl create rolebinding testlauncher --role=testlauncher --serviceaccount=$(kubectl get sa default -o jsonpath={.metadata.namespace}):testlauncher

0 commit comments

Comments
 (0)