Skip to content

Commit dd39c0c

Browse files
ambermingxinjingxiang-z
authored andcommitted
feat: add loop and retry for dcgm detection (#180)
Signed-off-by: Amber Xue <ambermingxin@nvidia.com>
1 parent 558b6d8 commit dd39c0c

15 files changed

Lines changed: 964 additions & 77 deletions

File tree

third_party/fleet-intelligence-sdk/components/accelerator/nvidia/dcgm/clock/component.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func New(gpudInstance *components.GPUdInstance) (components.Component, error) {
7373
}
7474

7575
// Register clock fields with DCGM instance for centralized watching
76-
if c.dcgmInstance != nil && c.dcgmInstance.DCGMExists() {
76+
if c.dcgmInstance != nil {
7777
if err := c.dcgmInstance.AddFieldsToWatch(clockFields); err != nil {
7878
log.Logger.Warnw("failed to register clock fields", "error", err)
7979
} else {

third_party/fleet-intelligence-sdk/components/accelerator/nvidia/dcgm/inforom/component.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func New(gpudInstance *components.GPUdInstance) (components.Component, error) {
7878
}
7979

8080
// Register this component's health watch system with DCGM
81-
if c.dcgmInstance != nil && c.dcgmInstance.DCGMExists() {
81+
if c.dcgmInstance != nil {
8282
if err := c.dcgmInstance.AddHealthWatch(dcgm.DCGM_HEALTH_WATCH_INFOROM); err != nil {
8383
log.Logger.Warnw("failed to add inforom health watch", "error", err)
8484
} else {

third_party/fleet-intelligence-sdk/components/accelerator/nvidia/dcgm/mem/component.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func New(gpudInstance *components.GPUdInstance) (components.Component, error) {
7878
}
7979

8080
// Only initialize if DCGM is available
81-
if c.dcgmInstance != nil && c.dcgmInstance.DCGMExists() {
81+
if c.dcgmInstance != nil {
8282
// Register this component's health watch system with DCGM
8383
if err := c.dcgmInstance.AddHealthWatch(dcgm.DCGM_HEALTH_WATCH_MEM); err != nil {
8484
log.Logger.Warnw("failed to add memory health watch", "error", err)
@@ -446,4 +446,3 @@ func (cr *checkResult) HealthStates() apiv1.HealthStates {
446446

447447
return apiv1.HealthStates{state}
448448
}
449-

third_party/fleet-intelligence-sdk/components/accelerator/nvidia/dcgm/nvlink/component.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func New(gpudInstance *components.GPUdInstance) (components.Component, error) {
7878
}
7979

8080
// Only initialize if DCGM is available
81-
if c.dcgmInstance != nil && c.dcgmInstance.DCGMExists() {
81+
if c.dcgmInstance != nil {
8282
// Register this component's health watch system with DCGM
8383
if err := c.dcgmInstance.AddHealthWatch(dcgm.DCGM_HEALTH_WATCH_NVLINK); err != nil {
8484
log.Logger.Warnw("failed to add NVLink health watch", "error", err)

third_party/fleet-intelligence-sdk/components/accelerator/nvidia/dcgm/nvswitch/component.go

Lines changed: 53 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ type component struct {
5151
healthCheckInterval time.Duration
5252
dcgmInstance nvidiadcgm.Instance
5353
dcgmHealthCache *nvidiadcgm.HealthCache
54+
switchMu sync.Mutex
55+
registeredSwitches map[uint]struct{}
5456

5557
eventBucket eventstore.Bucket
5658

@@ -72,43 +74,33 @@ func New(gpudInstance *components.GPUdInstance) (components.Component, error) {
7274
healthCheckInterval: healthCheckInterval,
7375
dcgmInstance: gpudInstance.DCGMInstance,
7476
dcgmHealthCache: gpudInstance.DCGMHealthCache,
77+
registeredSwitches: make(map[uint]struct{}),
7578
}
7679

77-
// Add NVSwitch entities to DCGM group and register health watches
78-
// This component takes ownership of managing NVSwitch entities in DCGM
80+
if c.dcgmInstance != nil {
81+
// Register health watch systems even if DCGM is not connected yet.
82+
// A reconnecting DCGM instance will replay this registration later.
83+
healthSystems := dcgm.DCGM_HEALTH_WATCH_NVSWITCH_FATAL | dcgm.DCGM_HEALTH_WATCH_NVSWITCH_NONFATAL
84+
if err := c.dcgmInstance.AddHealthWatch(healthSystems); err != nil {
85+
log.Logger.Warnw("failed to add NVSwitch health watch", "error", err)
86+
} else {
87+
log.Logger.Infow("registered DCGM NVSwitch health watch (fatal and non-fatal)")
88+
}
89+
}
90+
91+
// Add NVSwitch entities to DCGM group when DCGM is already connected.
7992
if c.dcgmInstance != nil && c.dcgmInstance.DCGMExists() {
8093
// Query for NVSwitch entities in the system
8194
switchEntities, err := dcgm.GetEntityGroupEntities(dcgm.FE_SWITCH)
8295
if err != nil {
8396
log.Logger.Warnw("failed to get NVSwitch entities", "error", err)
8497
} else if len(switchEntities) > 0 {
85-
// Add NVSwitch entities to the DCGM group
86-
addedCount := 0
87-
failedCount := 0
88-
for _, switchID := range switchEntities {
89-
if err := c.dcgmInstance.AddEntityToGroup(switchID); err != nil {
90-
log.Logger.Warnw("failed to add NVSwitch to DCGM group", "switchID", switchID, "error", err)
91-
failedCount++
92-
} else {
93-
addedCount++
94-
}
98+
if err := c.ensureSwitchEntitiesRegistered(switchEntities); err != nil {
99+
log.Logger.Warnw("failed to add NVSwitch entities to DCGM group", "error", err)
95100
}
96-
log.Logger.Infow("added NVSwitch entities to DCGM group",
97-
"totalCount", len(switchEntities),
98-
"addedCount", addedCount,
99-
"failedCount", failedCount)
100101
} else {
101102
log.Logger.Debugw("no NVSwitch entities found in system")
102103
}
103-
104-
// Register health watch systems (both fatal and non-fatal)
105-
// This registers with DCGM, and the health cache will automatically poll these systems
106-
healthSystems := dcgm.DCGM_HEALTH_WATCH_NVSWITCH_FATAL | dcgm.DCGM_HEALTH_WATCH_NVSWITCH_NONFATAL
107-
if err := c.dcgmInstance.AddHealthWatch(healthSystems); err != nil {
108-
log.Logger.Warnw("failed to add NVSwitch health watch", "error", err)
109-
} else {
110-
log.Logger.Infow("registered DCGM NVSwitch health watch (fatal and non-fatal)")
111-
}
112104
}
113105

114106
if gpudInstance.EventStore != nil {
@@ -246,6 +238,9 @@ func (c *component) Check() components.CheckResult {
246238
}
247239

248240
log.Logger.Debugw("found NVSwitch entities", "count", len(switchEntities))
241+
if err := c.ensureSwitchEntitiesRegistered(switchEntities); err != nil {
242+
log.Logger.Warnw("failed to ensure NVSwitch entities are in DCGM group", "error", err)
243+
}
249244

250245
// Get cached DCGM NVSwitch health check results (both fatal and non-fatal) from shared cache
251246
// Check fatal errors
@@ -312,6 +307,39 @@ func (c *component) Check() components.CheckResult {
312307
return cr
313308
}
314309

310+
func (c *component) ensureSwitchEntitiesRegistered(switchEntities []uint) error {
311+
c.switchMu.Lock()
312+
defer c.switchMu.Unlock()
313+
314+
addedCount := 0
315+
failedCount := 0
316+
317+
for _, switchID := range switchEntities {
318+
if _, exists := c.registeredSwitches[switchID]; exists {
319+
continue
320+
}
321+
if err := c.dcgmInstance.AddEntityToGroup(switchID); err != nil {
322+
log.Logger.Warnw("failed to add NVSwitch to DCGM group", "switchID", switchID, "error", err)
323+
failedCount++
324+
continue
325+
}
326+
c.registeredSwitches[switchID] = struct{}{}
327+
addedCount++
328+
}
329+
330+
if addedCount > 0 || failedCount > 0 {
331+
log.Logger.Infow("processed NVSwitch entity registration",
332+
"totalCount", len(switchEntities),
333+
"addedCount", addedCount,
334+
"failedCount", failedCount)
335+
}
336+
337+
if failedCount > 0 {
338+
return fmt.Errorf("failed to register %d NVSwitch entities", failedCount)
339+
}
340+
return nil
341+
}
342+
315343
var _ components.CheckResult = &checkResult{}
316344

317345
type checkResult struct {

third_party/fleet-intelligence-sdk/components/accelerator/nvidia/dcgm/pcie/component.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func New(gpudInstance *components.GPUdInstance) (components.Component, error) {
7878
}
7979

8080
// Only initialize if DCGM is available
81-
if c.dcgmInstance != nil && c.dcgmInstance.DCGMExists() {
81+
if c.dcgmInstance != nil {
8282
// Register this component's health watch system with DCGM
8383
if err := c.dcgmInstance.AddHealthWatch(dcgm.DCGM_HEALTH_WATCH_PCIE); err != nil {
8484
log.Logger.Warnw("failed to add PCIe health watch", "error", err)
@@ -361,4 +361,3 @@ func (cr *checkResult) HealthStates() apiv1.HealthStates {
361361

362362
return apiv1.HealthStates{state}
363363
}
364-

third_party/fleet-intelligence-sdk/components/accelerator/nvidia/dcgm/power/component.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func New(gpudInstance *components.GPUdInstance) (components.Component, error) {
8080
}
8181

8282
// Only initialize if DCGM is available
83-
if c.dcgmInstance != nil && c.dcgmInstance.DCGMExists() {
83+
if c.dcgmInstance != nil {
8484
// Register this component's health watch system with DCGM
8585
if err := c.dcgmInstance.AddHealthWatch(dcgm.DCGM_HEALTH_WATCH_POWER); err != nil {
8686
log.Logger.Warnw("failed to add power health watch", "error", err)
@@ -387,4 +387,3 @@ func (cr *checkResult) HealthStates() apiv1.HealthStates {
387387

388388
return apiv1.HealthStates{state}
389389
}
390-

third_party/fleet-intelligence-sdk/components/accelerator/nvidia/dcgm/thermal/component.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func New(gpudInstance *components.GPUdInstance) (components.Component, error) {
8080
}
8181

8282
// Only initialize if DCGM is available
83-
if c.dcgmInstance != nil && c.dcgmInstance.DCGMExists() {
83+
if c.dcgmInstance != nil {
8484
// Register this component's health watch system with DCGM
8585
if err := c.dcgmInstance.AddHealthWatch(dcgm.DCGM_HEALTH_WATCH_THERMAL); err != nil {
8686
log.Logger.Warnw("failed to add thermal health watch", "error", err)

third_party/fleet-intelligence-sdk/components/accelerator/nvidia/dcgm/utilization/component.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func New(gpudInstance *components.GPUdInstance) (components.Component, error) {
7373
}
7474

7575
// Register utilization fields with DCGM instance for centralized watching
76-
if c.dcgmInstance != nil && c.dcgmInstance.DCGMExists() {
76+
if c.dcgmInstance != nil {
7777
if err := c.dcgmInstance.AddFieldsToWatch(utilizationFields); err != nil {
7878
log.Logger.Warnw("failed to register utilization fields", "error", err)
7979
} else {

third_party/fleet-intelligence-sdk/pkg/nvidia-query/dcgm/field_cache.go

Lines changed: 61 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ type FieldValueCache struct {
4040
lastError error
4141

4242
fieldGroupID dcgm.FieldHandle
43+
fieldGroupName string
4344
pollInterval time.Duration
4445
started bool
4546
startOnce sync.Once
@@ -50,27 +51,46 @@ type FieldValueCache struct {
5051
func NewFieldValueCache(ctx context.Context, instance Instance, pollInterval time.Duration) *FieldValueCache {
5152
cctx, ccancel := context.WithCancel(ctx)
5253

53-
return &FieldValueCache{
54-
ctx: cctx,
55-
cancel: ccancel,
56-
instance: instance,
57-
values: make(map[uint]map[dcgm.Short]dcgm.FieldValue_v1),
58-
pollInterval: pollInterval,
54+
fc := &FieldValueCache{
55+
ctx: cctx,
56+
cancel: ccancel,
57+
instance: instance,
58+
values: make(map[uint]map[dcgm.Short]dcgm.FieldValue_v1),
59+
fieldGroupName: "gpud-gpu-fields",
60+
pollInterval: pollInterval,
5961
}
62+
63+
if registrar, ok := instance.(reconnectCallbackRegistrar); ok {
64+
registrar.RegisterReconnectCallback(fc.resetAfterReconnect)
65+
}
66+
67+
return fc
6068
}
6169

6270
// SetupFieldWatching creates the field group and starts DCGM watching for all registered fields.
6371
// For tests, use SetupFieldWatchingWithName to provide a unique name.
6472
func (fc *FieldValueCache) SetupFieldWatching() error {
65-
return fc.SetupFieldWatchingWithName("gpud-gpu-fields")
73+
return fc.SetupFieldWatchingWithName(fc.fieldGroupName)
6674
}
6775

6876
// SetupFieldWatchingWithName creates the field group with a custom name.
6977
// This is useful for tests to avoid naming conflicts when running in parallel.
7078
func (fc *FieldValueCache) SetupFieldWatchingWithName(fieldGroupName string) error {
79+
fc.mu.Lock()
80+
fc.fieldGroupName = fieldGroupName
81+
fc.mu.Unlock()
82+
83+
return fc.ensureFieldWatchingSetup()
84+
}
85+
86+
func (fc *FieldValueCache) ensureFieldWatchingSetup() error {
7187
fc.registrationMu.Lock()
7288
defer fc.registrationMu.Unlock()
7389

90+
if fc.fieldGroupID.GetHandle() != 0 {
91+
return nil
92+
}
93+
7494
if fc.instance == nil || !fc.instance.DCGMExists() {
7595
log.Logger.Debugw("DCGM not available, skipping field watching setup")
7696
return nil
@@ -82,6 +102,10 @@ func (fc *FieldValueCache) SetupFieldWatchingWithName(fieldGroupName string) err
82102
return nil
83103
}
84104

105+
fc.mu.RLock()
106+
fieldGroupName := fc.fieldGroupName
107+
fc.mu.RUnlock()
108+
85109
fieldGroupID, err := dcgm.FieldGroupCreate(fieldGroupName, watchedFields)
86110
if err != nil {
87111
setupErr := fmt.Errorf("failed to create DCGM field group: %w", err)
@@ -122,30 +146,16 @@ func (fc *FieldValueCache) Start() error {
122146
var startErr error
123147
fc.startOnce.Do(func() {
124148
fc.registrationMu.Lock()
125-
defer fc.registrationMu.Unlock()
126-
127-
if fc.instance == nil || !fc.instance.DCGMExists() {
128-
log.Logger.Debugw("no fields or DCGM unavailable, skipping polling")
129-
fc.started = true
130-
return
131-
}
132-
133-
watchedFields := fc.instance.GetWatchedFields()
134-
if len(watchedFields) == 0 {
135-
log.Logger.Debugw("no fields or DCGM unavailable, skipping polling")
136-
fc.started = true
137-
return
138-
}
139-
140149
fc.started = true
150+
fc.registrationMu.Unlock()
141151

142152
if err := fc.Poll(); err != nil {
143153
log.Logger.Warnw("initial poll failed", "error", err)
144154
}
145155

146156
go fc.pollLoop()
147157

148-
log.Logger.Infow("field cache polling started", "interval", fc.pollInterval, "fields", len(watchedFields))
158+
log.Logger.Infow("field cache polling started", "interval", fc.pollInterval)
149159
})
150160

151161
return startErr
@@ -155,13 +165,30 @@ func (fc *FieldValueCache) Start() error {
155165
func (fc *FieldValueCache) Stop() {
156166
fc.cancel()
157167

158-
if fc.instance != nil && fc.instance.DCGMExists() && fc.fieldGroupID.GetHandle() != 0 {
159-
if err := dcgm.FieldGroupDestroy(fc.fieldGroupID); err != nil {
168+
fc.registrationMu.Lock()
169+
fieldGroupID := fc.fieldGroupID
170+
fc.fieldGroupID = dcgm.FieldHandle{}
171+
fc.registrationMu.Unlock()
172+
173+
if fc.instance != nil && fc.instance.DCGMExists() && fieldGroupID.GetHandle() != 0 {
174+
if err := dcgm.FieldGroupDestroy(fieldGroupID); err != nil {
160175
log.Logger.Warnw("failed to destroy field group", "error", err)
161176
}
162177
}
163178
}
164179

180+
func (fc *FieldValueCache) resetAfterReconnect() {
181+
fc.registrationMu.Lock()
182+
fc.fieldGroupID = dcgm.FieldHandle{}
183+
fc.registrationMu.Unlock()
184+
185+
fc.mu.Lock()
186+
fc.values = make(map[uint]map[dcgm.Short]dcgm.FieldValue_v1)
187+
fc.lastError = nil
188+
fc.lastUpdate = time.Time{}
189+
fc.mu.Unlock()
190+
}
191+
165192
func (fc *FieldValueCache) pollLoop() {
166193
ticker := time.NewTicker(fc.pollInterval)
167194
defer ticker.Stop()
@@ -185,12 +212,19 @@ func (fc *FieldValueCache) Poll() error {
185212
}
186213

187214
if !fc.instance.DCGMExists() {
188-
return fmt.Errorf("DCGM library not loaded")
215+
// DCGM may become available after startup; keep polling until it is ready.
216+
log.Logger.Debugw("DCGM not available yet, skipping field poll")
217+
return nil
218+
}
219+
220+
if err := fc.ensureFieldWatchingSetup(); err != nil {
221+
return err
189222
}
190223

191224
watchedFields := fc.instance.GetWatchedFields()
192225
if len(watchedFields) == 0 {
193-
return fmt.Errorf("no fields to watch")
226+
log.Logger.Debugw("no fields registered with DCGM, skipping field poll")
227+
return nil
194228
}
195229

196230
devices := fc.instance.GetDevices()

0 commit comments

Comments
 (0)