Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile.nic-configuration-daemon
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ COPY ./ ./
# by leaving it empty we can ensure that the container and binary shipped on it will have the same platform.
RUN --mount=type=cache,target=/go/pkg/mod/ GO_GCFLAGS=${GCFLAGS} make build-daemon

FROM ${BASE_IMAGE_DOCA_FULL_RT_HOST:-nvcr.io/nvstaging/doca/doca:3.3.0099-full-rt-host-latest}
FROM ${BASE_IMAGE_DOCA_FULL_RT_HOST:-nvcr.io/nvstaging/doca/doca:3.3.0-full-rt-host-latest}

ARG TARGETARCH
ENV MFT_VERSION=4.33.0-169
Expand Down
19 changes: 18 additions & 1 deletion internal/controller/nicdevice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

v1alpha1 "github.com/Mellanox/nic-configuration-operator/api/v1alpha1"
"github.com/Mellanox/nic-configuration-operator/pkg/configuration"
Expand Down Expand Up @@ -923,9 +924,25 @@ func (r *NicDeviceReconciler) SetupWithManager(mgr ctrl.Manager, watchForMainten
},
}

// Bridge CC termination channel (plain Go channel from k8s-free package) into controller events
ccTermChan := r.SpectrumXManager.GetCCTerminationChannel()
ccEventChan := make(chan event.TypedGenericEvent[*v1alpha1.NicDevice], 10)
go func() {
for rdmaIface := range ccTermChan {
log.Log.Info("CC process terminated, triggering reconcile", "rdma", rdmaIface)
ccEventChan <- event.TypedGenericEvent[*v1alpha1.NicDevice]{}
}
}()

controller := ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.NicDevice{}).
Watches(&v1alpha1.NicDevice{}, nicDeviceEventHandler)
Watches(&v1alpha1.NicDevice{}, nicDeviceEventHandler).
WatchesRawSource(source.Channel(ccEventChan, handler.TypedFuncs[*v1alpha1.NicDevice, reconcile.Request]{
GenericFunc: func(ctx context.Context, e event.TypedGenericEvent[*v1alpha1.NicDevice], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
log.Log.Info("Enqueuing sync for CC termination event")
qHandler(q)
},
}))

if watchForMaintenance {
controller.Watches(&maintenanceoperator.NodeMaintenance{}, maintenance.GetMaintenanceRequestEventHandler(r.NodeName, qHandler))
Expand Down
47 changes: 47 additions & 0 deletions internal/controller/nicdevice_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ var _ = Describe("NicDeviceReconciler", func() {
spectrumXManager *spectrumxMocks.SpectrumXManager
udevManager *udevMocks.UdevManager
deviceDiscoveryUtils *devicediscoveryMocks.DeviceDiscoveryUtils
ccTerminationChan chan string
deviceName = "test-device"
ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -96,6 +97,8 @@ var _ = Describe("NicDeviceReconciler", func() {
maintenanceManager = &maintenanceMocks.MaintenanceManager{}
hostUtils = &hostMocks.HostUtils{}
spectrumXManager = &spectrumxMocks.SpectrumXManager{}
ccTerminationChan = make(chan string, 10)
spectrumXManager.On("GetCCTerminationChannel").Return((<-chan string)(ccTerminationChan))
spectrumXManager.On("GetDocaCCTargetVersion", mock.Anything).Return("", nil)
udevManager = &udevMocks.UdevManager{}
udevManager.On("ApplyUdevRules", mock.Anything, mock.Anything).Return(nil, false, nil)
Expand Down Expand Up @@ -1404,4 +1407,48 @@ var _ = Describe("NicDeviceReconciler", func() {
}, time.Second*2).Should(BeFalse())
})
})

Describe("reconcile triggered by CC process termination", func() {
It("Should re-reconcile when CC termination channel fires", func() {
configurationManager.On("ValidateDeviceNvSpec", mock.Anything, mock.Anything).Return(false, false, nil)
configurationManager.On("ApplyDeviceRuntimeSpec", mock.Anything).Return(nil)
maintenanceManager.On("ReleaseMaintenance", mock.Anything).Return(nil)

device := &v1alpha1.NicDevice{
ObjectMeta: metav1.ObjectMeta{Name: deviceName, Namespace: namespaceName},
Spec: v1alpha1.NicDeviceSpec{
Configuration: &v1alpha1.NicDeviceConfigurationSpec{
ResetToDefault: false,
Template: &v1alpha1.ConfigurationTemplateSpec{
NumVfs: 4,
LinkType: consts.Ethernet,
},
},
},
}
Expect(k8sClient.Create(ctx, device))
device.Status = v1alpha1.NicDeviceStatus{
Node: nodeName,
Ports: []v1alpha1.NicDevicePortSpec{{PCI: "0000:3b:00.0"}},
}
Expect(k8sClient.Status().Update(ctx, device)).To(Succeed())

// Wait for initial reconcile to complete successfully
Eventually(getDeviceConditions, timeout).Should(testutils.MatchCondition(metav1.Condition{
Type: consts.ConfigUpdateInProgressCondition,
Status: metav1.ConditionFalse,
Reason: consts.UpdateSuccessfulReason,
}))

initialCallCount := countCalls(configurationManager.Calls, "ApplyDeviceRuntimeSpec")

// Simulate CC process termination
ccTerminationChan <- "mlx5_0"

// Verify reconcile ran again
Eventually(func() int {
return countCalls(configurationManager.Calls, "ApplyDeviceRuntimeSpec")
}, timeout).Should(BeNumerically(">", initialCallCount))
})
})
})
11 changes: 11 additions & 0 deletions internal/controller/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"sync"

"github.com/stretchr/testify/mock"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/rand"
Expand Down Expand Up @@ -84,3 +85,13 @@ func startManager(manager manager.Manager, ctx context.Context, wg *sync.WaitGro

log.Log.Info("synced manager cache for test", "test", GinkgoT().Name())
}

func countCalls(calls []mock.Call, method string) int {
count := 0
for _, call := range calls {
if call.Method == method {
count++
}
}
return count
}
20 changes: 20 additions & 0 deletions pkg/spectrumx/mocks/SpectrumXManager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 30 additions & 7 deletions pkg/spectrumx/spectrumx.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ type SpectrumXManager interface {
GetDocaCCTargetVersion(device *v1alpha1.NicDevice) (string, error)
// RunDocaSpcXCC launches and keeps track of the DOCA SPC-X CC process for the given port
RunDocaSpcXCC(port v1alpha1.NicDevicePortSpec) error
// GetCCTerminationChannel returns a read-only channel that receives the RDMA interface name
// when a DOCA SPC-X CC process terminates unexpectedly after startup
GetCCTerminationChannel() <-chan string
}

type spectrumXConfigManager struct {
Expand All @@ -70,14 +73,16 @@ type spectrumXConfigManager struct {
execInterface execUtils.Interface
nvConfigUtils nvconfig.NVConfigUtils

ccProcesses map[string]*ccProcess
ccProcesses map[string]*ccProcess
ccTerminationChan chan string // buffered; carries RDMA iface name on unexpected exit
}

type ccProcess struct {
port v1alpha1.NicDevicePortSpec
cmd execUtils.Cmd

running atomic.Bool
running atomic.Bool
startupCheckPassed atomic.Bool // set after the 3s startup window; distinguishes startup failures from runtime crashes

// Error handling with mutex protection
errMutex sync.RWMutex
Expand Down Expand Up @@ -704,6 +709,16 @@ func (m *spectrumXConfigManager) RunDocaSpcXCC(port v1alpha1.NicDevicePortSpec)

log.Log.V(2).Info("SpectrumXConfigManager.RunDocaSpcXCC(): CC process output", "rdma", port.RdmaInterface, "output", string(output))
process.running.Store(false)

// Notify controller only for runtime crashes (after startup check passed)
if process.startupCheckPassed.Load() {
log.Log.Info("SpectrumXConfigManager.RunDocaSpcXCC(): CC process terminated unexpectedly, sending notification", "rdma", port.RdmaInterface)
select {
case m.ccTerminationChan <- port.RdmaInterface:
default:
log.Log.V(2).Info("SpectrumXConfigManager.RunDocaSpcXCC(): termination channel full, notification dropped", "rdma", port.RdmaInterface)
}
}
}()

log.Log.V(2).Info("Waiting 3s for DOCA SPC-X CC to start", "rdma", port.RdmaInterface)
Expand All @@ -723,19 +738,27 @@ func (m *spectrumXConfigManager) RunDocaSpcXCC(port v1alpha1.NicDevicePortSpec)

log.Log.V(2).Info("DOCA SPC-X CC process started", "rdma", port.RdmaInterface)

process.startupCheckPassed.Store(true)
m.ccProcesses[port.RdmaInterface] = process

log.Log.Info("Started DOCA SPC-X CC process", "rdma", port.RdmaInterface)

return nil
}

// GetCCTerminationChannel returns a read-only channel for CC process termination notifications.
// The channel carries the RDMA interface name of the terminated CC process.
func (m *spectrumXConfigManager) GetCCTerminationChannel() <-chan string {
return m.ccTerminationChan
}

func NewSpectrumXConfigManager(dmsManager dms.DMSManager, spectrumXConfigs map[string]*types.SpectrumXConfig) SpectrumXManager {
return &spectrumXConfigManager{
dmsManager: dmsManager,
spectrumXConfigs: spectrumXConfigs,
execInterface: execUtils.New(),
nvConfigUtils: nvconfig.NewNVConfigUtils(),
ccProcesses: make(map[string]*ccProcess),
dmsManager: dmsManager,
spectrumXConfigs: spectrumXConfigs,
execInterface: execUtils.New(),
nvConfigUtils: nvconfig.NewNVConfigUtils(),
ccProcesses: make(map[string]*ccProcess),
ccTerminationChan: make(chan string, 10),
}
}
39 changes: 34 additions & 5 deletions pkg/spectrumx/spectrumx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,12 @@ var _ = Describe("SpectrumXConfigManager", func() {
}

manager = &spectrumXConfigManager{
dmsManager: &dmsMgr,
spectrumXConfigs: cfgs,
execInterface: execFake,
nvConfigUtils: &nvConfigMgr,
ccProcesses: map[string]*ccProcess{},
dmsManager: &dmsMgr,
spectrumXConfigs: cfgs,
execInterface: execFake,
nvConfigUtils: &nvConfigMgr,
ccProcesses: map[string]*ccProcess{},
ccTerminationChan: make(chan string, 10),
}

beforeDevice()
Expand Down Expand Up @@ -413,6 +414,34 @@ var _ = Describe("SpectrumXConfigManager", func() {
Expect(err).To(HaveOccurred())
Expect(strings.ToLower(err.Error())).To(ContainSubstring("failed to start"))
})

It("sends notification on channel when process dies after startup", func() {
// fakeCmd with 4s delay survives the 3s startup check, then fails
nextCmd = &fakeCmd{output: []byte(""), err: errors.New("runtime crash"), delay: 4 * time.Second}
port := device.Status.Ports[0]
err := manager.RunDocaSpcXCC(port)
Expect(err).NotTo(HaveOccurred())
Expect(manager.ccProcesses).To(HaveKey(port.RdmaInterface))

// Wait for process to die and notification to fire
Eventually(manager.GetCCTerminationChannel(), 5*time.Second).Should(Receive(Equal(port.RdmaInterface)))
})

It("does NOT send notification when process fails during startup", func() {
nextCmd = &fakeCmd{output: []byte(""), err: errors.New("startup failure")}
port := device.Status.Ports[0]
err := manager.RunDocaSpcXCC(port)
Expect(err).To(HaveOccurred())

Consistently(manager.GetCCTerminationChannel(), 1*time.Second).ShouldNot(Receive())
})
})

Describe("GetCCTerminationChannel", func() {
It("returns the termination channel", func() {
ch := manager.GetCCTerminationChannel()
Expect(ch).NotTo(BeNil())
})
})

Describe("BreakoutConfigApplied", func() {
Expand Down
3 changes: 1 addition & 2 deletions pkg/types/spectrumx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ var _ = Describe("LoadSpectrumXConfig", func() {

first := cfg.NVConfig[0]
Expect(first.Name).ToNot(BeEmpty())
Expect(first.DMSPath).ToNot(BeEmpty())
Expect(first.ValueType).ToNot(BeEmpty())
Expect(first.MlxConfig).ToNot(BeEmpty())

Expect(cfg.UseSoftwareCCAlgorithm).To(BeTrue())
Expect(cfg.RuntimeConfig.AdaptiveRouting).ToNot(BeEmpty())
Expand Down
Loading