forked from kubernetes-sigs/dra-driver-nvidia-gpu
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdevice_state.go
More file actions
869 lines (756 loc) · 32.5 KB
/
device_state.go
File metadata and controls
869 lines (756 loc) · 32.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"context"
"fmt"
"slices"
"sync"
resourceapi "k8s.io/api/resource/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/version"
"k8s.io/dynamic-resource-allocation/kubeletplugin"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
cdiapi "tags.cncf.io/container-device-interface/pkg/cdi"
configapi "sigs.k8s.io/nvidia-dra-driver-gpu/api/nvidia.com/resource/v1beta1"
"sigs.k8s.io/nvidia-dra-driver-gpu/internal/common"
"sigs.k8s.io/nvidia-dra-driver-gpu/pkg/featuregates"
drametrics "sigs.k8s.io/nvidia-dra-driver-gpu/pkg/metrics"
)
type OpaqueDeviceConfig struct {
Requests []string
Config runtime.Object
}
type DeviceConfigState struct {
Type string
ComputeDomain string
containerEdits *cdiapi.ContainerEdits
}
type DeviceState struct {
sync.Mutex
cdi *CDIHandler
computeDomainManager *ComputeDomainManager
checkpointCleanupManager *CheckpointCleanupManager
allocatable AllocatableDevices
config *Config
nvdevlib *deviceLib
checkpointManager checkpointmanager.CheckpointManager
}
func NewDeviceState(ctx context.Context, config *Config) (*DeviceState, error) {
containerDriverRoot := root(config.flags.containerDriverRoot)
nvdevlib, err := newDeviceLib(containerDriverRoot)
if err != nil {
return nil, fmt.Errorf("failed to create device library: %w", err)
}
// Check driver version if IMEXDaemonsWithDNSNames feature gate is enabled
if featuregates.Enabled(featuregates.IMEXDaemonsWithDNSNames) {
if err := validateDriverVersionForIMEXDaemonsWithDNSNames(config.flags, nvdevlib); err != nil {
return nil, fmt.Errorf("driver version validation failed: %w", err)
}
}
allocatable, err := nvdevlib.enumerateAllPossibleDevices(config)
if err != nil {
return nil, fmt.Errorf("error enumerating all possible devices: %w", err)
}
devRoot := containerDriverRoot.getDevRoot()
klog.Infof("using devRoot=%v", devRoot)
hostDriverRoot := config.flags.hostDriverRoot
cdi, err := NewCDIHandler(
WithNvml(nvdevlib.nvmllib),
WithDeviceLib(nvdevlib),
WithDriverRoot(string(containerDriverRoot)),
WithDevRoot(devRoot),
WithTargetDriverRoot(hostDriverRoot),
WithNVIDIACDIHookPath(config.flags.nvidiaCDIHookPath),
WithCDIRoot(config.flags.cdiRoot),
WithVendor(cdiVendor),
)
if err != nil {
return nil, fmt.Errorf("unable to create CDI handler: %w", err)
}
// TODO: explore calling this not only during plugin startup because this
// information may change during runtime.
cliqueID, err := nvdevlib.getCliqueID()
if err != nil {
return nil, fmt.Errorf("error getting cliqueID: %w", err)
}
computeDomainManager := NewComputeDomainManager(config, cliqueID)
if err := cdi.CreateStandardDeviceSpecFile(allocatable); err != nil {
return nil, fmt.Errorf("unable to create base CDI spec file: %v", err)
}
checkpointManager, err := checkpointmanager.NewCheckpointManager(config.DriverPluginPath())
if err != nil {
return nil, fmt.Errorf("unable to create checkpoint manager: %v", err)
}
state := &DeviceState{
cdi: cdi,
computeDomainManager: computeDomainManager,
allocatable: allocatable,
config: config,
nvdevlib: nvdevlib,
checkpointManager: checkpointManager,
}
state.checkpointCleanupManager = NewCheckpointCleanupManager(state, config.clientsets.Resource)
checkpoints, err := state.checkpointManager.ListCheckpoints()
if err != nil {
return nil, fmt.Errorf("unable to list checkpoints: %v", err)
}
for _, c := range checkpoints {
if c == DriverPluginCheckpointFileBasename {
klog.Infof("Found previous checkpoint: %s", c)
cp, err := state.getCheckpoint()
if err != nil {
return nil, fmt.Errorf("unable to get checkpoint: %w", err)
}
syncPreparedDevicesGaugeFromCheckpoint(config.flags.nodeName, cp)
return state, nil
}
}
klog.Infof("Create empty checkpoint")
if err := state.createCheckpoint(&Checkpoint{}); err != nil {
return nil, fmt.Errorf("unable to create checkpoint: %w", err)
}
return state, nil
}
func (s *DeviceState) Prepare(ctx context.Context, claim *resourceapi.ResourceClaim) ([]kubeletplugin.Device, error) {
s.Lock()
defer s.Unlock()
claimUID := string(claim.UID)
checkpoint, err := s.getCheckpoint()
if err != nil {
return nil, fmt.Errorf("unable to get checkpoint: %w", err)
}
preparedClaim, exists := checkpoint.V2.PreparedClaims[claimUID]
if exists && preparedClaim.CheckpointState == ClaimCheckpointStatePrepareCompleted {
// Associated device(s) has/ave been prepared by us. Prepare() must be
// idempotent, as it may be invoked more than once per claim (and actual
// device preparation must happen at most once).
klog.V(4).Infof("Skip prepare: claim already in PrepareCompleted state: %s", ResourceClaimToString(claim))
return preparedClaim.PreparedDevices.GetDevices(), nil
}
// In certain scenarios, the same device can be prepared/allocated more than once for different claims
// due to races between data processing in different goroutines in the scheduler, or when pods are
// force-deleted while the kubelet still considers the devices allocated.
// To prevent this, we check whether any device requested in the incoming claim has already been prepared
// and fail the request if so (unless the prior preparation was performed with admin access).
// More details: https://github.com/kubernetes/kubernetes/pull/136269
if err := s.validateNoOverlappingPreparedDevices(checkpoint, claim); err != nil {
return nil, fmt.Errorf("unable to prepare claim %v: %w", claimUID, err)
}
err = s.updateCheckpoint(func(checkpoint *Checkpoint) {
checkpoint.V2.PreparedClaims[claimUID] = PreparedClaim{
CheckpointState: ClaimCheckpointStatePrepareStarted,
Status: claim.Status,
Name: claim.Name,
Namespace: claim.Namespace,
}
})
if err != nil {
return nil, fmt.Errorf("unable to update checkpoint: %w", err)
}
klog.V(6).Infof("checkpoint updated for claim %v", claimUID)
preparedDevices, err := s.prepareDevices(ctx, claim)
if err != nil {
return nil, fmt.Errorf("prepare devices failed: %w", err)
}
if err := s.cdi.CreateClaimSpecFile(claimUID, preparedDevices); err != nil {
return nil, fmt.Errorf("unable to create CDI spec file for claim: %w", err)
}
// Some errors above along the Prepare() path leave the claim in the
// checkpoint, in the 'PrepareStarted' state. That's deliberate, to annotate
// potentially partially prepared claims. There is an asynchronous
// checkpoint cleanup procedure that identifies when such entry goes stale.
err = s.updateCheckpoint(func(checkpoint *Checkpoint) {
checkpoint.V2.PreparedClaims[claimUID] = PreparedClaim{
CheckpointState: ClaimCheckpointStatePrepareCompleted,
Status: claim.Status,
PreparedDevices: preparedDevices,
}
})
if err != nil {
return nil, fmt.Errorf("unable to update checkpoint: %w", err)
}
klog.V(6).Infof("checkpoint updated for claim %v", claimUID)
return preparedDevices.GetDevices(), nil
}
func (s *DeviceState) Unprepare(ctx context.Context, claimRef kubeletplugin.NamespacedObject) error {
s.Lock()
defer s.Unlock()
klog.V(6).Infof("Unprepare() for claim '%s'", claimRef.String())
// Rely on local checkpoint state for ability to clean up.
checkpoint, err := s.getCheckpoint()
if err != nil {
return fmt.Errorf("unable to get checkpoint: %w", err)
}
claimUID := string(claimRef.UID)
pc, exists := checkpoint.V2.PreparedClaims[claimUID]
if !exists {
// Not an error: if this claim UID is not in the checkpoint then this
// device was never prepared or has already been unprepared (assume that
// Prepare+Checkpoint are done transactionally). Note that
// claimRef.String() contains namespace, name, UID.
klog.V(2).Infof("Unprepare noop: claim not found in checkpoint data: %v", claimRef.String())
return nil
}
// If pc.Status.Allocation is 'nil', attempt to pull the status from the API
// server. This should only ever happen if we have unmarshaled from a legacy
// checkpoint format that did not include the Status field.
//
// TODO: Remove this one release cycle following the v25.3.0 release
if pc.Status.Allocation == nil {
klog.Infof("PreparedClaim Status not set in Checkpoint for claim '%s': attempting to pull it from API server", claimRef.String())
claim, err := s.config.clientsets.Resource.ResourceClaims(claimRef.Namespace).Get(
ctx,
claimRef.Name,
metav1.GetOptions{})
// TODO: distinguish errors -- if this is a 'not found' error then this
// is permanent and we may want to drop the claim from the checkpoint.
// Otherwise, this might be worth retrying?
if err != nil {
return permanentError{fmt.Errorf("failed to fetch ResourceClaim %s: %w", claimRef.String(), err)}
}
if claim.Status.Allocation == nil {
// TODO: drop claim from checkpoint?
return permanentError{fmt.Errorf("no allocation set in ResourceClaim %s", claim.String())}
}
pc.Status = claim.Status
}
switch pc.CheckpointState {
case ClaimCheckpointStatePrepareStarted, ClaimCheckpointStatePrepareCompleted:
if err := s.unprepareDevices(ctx, &pc.Status); err != nil {
return fmt.Errorf("unprepare devices failed: %w", err)
}
default:
return fmt.Errorf("unsupported ClaimCheckpointState: %v", pc.CheckpointState)
}
// Assume that this is a retryable error. If there is any chance for this
// error to be permanent: drop the claim from checkpoint then regardless?
if err := s.cdi.DeleteClaimSpecFileIfExists(claimUID); err != nil {
return fmt.Errorf("unable to delete CDI spec file for claim: %w", err)
}
// Mutate checkpoint reflecting that all devices for this claim have been
// unprepared, by virtue of removing its UID from the PreparedClaims map.
err = s.deleteClaimFromCheckpoint(claimRef)
if err != nil {
return fmt.Errorf("error deleting claim from checkpoint: %w", err)
}
return nil
}
func (s *DeviceState) createCheckpoint(cp *Checkpoint) error {
if err := s.checkpointManager.CreateCheckpoint(DriverPluginCheckpointFileBasename, cp); err != nil {
return err
}
syncPreparedDevicesGaugeFromCheckpoint(s.config.flags.nodeName, cp)
return nil
}
func (s *DeviceState) getCheckpoint() (*Checkpoint, error) {
checkpoint := &Checkpoint{}
if err := s.checkpointManager.GetCheckpoint(DriverPluginCheckpointFileBasename, checkpoint); err != nil {
return nil, err
}
return checkpoint.ToLatestVersion(), nil
}
// Read checkpoint from store, perform mutation, and write checkpoint back. Any
// mutation of the checkpoint must go through this function. The
// read-mutate-write sequence must be performed under a lock: we must be
// conceptually certain that multiple read-mutate-write actions never overlap.
// Currently, it is assumed that any caller gets here by first acquiring
// driver's `pulock`.
func (s *DeviceState) updateCheckpoint(mutate func(*Checkpoint)) error {
checkpoint, err := s.getCheckpoint()
if err != nil {
return fmt.Errorf("unable to get checkpoint: %w", err)
}
mutate(checkpoint)
if err := s.checkpointManager.CreateCheckpoint(DriverPluginCheckpointFileBasename, checkpoint); err != nil {
return fmt.Errorf("unable to create checkpoint: %w", err)
}
syncPreparedDevicesGaugeFromCheckpoint(s.config.flags.nodeName, checkpoint)
return nil
}
func (s *DeviceState) deleteClaimFromCheckpoint(claimRef kubeletplugin.NamespacedObject) error {
err := s.updateCheckpoint(func(cp *Checkpoint) {
delete(cp.V2.PreparedClaims, string(claimRef.UID))
})
if err != nil {
return fmt.Errorf("unable to update checkpoint: %w", err)
}
klog.V(6).Infof("Deleted claim from checkpoint: %s", claimRef.String())
return nil
}
func (s *DeviceState) prepareDevices(ctx context.Context, claim *resourceapi.ResourceClaim) (PreparedDevices, error) {
// Generate a mapping of each OpaqueDeviceConfigs to the Device.Results it
// applies to. Strict-decode: data is provided by user and may be completely
// unvalidated so far (in absence of validating webhook).
configResultsMap, err := s.getConfigResultsMap(&claim.Status, configapi.StrictDecoder)
if err != nil {
return nil, fmt.Errorf("error generating configResultsMap: %w", err)
}
// Normalize, validate, and apply all configs associated with devices that
// need to be prepared. Track device group configs generated from applying the
// config to the set of device allocation results.
preparedDeviceGroupConfigState := make(map[runtime.Object]*DeviceConfigState)
for c, results := range configResultsMap {
// Cast the opaque config to a configapi.Interface type
var config configapi.Interface
switch castConfig := c.(type) {
case *configapi.ComputeDomainChannelConfig:
config = castConfig
case *configapi.ComputeDomainDaemonConfig:
config = castConfig
default:
return nil, fmt.Errorf("runtime object is not a recognized configuration")
}
// Normalize the config to set any implied defaults.
if err := config.Normalize(); err != nil {
return nil, fmt.Errorf("error normalizing config: %w", err)
}
// Validate the config to ensure its integrity.
if err := config.Validate(); err != nil {
return nil, fmt.Errorf("error validating config: %w", err)
}
// Apply the config to the list of results associated with it.
configState, err := s.applyConfig(ctx, config, claim, results)
if err != nil {
return nil, fmt.Errorf("error applying config: %w", err)
}
// Capture the prepared device group config in the map.
preparedDeviceGroupConfigState[c] = configState
}
// Walk through each config and its associated device allocation results
// and construct the list of prepared devices to return.
var preparedDevices PreparedDevices
for c, results := range configResultsMap {
preparedDeviceGroup := PreparedDeviceGroup{
ConfigState: *preparedDeviceGroupConfigState[c],
}
for _, result := range results {
cdiDevices := []string{}
if d := s.cdi.GetStandardDevice(s.allocatable[result.Device]); d != "" {
cdiDevices = append(cdiDevices, d)
}
if d := s.cdi.GetClaimDevice(string(claim.UID), s.allocatable[result.Device], preparedDeviceGroupConfigState[c].containerEdits); d != "" {
cdiDevices = append(cdiDevices, d)
}
device := kubeletplugin.Device{
Requests: []string{result.Request},
PoolName: result.Pool,
DeviceName: result.Device,
CDIDeviceIDs: cdiDevices,
}
var preparedDevice PreparedDevice
switch s.allocatable[result.Device].Type() {
case ComputeDomainChannelType:
preparedDevice.Channel = &PreparedComputeDomainChannel{
Info: s.allocatable[result.Device].Channel,
Device: &device,
}
case ComputeDomainDaemonType:
preparedDevice.Daemon = &PreparedComputeDomainDaemon{
Info: s.allocatable[result.Device].Daemon,
Device: &device,
}
}
preparedDeviceGroup.Devices = append(preparedDeviceGroup.Devices, preparedDevice)
}
preparedDevices = append(preparedDevices, &preparedDeviceGroup)
}
return preparedDevices, nil
}
func (s *DeviceState) unprepareDevices(ctx context.Context, cs *resourceapi.ResourceClaimStatus) error {
// Generate a mapping of each OpaqueDeviceConfigs to the Device.Results it
// applies to. Non-strict decoding: do not error out on unknown fields (data
// source is checkpointed JSON written by potentially newer versions of this
// driver).
configResultsMap, err := s.getConfigResultsMap(cs, configapi.NonstrictDecoder)
if err != nil {
return fmt.Errorf("error generating configResultsMap: %w", err)
}
// Unprepare any ComputeDomain daemons prepared for each group of prepared devices.
for c := range configResultsMap {
switch config := c.(type) {
case *configapi.ComputeDomainChannelConfig:
// If a channel type, remove the ComputeDomain label from the node
if err := s.computeDomainManager.RemoveNodeLabel(ctx, config.DomainID); err != nil {
return fmt.Errorf("error removing Node label for ComputeDomain: %w", err)
}
case *configapi.ComputeDomainDaemonConfig:
// If a daemon type, unprepare the new ComputeDomain daemon.
computeDomainDaemonSettings := s.computeDomainManager.NewSettings(config.DomainID)
if err := computeDomainDaemonSettings.Unprepare(ctx); err != nil {
return fmt.Errorf("error unpreparing ComputeDomain daemon settings: %w", err)
}
}
}
return nil
}
func (s *DeviceState) applyConfig(ctx context.Context, config configapi.Interface, claim *resourceapi.ResourceClaim, results []*resourceapi.DeviceRequestAllocationResult) (*DeviceConfigState, error) {
switch castConfig := config.(type) {
case *configapi.ComputeDomainChannelConfig:
return s.applyComputeDomainChannelConfig(ctx, castConfig, claim, results)
case *configapi.ComputeDomainDaemonConfig:
return s.applyComputeDomainDaemonConfig(ctx, castConfig, claim, results)
default:
return nil, fmt.Errorf("unknown config type: %T", castConfig)
}
}
func (s *DeviceState) applyComputeDomainChannelConfig(ctx context.Context, config *configapi.ComputeDomainChannelConfig, claim *resourceapi.ResourceClaim, results []*resourceapi.DeviceRequestAllocationResult) (*DeviceConfigState, error) {
// Not an expected error, but a violated invariant.
if len(results) != 1 {
return nil, fmt.Errorf("applyComputeDomainChannelConfig: unexpected results %v", results)
}
// If explicitly requested, inject all channels instead of just one.
chancount := 1
if config.AllocationMode == configapi.ComputeDomainChannelAllocationModeAll {
chancount = s.nvdevlib.maxImexChannelCount
}
// Declare a device group state object to populate.
configState := DeviceConfigState{
Type: ComputeDomainChannelType,
ComputeDomain: config.DomainID,
}
// Treat each request as a request for channel zero, even if
// AllocationModeAll.
if err := s.assertImexChannelNotAllocated(0); err != nil {
return nil, fmt.Errorf("allocation failed: %w", err)
}
// Create any necessary ComputeDomain channels and gather their CDI container edits.
if err := s.computeDomainManager.AssertComputeDomainNamespace(ctx, claim.Namespace, config.DomainID); err != nil {
return nil, permanentError{fmt.Errorf("error asserting ComputeDomain's namespace: %w", err)}
}
if err := s.computeDomainManager.AddNodeLabel(ctx, config.DomainID); err != nil {
return nil, fmt.Errorf("error adding Node label for ComputeDomain: %w", err)
}
if err := s.computeDomainManager.AssertComputeDomainReady(ctx, config.DomainID); err != nil {
return nil, fmt.Errorf("error asserting ComputeDomain Ready: %w", err)
}
if s.computeDomainManager.cliqueID == "" {
// Do not inject IMEX channel device nodes.
return &configState, nil
}
for _, info := range s.nvdevlib.nvCapImexChanDevInfos[:chancount] {
edits := s.computeDomainManager.GetComputeDomainChannelContainerEdits(s.cdi.devRoot, info)
configState.containerEdits = configState.containerEdits.Append(edits)
}
return &configState, nil
}
func (s *DeviceState) applyComputeDomainDaemonConfig(ctx context.Context, config *configapi.ComputeDomainDaemonConfig, claim *resourceapi.ResourceClaim, results []*resourceapi.DeviceRequestAllocationResult) (*DeviceConfigState, error) {
// Get the list of claim requests this config is being applied over.
var requests []string
for _, r := range results {
requests = append(requests, r.Request)
}
// Get the list of allocatable devices this config is being applied over.
allocatableDevices := make(AllocatableDevices)
for _, r := range results {
allocatableDevices[r.Device] = s.allocatable[r.Device]
}
if len(allocatableDevices) != 1 {
return nil, fmt.Errorf("only expected 1 device for requests '%v' in claim '%v'", requests, claim.UID)
}
// Declare a device group state object to populate.
configState := DeviceConfigState{
Type: ComputeDomainDaemonType,
ComputeDomain: config.DomainID,
}
// Create new ComputeDomain daemon settings from the ComputeDomainManager.
computeDomainDaemonSettings := s.computeDomainManager.NewSettings(config.DomainID)
// Prepare injecting IMEX daemon config files even if IMEX is not supported.
// This for example creates
// '/var/lib/kubelet/plugins/compute-domain.nvidia.com/domains/<uid>' on the
// host which is used as mount source mapped to /imexd in the CD daemon
// container.
if err := computeDomainDaemonSettings.Prepare(ctx); err != nil {
return nil, fmt.Errorf("error preparing ComputeDomain daemon settings for requests '%v' in claim '%v': %w", requests, claim.UID, err)
}
// Always inject CD config details into the CD daemon (regardless of clique
// ID being empty or not).
edits, err := computeDomainDaemonSettings.GetCDIContainerEditsCommon(ctx)
if err != nil {
return nil, fmt.Errorf("error getting common container edits for ComputeDomain daemon '%s': %w", config.DomainID, err)
}
configState.containerEdits = configState.containerEdits.Append(edits)
// Only inject dev nodes related to
// /proc/driver/nvidia/capabilities/fabric-imex-mgmt if IMEX is supported
// (if we want to start the IMEX daemon process in the CD daemon pod).
if s.computeDomainManager.cliqueID != "" {
// Parse the device node info for the fabric-imex-mgmt nvcap.
nvcapDeviceInfo, err := common.ParseNVCapDeviceInfo(nvidiaCapFabricImexMgmtPath)
if err != nil {
return nil, fmt.Errorf("error parsing nvcap device info for fabric-imex-mgmt: %w", err)
}
edits := computeDomainDaemonSettings.GetCDIContainerEditsForImex(ctx, s.cdi.devRoot, nvcapDeviceInfo)
configState.containerEdits = configState.containerEdits.Append(edits)
}
return &configState, nil
}
func (s *DeviceState) getConfigResultsMap(rcs *resourceapi.ResourceClaimStatus, decoder runtime.Decoder) (map[runtime.Object][]*resourceapi.DeviceRequestAllocationResult, error) {
// Retrieve the full set of device configs for the driver.
configs, err := GetOpaqueDeviceConfigs(
decoder,
DriverName,
rcs.Allocation.Devices.Config,
)
if err != nil {
return nil, fmt.Errorf("error getting opaque device configs: %w", err)
}
// Add the default ComputeDomainConfig to the front of the config list with the
// lowest precedence. This guarantees there will be at least one of each
// config in the list with len(Requests) == 0 for the lookup below.
configs = slices.Insert(configs, 0, &OpaqueDeviceConfig{
Requests: []string{},
Config: configapi.DefaultComputeDomainChannelConfig(),
})
configs = slices.Insert(configs, 0, &OpaqueDeviceConfig{
Requests: []string{},
Config: configapi.DefaultComputeDomainDaemonConfig(),
})
// Look through the configs and figure out which one will be applied to
// each device allocation result based on their order of precedence and type.
configResultsMap := make(map[runtime.Object][]*resourceapi.DeviceRequestAllocationResult)
for _, result := range rcs.Allocation.Devices.Results {
if result.Driver != DriverName {
continue
}
device, exists := s.allocatable[result.Device]
if !exists {
return nil, fmt.Errorf("requested device is not allocatable: %v", result.Device)
}
for _, c := range slices.Backward(configs) {
if slices.Contains(c.Requests, result.Request) {
if _, ok := c.Config.(*configapi.ComputeDomainChannelConfig); ok && device.Type() != ComputeDomainChannelType {
return nil, fmt.Errorf("cannot apply ComputeDomainChannelConfig to request: %v", result.Request)
}
if _, ok := c.Config.(*configapi.ComputeDomainDaemonConfig); ok && device.Type() != ComputeDomainDaemonType {
return nil, fmt.Errorf("cannot apply ComputeDomainDaemonConfig to request: %v", result.Request)
}
configResultsMap[c.Config] = append(configResultsMap[c.Config], &result)
break
}
if len(c.Requests) == 0 {
if _, ok := c.Config.(*configapi.ComputeDomainChannelConfig); ok && device.Type() != ComputeDomainChannelType {
continue
}
if _, ok := c.Config.(*configapi.ComputeDomainDaemonConfig); ok && device.Type() != ComputeDomainDaemonType {
continue
}
configResultsMap[c.Config] = append(configResultsMap[c.Config], &result)
break
}
}
}
return configResultsMap, nil
}
// assertImexChannelNotAllocated() consults the absolute, node-local source of
// truth (the checkpoint data). It fails when the IMEX channel with ID `id` is
// already in use by another resource claim.
//
// Must be performed in the Prepare() path for any claim asking for a channel.
// This makes sure that Prepare() and Unprepare() calls acting on the same
// resource are processed in the correct order (this prevents for example
// unprepare-after-prepare, cf. issue 641).
//
// The implementation may become more involved when the same IMEX channel may be
// shared across pods on the same node).
func (s *DeviceState) assertImexChannelNotAllocated(id int) error {
cp, err := s.getCheckpoint()
if err != nil {
return fmt.Errorf("unable to get checkpoint: %w", err)
}
for claimUID, claim := range cp.V2.PreparedClaims {
// Ignore non-completed preparations: file-based locking guarantees that
// only one Prepare() runs at any given time. If a claim is in the
// `PrepareStarted` state then it is not actually currently in progress
// of being prepared, but either retried soon (in which case we are
// faster and win over it) or never retried (in which case we can also
// safely allocate).
if claim.CheckpointState != ClaimCheckpointStatePrepareCompleted {
continue
}
for _, devs := range claim.PreparedDevices {
for _, d := range devs.Devices {
if d.Channel != nil && d.Channel.Info.ID == id {
// Maybe log something based on `claim.Status.ReservedFor`
// to facilitate debugging.
return fmt.Errorf("channel %d already allocated by claim %s (according to checkpoint)", id, claimUID)
}
}
}
}
return nil
}
// validateDriverVersionForIMEXDaemonsWithDNSNames validates that the driver version
// meets the minimum requirement for the IMEXDaemonsWithDNSNames feature gate.
func validateDriverVersionForIMEXDaemonsWithDNSNames(flags *Flags, nvdevlib *deviceLib) error {
klog.Infof("Starting driver version validation for IMEXDaemonsWithDNSNames feature...")
klog.Infof("Minimum required version: %s", IMEXDaemonsWithDNSNamesMinDriverVersion)
driverVer, err := nvdevlib.getDriverVersion()
if err != nil {
return fmt.Errorf("error getting driver version: %w", err)
}
minVersion, err := version.ParseGeneric(IMEXDaemonsWithDNSNamesMinDriverVersion)
if err != nil {
return fmt.Errorf("error parsing minimum version: %w", err)
}
if driverVer.LessThan(minVersion) {
klog.Errorf("IMEXDaemonsWithDNSNames feature requires GPU driver version >= %s, but found %s", minVersion.String(), driverVer.String())
klog.Errorf("If installed via helm, set featureGates.IMEXDaemonsWithDNSNames=false to disable")
return fmt.Errorf("minimum version not satisfied for IMEXDaemonsWithDNSNames feature")
}
klog.Infof("Driver version validation passed: %s >= %s", driverVer.String(), minVersion.String())
return nil
}
// GetOpaqueDeviceConfigs returns an ordered list of the configs contained in possibleConfigs for this driver.
//
// Configs can either come from the resource claim itself or from the device
// class associated with the request. Configs coming directly from the resource
// claim take precedence over configs coming from the device class. Moreover,
// configs found later in the list of configs attached to its source take
// precedence over configs found earlier in the list for that source.
//
// All of the configs relevant to the driver from the list of possibleConfigs
// will be returned in order of precedence (from lowest to highest). If no
// configs are found, nil is returned.
func GetOpaqueDeviceConfigs(
decoder runtime.Decoder,
driverName string,
possibleConfigs []resourceapi.DeviceAllocationConfiguration,
) ([]*OpaqueDeviceConfig, error) {
// Collect all configs in order of reverse precedence.
var classConfigs []resourceapi.DeviceAllocationConfiguration
var claimConfigs []resourceapi.DeviceAllocationConfiguration
var candidateConfigs []resourceapi.DeviceAllocationConfiguration
for _, config := range possibleConfigs {
switch config.Source {
case resourceapi.AllocationConfigSourceClass:
classConfigs = append(classConfigs, config)
case resourceapi.AllocationConfigSourceClaim:
claimConfigs = append(claimConfigs, config)
default:
return nil, fmt.Errorf("invalid config source: %v", config.Source)
}
}
candidateConfigs = append(candidateConfigs, classConfigs...)
candidateConfigs = append(candidateConfigs, claimConfigs...)
// Decode all configs that are relevant for the driver.
var resultConfigs []*OpaqueDeviceConfig
for _, config := range candidateConfigs {
// If this is nil, the driver doesn't support some future API extension
// and needs to be updated.
if config.Opaque == nil {
return nil, fmt.Errorf("only opaque parameters are supported by this driver")
}
// Configs for different drivers may have been specified because a
// single request can be satisfied by different drivers. This is not
// an error -- drivers must skip over other driver's configs in order
// to support this.
if config.Opaque.Driver != driverName {
continue
}
decodedConfig, err := runtime.Decode(decoder, config.Opaque.Parameters.Raw)
if err != nil {
// Bad opaque config: i) do not retry preparing this resource
// internally and ii) return notion of permanent error to kubelet,
// to give it an opportunity to play this error back to the user so
// that it becomes actionable.
return nil, permanentError{fmt.Errorf("error decoding config parameters: %w", err)}
}
resultConfig := &OpaqueDeviceConfig{
Requests: config.Requests,
Config: decodedConfig,
}
resultConfigs = append(resultConfigs, resultConfig)
}
return resultConfigs, nil
}
// requestedNonAdminDevices returns the set of device names requested by the claim,
// excluding admin-access allocations.
func (s *DeviceState) requestedNonAdminDevices(claim *resourceapi.ResourceClaim) map[string]struct{} {
requested := make(map[string]struct{}, len(claim.Status.Allocation.Devices.Results))
for _, r := range claim.Status.Allocation.Devices.Results {
if r.Driver != DriverName {
continue
}
if r.AdminAccess != nil && *r.AdminAccess {
continue
}
requested[r.Device] = struct{}{}
}
return requested
}
// validateNoOverlappingPreparedDevices checks whether the given claim requests any device that is
// already allocated (non-admin) to a different claim that has completed preparation.
func (s *DeviceState) validateNoOverlappingPreparedDevices(checkpoint *Checkpoint, claim *resourceapi.ResourceClaim) error {
claimUID := string(claim.UID)
// Get the set of requested non-admin devices for the current claim.
requestedDevices := s.requestedNonAdminDevices(claim)
if len(requestedDevices) == 0 {
return nil
}
for existingClaimUID, pc := range checkpoint.V2.PreparedClaims {
// Skip the current claim.
if existingClaimUID == claimUID {
continue
}
if pc.CheckpointState != ClaimCheckpointStatePrepareCompleted {
continue
}
// Get the non-admin devices from the prepared claim in the checkpoint.
// We allow overlapping device allocations only if they are requested with admin access.
preparedDevices := pc.GetNonAdminDevices()
if len(preparedDevices) == 0 {
continue
}
// Check for overlaps between requested devices from the current claim and others.
for device := range requestedDevices {
if _, found := preparedDevices[device]; found {
return fmt.Errorf(
"requested device %s is already allocated to different claim %s",
device, existingClaimUID,
)
}
}
}
return nil
}
func syncPreparedDevicesGaugeFromCheckpoint(nodeName string, cp *Checkpoint) {
counts := make(map[string]int)
if cp == nil {
return
}
lv := cp.ToLatestVersion()
if lv != nil && lv.V2 != nil {
for _, pc := range lv.V2.PreparedClaims {
if pc.CheckpointState != ClaimCheckpointStatePrepareCompleted {
continue
}
for _, g := range pc.PreparedDevices {
for _, dev := range g.Devices {
if _, ok := counts[dev.Type()]; !ok {
counts[dev.Type()] = 0
}
counts[dev.Type()]++
}
}
}
}
for _, dt := range []string{ComputeDomainChannelType, ComputeDomainDaemonType, UnknownDeviceType} {
if count, ok := counts[dt]; !ok {
drametrics.SetPreparedDevicesCounts(nodeName, DriverName, dt, 0)
} else {
drametrics.SetPreparedDevicesCounts(nodeName, DriverName, dt, count)
}
}
}