Skip to content

Commit f323a8f

Browse files
authored
Garbage collect obsolete vLLM instances (llm-d-incubation#428)
* Delete obsolete sleeping instances Signed-off-by: Jun Duan <jun.duan.phd@outlook.com> * Add possible cleanup for obsolete and previously awake instance when unbinding Signed-off-by: Jun Duan <jun.duan.phd@outlook.com> * Make use of the controller's workqueue for inferenceserverconfigs Signed-off-by: Jun Duan <jun.duan.phd@outlook.com> * Improve comments about the queue item types Signed-off-by: Jun Duan <jun.duan.phd@outlook.com> * Delegate GC of instances to each node as a local task Signed-off-by: Jun Duan <jun.duan.phd@outlook.com> --------- Signed-off-by: Jun Duan <jun.duan.phd@outlook.com>
1 parent 8582db6 commit f323a8f

File tree

3 files changed

+263
-39
lines changed

3 files changed

+263
-39
lines changed

pkg/controller/dual-pods/controller.go

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"errors"
2323
"fmt"
2424
"math"
25+
"reflect"
2526
"slices"
2627
"strconv"
2728
"strings"
@@ -75,17 +76,24 @@ import (
7576
// deletion of any server-providing Pod. Nor does the controller ever try to bind
7677
// one that is unbound; they are only created in the bound state.
7778

78-
// There are two types of item in the controller's work queue.
79+
// There are three types of item in the controller's work queue.
7980
// One is a reference to the gpu-map ConfigMap.
8081

81-
// The other type of queue item is a reference to an inference server.
82+
// The second type of queue item is a reference to a Node.
83+
// The are two types for an item on a Node:
84+
// 1) A reference to an inference server.
8285
// This reference carries the inference server's UID and the name
8386
// of the server-requesting Pod.
8487
// An inference server's UID is the UID of the server-requesting Pod.
88+
// 2) A reference to an unbound launcher-based server-providing Pod.
8589

86-
const requesterAnnotationKey = "dual-pods.llm-d.ai/requester"
87-
const nominalHashAnnotationKey = "dual-pods.llm-d.ai/nominal"
88-
const iscLabelKeysAnnotationKey = "dual-pods.llm-d.ai/isc-label-keys"
90+
// The third type of queue item is a reference to an InferenceServerConfig.
91+
// It is enqueued when an ISC's spec changes, to trigger cleanup of any
92+
// sleeping launcher instances whose configuration is now obsolete.
93+
94+
const requesterAnnotationKey = "dual-pods.llm-d.ai/requester"
95+
const nominalHashAnnotationKey = "dual-pods.llm-d.ai/nominal"
96+
const iscLabelKeysAnnotationKey = "dual-pods.llm-d.ai/isc-label-keys"
8997
const iscAnnotationKeysAnnotationKey = "dual-pods.llm-d.ai/isc-annotation-keys"
9098

9199
const providerFinalizer = "dual-pods.llm-d.ai/provider"
@@ -341,6 +349,10 @@ type unboundLauncherPodItem struct {
341349
NodeName string
342350
}
343351

352+
type instanceGCItem struct {
353+
ISCName string
354+
}
355+
344356
type infSvrItemType string
345357

346358
const (
@@ -460,7 +472,7 @@ func (ctl *controller) OnAdd(obj any, isInInitialList bool) {
460472
ctl.Queue.Add(nodeItem{nodeName})
461473
}
462474
case *fmav1alpha1.InferenceServerConfig:
463-
ctl.enqueueRequestersByInferenceServerConfig(typed, isInInitialList)
475+
ctl.enqueueInfSvrItemsByISC(typed, isInInitialList)
464476
case *corev1.ConfigMap:
465477
if typed.Name != GPUMapName {
466478
ctl.enqueueLogger.V(5).Info("Ignoring ConfigMap that is not the GPU map", "ref", cache.MetaObjectToName(typed))
@@ -513,7 +525,11 @@ func (ctl *controller) OnUpdate(prev, obj any) {
513525
ctl.Queue.Add(nodeItem{nodeName})
514526
}
515527
case *fmav1alpha1.InferenceServerConfig:
516-
ctl.enqueueRequestersByInferenceServerConfig(typed, false)
528+
prevTyped, ok := prev.(*fmav1alpha1.InferenceServerConfig)
529+
if ok && !reflect.DeepEqual(prevTyped.Spec, typed.Spec) {
530+
ctl.enqueueInstanceGCItemsForISC(typed.Name)
531+
}
532+
ctl.enqueueInfSvrItemsByISC(typed, false)
517533
case *corev1.ConfigMap:
518534
if typed.Name != GPUMapName {
519535
ctl.enqueueLogger.V(5).Info("Ignoring ConfigMap that is not the GPU map", "ref", cache.MetaObjectToName(typed))
@@ -569,7 +585,7 @@ func (ctl *controller) OnDelete(obj any) {
569585
ctl.Queue.Add(nodeItem{nodeName})
570586
}
571587
case *fmav1alpha1.InferenceServerConfig:
572-
ctl.enqueueRequestersByInferenceServerConfig(typed, false)
588+
ctl.enqueueInfSvrItemsByISC(typed, false)
573589
case *corev1.ConfigMap:
574590
if typed.Name != GPUMapName {
575591
ctl.enqueueLogger.V(5).Info("Ignoring ConfigMap that is not the GPU map", "ref", cache.MetaObjectToName(typed))
@@ -648,6 +664,15 @@ func (item cmItem) process(ctx context.Context, ctl *controller) (error, bool) {
648664
return nil, false
649665
}
650666

667+
func (ctl *controller) enqueueInstanceGCItemsForISC(iscName string) {
668+
ctl.mutex.Lock()
669+
defer ctl.mutex.Unlock()
670+
for nodeName, nodeDat := range ctl.nodeNameToData {
671+
nodeDat.add(instanceGCItem{ISCName: iscName})
672+
ctl.Queue.Add(nodeItem{nodeName})
673+
}
674+
}
675+
651676
func (ctl *controller) enqueueRequesters(ctx context.Context) {
652677
ctl.mutex.Lock()
653678
defer ctl.mutex.Unlock()
@@ -666,7 +691,7 @@ func (ctl *controller) enqueueRequesters(ctx context.Context) {
666691
}
667692
}
668693

669-
func (ctl *controller) enqueueRequestersByInferenceServerConfig(isc *fmav1alpha1.InferenceServerConfig, isInInitialList bool) {
694+
func (ctl *controller) enqueueInfSvrItemsByISC(isc *fmav1alpha1.InferenceServerConfig, isInInitialList bool) {
670695
inferenceServerConfigName := isc.Name
671696
requesters, err := ctl.podInformer.GetIndexer().ByIndex(inferenceServerConfigIndexName, inferenceServerConfigName)
672697
if err != nil {
@@ -792,3 +817,4 @@ func (ctl *controller) clearServerData(nodeDat *nodeData, uid apitypes.UID) {
792817
defer ctl.mutex.Unlock()
793818
delete(nodeDat.InferenceServers, uid)
794819
}
820+

pkg/controller/dual-pods/inference-server.go

Lines changed: 171 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat *
266266
if providingPod.Labels != nil {
267267
_, providingPodLauncherBased = providingPod.Labels[ctlrcommon.LauncherConfigNameLabelKey]
268268
}
269-
err := ctl.ensureUnbound(ctx, serverDat, providingPod, providingPodLauncherBased)
269+
err := ctl.ensureUnbound(ctx, serverDat, nodeDat, providingPod, providingPodLauncherBased)
270270
if err != nil {
271271
return err, true
272272
}
@@ -334,7 +334,7 @@ func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat *
334334
if serverDat.GPUIDsStr == nil {
335335
logger.V(5).Info("Querying accelerators", "ip", requesterIP, "port", adminPort)
336336
url := fmt.Sprintf("http://%s:%s%s", requesterIP, adminPort, stubapi.AcceleratorQueryPath)
337-
gpuUUIDs, err := getGPUUUIDs(url)
337+
gpuUUIDs, err := getGPUUUIDs(ctx, url)
338338
if err != nil {
339339
queryErr := fmt.Errorf("GET %q fails: %s", url, err.Error())
340340
updateErr, _ := ctl.ensureReqStatus(ctx, requestingPod, serverDat, queryErr.Error())
@@ -1118,36 +1118,115 @@ func (ctl *controller) removeProviderFinalizer(ctx context.Context, providingPod
11181118
return false, nil // no change
11191119
}
11201120

1121+
func (item instanceGCItem) process(ctx context.Context, ctl *controller, nodeDat *nodeData) (error, bool) {
1122+
logger := klog.FromContext(ctx).WithValues("iscName", item.ISCName)
1123+
1124+
isc, err := ctl.iscLister.InferenceServerConfigs(ctl.namespace).Get(item.ISCName)
1125+
if err != nil {
1126+
if apierrors.IsNotFound(err) {
1127+
return nil, false
1128+
}
1129+
return err, true
1130+
}
1131+
1132+
for launcherPodName, launcherDat := range nodeDat.Launchers {
1133+
launcherPod, err := ctl.podLister.Pods(ctl.namespace).Get(launcherPodName)
1134+
if err != nil {
1135+
if apierrors.IsNotFound(err) {
1136+
continue
1137+
}
1138+
logger.Error(err, "Failed to get launcher pod during instance GC", "launcherPod", launcherPodName)
1139+
continue
1140+
}
1141+
if launcherPod.DeletionTimestamp != nil || launcherPod.Status.PodIP == "" {
1142+
continue
1143+
}
1144+
launcherBaseURL := fmt.Sprintf("http://%s:%d", launcherPod.Status.PodIP, ctlrcommon.LauncherServicePort)
1145+
lClient, err := NewLauncherClient(launcherBaseURL)
1146+
if err != nil {
1147+
logger.Error(err, "Failed to create launcher client during instance GC", "launcherPod", launcherPodName)
1148+
continue
1149+
}
1150+
allInsts, err := lClient.ListInstances(ctx)
1151+
if err != nil {
1152+
logger.Error(err, "Failed to list instances during instance GC", "launcherPod", launcherPodName)
1153+
continue
1154+
}
1155+
for _, inst := range allInsts.Instances {
1156+
if inst.Annotations[VllmConfigISCNameAnnotationKey] != isc.Name {
1157+
continue
1158+
}
1159+
if len(inst.GpuUUIDs) == 0 {
1160+
logger.V(4).Info("Skipping instance GC: no GPU UUIDs", "launcherPod", launcherPodName, "instanceID", inst.InstanceID)
1161+
continue
1162+
}
1163+
_, currentHash, err := ctl.configInferenceServer(isc, inst.GpuUUIDs)
1164+
if err != nil {
1165+
logger.Error(err, "Failed to compute current hash during instance GC", "launcherPod", launcherPodName, "instanceID", inst.InstanceID)
1166+
continue
1167+
}
1168+
if inst.InstanceID == currentHash {
1169+
continue // not obsolete
1170+
}
1171+
sleeping, err := ctl.querySleeping(ctx, launcherPod, int16(isc.Spec.ModelServerConfig.Port))
1172+
if err != nil {
1173+
logger.Error(err, "Failed to query sleeping state during instance GC", "launcherPod", launcherPodName, "instanceID", inst.InstanceID)
1174+
continue
1175+
}
1176+
if !sleeping {
1177+
logger.V(4).Info("Skipping instance GC: instance not explicitly sleeping", "launcherPod", launcherPodName, "instanceID", inst.InstanceID)
1178+
continue
1179+
}
1180+
if _, err := lClient.DeleteInstance(ctx, inst.InstanceID); err != nil {
1181+
if !IsInstanceNotFoundError(err) {
1182+
logger.Error(err, "Failed to delete obsolete sleeping instance during GC", "launcherPod", launcherPodName, "instanceID", inst.InstanceID)
1183+
}
1184+
continue
1185+
}
1186+
delete(launcherDat.Instances, inst.InstanceID)
1187+
logger.V(2).Info("Deleted obsolete sleeping instance", "launcherPod", launcherPodName, "instanceID", inst.InstanceID, "currentHash", currentHash)
1188+
}
1189+
}
1190+
return nil, false
1191+
}
1192+
11211193
// Unbinds the given server-providing Pod.
1122-
func (ctl *controller) ensureUnbound(ctx context.Context, serverDat *serverData, providingPod *corev1.Pod, launcherBased bool) error {
1194+
func (ctl *controller) ensureUnbound(ctx context.Context, serverDat *serverData, nodeDat *nodeData, providingPod *corev1.Pod, launcherBased bool) error {
11231195
logger := klog.FromContext(ctx)
11241196
// A providingPod with no IP is not scheduled, so we know that it is not awake.
11251197
// If providingPod is stale then the update will fail.
11261198
if (serverDat.Sleeping == nil || !*(serverDat.Sleeping)) && providingPod.Status.PodIP != "" { // need to put to sleep
1127-
serverPort := serverDat.ServerPort
1128-
// TODO(waltforme): Is serverPort always set correctly for launcher-based server-providing Pods upon unbinding?
1129-
// E.g. What if requestingPod is deleted during a crash and restart of the dual-pods controller?
1130-
// In order to find the port in this case, I think the best effort is to recompute hash for all InferenceServerConfig objects and try to match.
1131-
if !launcherBased {
1132-
if serverDat.NominalProvidingPod == nil {
1133-
var err error
1134-
_, serverPort, err = utils.GetInferenceServerContainerIndexAndPort(providingPod)
1135-
if err != nil { // Impossible, because such a providingPod would never be created by this controller
1136-
return fmt.Errorf("unable to put server to sleep because port not known: %w", err)
1199+
// For launcher-based instances, check if the instance is already obsolete
1200+
// (i.e. its InferenceServerConfig was updated since the instance was created).
1201+
// If so, delete it from the launcher rather than putting it to sleep.
1202+
if launcherBased && ctl.maybeDeleteObsoleteInstance(ctx, serverDat, nodeDat, providingPod) {
1203+
serverDat.Sleeping = ptr.To(true)
1204+
} else {
1205+
serverPort := serverDat.ServerPort
1206+
// TODO(waltforme): Is serverPort always set correctly for launcher-based server-providing Pods upon unbinding?
1207+
// E.g. What if requestingPod is deleted during a crash and restart of the dual-pods controller?
1208+
// In order to find the port in this case, I think the best effort is to recompute hash for all InferenceServerConfig objects and try to match.
1209+
if !launcherBased {
1210+
if serverDat.NominalProvidingPod == nil {
1211+
var err error
1212+
_, serverPort, err = utils.GetInferenceServerContainerIndexAndPort(providingPod)
1213+
if err != nil { // Impossible, because such a providingPod would never be created by this controller
1214+
return fmt.Errorf("unable to put server to sleep because port not known: %w", err)
1215+
}
11371216
}
11381217
}
1218+
endpoint := fmt.Sprintf("%s:%d", providingPod.Status.PodIP, serverPort)
1219+
sleepURL := "http://" + endpoint + "/sleep"
1220+
resp, err := http.Post(sleepURL, "", nil)
1221+
if err != nil {
1222+
return fmt.Errorf("failed to put provider %q to sleep, POST %s got error: %w", serverDat.ProvidingPodName, sleepURL, err)
1223+
}
1224+
if sc := resp.StatusCode; sc != http.StatusOK {
1225+
return fmt.Errorf("failed to put provider %q to sleep, POST %s returned status %d", serverDat.ProvidingPodName, sleepURL, sc)
1226+
}
1227+
serverDat.Sleeping = ptr.To(true)
1228+
logger.V(2).Info("Put inference server to sleep", "endpoint", endpoint)
11391229
}
1140-
endpoint := fmt.Sprintf("%s:%d", providingPod.Status.PodIP, serverPort)
1141-
sleepURL := "http://" + endpoint + "/sleep"
1142-
resp, err := http.Post(sleepURL, "", nil)
1143-
if err != nil {
1144-
return fmt.Errorf("failed to put provider %q to sleep, POST %s got error: %w", serverDat.ProvidingPodName, sleepURL, err)
1145-
}
1146-
if sc := resp.StatusCode; sc != http.StatusOK {
1147-
return fmt.Errorf("failed to put provider %q to sleep, POST %s returned status %d", serverDat.ProvidingPodName, sleepURL, sc)
1148-
}
1149-
serverDat.Sleeping = ptr.To(true)
1150-
logger.V(2).Info("Put inference server to sleep", "endpoint", endpoint)
11511230
}
11521231
providingPod = providingPod.DeepCopy()
11531232
var aChange, fChange bool
@@ -1227,6 +1306,64 @@ func (ctl *controller) ensureUnbound(ctx context.Context, serverDat *serverData,
12271306
return nil
12281307
}
12291308

1309+
// maybeDeleteObsoleteInstance checks whether the launcher-based instance is obsolete
1310+
// (its InferenceServerConfig was updated since the instance was created) and if so,
1311+
// deletes it from the launcher. Returns true if the instance was deleted.
1312+
// On any error, returns false so the caller falls through to the normal sleep path.
1313+
func (ctl *controller) maybeDeleteObsoleteInstance(ctx context.Context, serverDat *serverData, nodeDat *nodeData, providingPod *corev1.Pod) bool {
1314+
logger := klog.FromContext(ctx)
1315+
if serverDat.InstanceID == "" {
1316+
return false
1317+
}
1318+
launcherBaseURL := fmt.Sprintf("http://%s:%d", providingPod.Status.PodIP, ctlrcommon.LauncherServicePort)
1319+
lClient, err := NewLauncherClient(launcherBaseURL)
1320+
if err != nil {
1321+
logger.V(4).Info("Cannot check instance obsolescence: failed to create launcher client", "err", err)
1322+
return false
1323+
}
1324+
instState, err := lClient.GetInstanceState(ctx, serverDat.InstanceID)
1325+
if err != nil {
1326+
logger.V(4).Info("Cannot check instance obsolescence: failed to get instance state", "instanceID", serverDat.InstanceID, "err", err)
1327+
return false
1328+
}
1329+
iscName := instState.Annotations[VllmConfigISCNameAnnotationKey]
1330+
if iscName == "" {
1331+
logger.V(4).Info("Cannot check instance obsolescence: no ISC name annotation on instance", "instanceID", serverDat.InstanceID)
1332+
return false
1333+
}
1334+
currentISC, err := ctl.iscLister.InferenceServerConfigs(ctl.namespace).Get(iscName)
1335+
if err != nil {
1336+
logger.V(4).Info("Cannot check instance obsolescence: ISC not found", "iscName", iscName, "err", err)
1337+
return false
1338+
}
1339+
if len(instState.GpuUUIDs) == 0 {
1340+
logger.V(4).Info("Cannot check instance obsolescence: no GPU UUIDs on instance", "instanceID", serverDat.InstanceID)
1341+
return false
1342+
}
1343+
_, currentHash, err := ctl.configInferenceServer(currentISC, instState.GpuUUIDs)
1344+
if err != nil {
1345+
logger.V(4).Info("Cannot check instance obsolescence: failed to compute current hash", "iscName", iscName, "err", err)
1346+
return false
1347+
}
1348+
if currentHash == serverDat.InstanceID {
1349+
return false // not obsolete
1350+
}
1351+
// Instance is obsolete — delete from launcher instead of sleeping.
1352+
if _, err := lClient.DeleteInstance(ctx, serverDat.InstanceID); err != nil {
1353+
if !IsInstanceNotFoundError(err) {
1354+
logger.Error(err, "Failed to delete obsolete instance during unbinding",
1355+
"instanceID", serverDat.InstanceID)
1356+
return false
1357+
}
1358+
}
1359+
if launcherDat := nodeDat.Launchers[providingPod.Name]; launcherDat != nil {
1360+
delete(launcherDat.Instances, serverDat.InstanceID)
1361+
}
1362+
logger.V(2).Info("Deleted obsolete instance during unbinding",
1363+
"instanceID", serverDat.InstanceID, "currentHash", currentHash, "iscName", iscName)
1364+
return true
1365+
}
1366+
12301367
// getNominalServerProvidingPod returns the nominal server-providing Pod,
12311368
// which is cached in the serverData, computing the Pod if necessary.
12321369
// This also ensures that the serverData fields NominalProvidingPod and NominalProvidingPodHash
@@ -1375,7 +1512,7 @@ func getReducedInferenceContainerState(from *corev1.Pod) *reducedContainerState
13751512

13761513
func (ctl *controller) querySleeping(ctx context.Context, providingPod *corev1.Pod, serverPort int16) (bool, error) {
13771514
queryURL := fmt.Sprintf("http://%s:%d/is_sleeping", providingPod.Status.PodIP, serverPort)
1378-
body, err := doGet(queryURL)
1515+
body, err := doGet(ctx, queryURL)
13791516
if err != nil {
13801517
return false, err
13811518
}
@@ -1393,7 +1530,7 @@ func (ctl *controller) accelMemoryIsLowEnough(ctx context.Context, requestingPod
13931530
adminPort = api.AdminPortDefaultValue
13941531
}
13951532
url := fmt.Sprintf("http://%s:%s%s", requestingPod.Status.PodIP, adminPort, stubapi.AcceleratorMemoryQueryPath)
1396-
body, err := doGet(url)
1533+
body, err := doGet(ctx, url)
13971534
if err != nil {
13981535
return err
13991536
}
@@ -1598,12 +1735,16 @@ func init() {
15981735
podDecoder = codecFactory.UniversalDecoder(corev1.SchemeGroupVersion)
15991736
}
16001737

1601-
func doGet(url string) ([]byte, error) {
1738+
func doGet(ctx context.Context, url string) ([]byte, error) {
16021739
client := &http.Client{
16031740
Timeout: 5 * time.Second,
16041741
}
16051742

1606-
resp, err := client.Get(url)
1743+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
1744+
if err != nil {
1745+
return nil, fmt.Errorf("http get %q: %w", url, err)
1746+
}
1747+
resp, err := client.Do(req)
16071748
if err != nil {
16081749
return nil, fmt.Errorf("http get %q: %w", url, err)
16091750
}
@@ -1621,8 +1762,8 @@ func doGet(url string) ([]byte, error) {
16211762
}
16221763

16231764
// getGPUUUIDs does the HTTP GET on the given URL to fetch the assigned GPU UUIDs.
1624-
func getGPUUUIDs(url string) ([]string, error) {
1625-
body, err := doGet(url)
1765+
func getGPUUUIDs(ctx context.Context, url string) ([]string, error) {
1766+
body, err := doGet(ctx, url)
16261767
if err != nil {
16271768
return nil, err
16281769
}

0 commit comments

Comments
 (0)