Skip to content

Commit 8b4de64

Browse files
committed
reschedule the reconcile loop when DOCA PCC process dies
We need to restart the PCC and reapply its params Signed-off-by: Alexander Maslennikov <amaslennikov@nvidia.com> (cherry picked from commit 9f5de2e)
1 parent 29c2432 commit 8b4de64

File tree

7 files changed

+161
-15
lines changed

7 files changed

+161
-15
lines changed

internal/controller/nicdevice_controller.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
"sigs.k8s.io/controller-runtime/pkg/handler"
4141
"sigs.k8s.io/controller-runtime/pkg/log"
4242
"sigs.k8s.io/controller-runtime/pkg/reconcile"
43+
"sigs.k8s.io/controller-runtime/pkg/source"
4344

4445
v1alpha1 "github.com/Mellanox/nic-configuration-operator/api/v1alpha1"
4546
"github.com/Mellanox/nic-configuration-operator/pkg/configuration"
@@ -923,9 +924,25 @@ func (r *NicDeviceReconciler) SetupWithManager(mgr ctrl.Manager, watchForMainten
923924
},
924925
}
925926

927+
// Bridge CC termination channel (plain Go channel from k8s-free package) into controller events
928+
ccTermChan := r.SpectrumXManager.GetCCTerminationChannel()
929+
ccEventChan := make(chan event.TypedGenericEvent[*v1alpha1.NicDevice], 10)
930+
go func() {
931+
for rdmaIface := range ccTermChan {
932+
log.Log.Info("CC process terminated, triggering reconcile", "rdma", rdmaIface)
933+
ccEventChan <- event.TypedGenericEvent[*v1alpha1.NicDevice]{}
934+
}
935+
}()
936+
926937
controller := ctrl.NewControllerManagedBy(mgr).
927938
For(&v1alpha1.NicDevice{}).
928-
Watches(&v1alpha1.NicDevice{}, nicDeviceEventHandler)
939+
Watches(&v1alpha1.NicDevice{}, nicDeviceEventHandler).
940+
WatchesRawSource(source.Channel(ccEventChan, handler.TypedFuncs[*v1alpha1.NicDevice, reconcile.Request]{
941+
GenericFunc: func(ctx context.Context, e event.TypedGenericEvent[*v1alpha1.NicDevice], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
942+
log.Log.Info("Enqueuing sync for CC termination event")
943+
qHandler(q)
944+
},
945+
}))
929946

930947
if watchForMaintenance {
931948
controller.Watches(&maintenanceoperator.NodeMaintenance{}, maintenance.GetMaintenanceRequestEventHandler(r.NodeName, qHandler))

internal/controller/nicdevice_controller_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ var _ = Describe("NicDeviceReconciler", func() {
6161
spectrumXManager *spectrumxMocks.SpectrumXManager
6262
udevManager *udevMocks.UdevManager
6363
deviceDiscoveryUtils *devicediscoveryMocks.DeviceDiscoveryUtils
64+
ccTerminationChan chan string
6465
deviceName = "test-device"
6566
ctx context.Context
6667
cancel context.CancelFunc
@@ -96,6 +97,8 @@ var _ = Describe("NicDeviceReconciler", func() {
9697
maintenanceManager = &maintenanceMocks.MaintenanceManager{}
9798
hostUtils = &hostMocks.HostUtils{}
9899
spectrumXManager = &spectrumxMocks.SpectrumXManager{}
100+
ccTerminationChan = make(chan string, 10)
101+
spectrumXManager.On("GetCCTerminationChannel").Return((<-chan string)(ccTerminationChan))
99102
spectrumXManager.On("GetDocaCCTargetVersion", mock.Anything).Return("", nil)
100103
udevManager = &udevMocks.UdevManager{}
101104
udevManager.On("ApplyUdevRules", mock.Anything, mock.Anything).Return(nil, false, nil)
@@ -1382,4 +1385,48 @@ var _ = Describe("NicDeviceReconciler", func() {
13821385
}, time.Second*2).Should(BeFalse())
13831386
})
13841387
})
1388+
1389+
Describe("reconcile triggered by CC process termination", func() {
1390+
It("Should re-reconcile when CC termination channel fires", func() {
1391+
configurationManager.On("ValidateDeviceNvSpec", mock.Anything, mock.Anything).Return(false, false, nil)
1392+
configurationManager.On("ApplyDeviceRuntimeSpec", mock.Anything).Return(nil)
1393+
maintenanceManager.On("ReleaseMaintenance", mock.Anything).Return(nil)
1394+
1395+
device := &v1alpha1.NicDevice{
1396+
ObjectMeta: metav1.ObjectMeta{Name: deviceName, Namespace: namespaceName},
1397+
Spec: v1alpha1.NicDeviceSpec{
1398+
Configuration: &v1alpha1.NicDeviceConfigurationSpec{
1399+
ResetToDefault: false,
1400+
Template: &v1alpha1.ConfigurationTemplateSpec{
1401+
NumVfs: 4,
1402+
LinkType: consts.Ethernet,
1403+
},
1404+
},
1405+
},
1406+
}
1407+
Expect(k8sClient.Create(ctx, device))
1408+
device.Status = v1alpha1.NicDeviceStatus{
1409+
Node: nodeName,
1410+
Ports: []v1alpha1.NicDevicePortSpec{{PCI: "0000:3b:00.0"}},
1411+
}
1412+
Expect(k8sClient.Status().Update(ctx, device)).To(Succeed())
1413+
1414+
// Wait for initial reconcile to complete successfully
1415+
Eventually(getDeviceConditions, timeout).Should(testutils.MatchCondition(metav1.Condition{
1416+
Type: consts.ConfigUpdateInProgressCondition,
1417+
Status: metav1.ConditionFalse,
1418+
Reason: consts.UpdateSuccessfulReason,
1419+
}))
1420+
1421+
initialCallCount := countCalls(configurationManager.Calls, "ApplyDeviceRuntimeSpec")
1422+
1423+
// Simulate CC process termination
1424+
ccTerminationChan <- "mlx5_0"
1425+
1426+
// Verify reconcile ran again
1427+
Eventually(func() int {
1428+
return countCalls(configurationManager.Calls, "ApplyDeviceRuntimeSpec")
1429+
}, timeout).Should(BeNumerically(">", initialCallCount))
1430+
})
1431+
})
13851432
})

internal/controller/utils_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"context"
2020
"sync"
2121

22+
"github.com/stretchr/testify/mock"
2223
v1 "k8s.io/api/core/v1"
2324
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2425
"k8s.io/apimachinery/pkg/util/rand"
@@ -84,3 +85,13 @@ func startManager(manager manager.Manager, ctx context.Context, wg *sync.WaitGro
8485

8586
log.Log.Info("synced manager cache for test", "test", GinkgoT().Name())
8687
}
88+
89+
func countCalls(calls []mock.Call, method string) int {
90+
count := 0
91+
for _, call := range calls {
92+
if call.Method == method {
93+
count++
94+
}
95+
}
96+
return count
97+
}

pkg/spectrumx/mocks/SpectrumXManager.go

Lines changed: 20 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/spectrumx/spectrumx.go

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ type SpectrumXManager interface {
6262
GetDocaCCTargetVersion(device *v1alpha1.NicDevice) (string, error)
6363
// RunDocaSpcXCC launches and keeps track of the DOCA SPC-X CC process for the given port
6464
RunDocaSpcXCC(port v1alpha1.NicDevicePortSpec) error
65+
// GetCCTerminationChannel returns a read-only channel that receives the RDMA interface name
66+
// when a DOCA SPC-X CC process terminates unexpectedly after startup
67+
GetCCTerminationChannel() <-chan string
6568
}
6669

6770
type spectrumXConfigManager struct {
@@ -70,14 +73,16 @@ type spectrumXConfigManager struct {
7073
execInterface execUtils.Interface
7174
nvConfigUtils nvconfig.NVConfigUtils
7275

73-
ccProcesses map[string]*ccProcess
76+
ccProcesses map[string]*ccProcess
77+
ccTerminationChan chan string // buffered; carries RDMA iface name on unexpected exit
7478
}
7579

7680
type ccProcess struct {
7781
port v1alpha1.NicDevicePortSpec
7882
cmd execUtils.Cmd
7983

80-
running atomic.Bool
84+
running atomic.Bool
85+
startupCheckPassed atomic.Bool // set after the 3s startup window; distinguishes startup failures from runtime crashes
8186

8287
// Error handling with mutex protection
8388
errMutex sync.RWMutex
@@ -704,6 +709,16 @@ func (m *spectrumXConfigManager) RunDocaSpcXCC(port v1alpha1.NicDevicePortSpec)
704709

705710
log.Log.V(2).Info("SpectrumXConfigManager.RunDocaSpcXCC(): CC process output", "rdma", port.RdmaInterface, "output", string(output))
706711
process.running.Store(false)
712+
713+
// Notify controller only for runtime crashes (after startup check passed)
714+
if process.startupCheckPassed.Load() {
715+
log.Log.Info("SpectrumXConfigManager.RunDocaSpcXCC(): CC process terminated unexpectedly, sending notification", "rdma", port.RdmaInterface)
716+
select {
717+
case m.ccTerminationChan <- port.RdmaInterface:
718+
default:
719+
log.Log.V(2).Info("SpectrumXConfigManager.RunDocaSpcXCC(): termination channel full, notification dropped", "rdma", port.RdmaInterface)
720+
}
721+
}
707722
}()
708723

709724
log.Log.V(2).Info("Waiting 3s for DOCA SPC-X CC to start", "rdma", port.RdmaInterface)
@@ -723,19 +738,27 @@ func (m *spectrumXConfigManager) RunDocaSpcXCC(port v1alpha1.NicDevicePortSpec)
723738

724739
log.Log.V(2).Info("DOCA SPC-X CC process started", "rdma", port.RdmaInterface)
725740

741+
process.startupCheckPassed.Store(true)
726742
m.ccProcesses[port.RdmaInterface] = process
727743

728744
log.Log.Info("Started DOCA SPC-X CC process", "rdma", port.RdmaInterface)
729745

730746
return nil
731747
}
732748

749+
// GetCCTerminationChannel returns a read-only channel for CC process termination notifications.
750+
// The channel carries the RDMA interface name of the terminated CC process.
751+
func (m *spectrumXConfigManager) GetCCTerminationChannel() <-chan string {
752+
return m.ccTerminationChan
753+
}
754+
733755
func NewSpectrumXConfigManager(dmsManager dms.DMSManager, spectrumXConfigs map[string]*types.SpectrumXConfig) SpectrumXManager {
734756
return &spectrumXConfigManager{
735-
dmsManager: dmsManager,
736-
spectrumXConfigs: spectrumXConfigs,
737-
execInterface: execUtils.New(),
738-
nvConfigUtils: nvconfig.NewNVConfigUtils(),
739-
ccProcesses: make(map[string]*ccProcess),
757+
dmsManager: dmsManager,
758+
spectrumXConfigs: spectrumXConfigs,
759+
execInterface: execUtils.New(),
760+
nvConfigUtils: nvconfig.NewNVConfigUtils(),
761+
ccProcesses: make(map[string]*ccProcess),
762+
ccTerminationChan: make(chan string, 10),
740763
}
741764
}

pkg/spectrumx/spectrumx_test.go

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -178,11 +178,12 @@ var _ = Describe("SpectrumXConfigManager", func() {
178178
}
179179

180180
manager = &spectrumXConfigManager{
181-
dmsManager: &dmsMgr,
182-
spectrumXConfigs: cfgs,
183-
execInterface: execFake,
184-
nvConfigUtils: &nvConfigMgr,
185-
ccProcesses: map[string]*ccProcess{},
181+
dmsManager: &dmsMgr,
182+
spectrumXConfigs: cfgs,
183+
execInterface: execFake,
184+
nvConfigUtils: &nvConfigMgr,
185+
ccProcesses: map[string]*ccProcess{},
186+
ccTerminationChan: make(chan string, 10),
186187
}
187188

188189
beforeDevice()
@@ -413,6 +414,34 @@ var _ = Describe("SpectrumXConfigManager", func() {
413414
Expect(err).To(HaveOccurred())
414415
Expect(strings.ToLower(err.Error())).To(ContainSubstring("failed to start"))
415416
})
417+
418+
It("sends notification on channel when process dies after startup", func() {
419+
// fakeCmd with 4s delay survives the 3s startup check, then fails
420+
nextCmd = &fakeCmd{output: []byte(""), err: errors.New("runtime crash"), delay: 4 * time.Second}
421+
port := device.Status.Ports[0]
422+
err := manager.RunDocaSpcXCC(port)
423+
Expect(err).NotTo(HaveOccurred())
424+
Expect(manager.ccProcesses).To(HaveKey(port.RdmaInterface))
425+
426+
// Wait for process to die and notification to fire
427+
Eventually(manager.GetCCTerminationChannel(), 5*time.Second).Should(Receive(Equal(port.RdmaInterface)))
428+
})
429+
430+
It("does NOT send notification when process fails during startup", func() {
431+
nextCmd = &fakeCmd{output: []byte(""), err: errors.New("startup failure")}
432+
port := device.Status.Ports[0]
433+
err := manager.RunDocaSpcXCC(port)
434+
Expect(err).To(HaveOccurred())
435+
436+
Consistently(manager.GetCCTerminationChannel(), 1*time.Second).ShouldNot(Receive())
437+
})
438+
})
439+
440+
Describe("GetCCTerminationChannel", func() {
441+
It("returns the termination channel", func() {
442+
ch := manager.GetCCTerminationChannel()
443+
Expect(ch).NotTo(BeNil())
444+
})
416445
})
417446

418447
Describe("BreakoutConfigApplied", func() {

pkg/types/spectrumx_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,7 @@ var _ = Describe("LoadSpectrumXConfig", func() {
4141

4242
first := cfg.NVConfig[0]
4343
Expect(first.Name).ToNot(BeEmpty())
44-
Expect(first.DMSPath).ToNot(BeEmpty())
45-
Expect(first.ValueType).ToNot(BeEmpty())
44+
Expect(first.MlxConfig).ToNot(BeEmpty())
4645

4746
Expect(cfg.UseSoftwareCCAlgorithm).To(BeTrue())
4847
Expect(cfg.RuntimeConfig.AdaptiveRouting).ToNot(BeEmpty())

0 commit comments

Comments
 (0)