Skip to content

Commit aa0d5c8

Browse files
feat: add buffered health channel and device state tracking
Add buffered health channel to prevent the health check goroutine from blocking when ListAndWatch is slow to consume events. This addresses stability issues with multiple GPUs and bursty XID scenarios. Changes: - Add healthChannelBufferSize constant (64) for burst handling - Create buffered health channel in initialize() - Add enhanced logging for unhealthy device reports with reason - Add MarkUnhealthy() to Device for tracking failure reason/timestamp - Add IsUnhealthy() and UnhealthyDuration() for diagnostics - Add UnhealthyReason and LastUnhealthyTime fields to Device struct The buffer size of 64 provides headroom for 8 GPUs with multiple events per GPU while using a power-of-2 size for cache-friendly alignment. Devices marked unhealthy remain in that state until external intervention (node drain, GPU reset, reboot) - the device plugin does not attempt auto-recovery as that decision belongs to external components like DCGM or Node Problem Detector. Signed-off-by: Carlos Eduardo Arango Gutierrez <eduardoa@nvidia.com>
1 parent b26865d commit aa0d5c8

2 files changed

Lines changed: 42 additions & 4 deletions

File tree

internal/plugin/server.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,14 @@ const (
4646
deviceListEnvVar = "NVIDIA_VISIBLE_DEVICES"
4747
deviceListAsVolumeMountsHostPath = "/dev/null"
4848
deviceListAsVolumeMountsContainerPathRoot = "/var/run/nvidia-container-devices"
49+
50+
// healthChannelBufferSize defines the buffer capacity for the health
51+
// channel. This is sized to handle bursts of unhealthy device reports
52+
// without blocking the health check goroutine. With 8 GPUs and
53+
// potential for multiple events per GPU (XID errors, ECC errors, etc.),
54+
// a buffer of 64 provides ample headroom while using a power-of-2 size
55+
// for cache-friendly alignment.
56+
healthChannelBufferSize = 64
4957
)
5058

5159
// nvidiaDevicePlugin implements the Kubernetes device plugin API
@@ -108,7 +116,7 @@ func getPluginSocketPath(resource spec.ResourceName) string {
108116

109117
func (plugin *nvidiaDevicePlugin) initialize() {
110118
plugin.server = grpc.NewServer([]grpc.ServerOption{}...)
111-
plugin.health = make(chan *rm.Device)
119+
plugin.health = make(chan *rm.Device, healthChannelBufferSize)
112120
plugin.stop = make(chan interface{})
113121
}
114122

@@ -263,7 +271,8 @@ func (plugin *nvidiaDevicePlugin) GetDevicePluginOptions(context.Context, *plugi
263271
return options, nil
264272
}
265273

266-
// ListAndWatch lists devices and update that list according to the health status
274+
// ListAndWatch lists devices and update that list according to the health
275+
// status.
267276
func (plugin *nvidiaDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {
268277
if err := s.Send(&pluginapi.ListAndWatchResponse{Devices: plugin.apiDevices()}); err != nil {
269278
return err
@@ -274,9 +283,9 @@ func (plugin *nvidiaDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.D
274283
case <-plugin.stop:
275284
return nil
276285
case d := <-plugin.health:
277-
// FIXME: there is no way to recover from the Unhealthy state.
278286
d.Health = pluginapi.Unhealthy
279-
klog.Infof("'%s' device marked unhealthy: %s", plugin.rm.Resource(), d.ID)
287+
klog.Infof("'%s' device marked unhealthy: %s (reason: %s)",
288+
plugin.rm.Resource(), d.ID, d.UnhealthyReason)
280289
if err := s.Send(&pluginapi.ListAndWatchResponse{Devices: plugin.apiDevices()}); err != nil {
281290
return nil
282291
}

internal/rm/devices.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"fmt"
2121
"strconv"
2222
"strings"
23+
"time"
2324

2425
"k8s.io/klog/v2"
2526
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
@@ -35,6 +36,10 @@ type Device struct {
3536
// Replicas stores the total number of times this device is replicated.
3637
// If this is 0 or 1 then the device is not shared.
3738
Replicas int
39+
40+
// Health tracking fields
41+
LastUnhealthyTime time.Time // When device became unhealthy
42+
UnhealthyReason string // Human-readable reason (e.g., "XID-79")
3843
}
3944

4045
// deviceInfo defines the information the required to construct a Device
@@ -239,6 +244,30 @@ func (d *Device) GetUUID() string {
239244
return AnnotatedID(d.ID).GetID()
240245
}
241246

247+
// MarkUnhealthy marks the device as unhealthy and records the reason and
248+
// timestamp. This should be called when a health check detects a device
249+
// failure (e.g., XID error). Once marked unhealthy, devices remain in this
250+
// state until external intervention (e.g., node drain, GPU reset, reboot).
251+
func (d *Device) MarkUnhealthy(reason string) {
252+
d.Health = pluginapi.Unhealthy
253+
d.LastUnhealthyTime = time.Now()
254+
d.UnhealthyReason = reason
255+
}
256+
257+
// IsUnhealthy returns true if the device is currently marked as unhealthy.
258+
func (d *Device) IsUnhealthy() bool {
259+
return d.Health == pluginapi.Unhealthy
260+
}
261+
262+
// UnhealthyDuration returns how long the device has been unhealthy. Returns
263+
// zero duration if the device is healthy.
264+
func (d *Device) UnhealthyDuration() time.Duration {
265+
if !d.IsUnhealthy() {
266+
return 0
267+
}
268+
return time.Since(d.LastUnhealthyTime)
269+
}
270+
242271
// NewAnnotatedID creates a new AnnotatedID from an ID and a replica number.
243272
func NewAnnotatedID(id string, replica int) AnnotatedID {
244273
return AnnotatedID(fmt.Sprintf("%s::%d", id, replica))

0 commit comments

Comments
 (0)