forked from llm-d-incubation/llm-d-fast-model-actuation
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathinference-server.go
More file actions
1376 lines (1263 loc) · 56.3 KB
/
inference-server.go
File metadata and controls
1376 lines (1263 loc) · 56.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
Copyright 2025 The llm-d Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dualpods
import (
"bytes"
"context"
"crypto/sha256"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"os"
"regexp"
"slices"
"strconv"
"strings"
"text/template"
"time"
"github.com/llm-d-incubation/llm-d-fast-model-actuation/pkg/controller/utils"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
k8sserializer "k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
"sigs.k8s.io/yaml"
fmav1alpha1 "github.com/llm-d-incubation/llm-d-fast-model-actuation/api/fma/v1alpha1"
"github.com/llm-d-incubation/llm-d-fast-model-actuation/pkg/api"
ctlrcommon "github.com/llm-d-incubation/llm-d-fast-model-actuation/pkg/controller/common"
stubapi "github.com/llm-d-incubation/llm-d-fast-model-actuation/pkg/spi"
)
type nodeItem struct {
NodeName string
}
func (ni nodeItem) process(ctx context.Context, ctl *controller) (error, bool) {
logger := klog.FromContext(ctx).WithValues("node", ni.NodeName)
ctx = klog.NewContext(ctx, logger)
nodeDat := ctl.getNodeData(ni.NodeName)
items := nodeDat.yankItems()
var retries int
logger.V(4).Info("Processing items for node", "count", len(items))
for localItem := range items {
logger.V(4).Info("Processing node-local item", "item", localItem)
err, retry := localItem.process(ctx, ctl, nodeDat)
if err != nil {
if retry {
logger.Info("Processing node local item suffered transient error, will retry", "item", localItem, "err", err)
} else {
logger.Error(err, "Processing node local item failed", "item", localItem)
}
} else {
logger.V(4).Info("Finished processing node-local item", "item", localItem, "willRetry", retry)
}
if retry {
nodeDat.add(localItem)
retries++
}
}
logger.V(4).Info("Done processing items for node", "numToRetry", retries)
return nil, retries > 0
}
func (item launcherPodItem) process(ctx context.Context, ctl *controller, nodeDat *nodeData) (error, bool) {
logger := klog.FromContext(ctx).WithValues("launcherPod", item.LauncherPodName, "node", item.NodeName)
ctx = klog.NewContext(ctx, logger)
_, err := ctl.podLister.Pods(ctl.namespace).Get(item.LauncherPodName)
if err != nil {
if apierrors.IsNotFound(err) {
logger.V(2).Info("Launcher pod deleted, cleaning up launcher data")
ctl.clearLauncherData(nodeDat, item.LauncherPodName)
ctl.enqueueUnboundInfSvrItemsOnNode(ctx, item.NodeName, fmt.Sprintf("launcher pod %s deleted", item.LauncherPodName))
return nil, false
}
return err, true
}
ctl.enqueueUnboundInfSvrItemsOnNode(ctx, item.NodeName, fmt.Sprintf("launcher pod %s changed", item.LauncherPodName))
return nil, false
}
func (item infSvrItem) process(urCtx context.Context, ctl *controller, nodeDat *nodeData) (error, bool) {
logger := klog.FromContext(urCtx).WithValues("serverUID", item.UID, "requesterName", item.RequesterName)
ctx := klog.NewContext(urCtx, logger)
requesterRV := "(non existent)"
providerRV := "(non existent)"
serverDat := ctl.getServerData(nodeDat, item.RequesterName, item.UID)
var requesterDeletionTimestamp, providerDeletionTimestamp *string
var requesterRCS, providerRCS *reducedContainerState
requestingPod, err := ctl.podLister.Pods(ctl.namespace).Get(item.RequesterName)
if err != nil {
if apierrors.IsNotFound(err) {
requestingPod = nil
} else { // BTW, impossible
logger.Error(err, "Failed to get Pod")
return err, true
}
} else {
requesterRV = requestingPod.ResourceVersion
requesterDeletionTimestamp = TimePtrToStringPtr(requestingPod.DeletionTimestamp)
requesterRCS = getReducedInferenceContainerState(requestingPod)
}
var providingPod *corev1.Pod
providingPodAnys, err := ctl.podInformer.GetIndexer().ByIndex(requesterIndexName, string(item.UID))
if err != nil { //impossible
return err, false
}
switch len(providingPodAnys) {
case 0:
case 1:
providingPod = providingPodAnys[0].(*corev1.Pod)
providerRV = providingPod.ResourceVersion
providerDeletionTimestamp = TimePtrToStringPtr(providingPod.DeletionTimestamp)
providerRCS = getReducedInferenceContainerState(providingPod)
logger = logger.WithValues("providerName", providingPod.Name)
ctx = klog.NewContext(urCtx, logger)
serverDat.ProvidingPodName = providingPod.Name
default:
providerNames, _ := utils.SliceMap(providingPodAnys, func(podAny any) (string, error) {
pod := podAny.(*corev1.Pod)
return pod.Name, nil
})
return fmt.Errorf("found multiple bound server-running Pods: %v", providerNames), false
}
logger.V(5).Info("Processing inference server",
"requesterResourceVersion", requesterRV, "requesterDeletionTimestamp", requesterDeletionTimestamp,
"requesterRCS", requesterRCS,
"providerResourceVersion", providerRV, "providerDeletionTimestamp", providerDeletionTimestamp,
"providerRCS", providerRCS,
"GPUIDs", serverDat.GPUIDs)
podOps := ctl.coreclient.Pods(ctl.namespace)
// Delete the in-memory data after both Pods are gone.
if requestingPod == nil && providingPod == nil {
ctl.clearServerData(nodeDat, item.UID)
logger.V(2).Info("End of life of inference server")
return nil, false
}
// Decide what to do about the finalizer on the server-requesting Pod,
// and do it if that is a removal.
var shouldAddRequesterFinalizer bool
if requestingPod != nil {
removed, shouldAdd, err, retry := ctl.maybeRemoveRequesterFinalizer(ctx, requestingPod, providingPod)
if removed || err != nil {
return err, retry
}
shouldAddRequesterFinalizer = shouldAdd
}
// Handle the deletion of a server-providing Pod
if providingPod != nil && providingPod.DeletionTimestamp != nil {
if requestingPod != nil && requestingPod.DeletionTimestamp == nil {
// Reflect providingPod deletion to requestingPod deletion.
gonerRV := requesterRV
if shouldAddRequesterFinalizer { // don't let delete complete too quickly
gonerRV, err = ctl.addRequesterFinalizer(ctx, requestingPod, providingPod.Name)
if err != nil {
return err, true
}
}
err := podOps.Delete(ctx, requestingPod.Name, metav1.DeleteOptions{
PropagationPolicy: ptr.To(metav1.DeletePropagationBackground),
Preconditions: &metav1.Preconditions{UID: &item.UID, ResourceVersion: &gonerRV}})
if err == nil {
logger.V(2).Info("Requested deletion of server-requesting Pod because of deletion of server-providing Pod")
} else if apierrors.IsGone(err) || apierrors.IsNotFound(err) {
logger.V(5).Info("The server-requesting Pod is already gone")
} else {
return fmt.Errorf("failed to delete server-requesting Pod: %w", err), true
}
serverDat.RequesterDeleteRequested = true
}
// Ensure finalizer is absent from server-providing Pod so that its deletion can complete
changed, err := ctl.removeProviderFinalizer(ctx, providingPod)
if err != nil {
return err, true
}
if !changed {
logger.V(5).Info("Finalizer is absent from server-providing Pod, waiting for deletions to finish")
}
return nil, false
}
// Assert: providingPod == nil || providingPod.DeletionTimestamp == nil
// If the server-requesting Pod is absent or being deleted,
// ensure that the server-providing Pod is not bound.
if (requestingPod == nil || requestingPod.DeletionTimestamp != nil) && providingPod != nil {
// Time to unbind.
// As a special favor, delete providingPod if it is in trouble.
if utils.PodIsInTrouble(providingPod) {
err := podOps.Delete(ctx, providingPod.Name, metav1.DeleteOptions{
Preconditions: &metav1.Preconditions{UID: &providingPod.UID, ResourceVersion: &providingPod.ResourceVersion},
PropagationPolicy: ptr.To(metav1.DeletePropagationBackground),
})
if err == nil {
stJSON, marshalErr := json.Marshal(providingPod.Status)
logger.V(2).Info("Deleted server-providing Pod because it is in trouble", "providerName", providingPod.Name, "status", string(stJSON), "marshalErr", marshalErr)
return nil, false
} else if apierrors.IsNotFound(err) || apierrors.IsGone(err) {
logger.V(5).Info("Troubled server-providing Pod was concurrently deleted", "providerName", providingPod.Name)
} else {
logger.V(2).Info("Failed to delete troubled server-providing Pod", "providerName", providingPod.Name)
}
}
// since now requestingPod could be nil, use the providingPod's launcherConfigNameLabelKey
// to help determine whether providingPod is launcher-based
providingPodLauncherBased := false
if providingPod.Labels != nil {
_, providingPodLauncherBased = providingPod.Labels[ctlrcommon.LauncherConfigNameLabelKey]
}
err := ctl.ensureUnbound(ctx, serverDat, providingPod, providingPodLauncherBased)
if err != nil {
return err, true
}
if requestingPod != nil {
return ctl.ensureReqState(ctx, requestingPod, serverDat, false, true)
}
return nil, false
}
// Assert: requestingPod != nil
if requestingPod.Spec.NodeName == "" { // impossible now
return ctl.ensureReqStatus(ctx, requestingPod, serverDat, "not scheduled yet")
}
if requestingPod.DeletionTimestamp != nil || serverDat.RequesterDeleteRequested {
logger.V(5).Info("Waiting for deletion of server-requesting Pod to finish")
return nil, false
}
node, err := ctl.nodeLister.Get(requestingPod.Spec.NodeName)
if err != nil {
if apierrors.IsNotFound(err) {
node = nil
} else { // BTW, impossible
return err, true
}
}
if node == nil || node.DeletionTimestamp != nil {
// Node is gone or going away, do nothing to maintain server-providing Pod.
logger.V(3).Info("Ignoring inference server on absent or departing Node")
return nil, false
}
requesterIP := requestingPod.Status.PodIP
if requesterIP == "" {
return ctl.ensureReqState(ctx, requestingPod, serverDat, shouldAddRequesterFinalizer, false, "no IP assigned yet")
}
adminPort := requestingPod.Annotations[api.AdminPortAnnotationName]
if adminPort == "" {
adminPort = api.AdminPortDefaultValue
}
var isc *fmav1alpha1.InferenceServerConfig
iscName, launcherBased := requestingPod.Annotations[api.InferenceServerConfigAnnotationName]
if launcherBased {
logger.V(5).Info("Server requesting Pod is asking for launcher-based server providing Pod")
// from the requestingPod's annotations, get the InferenceServerConfig object
if iscName == "" {
return ctl.ensureReqStatus(ctx, requestingPod, serverDat,
fmt.Sprintf("empty value for annotation %q", api.InferenceServerConfigAnnotationName),
)
}
isc, err = ctl.iscLister.InferenceServerConfigs(ctl.namespace).Get(iscName)
if err != nil {
return ctl.ensureReqStatus(ctx, requestingPod, serverDat,
fmt.Sprintf("failed to get InferenceServerConfig %q: %v", iscName, err),
)
}
}
// Fetch the assigned GPUs if that has not already been done.
if serverDat.GPUIDsStr == nil {
logger.V(5).Info("Querying accelerators", "ip", requesterIP, "port", adminPort)
url := fmt.Sprintf("http://%s:%s%s", requesterIP, adminPort, stubapi.AcceleratorQueryPath)
gpuUUIDs, err := getGPUUUIDs(url)
if err != nil {
queryErr := fmt.Errorf("GET %q fails: %s", url, err.Error())
updateErr, _ := ctl.ensureReqStatus(ctx, requestingPod, serverDat, queryErr.Error())
if updateErr == nil {
return queryErr, true
}
return errors.Join(queryErr, updateErr), true
}
if len(gpuUUIDs) == 0 {
return ctl.ensureReqStatus(ctx, requestingPod, serverDat, "the assigned set of GPUs is empty")
}
logger.V(5).Info("Found GPUs", "gpuUUIDs", gpuUUIDs)
gpuIDsStr := strings.Join(gpuUUIDs, ",")
serverDat.GPUIDs = gpuUUIDs
serverDat.GPUIDsStr = &gpuIDsStr
if !launcherBased && serverDat.GPUIndicesStr == nil {
gpuIndices, err := ctl.mapToGPUIndices(requestingPod.Spec.NodeName, gpuUUIDs)
if err != nil {
return ctl.ensureReqStatus(ctx, requestingPod, serverDat, err.Error())
}
gpuIndicesStr := strings.Join(gpuIndices, ",")
serverDat.GPUIndices = gpuIndices
serverDat.GPUIndicesStr = &gpuIndicesStr
}
}
// If there is already a bound server-providing Pod then ensure that it is awake,
// ensure status reported, and relay readiness if needed.
if providingPod != nil {
var serverPort int16
if launcherBased {
serverPort = int16(isc.Spec.ModelServerConfig.Port)
} else {
_, serverPort, err = utils.GetInferenceServerContainerIndexAndPort(providingPod)
if err != nil { // Impossible, because such a providingPod would never be created by this controller
return fmt.Errorf("unable to wake up server because port not known: %w", err), true
}
}
if serverDat.Sleeping == nil {
sleeping, err := ctl.querySleeping(ctx, providingPod, serverPort)
if err != nil {
return err, true
}
logger.V(2).Info("Determined whether provider is sleeping", "isSleeping", sleeping)
serverDat.Sleeping = &sleeping
}
if *(serverDat.Sleeping) {
err = ctl.wakeSleeper(ctx, serverDat, requestingPod, providingPod, serverPort)
if err != nil {
return err, true
}
logger.V(2).Info("Woke discovered-bound inference server")
}
if err := ctl.ensureSleepingLabel(ctx, providingPod, *(serverDat.Sleeping)); err != nil {
return err, true
}
err, _ = ctl.ensureReqState(ctx, requestingPod, serverDat, shouldAddRequesterFinalizer, false)
if err != nil {
return err, true
}
// Relay readiness if not already done.
// For launcher-based providers, readiness follows the bound instance's
// sleeping state rather than the launcher's Pod readiness.
ready := utils.IsPodReady(providingPod)
if launcherBased {
ready = !*serverDat.Sleeping
}
if serverDat.ReadinessRelayed == nil || ready != *serverDat.ReadinessRelayed {
url, readiness := fmt.Sprintf("http://%s:%s", requestingPod.Status.PodIP, adminPort), ""
if ready {
logger.V(5).Info("Server-providing pod is ready", "name", providingPod.Name)
url += stubapi.BecomeReadyPath
readiness = "ready"
} else {
logger.V(5).Info("Server-providing pod is not ready", "name", providingPod.Name)
url += stubapi.BecomeUnreadyPath
readiness = "unready"
}
err = doPost(url)
if err != nil {
logger.Error(err, "Failed to relay the readiness", "name", providingPod.Name, "readiness", readiness)
return err, true
}
serverDat.ReadinessRelayed = &ready
logger.V(5).Info("Successfully relayed the readiness", "name", providingPod.Name, "readiness", readiness)
}
// TODO: sync desired and actual providingPod wrt labels (spec is mostly immutable, possible mutations are allowed)
logger.V(5).Info("Nothing more to do")
return nil, false
}
// Assert: providingPod == nil && !shouldAddRequesterFinalizer
if node.Spec.Unschedulable {
// Reflect the inability to serve back to the client/user
logger.V(2).Info("Deleting server-requesting Pod because it is bound to an unschedulable Node and has no server-providing Pod")
err := podOps.Delete(ctx, requestingPod.Name, metav1.DeleteOptions{PropagationPolicy: ptr.To(metav1.DeletePropagationBackground)})
return err, false
}
// What remains to be done is to wake or create a server-providing Pod
if !launcherBased {
serverPatch := requestingPod.Annotations[api.ServerPatchAnnotationName]
if serverPatch == "" { // this is bad, somebody has hacked important data
return ctl.ensureReqStatus(ctx, requestingPod, serverDat, "the "+api.ServerPatchAnnotationName+" annotation is missing")
}
// use the server patch to build the server-providing pod, if not already done.
desiredProvidingPod, nominalHash, err := serverDat.getNominalServerProvidingPod(ctx, requestingPod, serverPatch, api.ProviderData{
NodeName: requestingPod.Spec.NodeName,
})
if err != nil {
return ctl.ensureReqStatus(ctx, requestingPod, serverDat, fmt.Sprintf("failed to construct the nominal server-providing Pod: %s", err.Error()))
}
sleepingAnys, err := ctl.podInformer.GetIndexer().ByIndex(nominalHashIndexName, nominalHash)
if err != nil { // impossible
return err, false
}
if len(sleepingAnys) > 0 {
// They have to be sleeping, the Kube scheduler and kubelet would not have assigned the same
// node/gpus to the requester if there was another one awake.
if len(sleepingAnys) > 1 {
logger.V(2).Info("Unexpected: multiple sleeping Pods match; using the first", "requesterName", requestingPod.Name)
}
providingPod = sleepingAnys[0].(*corev1.Pod)
return ctl.bind(ctx, serverDat, requestingPod, providingPod, false, -1)
}
// What remains is to make a new server-providing Pod --- if the sleeper budget allows.
err, retry := ctl.enforceSleeperBudget(ctx, serverDat, requestingPod, ctl.sleeperLimit)
if err != nil || retry {
return err, retry
}
// Sleeper budget is met. Make the new Pod.
logger.V(3).Info("Creating server-providing pod", "node", requestingPod.Spec.NodeName, "gpus", serverDat.GPUIndicesStr, "labels", desiredProvidingPod.Labels)
echo, err := podOps.Create(ctx, desiredProvidingPod, metav1.CreateOptions{})
if err != nil {
errMsg := err.Error()
if invalidPodRE.MatchString(errMsg) {
return ctl.ensureReqStatus(ctx, requestingPod, serverDat, "the nominal server-providing "+errMsg)
}
innerErr, _ := ctl.ensureReqStatus(ctx, requestingPod, serverDat, fmt.Sprintf("failed to create server-providing Pod: %s", errMsg))
if innerErr != nil {
return errors.Join(err, innerErr), true
}
return err, true
}
serverDat.Sleeping = ptr.To(false)
logger.V(2).Info("Created server-providing pod", "name", echo.Name, "gpus", serverDat.GPUIndicesStr, "annotations", echo.Annotations, "labels", echo.Labels, "resourceVersion", echo.ResourceVersion)
return ctl.ensureReqStatus(ctx, requestingPod, serverDat)
}
// What remains to be done is to wake or create a launcher-based server-providing Pod
// from the InferenceServerConfig object, get the launcherConfig object
lcName := isc.Spec.LauncherConfigName
lc, err := ctl.lcLister.LauncherConfigs(ctl.namespace).Get(lcName)
if err != nil {
// TODO(waltforme): introduce the 'enqueue requesters by launcherconfigs' logic to the controller
return ctl.ensureReqStatus(ctx, requestingPod, serverDat,
fmt.Sprintf("failed to get LauncherConfig %q: %v", lcName, err),
)
}
desiredLauncherPod, err := utils.BuildLauncherPodFromTemplate(lc.Spec.PodTemplate, ctl.namespace, requestingPod.Spec.NodeName, lcName)
if err != nil {
return fmt.Errorf("failed to build launcher Pod from LauncherConfig %q: %w", lcName, err), true
}
lcHash := desiredLauncherPod.Annotations[ctlrcommon.LauncherConfigHashAnnotationKey]
logger.V(5).Info("LauncherConfig's hash", "hash", lcHash)
launcherPodAnys, err := ctl.podInformer.GetIndexer().ByIndex(launcherConfigHashIndexName, lcHash)
if err != nil {
return err, false
}
cfg, iscHash, err := ctl.configInferenceServer(isc, serverDat.GPUIDs)
if err != nil {
return fmt.Errorf("failed to configure inference server config: %w", err), true
}
logger.V(5).Info("Nominal hash of InferenceServerConfig", "hash", iscHash)
if len(launcherPodAnys) > 0 {
// Multiple launcher Pods could exist for one LauncherConfig object on one node.
// Select the best launcher Pod: prioritize those with sleeping instances (fast wake-up),
// then those with capacity for new instances.
// Note that multiple vLLM instances could exist in one launcher Pod, but at most one instance could be awake at a time.
launcherPod, hasSleepingInstance, someNotReady, err := ctl.selectBestLauncherPod(ctx, launcherPodAnys, iscHash, int(lc.Spec.MaxSleepingInstances), nodeDat)
if err != nil {
return err, true
}
if someNotReady {
logger.V(4).Info("Launcher Pods exist but some are not ready yet, will retry later")
return nil, true
}
if launcherPod == nil {
logger.V(5).Info("No suitable launcher Pod found with sleeping instance or necessary capacity")
// Fall through to create new launcher Pod
} else {
logger.V(5).Info("Selected launcher Pod", "name", launcherPod.Name, "hasSleepingInstance", hasSleepingInstance)
launcherIP := launcherPod.Status.PodIP
if launcherIP == "" {
return fmt.Errorf("launcher Pod %q has no IP assigned yet", launcherPod.Name), true
}
launcherBaseURL := fmt.Sprintf("http://%s:%d", launcherIP, ctlrcommon.LauncherServicePort)
lClient, err := NewLauncherClient(launcherBaseURL)
if err != nil {
return err, true
}
launcherDat := ctl.getLauncherData(nodeDat, launcherPod.Name)
if hasSleepingInstance {
// Fast path: wake up existing sleeping instance
logger.V(5).Info("Waking up existing vLLM instance", "iscHash", iscHash)
err := ctl.wakeupInstance(ctx, lClient, iscHash, isc.Spec.ModelServerConfig.Port)
if err != nil {
return fmt.Errorf("wake up vLLM instance: %w", err), true
}
launcherDat.Instances[iscHash] = time.Now()
// TODO(waltforme): the bind method may need more revision to fully handle launcher-based server providing Pods
return ctl.bind(ctx, serverDat, requestingPod, launcherPod, true, int16(isc.Spec.ModelServerConfig.Port))
} else {
// Slower path: create new instance in launcher with capacity
logger.V(5).Info("Creating new vLLM instance", "iscHash", iscHash)
result, err := lClient.CreateNamedInstance(ctx, iscHash, *cfg)
if err != nil {
return fmt.Errorf("create vLLM instance: %w", err), true
}
logger.V(5).Info("Created new vLLM instance",
"instance_id", result.InstanceID,
"status", result.Status,
)
launcherDat.Instances[iscHash] = time.Now()
// TODO(waltforme): the bind method may need more revision to fully handle launcher-based server providing Pods
return ctl.bind(ctx, serverDat, requestingPod, launcherPod, true, int16(isc.Spec.ModelServerConfig.Port))
}
}
}
// Remains: Zero matching launcher Pods, or the matching launcher Pod cannot host more instances to fulfill the request.
// TODO(waltforme): enforceSleeperBudget should be revised for launcher-based server-providing Pods
err, retry := ctl.enforceSleeperBudget(ctx, serverDat, requestingPod, int(lc.Spec.MaxSleepingInstances))
if err != nil || retry {
return err, retry
}
// Sleeper budget is met. Make a new launcher Pod.
// TODO(waltforme): introduce the 'enqueue requesters by launcher pods' logic to the controller.
echo, err := podOps.Create(ctx, desiredLauncherPod, metav1.CreateOptions{})
if err != nil {
errMsg := err.Error()
if invalidPodRE.MatchString(errMsg) {
return ctl.ensureReqStatus(ctx, requestingPod, serverDat, "the desired launcher-based server-providing "+errMsg)
}
innerErr, _ := ctl.ensureReqStatus(ctx, requestingPod, serverDat, fmt.Sprintf("failed to create launcher-based server-providing Pod: %s", errMsg))
if innerErr != nil {
return errors.Join(err, innerErr), true
}
return err, true
}
serverDat.Sleeping = nil
logger.V(2).Info("Created launcher-based server-providing pod", "name", echo.Name, "gpus", serverDat.GPUIDsStr, "annotations", echo.Annotations, "labels", echo.Labels, "resourceVersion", echo.ResourceVersion)
return ctl.ensureReqStatus(ctx, requestingPod, serverDat)
}
// selectBestLauncherPod evaluates all matching launcher Pods and selects the 'best' one for fulfilling a request.
// Currently the definition of 'best' is radically simple:
// Priority 1: Launcher with a sleeping instance matching iscHash (fastest - just wake up);
// Priority 2: Launcher with capacity for a new instance (slower - need to create);
// Otherwise, no launcher Pod is selected.
// Returns (selectedPod, hasSleepingInstance, somePodsNotReady, error).
// Returns (nil, false, false, nil) if no suitable launcher found and all pods are ready or failed.
// Returns (nil, false, true, nil) if there are pods not ready yet - caller should retry later.
func (ctl *controller) selectBestLauncherPod(
ctx context.Context,
launcherPodAnys []interface{},
iscHash string,
maxOthers int,
nodeDat *nodeData,
) (*corev1.Pod, bool, bool, error) {
logger := klog.FromContext(ctx)
var candidateWithCapacity *corev1.Pod
var somePodsNotReady bool
for _, podAny := range launcherPodAnys {
launcherPod := podAny.(*corev1.Pod)
if launcherPod.Status.Phase == corev1.PodFailed || launcherPod.DeletionTimestamp != nil {
continue
}
// Track pods that are not ready yet - we should give them time instead of
// failing and creating new launcher Pods immediately.
if launcherPod.Status.PodIP == "" || !utils.IsPodReady(launcherPod) {
logger.V(5).Info("Launcher Pod not ready yet", "name", launcherPod.Name, "hasIP", launcherPod.Status.PodIP != "")
somePodsNotReady = true
continue
}
insts, err, retry := ctl.syncLauncherInstances(ctx, nodeDat, launcherPod)
if err != nil || retry {
somePodsNotReady = true
continue
}
// Check if this launcher has a sleeping instance matching the iscHash
hasSleepingInstance := false
for _, inst := range insts.Instances {
if inst.InstanceID == iscHash {
hasSleepingInstance = true
break
}
}
if hasSleepingInstance {
// Priority 1: Found a sleeping instance
logger.V(5).Info("Found launcher with sleeping instance (fastest path)",
"name", launcherPod.Name,
"iscHash", iscHash,
"totalInstances", insts.TotalInstances,
"runningInstances", insts.RunningInstances)
return launcherPod, true, false, nil
}
// Check if this launcher has capacity for a new instance
if insts.TotalInstances <= maxOthers && candidateWithCapacity == nil {
// Priority 2: Has capacity for new instance
logger.V(5).Info("Found launcher with capacity for new instance",
"name", launcherPod.Name,
"totalInstances", insts.TotalInstances)
candidateWithCapacity = launcherPod
// Don't return yet - keep looking for sleeping instances (higher priority)
}
}
// No sleeper but we found a launcher with capacity, use it
if candidateWithCapacity != nil {
logger.V(4).Info("Selected launcher with capacity (slower path)", "name", candidateWithCapacity.Name)
return candidateWithCapacity, false, false, nil
}
// Found sleeper nor capable ones, but there are pods not ready yet, signal caller to retry later
if somePodsNotReady {
logger.V(4).Info("Found launcher Pods not ready yet, will retry later")
return nil, false, true, nil
}
// No suitable launchers found
logger.V(4).Info("No suitable launcher Pod found with sleeping instance or necessary capacity")
return nil, false, false, nil
}
func (ctl *controller) configInferenceServer(isc *fmav1alpha1.InferenceServerConfig, gpuUUIDs []string) (*VllmConfig, string, error) {
options := isc.Spec.ModelServerConfig.Options + " --port " + strconv.Itoa(int(isc.Spec.ModelServerConfig.Port))
vllmCfg := VllmConfig{
Options: options,
GpuUUIDs: gpuUUIDs,
EnvVars: make(map[string]string, len(isc.Spec.ModelServerConfig.EnvVars)),
}
for k, v := range isc.Spec.ModelServerConfig.EnvVars {
vllmCfg.EnvVars[k] = v
}
iscBytes, err := yaml.Marshal(isc.Spec.ModelServerConfig)
if err != nil {
return nil, "", fmt.Errorf("failed to marshal InferenceServerConfig %q: %w", isc.Name, err)
}
hasher := sha256.New()
hasher.Write(iscBytes)
hasher.Write([]byte(";gpus="))
hasher.Write([]byte(strings.Join(gpuUUIDs, ",")))
var hash [sha256.Size]byte
hashSl := hasher.Sum(hash[:0])
// using Raw_URL_Encoding because this hash will be used in URLs to the launcher.
nominalHash := base64.RawURLEncoding.EncodeToString(hashSl)
return &vllmCfg, nominalHash, nil
}
func (ctl *controller) wakeupInstance(ctx context.Context, lClient *LauncherClient, instanceID string, instancePort int32) error {
logger := klog.FromContext(ctx)
err := doPost("http://" + lClient.baseURL.Hostname() + ":" + strconv.Itoa(int(instancePort)) + "/wake_up")
if err != nil {
return fmt.Errorf("failed to wake up vLLM instance %q: %w", instanceID, err)
}
logger.V(2).Info("Woke up vLLM instance", "instanceID", instanceID)
return nil
}
func (ctl *controller) ensureSleepingLabel(ctx context.Context, providingPod *corev1.Pod, desired bool) error {
logger := klog.FromContext(ctx)
desiredStr := strconv.FormatBool(desired)
if providingPod.Labels[api.SleepingLabelName] != desiredStr {
providingPod = providingPod.DeepCopy()
providingPod.Labels = utils.MapSet(providingPod.Labels, api.SleepingLabelName, desiredStr)
echo, err := ctl.coreclient.Pods(ctl.namespace).Update(ctx, providingPod, metav1.UpdateOptions{
FieldManager: ControllerName})
if err != nil {
return fmt.Errorf("failed to revise sleeping label on server-providing Pod to %s: %w", desiredStr, err)
}
logger.V(3).Info("Updated sleeping label on sever-providing Pod", "sleeping", desiredStr, "newResourceVersion", echo.ResourceVersion)
}
return nil
}
var invalidPodRE = regexp.MustCompile(`^Pod "[a-z0-9.-]*" is invalid`)
func (ctl *controller) enforceSleeperBudget(ctx context.Context, serverDat *serverData, requestingPod *corev1.Pod, sleeperLimit int) (error, bool) {
logger := klog.FromContext(ctx)
podOps := ctl.coreclient.Pods(ctl.namespace)
gonerNames := sets.New[string]() // names of deleted server-providing Pods
now := time.Now()
nameToAge := map[string]time.Duration{}
getAge := func(pod *corev1.Pod) time.Duration {
age, have := nameToAge[pod.Name]
if !have {
idx := slices.IndexFunc(pod.ManagedFields, func(mf metav1.ManagedFieldsEntry) bool {
return mf.Manager == ControllerName
})
if idx >= 0 {
age = now.Sub(pod.ManagedFields[idx].Time.Time)
} else {
age = now.Sub(pod.CreationTimestamp.Time)
}
nameToAge[pod.Name] = age
}
return age
}
comparePods := func(left, right *corev1.Pod) int {
leftAge := getAge(left)
rightAge := getAge(right)
switch {
case leftAge > rightAge:
return -1
case rightAge > leftAge:
return 1
default:
return strings.Compare(left.Name, right.Name)
}
}
for _, gpuIndex := range serverDat.GPUIndices { // enforce sleeper budget on this GPU
// This is really simple logic. Just pick some without preference.
// Recognize deletions done for the sake of other GPUs.
// TODO: better
key := requestingPod.Spec.NodeName + " " + gpuIndex
sleepingAnys, err := ctl.podInformer.GetIndexer().ByIndex(GPUIndexName, key)
if err != nil { // impossible
return err, false
}
sleepingPods, _ := utils.SliceMap(sleepingAnys, func(sleepingAny any) (*corev1.Pod, error) {
pod := sleepingAny.(*corev1.Pod)
if gonerNames.Has(pod.Name) {
return nil, io.EOF
}
return pod, nil
})
// Every existing server-providing Pod on this GPU must have a sleeping inference server,
// otherwise the scheduler and kubelet would not have assigned this GPU to the server-requesting Pod.
toGo := len(sleepingPods) - sleeperLimit
if toGo <= 0 {
continue
}
slices.SortFunc(sleepingPods, comparePods)
for idx, goner := range sleepingPods[:toGo] {
gonerNames.Insert(goner.Name)
err := podOps.Delete(ctx, goner.Name, metav1.DeleteOptions{
Preconditions: &metav1.Preconditions{UID: &goner.UID, ResourceVersion: &goner.ResourceVersion},
PropagationPolicy: ptr.To(metav1.DeletePropagationBackground),
})
if err == nil {
logger.V(2).Info("Deleted server-providing Pod with sleeping server, to respect sleeper-limit", "idx", idx, "total", len(sleepingPods), "limit", sleeperLimit, "name", goner.Name, "resourceVersion", goner.ResourceVersion)
} else if apierrors.IsNotFound(err) || apierrors.IsGone(err) {
logger.V(5).Info("Server-providing Pod was concurrently deleted", "name", goner.Name)
} else {
return fmt.Errorf("unable to delete server-providing Pod %s (RV=%s): %w", goner.Name, goner.ResourceVersion, err), true
}
}
}
return nil, len(gonerNames) > 0
}
// Note: instPort is used only for launcher-based server-providing Pods.
func (ctl *controller) bind(ctx context.Context, serverDat *serverData, requestingPod, providingPod *corev1.Pod, launcherBased bool, instPort int16) (error, bool) {
logger := klog.FromContext(ctx)
providingPod = providingPod.DeepCopy()
providingPod.Annotations[requesterAnnotationKey] = string(requestingPod.UID) + " " + requestingPod.Name
if !slices.Contains(providingPod.Finalizers, providerFinalizer) {
providingPod.Finalizers = append(providingPod.Finalizers, providerFinalizer)
}
providingPod.Labels = utils.MapSet(providingPod.Labels, api.DualLabelName, requestingPod.Name)
serverDat.Sleeping = nil
echo, err := ctl.coreclient.Pods(ctl.namespace).Update(ctx, providingPod, metav1.UpdateOptions{FieldManager: ControllerName})
if err != nil {
return fmt.Errorf("failed to bind server-providing Pod %s: %w", providingPod.Name, err), true
}
serverDat.ProvidingPodName = providingPod.Name
logger.V(2).Info("Bound server-providing Pod", "name", providingPod.Name, "node", requestingPod.Spec.NodeName, "gpus", serverDat.GPUIDsStr, "newResourceVersion", echo.ResourceVersion)
var serverPort int16
if launcherBased {
serverPort = instPort
} else {
_, serverPort, err = utils.GetInferenceServerContainerIndexAndPort(providingPod)
if err != nil { // Impossible, because such a providingPod would never be created by this controller
return fmt.Errorf("unable to wake up server because port not known: %w", err), true
}
}
// For launcher-based server-providing Pods, ServerPort is written when binding.
// For direct server-providing Pods, ServerPort is written (earlier) when
// constructingthe server-providing Pod's spec in getNominalServerProvidingPod.
if launcherBased {
serverDat.ServerPort = serverPort
}
err = ctl.wakeSleeper(ctx, serverDat, requestingPod, providingPod, serverPort)
if err != nil {
return err, true
}
logger.V(2).Info("Woke freshly-bound inference server", "providingPod", providingPod.Name)
return ctl.ensureReqState(ctx, requestingPod, serverDat, !slices.Contains(requestingPod.Finalizers, requesterFinalizer), false)
}
func (ctl *controller) wakeSleeper(ctx context.Context, serverDat *serverData, requestingPod, providingPod *corev1.Pod, serverPort int16) error {
if ctl.debugAccelMemory {
if err := ctl.accelMemoryIsLowEnough(ctx, requestingPod, serverDat); err != nil {
return err
}
}
wakeURL := fmt.Sprintf("http://%s:%d/wake_up", providingPod.Status.PodIP, serverPort)
err := doPost(wakeURL)
if err != nil {
return err
}
if err := ctl.ensureSleepingLabel(ctx, providingPod, false); err != nil {
return err
}
serverDat.Sleeping = ptr.To(false)
return nil
}
// maybeRemoveRequesterFinalizer removes the requesterFinalizer if necessary,
// and determines whether the finalizer needs to be added.
// requestingPod != nil; providingPod might be nil.
// Returns (removed, shouldAdd bool, err error, retry bool).
func (ctl *controller) maybeRemoveRequesterFinalizer(ctx context.Context, requestingPod, providingPod *corev1.Pod) (bool, bool, error, bool) {
// First, determine whether finalizer should be present
var wantFinalizer bool
if providingPod != nil {
isIdx, err := utils.GetInferenceServerContainerIndex(providingPod)
if err == nil {
isCtr := &providingPod.Spec.Containers[isIdx]
statIdx := slices.IndexFunc(providingPod.Status.ContainerStatuses,
func(status corev1.ContainerStatus) bool {
return status.Name == isCtr.Name
})
if statIdx >= 0 {
isStatus := &providingPod.Status.ContainerStatuses[statIdx]
wantFinalizer = isStatus.State.Running != nil
}
}
}
// Next, determine whether finalizer is present
finIdx := slices.Index(requestingPod.Finalizers, requesterFinalizer)
haveFinalizer := finIdx >= 0
// Finally, deal with it
if wantFinalizer == haveFinalizer {
return false, false, nil, false
}
if wantFinalizer {
return false, requestingPod.DeletionTimestamp == nil, nil, false
}
podOps := ctl.coreclient.Pods(ctl.namespace)
requestingPod = requestingPod.DeepCopy()
requestingPod.Finalizers = slices.Delete(requestingPod.Finalizers, finIdx, finIdx+1)
echo, err := podOps.Update(ctx, requestingPod, metav1.UpdateOptions{FieldManager: ControllerName})
if err != nil {
return false, false, fmt.Errorf("failed to remove finalizer from server-requesting Pod: %w", err), true
}
logger := klog.FromContext(ctx)
logger.V(2).Info("Removed requester finalizer", "newResourceVersion", echo.ResourceVersion)
return true, false, nil, false
}
// addRequesterFinalizer does the API call to add the controller's finalizer to the server-requesting Pod.
// Returns (newResourceVersion string, err error)
func (ctl *controller) addRequesterFinalizer(ctx context.Context, requestingPod *corev1.Pod, providingPodName string) (string, error) {
podOps := ctl.coreclient.Pods(ctl.namespace)
requestingPod = requestingPod.DeepCopy()
if requestingPod.Labels[api.DualLabelName] != providingPodName {
requestingPod.Labels = utils.MapSet(requestingPod.Labels, api.DualLabelName, providingPodName)
}
requestingPod.Finalizers = append(requestingPod.Finalizers, requesterFinalizer)
echo, err := podOps.Update(ctx, requestingPod, metav1.UpdateOptions{FieldManager: ControllerName})
if err != nil {
return "", fmt.Errorf("failed to add finalizer from server-requesting Pod: %w", err)
}
logger := klog.FromContext(ctx)
logger.V(2).Info("Added requester finalizer", "newResourceVersion", echo.ResourceVersion)
return echo.ResourceVersion, nil
}
// removeProviderFinalizer does the API call to remove the controller's finalizer from the server-providing Pod.
// Returns (changed bool, err error)
func (ctl *controller) removeProviderFinalizer(ctx context.Context, providingPod *corev1.Pod) (bool, error) {
logger := klog.FromContext(ctx)
podOps := ctl.coreclient.Pods(ctl.namespace)
// Ensure finalizer is absent from server-providing Pod so that its deletion can complete
if newFinalizers, changed := utils.SliceRemoveOnce(providingPod.Finalizers, providerFinalizer); changed {
providingPod = providingPod.DeepCopy()
providingPod.Finalizers = newFinalizers
echo, err := podOps.Update(ctx, providingPod, metav1.UpdateOptions{FieldManager: ctl.ControllerName})
if err != nil {
return false, fmt.Errorf("failed to remove finalizer from server-providing Pod %s (RV %s): %w", providingPod.Name, providingPod.ResourceVersion, err)
}
logger.V(2).Info("Removed finalizer from server-providing Pod", "provider", providingPod.Name, "newResourceVersion", echo.ResourceVersion)
return true, nil // update and/or delete event will trigger more processing
}
return false, nil // no change
}
// Unbinds the given server-providing Pod.
func (ctl *controller) ensureUnbound(ctx context.Context, serverDat *serverData, providingPod *corev1.Pod, launcherBased bool) error {
logger := klog.FromContext(ctx)
// A providingPod with no IP is not scheduled, so we know that it is not awake.
// If providingPod is stale then the update will fail.
if (serverDat.Sleeping == nil || !*(serverDat.Sleeping)) && providingPod.Status.PodIP != "" { // need to put to sleep
serverPort := serverDat.ServerPort
// TODO(waltforme): Is serverPort always set correctly for launcher-based server-providing Pods upon unbinding?
// E.g. What if requestingPod is deleted during a crash and restart of the dual-pods controller?
// In order to find the port in this case, I think the best effort is to recompute hash for all InferenceServerConfig objects and try to match.
if !launcherBased {
if serverDat.NominalProvidingPod == nil {
var err error
_, serverPort, err = utils.GetInferenceServerContainerIndexAndPort(providingPod)
if err != nil { // Impossible, because such a providingPod would never be created by this controller
return fmt.Errorf("unable to put server to sleep because port not known: %w", err)
}
}
}
sleepURL := fmt.Sprintf("http://%s:%d/sleep", providingPod.Status.PodIP, serverPort)
resp, err := http.Post(sleepURL, "", nil)
if err != nil {
return fmt.Errorf("failed to put provider %q to sleep, POST %s got error: %w", serverDat.ProvidingPodName, sleepURL, err)
}
if sc := resp.StatusCode; sc != http.StatusOK {
return fmt.Errorf("failed to put provider %q to sleep, POST %s returned status %d", serverDat.ProvidingPodName, sleepURL, sc)
}
serverDat.Sleeping = ptr.To(true)
logger.V(2).Info("Put inference server to sleep")
}
providingPod = providingPod.DeepCopy()
var aChange, fChange bool
// Ensure the sleeping label is correct
sleepLabelValue := providingPod.Labels[api.SleepingLabelName]
lChange := sleepLabelValue != "true"
if lChange {
providingPod.Labels = utils.MapSet(providingPod.Labels, api.SleepingLabelName, "true")
}
// Ensure requester annotation is absent
if _, have := providingPod.Annotations[requesterAnnotationKey]; have {
delete(providingPod.Annotations, requesterAnnotationKey)
aChange = true
}
// Ensure finalizer is absent
providingPod.Finalizers, fChange = utils.SliceRemoveOnce(providingPod.Finalizers, providerFinalizer)
if aChange || fChange || lChange {
if providingPod.Labels != nil {
delete(providingPod.Labels, api.DualLabelName)
}
podOps := ctl.coreclient.Pods(ctl.namespace)
echo, err := podOps.Update(ctx, providingPod, metav1.UpdateOptions{FieldManager: ControllerName})
if err != nil {
return fmt.Errorf("failed to unbind server-providing Pod %s: %w", providingPod.Name, err)
}
logger.V(2).Info("Unbound server-providing Pod", "name", providingPod.Name, "node", providingPod.Spec.NodeName, "gpus", serverDat.GPUIDsStr, "newResourceVersion", echo.ResourceVersion)
} else {
logger.V(3).Info("Server-providing Pod remains unbound", "name", providingPod.Name, "resourceVersion", providingPod.ResourceVersion)
}
serverDat.ProvidingPodName = ""
serverDat.ServerPort = -1
return nil
}
// getNominalServerProvidingPod returns the nominal server-providing Pod,
// which is cached in the serverData, computing the Pod if necessary.
// This also ensures that the serverData fields NominalProvidingPod and NominalProvidingPodHash
// have the right values.
// Returns (NominalProvidingPod, NominalProvidingPodHash, error)