Skip to content
26 changes: 21 additions & 5 deletions cmd/compute-domain-daemon/computedomain.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,10 @@ func (m *ComputeDomainManager) Get(uid string) (*nvapi.ComputeDomain, error) {
return cds[0], nil
}

// onAddOrUpdate handles the addition or update of a ComputeDomain.
// onAddOrUpdate handles the addition or update of a ComputeDomain. Here, we
// receive updates not for all CDs in the system, but only for the CD that we
// are registered for (filtered by CD name). Note that the informer triggers
// this callback once upon startup for all existing objects.
func (m *ComputeDomainManager) onAddOrUpdate(ctx context.Context, obj any) error {
// Cast the object to a ComputeDomain object
o, ok := obj.(*nvapi.ComputeDomain)
Expand All @@ -203,13 +206,14 @@ func (m *ComputeDomainManager) onAddOrUpdate(ctx context.Context, obj any) error
return nil
}

// Because the informer only filters by name:
// Skip ComputeDomains that don't match on UUID
if string(cd.UID) != m.config.computeDomainUUID {
klog.Errorf("ComputeDomain processed with non-matching UID (%v, %v)", cd.UID, m.config.computeDomainUUID)
klog.Warningf("ComputeDomain processed with non-matching UID (%v, %v)", cd.UID, m.config.computeDomainUUID)
return nil
}

// Update node info in ComputeDomain
// Update node info in ComputeDomain.
if err := m.UpdateComputeDomainNodeInfo(ctx, cd); err != nil {
return fmt.Errorf("error updating node info in ComputeDomain: %w", err)
}
Expand Down Expand Up @@ -257,7 +261,10 @@ func (m *ComputeDomainManager) UpdateComputeDomainNodeInfo(ctx context.Context,
Name: m.config.nodeName,
CliqueID: m.config.cliqueID,
Index: nextIndex,
Status: nvapi.ComputeDomainStatusNotReady,
}

klog.Infof("CD status does not contain node name '%s' yet, try to insert myself: %v", m.config.nodeName, nodeInfo)
newCD.Status.Nodes = append(newCD.Status.Nodes, nodeInfo)
}

Expand All @@ -266,7 +273,7 @@ func (m *ComputeDomainManager) UpdateComputeDomainNodeInfo(ctx context.Context,
// across pod restarts.
nodeInfo.IPAddress = m.config.podIP

// Conditionally update its status
// Conditionally update global CD status if it's still in its initial status
if newCD.Status.Status == "" {
newCD.Status.Status = nvapi.ComputeDomainStatusNotReady
}
Expand All @@ -279,6 +286,7 @@ func (m *ComputeDomainManager) UpdateComputeDomainNodeInfo(ctx context.Context,
}
m.mutationCache.Mutation(newCD)

klog.V(2).Infof("Successfully updated CD")
return nil
}

Expand Down Expand Up @@ -318,6 +326,14 @@ func getNextAvailableIndex(currentCliqueID string, nodes []*nvapi.ComputeDomainN
nextIndex++
}

// Skip `maxNodesPerIMEXDomain` check in the special case of no clique ID
// being set: this means that this node does not actually run an IMEX daemon
// managed by us and the set of nodes in this "noop" mode in this CD is
// allowed to grow larger than maxNodesPerIMEXDomain.
if currentCliqueID == "" {
return nextIndex, nil
}
Comment thread
klueska marked this conversation as resolved.

// Ensure nextIndex is within the range 0..maxNodesPerIMEXDomain
if nextIndex < 0 || nextIndex >= maxNodesPerIMEXDomain {
return -1, fmt.Errorf("no available indices within maxNodesPerIMEXDomain (%d) for cliqueID %s", maxNodesPerIMEXDomain, currentCliqueID)
Expand Down Expand Up @@ -352,7 +368,7 @@ func (m *ComputeDomainManager) MaybePushNodesUpdate(cd *nvapi.ComputeDomain) {
m.previousNodes = cd.Status.Nodes
m.updatedNodesChan <- cd.Status.Nodes
} else {
klog.Infof("IP set did not change")
klog.V(6).Infof("IP set did not change")
Comment thread
jgehrcke marked this conversation as resolved.
}
}

Expand Down
40 changes: 29 additions & 11 deletions cmd/compute-domain-daemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,14 +185,6 @@ func newApp() *cli.App {

// Run invokes the IMEX daemon and manages its lifecycle.
func run(ctx context.Context, cancel context.CancelFunc, flags *Flags) error {
// Support heterogeneous compute domain
if flags.cliqueID == "" {
fmt.Println("ClusterUUID and CliqueId are NOT set for GPUs on this node.")
fmt.Println("The IMEX daemon will not be started.")
fmt.Println("Sleeping forever...")
<-ctx.Done()
return nil
}

config := &ControllerConfig{
cliqueID: flags.cliqueID,
Expand All @@ -207,7 +199,17 @@ func run(ctx context.Context, cancel context.CancelFunc, flags *Flags) error {
}
klog.Infof("config: %v", config)

// Write the IMEX config with the current pod IP before starting the daemon
// Support heterogeneous ComputeDomains. That means that a CD may contain
// nodes that do not take part in Multi-Node NVLink communication. On such
// nodes, this program is started with an empty NVLink clique ID
// configuration parameter. In this mode, do not start the IMEX daemon but
// otherwise keep business logic intact. In particular, continuously update
// this node's state in the CD object.
if flags.cliqueID == "" {
klog.Infof("no cliqueID: register with ComputeDomain, but do not run IMEX daemon")
}

// Render and write the IMEX daemon config with the current pod IP
if err := writeIMEXConfig(flags.podIP); err != nil {
return fmt.Errorf("writeIMEXConfig failed: %w", err)
}
Expand Down Expand Up @@ -302,6 +304,11 @@ func IMEXDaemonUpdateLoopWithIPs(ctx context.Context, controller *Controller, cl
return fmt.Errorf("writeNodesConfig failed: %w", err)
}

if cliqueID == "" {
klog.V(1).Infof("empty cliqueID: do not start IMEX daemon")
break
}

Comment thread
klueska marked this conversation as resolved.
klog.Infof("Got update, (re)start IMEX daemon")
if err := pm.Restart(); err != nil {
// This might be a permanent problem, and retrying upon next update
Expand Down Expand Up @@ -331,6 +338,11 @@ func IMEXDaemonUpdateLoopWithDNSNames(ctx context.Context, controller *Controlle
return fmt.Errorf("failed to update DNS name => IP mappings: %w", err)
}

if dnsNameManager.cliqueID == "" {
klog.V(1).Infof("empty cliqueID: do not start IMEX daemon")
break
}

fresh, err := processManager.EnsureStarted()
if err != nil {
return fmt.Errorf("failed to ensure IMEX daemon is started: %w", err)
Expand All @@ -344,7 +356,7 @@ func IMEXDaemonUpdateLoopWithDNSNames(ctx context.Context, controller *Controlle
// addresses compared to the old set (then we don't need to force
// the daemon to re-resolve & re-connect).
if !updated || fresh {
continue
break
}

// Actively ask the IMEX daemon to re-read its config and to
Expand All @@ -365,7 +377,7 @@ func IMEXDaemonUpdateLoopWithDNSNames(ctx context.Context, controller *Controlle
// It returns an error if any step fails.
func check(ctx context.Context, cancel context.CancelFunc, flags *Flags) error {
if flags.cliqueID == "" {
fmt.Println("ClusterUUID and CliqueId are NOT set for GPUs on this node.")
fmt.Println("check succeeded (noop, clique ID is empty)")
return nil
}

Expand Down Expand Up @@ -405,6 +417,12 @@ func writeIMEXConfig(podIP string) error {
return fmt.Errorf("error executing template: %w", err)
}

// Ensure the directory exists
dir := filepath.Dir(imexConfigPath)
if err := os.MkdirAll(dir, 0755); err != nil {
return fmt.Errorf("failed to create directory %s: %w", dir, err)
}

if err := os.WriteFile(imexConfigPath, configFile.Bytes(), 0644); err != nil {
return fmt.Errorf("error writing config file %v: %w", imexConfigPath, err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/compute-domain-daemon/podmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (pm *PodManager) updateNodeStatus(ctx context.Context, status string) error
return fmt.Errorf("failed to get ComputeDomain: %w", err)
}
if cd == nil {
return fmt.Errorf("ComputeDomain not found")
return fmt.Errorf("ComputeDomain '%s/%s' not found", pm.config.computeDomainName, pm.config.computeDomainUUID)
}

// Create a deep copy to avoid modifying the original
Expand Down
3 changes: 1 addition & 2 deletions cmd/compute-domain-daemon/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,7 @@ func (m *ProcessManager) Watchdog(ctx context.Context) error {
}
}

// Detect if process terminated unexpectedly. Caller must ignore bool if err
// isn't nil.
// Detect if process terminated unexpectedly.
func (m *ProcessManager) lost() bool {
if !m.TryLock() {
// Start or stop is in progress; do not inspect state.
Expand Down
44 changes: 28 additions & 16 deletions cmd/compute-domain-kubelet-plugin/computedomain.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type ComputeDomainManager struct {

type ComputeDomainDaemonSettings struct {
manager *ComputeDomainManager
domain string
domainID string
rootDir string
configTmplPath string
nodesConfigPath string
Expand Down Expand Up @@ -128,13 +128,13 @@ func (m *ComputeDomainManager) Stop() error {
return nil
}

func (m *ComputeDomainManager) NewSettings(domain string) *ComputeDomainDaemonSettings {
func (m *ComputeDomainManager) NewSettings(domainID string) *ComputeDomainDaemonSettings {
return &ComputeDomainDaemonSettings{
manager: m,
domain: domain,
rootDir: fmt.Sprintf("%s/%s", m.configFilesRoot, domain),
configTmplPath: fmt.Sprintf("%s/%s/%s", m.configFilesRoot, domain, "config.tmpl.cfg"),
nodesConfigPath: fmt.Sprintf("%s/%s/%s", m.configFilesRoot, domain, "nodes_config.cfg"),
domainID: domainID,
rootDir: fmt.Sprintf("%s/%s", m.configFilesRoot, domainID),
configTmplPath: fmt.Sprintf("%s/%s/%s", m.configFilesRoot, domainID, "config.tmpl.cfg"),
nodesConfigPath: fmt.Sprintf("%s/%s/%s", m.configFilesRoot, domainID, "nodes_config.cfg"),
}
}

Expand All @@ -154,17 +154,16 @@ func (m *ComputeDomainManager) GetComputeDomainChannelContainerEdits(devRoot str
}
}

func (s *ComputeDomainDaemonSettings) GetDomain() string {
return s.domain
}

func (s *ComputeDomainDaemonSettings) GetCDIContainerEdits(ctx context.Context, devRoot string, info *nvcapDeviceInfo) (*cdiapi.ContainerEdits, error) {
cd, err := s.manager.GetComputeDomain(ctx, s.domain)
// GetCDIContainerEditsCommon() returns the CDI spec edits always required for
// launching the CD Daemon (whether or not it tries to launch an IMEX daemon
// internally).
func (s *ComputeDomainDaemonSettings) GetCDIContainerEditsCommon(ctx context.Context) (*cdiapi.ContainerEdits, error) {
cd, err := s.manager.GetComputeDomain(ctx, s.domainID)
if err != nil {
return nil, fmt.Errorf("error getting compute domain: %w", err)
return nil, fmt.Errorf("error getting compute domain %s: %w", s.domainID, err)
}
if cd == nil {
return nil, fmt.Errorf("compute domain not found: %s", s.domain)
return nil, fmt.Errorf("compute domain not found: %s", s.domainID)
}

edits := &cdiapi.ContainerEdits{
Expand All @@ -182,6 +181,20 @@ func (s *ComputeDomainDaemonSettings) GetCDIContainerEdits(ctx context.Context,
Options: []string{"rw", "nosuid", "nodev", "bind"},
},
},
},
}
return edits, nil
}

func (s *ComputeDomainDaemonSettings) GetDomainID() string {
return s.domainID
}

// GetCDIContainerEditsForImex() returns the CDI spec edits only required for
// launching the CD daemon when it actually wraps an IMEX daemon.
func (s *ComputeDomainDaemonSettings) GetCDIContainerEditsForImex(ctx context.Context, devRoot string, info *nvcapDeviceInfo) *cdiapi.ContainerEdits {
edits := &cdiapi.ContainerEdits{
ContainerEdits: &cdispec.ContainerEdits{
DeviceNodes: []*cdispec.DeviceNode{
{
Path: info.path,
Expand All @@ -193,8 +206,7 @@ func (s *ComputeDomainDaemonSettings) GetCDIContainerEdits(ctx context.Context,
},
},
}

return edits, nil
return edits
}

func (s *ComputeDomainDaemonSettings) Prepare(ctx context.Context) error {
Expand Down
44 changes: 27 additions & 17 deletions cmd/compute-domain-kubelet-plugin/device_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,8 @@ func (s *DeviceState) applyComputeDomainChannelConfig(ctx context.Context, confi
}

for _, info := range s.nvdevlib.nvCapImexChanDevInfos[:chancount] {
configState.containerEdits = configState.containerEdits.Append(s.computeDomainManager.GetComputeDomainChannelContainerEdits(s.cdi.devRoot, info))
edits := s.computeDomainManager.GetComputeDomainChannelContainerEdits(s.cdi.devRoot, info)
configState.containerEdits = configState.containerEdits.Append(edits)
}

return &configState, nil
Expand Down Expand Up @@ -491,27 +492,36 @@ func (s *DeviceState) applyComputeDomainDaemonConfig(ctx context.Context, config
ComputeDomain: config.DomainID,
}

// Only prepare files to inject to the daemon if IMEX is supported.
if s.computeDomainManager.cliqueID != "" {
// Parse the device node info for the fabic-imex-mgmt nvcap.
nvcapDeviceInfo, err := s.nvdevlib.parseNVCapDeviceInfo(nvidiaCapFabricImexMgmtPath)
if err != nil {
return nil, fmt.Errorf("error parsing nvcap device info for fabic-imex-mgmt: %w", err)
}
// Create new ComputeDomain daemon settings from the ComputeDomainManager.
computeDomainDaemonSettings := s.computeDomainManager.NewSettings(config.DomainID)

// Create new ComputeDomain daemon settings from the ComputeDomainManager.
computeDomainDaemonSettings := s.computeDomainManager.NewSettings(config.DomainID)
// Prepare injecting IMEX daemon config files even if IMEX is not supported.
// This for example creates
// '/var/lib/kubelet/plugins/compute-domain.nvidia.com/domains/<uid>' on the
// host which is used as mount source mapped to /etc/nvidia-imex in the CD
// daemon container.
if err := computeDomainDaemonSettings.Prepare(ctx); err != nil {
return nil, fmt.Errorf("error preparing ComputeDomain daemon settings for requests '%v' in claim '%v': %w", requests, claim.UID, err)
}

// Prepare the new ComputeDomain daemon.
if err := computeDomainDaemonSettings.Prepare(ctx); err != nil {
return nil, fmt.Errorf("error preparing ComputeDomain daemon settings for requests '%v' in claim '%v': %w", requests, claim.UID, err)
}
// Always inject CD config details into the CD daemon (regardless of clique
// ID being empty or not).
edits, err := computeDomainDaemonSettings.GetCDIContainerEditsCommon(ctx)
if err != nil {
return nil, fmt.Errorf("error getting common container edits for ComputeDomain daemon '%s': %w", config.DomainID, err)
}
configState.containerEdits = configState.containerEdits.Append(edits)

// Store information about the ComputeDomain daemon in the configState.
edits, err := computeDomainDaemonSettings.GetCDIContainerEdits(ctx, s.cdi.devRoot, nvcapDeviceInfo)
// Only inject dev nodes related to
// /proc/driver/nvidia/capabilities/fabric-imex-mgmt if IMEX is supported
// (if we want to start the IMEX daemon process in the CD daemon pod).
if s.computeDomainManager.cliqueID != "" {
// Parse the device node info for the fabric-imex-mgmt nvcap.
nvcapDeviceInfo, err := s.nvdevlib.parseNVCapDeviceInfo(nvidiaCapFabricImexMgmtPath)
if err != nil {
return nil, fmt.Errorf("error getting container edits for ComputeDomain daemon for requests '%v' in claim '%v': %w", requests, claim.UID, err)
return nil, fmt.Errorf("error parsing nvcap device info for fabric-imex-mgmt: %w", err)
}
edits := computeDomainDaemonSettings.GetCDIContainerEditsForImex(ctx, s.cdi.devRoot, nvcapDeviceInfo)
configState.containerEdits = configState.containerEdits.Append(edits)
}

Expand Down