Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion api/nvidia.com/resource/v1beta1/computedomain.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@ const (
ComputeDomainStatusNone = ""
ComputeDomainStatusReady = "Ready"
ComputeDomainStatusNotReady = "NotReady"
ComputeDomainStatusFailed = "Failed"

ComputeDomainChannelAllocationModeSingle = "Single"
ComputeDomainChannelAllocationModeAll = "All"

ComputeDomainBindingConditions = "ComputeDomainReady"
ComputeDomainBindingFailureConditions = "ComputeDomainNotReady"
)

// +genclient
Expand Down Expand Up @@ -134,7 +138,7 @@ type ComputeDomainNode struct {
// it is not. It is marked as optional in order to support downgrades
// and avoid an API bump.
// +kubebuilder:validation:Optional
// +kubebuilder:validation:Enum=Ready;NotReady
// +kubebuilder:validation:Enum=Ready;NotReady;Failed
// +kubebuilder:default:=NotReady
Status string `json:"status,omitempty"`
}
27 changes: 27 additions & 0 deletions cmd/compute-domain-controller/cdstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,13 @@ func (m *ComputeDomainStatusManager) sync(ctx context.Context) {
continue
}

if featuregates.Enabled(featuregates.ComputeDomainBindingConditions) {
if IsPodFailed(pod) {
nonFabricPodsByCD[cdUID] = append(nonFabricPodsByCD[cdUID], pod)
continue
}
}

// Separate pods based on cliqueID label
cliqueID, exists := pod.Labels[computeDomainCliqueLabelKey]
if !exists || cliqueID != "" {
Expand Down Expand Up @@ -271,6 +278,10 @@ func (m *ComputeDomainStatusManager) buildNodesFromPods(pods []*corev1.Pod) []*n
}
}

if IsPodFailed(pod) {
status = nvapi.ComputeDomainStatusFailed
}

nodes = append(nodes, &nvapi.ComputeDomainNode{
Name: pod.Spec.NodeName,
IPAddress: pod.Status.PodIP,
Expand Down Expand Up @@ -363,3 +374,19 @@ func (m *ComputeDomainStatusManager) nodesEqual(a, b []*nvapi.ComputeDomainNode)
}
return maps.Equal(aMap, bMap)
}

func IsPodFailed(pod *corev1.Pod) bool {
if pod.Status.Phase == corev1.PodFailed {
return true
}

for _, ctrStatus := range pod.Status.ContainerStatuses {
if ctrStatus.State.Waiting != nil {
switch ctrStatus.State.Waiting.Reason {
case "ErrImagePull", "ImagePullBackOff", "CrashLoopBackOff":
return true
}
}
}
return false
}
41 changes: 35 additions & 6 deletions cmd/compute-domain-kubelet-plugin/computedomain.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ const (
ComputeDomainDaemonConfigTemplatePath = "/templates/compute-domain-daemon-config.tmpl.cfg"
)

type AssertNameSpaceFunc func(ctx context.Context, claimNamespace, cdUID string) error
type AddNodeLabelFunc func(ctx context.Context, cdUID string) error
type RemoveNodeLabelFunc func(ctx context.Context, cdUID string) error
type AssertComputeDomainReadyFunc func(ctx context.Context, cdUID string) error

type ComputeDomainManager struct {
config *Config
waitGroup sync.WaitGroup
Expand All @@ -57,6 +62,8 @@ type ComputeDomainManager struct {

configFilesRoot string
cliqueID string

podManager *PodManager
}

type ComputeDomainDaemonSettings struct {
Expand All @@ -80,6 +87,8 @@ func NewComputeDomainManager(config *Config, cliqueID string) *ComputeDomainMana
cliqueID: cliqueID,
}

m.podManager = NewPodManager(config, m.AssertComputeDomainNamespace, m.AddNodeLabel, m.RemoveNodeLabel, m.AssertComputeDomainReady)

return m
}

Expand Down Expand Up @@ -118,10 +127,21 @@ func (m *ComputeDomainManager) Start(ctx context.Context) (rerr error) {
return fmt.Errorf("informer cache sync for ComputeDomains failed")
}

if featuregates.Enabled(featuregates.ComputeDomainBindingConditions) {
if err := m.podManager.Start(ctx); err != nil {
return fmt.Errorf("error starting Pod manager: %w", err)
}
}

return nil
}

func (m *ComputeDomainManager) Stop() error {
if featuregates.Enabled(featuregates.ComputeDomainBindingConditions) {
if err := m.podManager.Stop(); err != nil {
return fmt.Errorf("error stopping Pod manager: %w", err)
}
}
if m.cancelContext != nil {
m.cancelContext()
}
Expand Down Expand Up @@ -245,7 +265,11 @@ func (m *ComputeDomainManager) AssertComputeDomainReady(ctx context.Context, cdU
}

// Check if the current node is ready in the ComputeDomain
if !m.isCurrentNodeReady(ctx, cd) {
ready, failed := m.isCurrentNodeReady(ctx, cd)
if failed {
return fmt.Errorf("%w: current node failed in ComputeDomain", ErrBindingFailure)
}
if !ready {
return fmt.Errorf("current node not ready in ComputeDomain")
}

Expand All @@ -256,23 +280,28 @@ func (m *ComputeDomainManager) AssertComputeDomainReady(ctx context.Context, cdU
// When the feature gate is enabled, we check both the clique and the status to ensure
// that compute domains started before the feature gate was enabled continue to work
// even after the feature gate is enabled.
func (m *ComputeDomainManager) isCurrentNodeReady(ctx context.Context, cd *nvapi.ComputeDomain) bool {
func (m *ComputeDomainManager) isCurrentNodeReady(ctx context.Context, cd *nvapi.ComputeDomain) (bool, bool) {
if featuregates.Enabled(featuregates.ComputeDomainCliques) {
if m.isCurrentNodeReadyInClique(ctx, cd) {
return true
return true, false
}
}
return m.isCurrentNodeReadyInStatus(cd)
}

// isCurrentNodeReadyInStatus checks if the current node is marked as ready in the ComputeDomain status.
func (m *ComputeDomainManager) isCurrentNodeReadyInStatus(cd *nvapi.ComputeDomain) bool {
func (m *ComputeDomainManager) isCurrentNodeReadyInStatus(cd *nvapi.ComputeDomain) (bool, bool) {
for _, node := range cd.Status.Nodes {
if node.Name == m.config.flags.nodeName {
return node.Status == nvapi.ComputeDomainStatusReady
switch node.Status {
case nvapi.ComputeDomainStatusReady:
return true, false
case nvapi.ComputeDomainStatusFailed:
return false, true
}
}
}
return false
return false, false
}

// isCurrentNodeReadyInClique checks if the current node is marked as ready in the ComputeDomainClique.
Expand Down
6 changes: 6 additions & 0 deletions cmd/compute-domain-kubelet-plugin/deviceinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (

resourceapi "k8s.io/api/resource/v1"
"k8s.io/utils/ptr"
nvapi "sigs.k8s.io/nvidia-dra-driver-gpu/api/nvidia.com/resource/v1beta1"
"sigs.k8s.io/nvidia-dra-driver-gpu/pkg/featuregates"
)

type ComputeDomainChannelInfo struct {
Expand Down Expand Up @@ -59,6 +61,10 @@ func (d *ComputeDomainChannelInfo) GetDevice() resourceapi.Device {
},
},
}
if featuregates.Enabled(featuregates.ComputeDomainBindingConditions) {
device.BindingConditions = []string{nvapi.ComputeDomainBindingConditions}
device.BindingFailureConditions = []string{nvapi.ComputeDomainBindingFailureConditions}
}
return device
}

Expand Down
8 changes: 7 additions & 1 deletion cmd/compute-domain-kubelet-plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"sigs.k8s.io/nvidia-dra-driver-gpu/internal/info"
"sigs.k8s.io/nvidia-dra-driver-gpu/pkg/featuregates"
pkgflags "sigs.k8s.io/nvidia-dra-driver-gpu/pkg/flags"
"sigs.k8s.io/nvidia-dra-driver-gpu/pkg/workqueue"
)

const (
Expand All @@ -62,6 +63,8 @@ type Flags struct {
type Config struct {
flags *Flags
clientsets pkgflags.ClientSets
// workQueue manages the asynchronous processing of tasks
workQueue *workqueue.WorkQueue
}

func (c Config) DriverPluginPath() string {
Expand Down Expand Up @@ -180,9 +183,12 @@ func newApp() *cli.App {
return fmt.Errorf("create client: %w", err)
}

workQueue := workqueue.New(workqueue.DefaultControllerRateLimiter())

config := &Config{
flags: flags,
clientsets: clientSets,
workQueue: workQueue,
}

return RunPlugin(c.Context, config)
Expand Down Expand Up @@ -245,7 +251,7 @@ func RunPlugin(ctx context.Context, config *Config) error {
return fmt.Errorf("error creating driver: %w", err)
}

<-ctx.Done()
config.workQueue.Run(ctx)
if err := ctx.Err(); err != nil && !errors.Is(err, context.Canceled) {
// A canceled context is the normal case here when the process receives
// a signal. Only log the error for more interesting cases.
Expand Down
Loading