Skip to content

Commit 0ff27b3

Browse files
Merge pull request #88 from MikeSpreitzer/source-reorg
Controller source reorg
2 parents 7bc10ce + 6e3682e commit 0ff27b3

File tree

3 files changed

+253
-335
lines changed

3 files changed

+253
-335
lines changed

pkg/controller/dual-pods/controller.go

Lines changed: 81 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ import (
2323
"sync"
2424
"sync/atomic"
2525

26-
"github.com/spf13/pflag"
27-
2826
corev1 "k8s.io/api/core/v1"
2927
"k8s.io/apimachinery/pkg/api/errors"
3028
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -39,6 +37,22 @@ import (
3937
genctlr "github.com/llm-d-incubation/llm-d-fast-model-actuation/pkg/controller/generic"
4038
)
4139

40+
// This package implements the dual-pods controller.
41+
42+
// The controller works in the context of one Kubernetes API namespace.
43+
44+
// A Pod is a server-requesting Pod if it has the server patch annotation.
45+
// A Pod is a server-running Pod if has a controlling OwnerReference to
46+
// a Pod (the server-requesting Pod).
47+
48+
// There are two types of item in the controller's work queue.
49+
// One is a reference to the gpu-map ConfigMap.
50+
51+
// The other type of queue item is a reference to an inference server.
52+
// This reference carries the inference server's UID and the name
53+
// of the server-requesting Pod.
54+
// An inference server's UID is the UID of the server-requesting Pod.
55+
4256
const ControllerName = "dual-pods-controller"
4357

4458
// GPUMapName is the name of the ConfigMap(s) parsed to discover the mapping from GPU UUID to location.
@@ -53,14 +67,6 @@ type Controller interface {
5367
Start(context.Context) error
5468
}
5569

56-
type CommonConfig struct {
57-
Verbosity int // `-v`
58-
}
59-
60-
func (cc *CommonConfig) AddToFlagSet(name string, flags *pflag.FlagSet) {
61-
flags.IntVar(&cc.Verbosity, name+"-verbosity", cc.Verbosity, "-v setting for "+name)
62-
}
63-
6470
// NewController makes a new dual pods controller.
6571
// The given namespace is the one to focus on.
6672
func NewController(
@@ -80,7 +86,6 @@ func NewController(
8086
cmLister: corev1PreInformers.ConfigMaps().Lister(),
8187
nodeInformer: corev1PreInformers.Nodes().Informer(),
8288
nodeLister: corev1PreInformers.Nodes().Lister(),
83-
requesters: make(map[string]*requesterData),
8489
inferenceServers: make(map[apitypes.UID]*serverData),
8590
}
8691
ctl.gpuMap.Store(&map[string]GpuLocation{})
@@ -106,16 +111,13 @@ type controller struct {
106111
cmLister corev1listers.ConfigMapLister
107112
nodeInformer cache.SharedIndexInformer
108113
nodeLister corev1listers.NodeLister
109-
genctlr.QueueAndWorkers[typedRef]
114+
genctlr.QueueAndWorkers[queueItem]
110115

111116
// gpuMaps maps GPU UUID to GpuLocation
112117
gpuMap atomic.Pointer[map[string]GpuLocation]
113118

114119
mutex sync.Mutex
115120

116-
// requesters maps sever-requesting Pod name to data
117-
requesters map[string]*requesterData
118-
119121
// inferenceServers maps UID of serve-requesting Pod to data
120122
inferenceServers map[apitypes.UID]*serverData
121123
}
@@ -127,116 +129,119 @@ type GpuLocation struct {
127129
Index uint
128130
}
129131

130-
type requesterData struct {
131-
PodUID apitypes.UID
132-
GPUIndices *string
133-
}
134-
135132
// Internal state about an inference server
136133
type serverData struct {
137-
requestingPodName string
134+
RequestingPodName string
135+
GPUIndices *string
138136
ReadinessRelayed *bool
137+
138+
// RequesterDeleteRequested carries this bit forward without waiting for notification
139+
// from apiserver. Remember there is no sync between the notification streams for
140+
// different objects.
141+
RequesterDeleteRequested bool
139142
}
140143

141-
type typedRef struct {
142-
Kind string
143-
cache.ObjectName
144+
type queueItem interface {
145+
// process returns (err error, retry bool).
146+
// There will be a retry iff `retry || err != nil`.
147+
process(ctx context.Context, ctl *controller) (error, bool)
144148
}
145149

146-
func (ref typedRef) String() string {
147-
return ref.Kind + ":" + ref.ObjectName.String()
150+
type cmItem struct {
151+
cache.ObjectName
148152
}
149153

150-
const podKind = "Pod"
151-
const cmKind = "ConfigMap"
154+
type infSvrItem struct {
155+
UID apitypes.UID
156+
// RequesterName is the name of the Pod that had this UID
157+
RequesterName string
158+
}
152159

153-
func (ctl *controller) careAbout(pod *corev1.Pod) bool {
160+
// careAbout returns infSvrItem, podIsRequester, have
161+
func (ctl *controller) careAbout(pod *corev1.Pod) (infSvrItem, bool, bool) {
154162
if len(pod.Annotations[api.ServerPatchAnnotationName]) > 0 {
155-
return true
163+
return infSvrItem{pod.UID, pod.Name}, true, true
156164
}
157-
_, owned := IsOwnedByRequest(pod)
158-
return owned
165+
owner, have := GetOwner(pod)
166+
return owner, false, have
159167
}
160168

161169
func (ctl *controller) OnAdd(obj any, isInInitialList bool) {
162-
var kind string
163-
var objM metav1.Object
164170
switch typed := obj.(type) {
165171
case *corev1.Pod:
166-
if !ctl.careAbout(typed) {
172+
if item, isReq, owned := ctl.careAbout(typed); !owned {
167173
ctl.enqueueLogger.V(5).Info("Ignoring irrelevant Pod", "name", typed.Name)
168174
return
175+
} else {
176+
ctl.enqueueLogger.V(5).Info("Enqueuing inference server reference due to notification of add", "item", item, "isReq", isReq, "isInInitialList", isInInitialList, "resourceVersion", typed.ResourceVersion)
177+
ctl.Queue.Add(item)
169178
}
170-
objM = typed
171-
kind = podKind
172179
case *corev1.ConfigMap:
173180
if typed.Name != GPUMapName {
174181
ctl.enqueueLogger.V(5).Info("Ignoring ConfigMap that is not the GPU map", "ref", cache.MetaObjectToName(typed))
175182
return
183+
} else {
184+
item := cmItem{cache.MetaObjectToName(typed)}
185+
ctl.enqueueLogger.V(5).Info("Enqueuing ConfigMap reference due to notification of add", "item", item, "isInInitialList", isInInitialList, "resourceVersion", typed.ResourceVersion)
186+
ctl.Queue.Add(item)
176187
}
177-
objM = typed
178-
kind = cmKind
179188
default:
180189
ctl.enqueueLogger.Error(nil, "Notified of add of unexpected type of object", "type", fmt.Sprintf("%T", obj))
181190
return
182191
}
183-
ref := typedRef{kind, cache.MetaObjectToName(objM)}
184-
ctl.enqueueLogger.V(5).Info("Enqueuing reference due to notification of add", "ref", ref, "isInInitialList", isInInitialList, "resourceVersion", objM.GetResourceVersion())
185-
ctl.Queue.Add(ref)
186-
187192
}
188193

189194
func (ctl *controller) OnUpdate(prev, obj any) {
190-
var kind string
191-
var objM metav1.Object
192195
switch typed := obj.(type) {
193196
case *corev1.Pod:
194-
if !ctl.careAbout(typed) {
197+
if item, isReq, owned := ctl.careAbout(typed); !owned {
198+
ctl.enqueueLogger.V(5).Info("Ignoring irrelevant Pod", "name", typed.Name)
195199
return
200+
} else {
201+
ctl.enqueueLogger.V(5).Info("Enqueuing inference server reference due to notification of update", "item", item, "isReq", isReq, "resourceVersion", typed.ResourceVersion)
202+
ctl.Queue.Add(item)
196203
}
197-
objM = typed
198-
kind = podKind
199204
case *corev1.ConfigMap:
200205
if typed.Name != GPUMapName {
206+
ctl.enqueueLogger.V(5).Info("Ignoring ConfigMap that is not the GPU map", "ref", cache.MetaObjectToName(typed))
201207
return
208+
} else {
209+
item := cmItem{cache.MetaObjectToName(typed)}
210+
ctl.enqueueLogger.V(5).Info("Enqueuing ConfigMap reference due to notification of update", "item", item, "resourceVersion", typed.ResourceVersion)
211+
ctl.Queue.Add(item)
202212
}
203-
objM = typed
204-
kind = cmKind
205213
default:
206214
ctl.enqueueLogger.Error(nil, "Notified of update of unexpected type of object", "type", fmt.Sprintf("%T", obj))
207215
return
208216
}
209-
ref := typedRef{kind, cache.MetaObjectToName(objM)}
210-
ctl.enqueueLogger.V(5).Info("Enqueuing reference due to notification of update", "ref", ref, "resourceVersion", objM.GetResourceVersion())
211-
ctl.Queue.Add(ref)
212217
}
213218

214219
func (ctl *controller) OnDelete(obj any) {
215220
if dfsu, ok := obj.(cache.DeletedFinalStateUnknown); ok {
216221
obj = dfsu.Obj
217222
}
218-
var kind string
219-
var objM metav1.Object
220223
switch typed := obj.(type) {
221224
case *corev1.Pod:
222-
if !ctl.careAbout(typed) {
225+
if item, isReq, owned := ctl.careAbout(typed); !owned {
226+
ctl.enqueueLogger.V(5).Info("Ignoring irrelevant Pod", "name", typed.Name)
223227
return
228+
} else {
229+
ctl.enqueueLogger.V(5).Info("Enqueuing inference server reference due to notification of delete", "item", item, "isReq", isReq, "resourceVersion", typed.ResourceVersion)
230+
ctl.Queue.Add(item)
224231
}
225-
objM = typed
226-
kind = podKind
227232
case *corev1.ConfigMap:
228233
if typed.Name != GPUMapName {
234+
ctl.enqueueLogger.V(5).Info("Ignoring ConfigMap that is not the GPU map", "ref", cache.MetaObjectToName(typed))
229235
return
236+
} else {
237+
item := cmItem{cache.MetaObjectToName(typed)}
238+
ctl.enqueueLogger.V(5).Info("Enqueuing ConfigMap reference due to notification of delete", "item", item, "resourceVersion", typed.ResourceVersion)
239+
ctl.Queue.Add(item)
230240
}
231-
objM = typed
232-
kind = cmKind
233241
default:
234242
ctl.enqueueLogger.Error(nil, "Notified of delete of unexpected type of object", "type", fmt.Sprintf("%T", obj))
235243
return
236244
}
237-
ref := typedRef{kind, cache.MetaObjectToName(objM)}
238-
ctl.enqueueLogger.V(5).Info("Enqueuing reference due to notification of delete", "ref", ref, "resourceVersion", objM.GetResourceVersion())
239-
ctl.Queue.Add(ref)
240245
}
241246

242247
func (ctl *controller) Start(ctx context.Context) error {
@@ -252,51 +257,13 @@ func (ctl *controller) Start(ctx context.Context) error {
252257

253258
// process returns (err error, retry bool).
254259
// There will be a retry iff `retry || err != nil`.
255-
func (ctl *controller) process(ctx context.Context, ref typedRef) (error, bool) {
256-
logger := klog.FromContext(ctx)
257-
switch ref.Kind {
258-
case podKind:
259-
return ctl.processPod(ctx, ref.ObjectName)
260-
case cmKind:
261-
return ctl.processConfigMap(ctx, ref.ObjectName)
262-
default:
263-
logger.Error(nil, "Asked to process unexpected Kind of object", "kind", ref.Kind)
264-
return nil, false
265-
}
266-
}
267-
268-
func (ctl *controller) processPod(ctx context.Context, podRef cache.ObjectName) (error, bool) {
269-
logger := klog.FromContext(ctx)
270-
logger.V(5).Info("Processing Pod", "name", podRef.Name)
271-
272-
got, err := ctl.podLister.Pods(podRef.Namespace).Get(podRef.Name)
273-
if err != nil {
274-
if errors.IsNotFound(err) {
275-
// Two cases are possible.
276-
// (1) This is a server-requesting Pod, in which case Kube GC will
277-
// delete the server-running Pod (once this controller removes the runner's finalizer).
278-
// (2) This is a server-running Pod and this controller has already
279-
// removed its finalizer and started deletion of the server-requesting Pod.
280-
logger.V(5).Info("Pod not found, nothing to do", "name", podRef.Name)
281-
ctl.clearRequesterData(podRef.Name)
282-
return nil, false
283-
}
284-
logger.Error(err, "Failed to get Pod", "name", podRef.Name)
285-
return err, true
286-
}
287-
288-
logger.V(5).Info("Pod exists", "labels", got.Labels, "IP", got.Status.PodIP, "resourceVersion", got.ResourceVersion)
289-
patch := got.Annotations[api.ServerPatchAnnotationName]
290-
if len(patch) > 0 {
291-
return ctl.processServerRequestingPod(ctx, got, patch)
292-
} else {
293-
return ctl.processServerRunningPod(ctx, got)
294-
}
260+
func (ctl *controller) process(ctx context.Context, item queueItem) (error, bool) {
261+
return item.process(ctx, ctl)
295262
}
296263

297-
func (ctl *controller) processConfigMap(ctx context.Context, cmRef cache.ObjectName) (error, bool) {
264+
func (item cmItem) process(ctx context.Context, ctl *controller) (error, bool) {
298265
logger := klog.FromContext(ctx)
299-
cm, err := ctl.coreclient.ConfigMaps(cmRef.Namespace).Get(ctx, cmRef.Name, metav1.GetOptions{})
266+
cm, err := ctl.coreclient.ConfigMaps(item.Namespace).Get(ctx, item.Name, metav1.GetOptions{})
300267
if err != nil {
301268
if errors.IsNotFound(err) {
302269
ctl.gpuMap.Store(nil)
@@ -336,47 +303,26 @@ func (ctl *controller) enqueueRequesters(ctx context.Context) {
336303
ctl.mutex.Lock()
337304
defer ctl.mutex.Unlock()
338305
logger := klog.FromContext(ctx)
339-
for reqPodName := range ctl.requesters {
340-
ref := typedRef{Kind: podKind, ObjectName: cache.ObjectName{Namespace: ctl.namespace, Name: reqPodName}}
341-
logger.V(5).Info("Enqueuing server-requesting Pod because of change to GPU map", "ref", ref)
342-
ctl.Queue.Add(ref)
306+
for infSvrUID, serverDat := range ctl.inferenceServers {
307+
item := infSvrItem{infSvrUID, serverDat.RequestingPodName}
308+
logger.V(5).Info("Enqueuing inference server because of change to GPU map", "item", item)
309+
ctl.Queue.Add(item)
343310
}
344311
}
345312

346-
func (ctl *controller) getRequesterData(name string, podUID apitypes.UID, insist bool) *requesterData {
347-
ctl.mutex.Lock()
348-
defer ctl.mutex.Unlock()
349-
ans := ctl.requesters[name]
350-
if ans == nil && insist || ans != nil && podUID != ans.PodUID {
351-
ans = &requesterData{PodUID: podUID}
352-
ctl.requesters[name] = ans
353-
}
354-
return ans
355-
}
356-
357-
func (ctl *controller) clearRequesterData(name string) {
358-
ctl.mutex.Lock()
359-
defer ctl.mutex.Unlock()
360-
delete(ctl.requesters, name)
361-
}
362-
363-
func (ctl *controller) getServerData(reqName string, reqUID apitypes.UID, insist bool) *serverData {
313+
func (ctl *controller) getServerData(reqName string, reqUID apitypes.UID) *serverData {
364314
ctl.mutex.Lock()
365315
defer ctl.mutex.Unlock()
366316
ans := ctl.inferenceServers[reqUID]
367-
if ans == nil && insist {
368-
ans = &serverData{requestingPodName: reqName}
317+
if ans == nil {
318+
ans = &serverData{RequestingPodName: reqName}
369319
ctl.inferenceServers[reqUID] = ans
370320
}
371321
return ans
372322
}
373323

374-
func (ctl *controller) clearServerData(reqName string) {
324+
func (ctl *controller) clearServerData(uid apitypes.UID) {
375325
ctl.mutex.Lock()
376326
defer ctl.mutex.Unlock()
377-
for uid, serveDat := range ctl.inferenceServers {
378-
if serveDat.requestingPodName == reqName {
379-
delete(ctl.inferenceServers, uid)
380-
}
381-
}
327+
delete(ctl.inferenceServers, uid)
382328
}

0 commit comments

Comments
 (0)