diff --git a/cmd/compute-domain-daemon/computedomain.go b/cmd/compute-domain-daemon/computedomain.go index 314274d71..6e5ec2003 100644 --- a/cmd/compute-domain-daemon/computedomain.go +++ b/cmd/compute-domain-daemon/computedomain.go @@ -184,7 +184,10 @@ func (m *ComputeDomainManager) Get(uid string) (*nvapi.ComputeDomain, error) { return cds[0], nil } -// onAddOrUpdate handles the addition or update of a ComputeDomain. +// onAddOrUpdate handles the addition or update of a ComputeDomain. Here, we +// receive updates not for all CDs in the system, but only for the CD that we +// are registered for (filtered by CD name). Note that the informer triggers +// this callback once upon startup for all existing objects. func (m *ComputeDomainManager) onAddOrUpdate(ctx context.Context, obj any) error { // Cast the object to a ComputeDomain object o, ok := obj.(*nvapi.ComputeDomain) @@ -203,13 +206,14 @@ func (m *ComputeDomainManager) onAddOrUpdate(ctx context.Context, obj any) error return nil } + // Because the informer only filters by name: // Skip ComputeDomains that don't match on UUID if string(cd.UID) != m.config.computeDomainUUID { - klog.Errorf("ComputeDomain processed with non-matching UID (%v, %v)", cd.UID, m.config.computeDomainUUID) + klog.Warningf("ComputeDomain processed with non-matching UID (%v, %v)", cd.UID, m.config.computeDomainUUID) return nil } - // Update node info in ComputeDomain + // Update node info in ComputeDomain. if err := m.UpdateComputeDomainNodeInfo(ctx, cd); err != nil { return fmt.Errorf("error updating node info in ComputeDomain: %w", err) } @@ -257,7 +261,10 @@ func (m *ComputeDomainManager) UpdateComputeDomainNodeInfo(ctx context.Context, Name: m.config.nodeName, CliqueID: m.config.cliqueID, Index: nextIndex, + Status: nvapi.ComputeDomainStatusNotReady, } + + klog.Infof("CD status does not contain node name '%s' yet, try to insert myself: %v", m.config.nodeName, nodeInfo) newCD.Status.Nodes = append(newCD.Status.Nodes, nodeInfo) } @@ -266,7 +273,7 @@ func (m *ComputeDomainManager) UpdateComputeDomainNodeInfo(ctx context.Context, // across pod restarts. nodeInfo.IPAddress = m.config.podIP - // Conditionally update its status + // Conditionally update global CD status if it's still in its initial status if newCD.Status.Status == "" { newCD.Status.Status = nvapi.ComputeDomainStatusNotReady } @@ -279,6 +286,7 @@ func (m *ComputeDomainManager) UpdateComputeDomainNodeInfo(ctx context.Context, } m.mutationCache.Mutation(newCD) + klog.V(2).Infof("Successfully updated CD") return nil } @@ -318,6 +326,14 @@ func getNextAvailableIndex(currentCliqueID string, nodes []*nvapi.ComputeDomainN nextIndex++ } + // Skip `maxNodesPerIMEXDomain` check in the special case of no clique ID + // being set: this means that this node does not actually run an IMEX daemon + // managed by us and the set of nodes in this "noop" mode in this CD is + // allowed to grow larger than maxNodesPerIMEXDomain. + if currentCliqueID == "" { + return nextIndex, nil + } + // Ensure nextIndex is within the range 0..maxNodesPerIMEXDomain if nextIndex < 0 || nextIndex >= maxNodesPerIMEXDomain { return -1, fmt.Errorf("no available indices within maxNodesPerIMEXDomain (%d) for cliqueID %s", maxNodesPerIMEXDomain, currentCliqueID) @@ -352,7 +368,7 @@ func (m *ComputeDomainManager) MaybePushNodesUpdate(cd *nvapi.ComputeDomain) { m.previousNodes = cd.Status.Nodes m.updatedNodesChan <- cd.Status.Nodes } else { - klog.Infof("IP set did not change") + klog.V(6).Infof("IP set did not change") } } diff --git a/cmd/compute-domain-daemon/main.go b/cmd/compute-domain-daemon/main.go index 8d4a1b932..52807dd9c 100644 --- a/cmd/compute-domain-daemon/main.go +++ b/cmd/compute-domain-daemon/main.go @@ -185,14 +185,6 @@ func newApp() *cli.App { // Run invokes the IMEX daemon and manages its lifecycle. func run(ctx context.Context, cancel context.CancelFunc, flags *Flags) error { - // Support heterogeneous compute domain - if flags.cliqueID == "" { - fmt.Println("ClusterUUID and CliqueId are NOT set for GPUs on this node.") - fmt.Println("The IMEX daemon will not be started.") - fmt.Println("Sleeping forever...") - <-ctx.Done() - return nil - } config := &ControllerConfig{ cliqueID: flags.cliqueID, @@ -207,7 +199,17 @@ func run(ctx context.Context, cancel context.CancelFunc, flags *Flags) error { } klog.Infof("config: %v", config) - // Write the IMEX config with the current pod IP before starting the daemon + // Support heterogeneous ComputeDomains. That means that a CD may contain + // nodes that do not take part in Multi-Node NVLink communication. On such + // nodes, this program is started with an empty NVLink clique ID + // configuration parameter. In this mode, do not start the IMEX daemon but + // otherwise keep business logic intact. In particular, continuously update + // this node's state in the CD object. + if flags.cliqueID == "" { + klog.Infof("no cliqueID: register with ComputeDomain, but do not run IMEX daemon") + } + + // Render and write the IMEX daemon config with the current pod IP if err := writeIMEXConfig(flags.podIP); err != nil { return fmt.Errorf("writeIMEXConfig failed: %w", err) } @@ -302,6 +304,11 @@ func IMEXDaemonUpdateLoopWithIPs(ctx context.Context, controller *Controller, cl return fmt.Errorf("writeNodesConfig failed: %w", err) } + if cliqueID == "" { + klog.V(1).Infof("empty cliqueID: do not start IMEX daemon") + break + } + klog.Infof("Got update, (re)start IMEX daemon") if err := pm.Restart(); err != nil { // This might be a permanent problem, and retrying upon next update @@ -331,6 +338,11 @@ func IMEXDaemonUpdateLoopWithDNSNames(ctx context.Context, controller *Controlle return fmt.Errorf("failed to update DNS name => IP mappings: %w", err) } + if dnsNameManager.cliqueID == "" { + klog.V(1).Infof("empty cliqueID: do not start IMEX daemon") + break + } + fresh, err := processManager.EnsureStarted() if err != nil { return fmt.Errorf("failed to ensure IMEX daemon is started: %w", err) @@ -344,7 +356,7 @@ func IMEXDaemonUpdateLoopWithDNSNames(ctx context.Context, controller *Controlle // addresses compared to the old set (then we don't need to force // the daemon to re-resolve & re-connect). if !updated || fresh { - continue + break } // Actively ask the IMEX daemon to re-read its config and to @@ -365,7 +377,7 @@ func IMEXDaemonUpdateLoopWithDNSNames(ctx context.Context, controller *Controlle // It returns an error if any step fails. func check(ctx context.Context, cancel context.CancelFunc, flags *Flags) error { if flags.cliqueID == "" { - fmt.Println("ClusterUUID and CliqueId are NOT set for GPUs on this node.") + fmt.Println("check succeeded (noop, clique ID is empty)") return nil } @@ -405,6 +417,12 @@ func writeIMEXConfig(podIP string) error { return fmt.Errorf("error executing template: %w", err) } + // Ensure the directory exists + dir := filepath.Dir(imexConfigPath) + if err := os.MkdirAll(dir, 0755); err != nil { + return fmt.Errorf("failed to create directory %s: %w", dir, err) + } + if err := os.WriteFile(imexConfigPath, configFile.Bytes(), 0644); err != nil { return fmt.Errorf("error writing config file %v: %w", imexConfigPath, err) } diff --git a/cmd/compute-domain-daemon/podmanager.go b/cmd/compute-domain-daemon/podmanager.go index 8957e9307..18cb74d83 100644 --- a/cmd/compute-domain-daemon/podmanager.go +++ b/cmd/compute-domain-daemon/podmanager.go @@ -161,7 +161,7 @@ func (pm *PodManager) updateNodeStatus(ctx context.Context, status string) error return fmt.Errorf("failed to get ComputeDomain: %w", err) } if cd == nil { - return fmt.Errorf("ComputeDomain not found") + return fmt.Errorf("ComputeDomain '%s/%s' not found", pm.config.computeDomainName, pm.config.computeDomainUUID) } // Create a deep copy to avoid modifying the original diff --git a/cmd/compute-domain-daemon/process.go b/cmd/compute-domain-daemon/process.go index c0a9ba8c1..199c6b777 100644 --- a/cmd/compute-domain-daemon/process.go +++ b/cmd/compute-domain-daemon/process.go @@ -201,8 +201,7 @@ func (m *ProcessManager) Watchdog(ctx context.Context) error { } } -// Detect if process terminated unexpectedly. Caller must ignore bool if err -// isn't nil. +// Detect if process terminated unexpectedly. func (m *ProcessManager) lost() bool { if !m.TryLock() { // Start or stop is in progress; do not inspect state. diff --git a/cmd/compute-domain-kubelet-plugin/computedomain.go b/cmd/compute-domain-kubelet-plugin/computedomain.go index 83f7ab3f4..5ea94db1e 100644 --- a/cmd/compute-domain-kubelet-plugin/computedomain.go +++ b/cmd/compute-domain-kubelet-plugin/computedomain.go @@ -60,7 +60,7 @@ type ComputeDomainManager struct { type ComputeDomainDaemonSettings struct { manager *ComputeDomainManager - domain string + domainID string rootDir string configTmplPath string nodesConfigPath string @@ -128,13 +128,13 @@ func (m *ComputeDomainManager) Stop() error { return nil } -func (m *ComputeDomainManager) NewSettings(domain string) *ComputeDomainDaemonSettings { +func (m *ComputeDomainManager) NewSettings(domainID string) *ComputeDomainDaemonSettings { return &ComputeDomainDaemonSettings{ manager: m, - domain: domain, - rootDir: fmt.Sprintf("%s/%s", m.configFilesRoot, domain), - configTmplPath: fmt.Sprintf("%s/%s/%s", m.configFilesRoot, domain, "config.tmpl.cfg"), - nodesConfigPath: fmt.Sprintf("%s/%s/%s", m.configFilesRoot, domain, "nodes_config.cfg"), + domainID: domainID, + rootDir: fmt.Sprintf("%s/%s", m.configFilesRoot, domainID), + configTmplPath: fmt.Sprintf("%s/%s/%s", m.configFilesRoot, domainID, "config.tmpl.cfg"), + nodesConfigPath: fmt.Sprintf("%s/%s/%s", m.configFilesRoot, domainID, "nodes_config.cfg"), } } @@ -154,17 +154,16 @@ func (m *ComputeDomainManager) GetComputeDomainChannelContainerEdits(devRoot str } } -func (s *ComputeDomainDaemonSettings) GetDomain() string { - return s.domain -} - -func (s *ComputeDomainDaemonSettings) GetCDIContainerEdits(ctx context.Context, devRoot string, info *nvcapDeviceInfo) (*cdiapi.ContainerEdits, error) { - cd, err := s.manager.GetComputeDomain(ctx, s.domain) +// GetCDIContainerEditsCommon() returns the CDI spec edits always required for +// launching the CD Daemon (whether or not it tries to launch an IMEX daemon +// internally). +func (s *ComputeDomainDaemonSettings) GetCDIContainerEditsCommon(ctx context.Context) (*cdiapi.ContainerEdits, error) { + cd, err := s.manager.GetComputeDomain(ctx, s.domainID) if err != nil { - return nil, fmt.Errorf("error getting compute domain: %w", err) + return nil, fmt.Errorf("error getting compute domain %s: %w", s.domainID, err) } if cd == nil { - return nil, fmt.Errorf("compute domain not found: %s", s.domain) + return nil, fmt.Errorf("compute domain not found: %s", s.domainID) } edits := &cdiapi.ContainerEdits{ @@ -182,6 +181,20 @@ func (s *ComputeDomainDaemonSettings) GetCDIContainerEdits(ctx context.Context, Options: []string{"rw", "nosuid", "nodev", "bind"}, }, }, + }, + } + return edits, nil +} + +func (s *ComputeDomainDaemonSettings) GetDomainID() string { + return s.domainID +} + +// GetCDIContainerEditsForImex() returns the CDI spec edits only required for +// launching the CD daemon when it actually wraps an IMEX daemon. +func (s *ComputeDomainDaemonSettings) GetCDIContainerEditsForImex(ctx context.Context, devRoot string, info *nvcapDeviceInfo) *cdiapi.ContainerEdits { + edits := &cdiapi.ContainerEdits{ + ContainerEdits: &cdispec.ContainerEdits{ DeviceNodes: []*cdispec.DeviceNode{ { Path: info.path, @@ -193,8 +206,7 @@ func (s *ComputeDomainDaemonSettings) GetCDIContainerEdits(ctx context.Context, }, }, } - - return edits, nil + return edits } func (s *ComputeDomainDaemonSettings) Prepare(ctx context.Context) error { diff --git a/cmd/compute-domain-kubelet-plugin/device_state.go b/cmd/compute-domain-kubelet-plugin/device_state.go index f21d6484f..d6acf3e40 100644 --- a/cmd/compute-domain-kubelet-plugin/device_state.go +++ b/cmd/compute-domain-kubelet-plugin/device_state.go @@ -462,7 +462,8 @@ func (s *DeviceState) applyComputeDomainChannelConfig(ctx context.Context, confi } for _, info := range s.nvdevlib.nvCapImexChanDevInfos[:chancount] { - configState.containerEdits = configState.containerEdits.Append(s.computeDomainManager.GetComputeDomainChannelContainerEdits(s.cdi.devRoot, info)) + edits := s.computeDomainManager.GetComputeDomainChannelContainerEdits(s.cdi.devRoot, info) + configState.containerEdits = configState.containerEdits.Append(edits) } return &configState, nil @@ -491,27 +492,36 @@ func (s *DeviceState) applyComputeDomainDaemonConfig(ctx context.Context, config ComputeDomain: config.DomainID, } - // Only prepare files to inject to the daemon if IMEX is supported. - if s.computeDomainManager.cliqueID != "" { - // Parse the device node info for the fabic-imex-mgmt nvcap. - nvcapDeviceInfo, err := s.nvdevlib.parseNVCapDeviceInfo(nvidiaCapFabricImexMgmtPath) - if err != nil { - return nil, fmt.Errorf("error parsing nvcap device info for fabic-imex-mgmt: %w", err) - } + // Create new ComputeDomain daemon settings from the ComputeDomainManager. + computeDomainDaemonSettings := s.computeDomainManager.NewSettings(config.DomainID) - // Create new ComputeDomain daemon settings from the ComputeDomainManager. - computeDomainDaemonSettings := s.computeDomainManager.NewSettings(config.DomainID) + // Prepare injecting IMEX daemon config files even if IMEX is not supported. + // This for example creates + // '/var/lib/kubelet/plugins/compute-domain.nvidia.com/domains/' on the + // host which is used as mount source mapped to /etc/nvidia-imex in the CD + // daemon container. + if err := computeDomainDaemonSettings.Prepare(ctx); err != nil { + return nil, fmt.Errorf("error preparing ComputeDomain daemon settings for requests '%v' in claim '%v': %w", requests, claim.UID, err) + } - // Prepare the new ComputeDomain daemon. - if err := computeDomainDaemonSettings.Prepare(ctx); err != nil { - return nil, fmt.Errorf("error preparing ComputeDomain daemon settings for requests '%v' in claim '%v': %w", requests, claim.UID, err) - } + // Always inject CD config details into the CD daemon (regardless of clique + // ID being empty or not). + edits, err := computeDomainDaemonSettings.GetCDIContainerEditsCommon(ctx) + if err != nil { + return nil, fmt.Errorf("error getting common container edits for ComputeDomain daemon '%s': %w", config.DomainID, err) + } + configState.containerEdits = configState.containerEdits.Append(edits) - // Store information about the ComputeDomain daemon in the configState. - edits, err := computeDomainDaemonSettings.GetCDIContainerEdits(ctx, s.cdi.devRoot, nvcapDeviceInfo) + // Only inject dev nodes related to + // /proc/driver/nvidia/capabilities/fabric-imex-mgmt if IMEX is supported + // (if we want to start the IMEX daemon process in the CD daemon pod). + if s.computeDomainManager.cliqueID != "" { + // Parse the device node info for the fabric-imex-mgmt nvcap. + nvcapDeviceInfo, err := s.nvdevlib.parseNVCapDeviceInfo(nvidiaCapFabricImexMgmtPath) if err != nil { - return nil, fmt.Errorf("error getting container edits for ComputeDomain daemon for requests '%v' in claim '%v': %w", requests, claim.UID, err) + return nil, fmt.Errorf("error parsing nvcap device info for fabric-imex-mgmt: %w", err) } + edits := computeDomainDaemonSettings.GetCDIContainerEditsForImex(ctx, s.cdi.devRoot, nvcapDeviceInfo) configState.containerEdits = configState.containerEdits.Append(edits) }