Skip to content

Commit c352a5a

Browse files
refactor(plugin): use context for health check lifecycle
Replace stop channel with context-based cancellation for health checks: - Add healthCtx, healthCancel, and healthWg for lifecycle management - Capture context/channel references in ListAndWatch to avoid race with cleanup() which may nil these fields - Properly wait for health goroutine completion during cleanup Signed-off-by: Carlos Eduardo Arango Gutierrez <eduardoa@nvidia.com>
1 parent 3a37806 commit c352a5a

File tree

1 file changed

+25
-10
lines changed

1 file changed

+25
-10
lines changed

internal/plugin/server.go

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"path"
2626
"path/filepath"
2727
"strings"
28+
"sync"
2829
"time"
2930

3031
cdiapi "tags.cncf.io/container-device-interface/pkg/cdi"
@@ -62,7 +63,12 @@ type nvidiaDevicePlugin struct {
6263
socket string
6364
server *grpc.Server
6465
health chan *rm.Device
65-
stop chan interface{}
66+
67+
// healthCtx and healthCancel control the health check goroutine lifecycle.
68+
// healthWg is used to wait for the health check goroutine to complete during cleanup.
69+
healthCtx context.Context
70+
healthCancel context.CancelFunc
71+
healthWg sync.WaitGroup
6672

6773
imexChannels imex.Channels
6874

@@ -94,7 +100,6 @@ func (o *options) devicePluginForResource(ctx context.Context, resourceManager r
94100
// time the plugin server is restarted.
95101
server: nil,
96102
health: nil,
97-
stop: nil,
98103
}
99104
return &plugin, nil
100105
}
@@ -109,14 +114,18 @@ func getPluginSocketPath(resource spec.ResourceName) string {
109114
func (plugin *nvidiaDevicePlugin) initialize() {
110115
plugin.server = grpc.NewServer([]grpc.ServerOption{}...)
111116
plugin.health = make(chan *rm.Device)
112-
plugin.stop = make(chan interface{})
117+
plugin.healthCtx, plugin.healthCancel = context.WithCancel(plugin.ctx)
113118
}
114119

115120
func (plugin *nvidiaDevicePlugin) cleanup() {
116-
close(plugin.stop)
121+
if plugin.healthCancel != nil {
122+
plugin.healthCancel()
123+
}
124+
plugin.healthWg.Wait()
117125
plugin.server = nil
118126
plugin.health = nil
119-
plugin.stop = nil
127+
plugin.healthCtx = nil
128+
plugin.healthCancel = nil
120129
}
121130

122131
// Devices returns the full set of devices associated with the plugin.
@@ -148,10 +157,12 @@ func (plugin *nvidiaDevicePlugin) Start(kubeletSocket string) error {
148157
}
149158
klog.Infof("Registered device plugin for '%s' with Kubelet", plugin.rm.Resource())
150159

160+
plugin.healthWg.Add(1)
151161
go func() {
162+
defer plugin.healthWg.Done()
152163
// TODO: add MPS health check
153-
err := plugin.rm.CheckHealth(plugin.stop, plugin.health)
154-
if err != nil {
164+
err := plugin.rm.CheckHealth(plugin.healthCtx, plugin.health)
165+
if err != nil && !errors.Is(err, context.Canceled) {
155166
klog.Errorf("Failed to start health check: %v; continuing with health checks disabled", err)
156167
}
157168
}()
@@ -265,15 +276,19 @@ func (plugin *nvidiaDevicePlugin) GetDevicePluginOptions(context.Context, *plugi
265276

266277
// ListAndWatch lists devices and update that list according to the health status
267278
func (plugin *nvidiaDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {
279+
// Capture references at start to avoid race with cleanup() which may nil these fields.
280+
healthCtx := plugin.healthCtx
281+
health := plugin.health
282+
268283
if err := s.Send(&pluginapi.ListAndWatchResponse{Devices: plugin.apiDevices()}); err != nil {
269284
return err
270285
}
271286

272287
for {
273288
select {
274-
case <-plugin.stop:
289+
case <-healthCtx.Done():
275290
return nil
276-
case d := <-plugin.health:
291+
case d := <-health:
277292
// FIXME: there is no way to recover from the Unhealthy state.
278293
d.Health = pluginapi.Unhealthy
279294
klog.Infof("'%s' device marked unhealthy: %s", plugin.rm.Resource(), d.ID)
@@ -368,7 +383,7 @@ func (plugin *nvidiaDevicePlugin) getAllocateResponse(requestIds []string) (*plu
368383
// updateResponseForMPS ensures that the ContainerAllocate response contains the information required to use MPS.
369384
// This includes per-resource pipe and log directories as well as a global daemon-specific shm
370385
// and assumes that an MPS control daemon has already been started.
371-
func (plugin nvidiaDevicePlugin) updateResponseForMPS(response *pluginapi.ContainerAllocateResponse) {
386+
func (plugin *nvidiaDevicePlugin) updateResponseForMPS(response *pluginapi.ContainerAllocateResponse) {
372387
plugin.mps.updateReponse(response)
373388
}
374389

0 commit comments

Comments
 (0)