Skip to content

Commit 7bc10ce

Browse files
Merge pull request #85 from MikeSpreitzer/remember-readiness-relay
Milestone 1 stragglers
2 parents ceb7519 + 50340ff commit 7bc10ce

File tree

3 files changed

+101
-39
lines changed

3 files changed

+101
-39
lines changed

pkg/controller/dual-pods/controller.go

Lines changed: 67 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -71,16 +71,17 @@ func NewController(
7171
numWorkers int,
7272
) (*controller, error) {
7373
ctl := &controller{
74-
enqueueLogger: logger.WithName(ControllerName),
75-
coreclient: coreClient,
76-
namespace: namespace,
77-
podInformer: corev1PreInformers.Pods().Informer(),
78-
podLister: corev1PreInformers.Pods().Lister(),
79-
cmInformer: corev1PreInformers.ConfigMaps().Informer(),
80-
cmLister: corev1PreInformers.ConfigMaps().Lister(),
81-
nodeInformer: corev1PreInformers.Nodes().Informer(),
82-
nodeLister: corev1PreInformers.Nodes().Lister(),
83-
requesters: make(map[string]*requesterData),
74+
enqueueLogger: logger.WithName(ControllerName),
75+
coreclient: coreClient,
76+
namespace: namespace,
77+
podInformer: corev1PreInformers.Pods().Informer(),
78+
podLister: corev1PreInformers.Pods().Lister(),
79+
cmInformer: corev1PreInformers.ConfigMaps().Informer(),
80+
cmLister: corev1PreInformers.ConfigMaps().Lister(),
81+
nodeInformer: corev1PreInformers.Nodes().Informer(),
82+
nodeLister: corev1PreInformers.Nodes().Lister(),
83+
requesters: make(map[string]*requesterData),
84+
inferenceServers: make(map[apitypes.UID]*serverData),
8485
}
8586
ctl.gpuMap.Store(&map[string]GpuLocation{})
8687
ctl.QueueAndWorkers = genctlr.NewQueueAndWorkers(string(ControllerName), numWorkers, ctl.process)
@@ -114,8 +115,13 @@ type controller struct {
114115

115116
// requesters maps sever-requesting Pod name to data
116117
requesters map[string]*requesterData
118+
119+
// inferenceServers maps UID of serve-requesting Pod to data
120+
inferenceServers map[apitypes.UID]*serverData
117121
}
118122

123+
var _ Controller = &controller{}
124+
119125
type GpuLocation struct {
120126
Node string
121127
Index uint
@@ -126,7 +132,11 @@ type requesterData struct {
126132
GPUIndices *string
127133
}
128134

129-
var _ Controller = &controller{}
135+
// Internal state about an inference server
136+
type serverData struct {
137+
requestingPodName string
138+
ReadinessRelayed *bool
139+
}
130140

131141
type typedRef struct {
132142
Kind string
@@ -294,25 +304,45 @@ func (ctl *controller) processConfigMap(ctx context.Context, cmRef cache.ObjectN
294304
}
295305
return err, true
296306
}
307+
oldMap := ctl.gpuMap.Load()
297308
newMap := map[string]GpuLocation{}
298309
nodeCount := 0
310+
additions := 0
299311
for nodeName, mapStr := range cm.Data {
300-
var nodesMap map[string]uint
301-
err = json.Unmarshal([]byte(mapStr), &nodesMap)
312+
var newNodesMap map[string]uint
313+
err = json.Unmarshal([]byte(mapStr), &newNodesMap)
302314
if err != nil {
303315
logger.Error(err, "A GPU map entry failed to parse as JSON", "nodeName", nodeName)
304316
continue
305317
}
306-
for uuid, index := range nodesMap {
307-
newMap[uuid] = GpuLocation{Node: nodeName, Index: index}
318+
for uuid, index := range newNodesMap {
319+
newLoc := GpuLocation{Node: nodeName, Index: index}
320+
if oldMap == nil || (*oldMap)[uuid] != newLoc {
321+
additions++
322+
}
323+
newMap[uuid] = newLoc
308324
}
309325
nodeCount += 1
310326
}
311-
logger.V(1).Info("Parsed GPU map", "numNodes", nodeCount, "numGPUs", len(newMap))
327+
logger.V(1).Info("Parsed GPU map", "numNodes", nodeCount, "numGPUs", len(newMap), "additions", additions)
312328
ctl.gpuMap.Store(&newMap)
329+
if additions > 0 {
330+
ctl.enqueueRequesters(ctx)
331+
}
313332
return nil, false
314333
}
315334

335+
func (ctl *controller) enqueueRequesters(ctx context.Context) {
336+
ctl.mutex.Lock()
337+
defer ctl.mutex.Unlock()
338+
logger := klog.FromContext(ctx)
339+
for reqPodName := range ctl.requesters {
340+
ref := typedRef{Kind: podKind, ObjectName: cache.ObjectName{Namespace: ctl.namespace, Name: reqPodName}}
341+
logger.V(5).Info("Enqueuing server-requesting Pod because of change to GPU map", "ref", ref)
342+
ctl.Queue.Add(ref)
343+
}
344+
}
345+
316346
func (ctl *controller) getRequesterData(name string, podUID apitypes.UID, insist bool) *requesterData {
317347
ctl.mutex.Lock()
318348
defer ctl.mutex.Unlock()
@@ -329,3 +359,24 @@ func (ctl *controller) clearRequesterData(name string) {
329359
defer ctl.mutex.Unlock()
330360
delete(ctl.requesters, name)
331361
}
362+
363+
func (ctl *controller) getServerData(reqName string, reqUID apitypes.UID, insist bool) *serverData {
364+
ctl.mutex.Lock()
365+
defer ctl.mutex.Unlock()
366+
ans := ctl.inferenceServers[reqUID]
367+
if ans == nil && insist {
368+
ans = &serverData{requestingPodName: reqName}
369+
ctl.inferenceServers[reqUID] = ans
370+
}
371+
return ans
372+
}
373+
374+
func (ctl *controller) clearServerData(reqName string) {
375+
ctl.mutex.Lock()
376+
defer ctl.mutex.Unlock()
377+
for uid, serveDat := range ctl.inferenceServers {
378+
if serveDat.requestingPodName == reqName {
379+
delete(ctl.inferenceServers, uid)
380+
}
381+
}
382+
}

pkg/controller/dual-pods/server-requesting.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -105,20 +105,20 @@ func (ctl *controller) processServerRequestingPod(ctx context.Context, requestin
105105

106106
// use the server patch to build the server-running pod
107107
logger.V(5).Info("Building server-running pod from patch", "name", requestingPod.Name, "patch", serverPatch)
108-
serverRunningPod, err := composeServerRunningPod(ctx, requestingPod, serverPatch, *reqDat.GPUIndices, api.RunnerData{
108+
desiredRunningPod, err := composeServerRunningPod(ctx, requestingPod, serverPatch, *reqDat.GPUIndices, api.RunnerData{
109109
NodeName: requestingPod.Spec.NodeName,
110110
})
111111
if err != nil {
112112
return ctl.ensureReqStatus(ctx, requestingPod, fmt.Sprintf("failed to construct the nominal server-running Pod: %s", err.Error()))
113113
}
114114

115-
got, err := ctl.podLister.Pods(serverRunningPod.Namespace).Get(serverRunningPod.Name)
115+
actualRunningPod, err := ctl.podLister.Pods(desiredRunningPod.Namespace).Get(desiredRunningPod.Name)
116116
if err != nil && !apierrors.IsNotFound(err) {
117-
logger.Error(err, "Failed to get existing server-running pod", "name", serverRunningPod.Name)
117+
logger.Error(err, "Failed to get existing server-running pod", "name", desiredRunningPod.Name)
118118
return err, true
119119
}
120-
if got != nil {
121-
logger.V(5).Info("Server-running pod exists", "name", serverRunningPod.Name)
120+
if actualRunningPod != nil {
121+
logger.V(5).Info("Server-running pod exists", "name", desiredRunningPod.Name)
122122

123123
// TODO: we should reconcile the existing server-running pod with the one we just built
124124
return nil, false
@@ -131,8 +131,8 @@ func (ctl *controller) processServerRequestingPod(ctx context.Context, requestin
131131
return err, false
132132
}
133133

134-
logger.V(2).Info("Creating server-running pod", "name", serverRunningPod.Name, "namespace", serverRunningPod.Namespace, "annotations", serverRunningPod.Annotations, "labels", serverRunningPod.Labels)
135-
echo, err := ctl.coreclient.Pods(serverRunningPod.Namespace).Create(ctx, serverRunningPod, metav1.CreateOptions{})
134+
logger.V(2).Info("Creating server-running pod", "name", desiredRunningPod.Name, "namespace", desiredRunningPod.Namespace, "annotations", desiredRunningPod.Annotations, "labels", desiredRunningPod.Labels)
135+
echo, err := ctl.coreclient.Pods(desiredRunningPod.Namespace).Create(ctx, desiredRunningPod, metav1.CreateOptions{})
136136
if err != nil {
137137
errMsg := err.Error()
138138
if invalidPodRE.MatchString(errMsg) {
@@ -144,7 +144,7 @@ func (ctl *controller) processServerRequestingPod(ctx context.Context, requestin
144144
}
145145
return err, true
146146
}
147-
logger.V(5).Info("Created server-running pod", "name", serverRunningPod.Name, "annotations", echo.Annotations, "labels", echo.Labels, "resourceVersion", echo.ResourceVersion)
147+
logger.V(5).Info("Created server-running pod", "name", desiredRunningPod.Name, "annotations", echo.Annotations, "labels", echo.Labels, "resourceVersion", echo.ResourceVersion)
148148

149149
return ctl.ensureReqStatus(ctx, requestingPod)
150150
}

pkg/controller/dual-pods/server-running.go

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ func (ctl *controller) processServerRunningPod(ctx context.Context, runningPod *
6868
}
6969
}
7070

71+
if requestingPod == nil {
72+
ctl.clearServerData(requestingPodName)
73+
}
74+
7175
if requestingPod == nil || requestingPod.DeletionTimestamp != nil || runningPod.DeletionTimestamp != nil {
7276
podOps := ctl.coreclient.Pods(runningPod.Namespace)
7377
// Deletion requested, so remove finalizer and delete server-requesting Pod
@@ -90,27 +94,34 @@ func (ctl *controller) processServerRunningPod(ctx context.Context, runningPod *
9094
return err, false
9195
}
9296

97+
serverDat := ctl.getServerData(requestingPodName, requestingPod.UID, true)
98+
9399
// relay the readiness
94100
port := requestingPod.Annotations[api.AdminPortAnnotationName]
95101
if port == "" {
96102
port = api.AdminPortDefaultValue
97103
}
98-
url, readiness := fmt.Sprintf("http://%s:%s", requestingPod.Status.PodIP, port), ""
99-
if isPodReady(runningPod) {
100-
logger.V(5).Info("Server-running pod is ready", "name", runningPod.Name)
101-
url += stubapi.BecomeReadyPath
102-
readiness = "ready"
103-
} else {
104-
logger.V(5).Info("Server-running pod is not ready", "name", runningPod.Name)
105-
url += stubapi.BecomeUnreadyPath
106-
readiness = "unready"
107-
}
108-
err = postToReadiness(url)
109-
if err != nil {
110-
logger.Error(err, "Failed to relay the readiness", "name", runningPod.Name, "readiness", readiness)
111-
return err, true
104+
ready := isPodReady(runningPod)
105+
if serverDat.ReadinessRelayed == nil || ready != *serverDat.ReadinessRelayed {
106+
// TODO: use cache
107+
url, readiness := fmt.Sprintf("http://%s:%s", requestingPod.Status.PodIP, port), ""
108+
if ready {
109+
logger.V(5).Info("Server-running pod is ready", "name", runningPod.Name)
110+
url += stubapi.BecomeReadyPath
111+
readiness = "ready"
112+
} else {
113+
logger.V(5).Info("Server-running pod is not ready", "name", runningPod.Name)
114+
url += stubapi.BecomeUnreadyPath
115+
readiness = "unready"
116+
}
117+
err = postToReadiness(url)
118+
if err != nil {
119+
logger.Error(err, "Failed to relay the readiness", "name", runningPod.Name, "readiness", readiness)
120+
return err, true
121+
}
122+
serverDat.ReadinessRelayed = &ready
123+
logger.V(5).Info("Successfully relayed the readiness", "name", runningPod.Name, "readiness", readiness)
112124
}
113-
logger.V(5).Info("Successfully relayed the readiness", "name", runningPod.Name, "readiness", readiness)
114125

115126
logger.V(5).Info("Processed server-running pod", "name", runningPod.Name)
116127
return nil, false

0 commit comments

Comments
 (0)