From 0dcfe578eb5075399fdc202201bb56a4ae232243 Mon Sep 17 00:00:00 2001 From: Sheng Lin Date: Wed, 18 Mar 2026 17:30:37 -0400 Subject: [PATCH] Add DRA request Prometheus metrics Signed-off-by: Sheng Lin --- .../computedomain.go | 4 + cmd/compute-domain-controller/controller.go | 9 ++ cmd/compute-domain-controller/main.go | 21 +-- cmd/compute-domain-daemon/controller.go | 6 + cmd/compute-domain-daemon/main.go | 4 + .../device_state.go | 44 +++++- cmd/compute-domain-kubelet-plugin/driver.go | 15 ++ cmd/compute-domain-kubelet-plugin/main.go | 25 +++- cmd/gpu-kubelet-plugin/device_state.go | 44 +++++- cmd/gpu-kubelet-plugin/driver.go | 17 ++- cmd/gpu-kubelet-plugin/main.go | 22 +++ .../templates/controller.yaml | 9 ++ .../templates/kubeletplugin.yaml | 14 ++ .../helm/nvidia-dra-driver-gpu/values.yaml | 11 ++ pkg/metrics/computedomain_cluster.go | 95 +++++++++++++ pkg/metrics/dra_requests.go | 131 ++++++++++++++++++ pkg/metrics/prometheus_httpserver.go | 87 ++++++++++++ 17 files changed, 534 insertions(+), 24 deletions(-) create mode 100644 pkg/metrics/computedomain_cluster.go create mode 100644 pkg/metrics/dra_requests.go create mode 100644 pkg/metrics/prometheus_httpserver.go diff --git a/cmd/compute-domain-controller/computedomain.go b/cmd/compute-domain-controller/computedomain.go index 4ccce9fde..62f0eca56 100644 --- a/cmd/compute-domain-controller/computedomain.go +++ b/cmd/compute-domain-controller/computedomain.go @@ -28,6 +28,7 @@ import ( "k8s.io/klog/v2" nvapi "sigs.k8s.io/nvidia-dra-driver-gpu/api/nvidia.com/resource/v1beta1" + "sigs.k8s.io/nvidia-dra-driver-gpu/pkg/metrics" nvinformers "sigs.k8s.io/nvidia-dra-driver-gpu/pkg/nvidia.com/informers/externalversions" nvlisters "sigs.k8s.io/nvidia-dra-driver-gpu/pkg/nvidia.com/listers/resource/v1beta1" ) @@ -207,6 +208,8 @@ func (m *ComputeDomainManager) UpdateStatus(ctx context.Context, cd *nvapi.Compu // Recalculate global status based on current state cd.Status.Status = m.calculateGlobalStatus(cd) + metrics.ObserveComputeDomainStatus(string(cd.UID), cd.Status.Status) + updatedCD, err := m.config.clientsets.Nvidia.ResourceV1beta1().ComputeDomains(cd.Namespace).UpdateStatus(ctx, cd, metav1.UpdateOptions{}) if err != nil { return nil, err @@ -344,6 +347,7 @@ func (m *ComputeDomainManager) onAddOrUpdate(ctx context.Context, obj any) error return fmt.Errorf("error removing finalizer: %w", err) } + metrics.ForgetComputeDomain(string(cd.UID)) return nil } diff --git a/cmd/compute-domain-controller/controller.go b/cmd/compute-domain-controller/controller.go index 91f47a8d7..f1294dc0d 100644 --- a/cmd/compute-domain-controller/controller.go +++ b/cmd/compute-domain-controller/controller.go @@ -56,6 +56,13 @@ type ManagerConfig struct { // logVerbosityCDDaemon controls the log verbosity for dynamically launched // ComputeDomain daemons. logVerbosityCDDaemon int + + // httpEndpoint is the TCP network address where the HTTP server for diagnostics + // (including pprof and metrics) will listen + httpEndpoint string + + // metricsPath is the HTTP path for Prometheus metrics + metricsPath string } // Controller manages the lifecycle of the DRA driver and its components. @@ -84,6 +91,8 @@ func (c *Controller) Run(ctx context.Context) error { clientsets: c.config.clientsets, workQueue: workQueue, logVerbosityCDDaemon: c.config.flags.logVerbosityCDDaemon, + httpEndpoint: c.config.flags.httpEndpoint, + metricsPath: c.config.flags.metricsPath, } // TODO: log full, nested cliFlags structure. diff --git a/cmd/compute-domain-controller/main.go b/cmd/compute-domain-controller/main.go index 328af05c0..61e13a703 100644 --- a/cmd/compute-domain-controller/main.go +++ b/cmd/compute-domain-controller/main.go @@ -28,15 +28,12 @@ import ( "syscall" "github.com/google/uuid" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/urfave/cli/v2" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/component-base/logs" - "k8s.io/component-base/metrics/legacyregistry" "k8s.io/klog/v2" _ "k8s.io/component-base/metrics/prometheus/restclient" // for client metric registration @@ -47,6 +44,7 @@ import ( "sigs.k8s.io/nvidia-dra-driver-gpu/internal/info" "sigs.k8s.io/nvidia-dra-driver-gpu/pkg/featuregates" pkgflags "sigs.k8s.io/nvidia-dra-driver-gpu/pkg/flags" + "sigs.k8s.io/nvidia-dra-driver-gpu/pkg/metrics" ) const ( @@ -371,24 +369,9 @@ func runWithLeaderElection(ctx context.Context, config *Config, controller *Cont func SetupHTTPEndpoint(config *Config) error { if config.flags.metricsPath != "" { - // To collect metrics data from the metric handler itself, we - // let it register itself and then collect from that registry. - reg := prometheus.NewRegistry() - gatherers := prometheus.Gatherers{ - // Include Go runtime and process metrics: - // https://github.com/kubernetes/kubernetes/blob/9780d88cb6a4b5b067256ecb4abf56892093ee87/staging/src/k8s.io/component-base/metrics/legacyregistry/registry.go#L46-L49 - legacyregistry.DefaultGatherer, - } - gatherers = append(gatherers, reg) - actualPath := path.Join("/", config.flags.metricsPath) klog.InfoS("Starting metrics", "path", actualPath) - // This is similar to k8s.io/component-base/metrics HandlerWithReset - // except that we gather from multiple sources. - config.mux.Handle(actualPath, - promhttp.InstrumentMetricHandler( - reg, - promhttp.HandlerFor(gatherers, promhttp.HandlerOpts{}))) + config.mux.Handle(path.Join("/", config.flags.metricsPath), metrics.NewLegacyPrometheusHandler()) } if config.flags.profilePath != "" { diff --git a/cmd/compute-domain-daemon/controller.go b/cmd/compute-domain-daemon/controller.go index b35aa9017..7fd86949f 100644 --- a/cmd/compute-domain-daemon/controller.go +++ b/cmd/compute-domain-daemon/controller.go @@ -36,6 +36,8 @@ type DaemonInfoManager interface { // ManagerConfig holds the configuration for the compute domain manager. type ManagerConfig struct { + httpEndpoint string + metricsPath string workQueue *workqueue.WorkQueue clientsets flags.ClientSets nodeName string @@ -52,6 +54,8 @@ type ManagerConfig struct { // ControllerConfig holds the configuration for the controller. type ControllerConfig struct { + httpEndpoint string + metricsPath string clientsets flags.ClientSets nodeName string computeDomainUUID string @@ -77,6 +81,8 @@ func NewController(config *ControllerConfig) (*Controller, error) { mc := &ManagerConfig{ workQueue: workQueue, + httpEndpoint: config.httpEndpoint, + metricsPath: config.metricsPath, clientsets: config.clientsets, nodeName: config.nodeName, computeDomainUUID: config.computeDomainUUID, diff --git a/cmd/compute-domain-daemon/main.go b/cmd/compute-domain-daemon/main.go index a2299bb93..b4e4df15a 100644 --- a/cmd/compute-domain-daemon/main.go +++ b/cmd/compute-domain-daemon/main.go @@ -61,6 +61,8 @@ type Flags struct { podName string podNamespace string maxNodesPerIMEXDomain int + httpEndpoint string + metricsPath string klogVerbosity int } @@ -244,6 +246,8 @@ func run(ctx context.Context, cancel context.CancelFunc, flags *Flags) error { } config := &ControllerConfig{ + httpEndpoint: flags.httpEndpoint, + metricsPath: flags.metricsPath, clientsets: clientsets, cliqueID: flags.cliqueID, computeDomainUUID: flags.computeDomainUUID, diff --git a/cmd/compute-domain-kubelet-plugin/device_state.go b/cmd/compute-domain-kubelet-plugin/device_state.go index b4c337e4e..8a3d5258e 100644 --- a/cmd/compute-domain-kubelet-plugin/device_state.go +++ b/cmd/compute-domain-kubelet-plugin/device_state.go @@ -34,6 +34,7 @@ import ( configapi "sigs.k8s.io/nvidia-dra-driver-gpu/api/nvidia.com/resource/v1beta1" "sigs.k8s.io/nvidia-dra-driver-gpu/internal/common" "sigs.k8s.io/nvidia-dra-driver-gpu/pkg/featuregates" + drametrics "sigs.k8s.io/nvidia-dra-driver-gpu/pkg/metrics" ) type OpaqueDeviceConfig struct { @@ -132,6 +133,11 @@ func NewDeviceState(ctx context.Context, config *Config) (*DeviceState, error) { for _, c := range checkpoints { if c == DriverPluginCheckpointFileBasename { klog.Infof("Found previous checkpoint: %s", c) + cp, err := state.getCheckpoint() + if err != nil { + return nil, fmt.Errorf("unable to get checkpoint: %w", err) + } + syncPreparedDevicesGaugeFromCheckpoint(config.flags.nodeName, cp) return state, nil } } @@ -288,7 +294,11 @@ func (s *DeviceState) Unprepare(ctx context.Context, claimRef kubeletplugin.Name } func (s *DeviceState) createCheckpoint(cp *Checkpoint) error { - return s.checkpointManager.CreateCheckpoint(DriverPluginCheckpointFileBasename, cp) + if err := s.checkpointManager.CreateCheckpoint(DriverPluginCheckpointFileBasename, cp); err != nil { + return err + } + syncPreparedDevicesGaugeFromCheckpoint(s.config.flags.nodeName, cp) + return nil } func (s *DeviceState) getCheckpoint() (*Checkpoint, error) { @@ -317,6 +327,7 @@ func (s *DeviceState) updateCheckpoint(mutate func(*Checkpoint)) error { return fmt.Errorf("unable to create checkpoint: %w", err) } + syncPreparedDevicesGaugeFromCheckpoint(s.config.flags.nodeName, checkpoint) return nil } @@ -825,3 +836,34 @@ func (s *DeviceState) validateNoOverlappingPreparedDevices(checkpoint *Checkpoin } return nil } + +func syncPreparedDevicesGaugeFromCheckpoint(nodeName string, cp *Checkpoint) { + counts := make(map[string]int) + if cp == nil { + return + } + lv := cp.ToLatestVersion() + if lv != nil && lv.V2 != nil { + for _, pc := range lv.V2.PreparedClaims { + if pc.CheckpointState != ClaimCheckpointStatePrepareCompleted { + continue + } + for _, g := range pc.PreparedDevices { + for _, dev := range g.Devices { + if _, ok := counts[dev.Type()]; !ok { + counts[dev.Type()] = 0 + } + counts[dev.Type()]++ + } + } + } + } + + for _, dt := range []string{ComputeDomainChannelType, ComputeDomainDaemonType, UnknownDeviceType} { + if count, ok := counts[dt]; !ok { + drametrics.SetPreparedDevicesCounts(nodeName, DriverName, dt, 0) + } else { + drametrics.SetPreparedDevicesCounts(nodeName, DriverName, dt, count) + } + } +} diff --git a/cmd/compute-domain-kubelet-plugin/driver.go b/cmd/compute-domain-kubelet-plugin/driver.go index f02120bbb..bd0480791 100644 --- a/cmd/compute-domain-kubelet-plugin/driver.go +++ b/cmd/compute-domain-kubelet-plugin/driver.go @@ -33,6 +33,7 @@ import ( "k8s.io/klog/v2" "sigs.k8s.io/nvidia-dra-driver-gpu/pkg/flock" + drametrics "sigs.k8s.io/nvidia-dra-driver-gpu/pkg/metrics" "sigs.k8s.io/nvidia-dra-driver-gpu/pkg/workqueue" ) @@ -241,14 +242,19 @@ func (d *driver) HandleError(ctx context.Context, err error, msg string) { // also reflect an error. Set the boolean to `true` for any result wrapping a // non-retryable error. func (d *driver) nodePrepareResource(ctx context.Context, claim *resourceapi.ResourceClaim) (bool, kubeletplugin.PrepareResult) { + t0 := time.Now() + release, err := d.pulock.Acquire(ctx, flock.WithTimeout(10*time.Second)) if err != nil { + drametrics.IncNodePrepareError(DriverName, "lock_acquire") res := kubeletplugin.PrepareResult{ Err: fmt.Errorf("error acquiring prep/unprep lock: %w", err), } return false, res } defer release() + doneInFlight := drametrics.TrackInFlight(DriverName, "prepare") + defer doneInFlight() if claim.Status.Allocation == nil { res := kubeletplugin.PrepareResult{ @@ -259,6 +265,7 @@ func (d *driver) nodePrepareResource(ctx context.Context, claim *resourceapi.Res devs, err := d.state.Prepare(ctx, claim) if err != nil { + drametrics.IncNodePrepareError(DriverName, "prepare_devices") res := kubeletplugin.PrepareResult{ Err: fmt.Errorf("error preparing devices for claim '%s': %w", ResourceClaimToString(claim), err), } @@ -270,6 +277,7 @@ func (d *driver) nodePrepareResource(ctx context.Context, claim *resourceapi.Res } klog.V(1).Infof("Prepared devices for claim '%s': %v", ResourceClaimToString(claim), devs) + drametrics.ObserveRequest(DriverName, "prepare", time.Since(t0)) return true, kubeletplugin.PrepareResult{Devices: devs} } @@ -277,17 +285,24 @@ func (d *driver) nodePrepareResource(ctx context.Context, claim *resourceapi.Res // Return 2-tuple: the first value is a boolean indicating to the retry logic // whether the work is 'done'. func (d *driver) nodeUnprepareResource(ctx context.Context, claimRef kubeletplugin.NamespacedObject) (bool, error) { + tstart := time.Now() + release, err := d.pulock.Acquire(ctx, flock.WithTimeout(10*time.Second)) if err != nil { + drametrics.IncNodeUnprepareError(DriverName, "lock_acquire") return false, fmt.Errorf("error acquiring prep/unprep lock: %w", err) } defer release() + doneInFlight := drametrics.TrackInFlight(DriverName, "unprepare") + defer doneInFlight() if err := d.state.Unprepare(ctx, claimRef); err != nil { + drametrics.IncNodeUnprepareError(DriverName, "unprepare_devices") return isPermanentError(err), fmt.Errorf("error unpreparing devices for claim '%v': %w", claimRef.String(), err) } klog.V(1).Infof("Unprepared devices for claim '%v'", claimRef.String()) + drametrics.ObserveRequest(DriverName, "unprepare", time.Since(tstart)) return true, nil } diff --git a/cmd/compute-domain-kubelet-plugin/main.go b/cmd/compute-domain-kubelet-plugin/main.go index a8398e604..5e60bc4e9 100644 --- a/cmd/compute-domain-kubelet-plugin/main.go +++ b/cmd/compute-domain-kubelet-plugin/main.go @@ -27,15 +27,15 @@ import ( "github.com/urfave/cli/v2" + "k8s.io/component-base/logs" "k8s.io/dynamic-resource-allocation/kubeletplugin" "k8s.io/klog/v2" - "k8s.io/component-base/logs" - "sigs.k8s.io/nvidia-dra-driver-gpu/internal/common" "sigs.k8s.io/nvidia-dra-driver-gpu/internal/info" "sigs.k8s.io/nvidia-dra-driver-gpu/pkg/featuregates" pkgflags "sigs.k8s.io/nvidia-dra-driver-gpu/pkg/flags" + "sigs.k8s.io/nvidia-dra-driver-gpu/pkg/metrics" ) const ( @@ -48,6 +48,8 @@ type Flags struct { kubeClientConfig pkgflags.KubeClientConfig nodeName string + httpEndpoint string + metricsPath string namespace string cdiRoot string containerDriverRoot string @@ -95,6 +97,19 @@ func newApp() *cli.App { Destination: &flags.namespace, EnvVars: []string{"NAMESPACE"}, }, + &cli.StringFlag{ + Name: "http-endpoint", + Usage: "The TCP network `address` where the metrics HTTP server will listen (example: `:8080`). The default is the empty string, which means the server is disabled.", + Destination: &flags.httpEndpoint, + EnvVars: []string{"HTTP_ENDPOINT"}, + }, + &cli.StringFlag{ + Name: "metrics-path", + Usage: "The HTTP `path` where Prometheus metrics are exposed, disabled if empty.", + Value: "/metrics", + Destination: &flags.metricsPath, + EnvVars: []string{"METRICS_PATH"}, + }, &cli.StringFlag{ Name: "cdi-root", Usage: "Absolute path to the directory where CDI files will be generated.", @@ -239,6 +254,12 @@ func RunPlugin(ctx context.Context, config *Config) error { ctx, cancel := signal.NotifyContext(ctx, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) defer cancel() + if config.flags.httpEndpoint != "" { + if err := metrics.RunPrometheusMetricsServer(ctx, config.flags.httpEndpoint, config.flags.metricsPath); err != nil { + return fmt.Errorf("setup metrics endpoint: %w", err) + } + } + // Create and start the driver driver, err := NewDriver(ctx, config) if err != nil { diff --git a/cmd/gpu-kubelet-plugin/device_state.go b/cmd/gpu-kubelet-plugin/device_state.go index 4e7a13a07..a306ee269 100644 --- a/cmd/gpu-kubelet-plugin/device_state.go +++ b/cmd/gpu-kubelet-plugin/device_state.go @@ -37,6 +37,7 @@ import ( configapi "sigs.k8s.io/nvidia-dra-driver-gpu/api/nvidia.com/resource/v1beta1" "sigs.k8s.io/nvidia-dra-driver-gpu/pkg/featuregates" "sigs.k8s.io/nvidia-dra-driver-gpu/pkg/flock" + drametrics "sigs.k8s.io/nvidia-dra-driver-gpu/pkg/metrics" ) type OpaqueDeviceConfig struct { @@ -166,6 +167,11 @@ func NewDeviceState(ctx context.Context, config *Config) (*DeviceState, error) { for _, c := range checkpoints { if c == DriverPluginCheckpointFileBasename { + cp, err := state.getCheckpoint(ctx) + if err != nil { + return nil, fmt.Errorf("unable to get checkpoint: %w", err) + } + syncPreparedDevicesGaugeFromCheckpoint(config.flags.nodeName, cp) return state, nil } } @@ -525,7 +531,11 @@ func (s *DeviceState) createCheckpoint(ctx context.Context, cp *Checkpoint) erro klog.V(7).Info("acquired cplock (createCheckpoint)") err = s.checkpointManager.CreateCheckpoint(DriverPluginCheckpointFileBasename, cp) klog.V(7).Info("create cp: done") - return err + if err != nil { + return err + } + syncPreparedDevicesGaugeFromCheckpoint(s.config.flags.nodeName, cp) + return nil } func (s *DeviceState) getCheckpoint(ctx context.Context) (*Checkpoint, error) { @@ -578,6 +588,7 @@ func (s *DeviceState) updateCheckpoint(ctx context.Context, mutate func(*Checkpo return fmt.Errorf("unable to create checkpoint: %w", err) } klog.V(6).Infof("t_checkpoint_update_total %.3f s", time.Since(tucp0).Seconds()) + syncPreparedDevicesGaugeFromCheckpoint(s.config.flags.nodeName, cp) return nil } @@ -1182,3 +1193,34 @@ func (s *DeviceState) deleteMigDevIfExistsAndNotUsedByCompletedClaim(ms *MigSpec return nil } + +func syncPreparedDevicesGaugeFromCheckpoint(nodeName string, cp *Checkpoint) { + counts := make(map[string]int) // map of device type to count of devices of that type + if cp == nil { + return + } + lv := cp.ToLatestVersion() + if lv != nil && lv.V2 != nil { + for _, pc := range lv.V2.PreparedClaims { + if pc.CheckpointState != ClaimCheckpointStatePrepareCompleted { + continue + } + for _, g := range pc.PreparedDevices { + for _, dev := range g.Devices { + if _, ok := counts[dev.Type()]; !ok { + counts[dev.Type()] = 0 + } + counts[dev.Type()]++ + } + } + } + } + + for _, dt := range []string{GpuDeviceType, PreparedMigDeviceType, VfioDeviceType, UnknownDeviceType} { + if count, ok := counts[dt]; !ok { + drametrics.SetPreparedDevicesCounts(nodeName, DriverName, dt, 0) + } else { + drametrics.SetPreparedDevicesCounts(nodeName, DriverName, dt, count) + } + } +} diff --git a/cmd/gpu-kubelet-plugin/driver.go b/cmd/gpu-kubelet-plugin/driver.go index 8283de28e..400e46473 100644 --- a/cmd/gpu-kubelet-plugin/driver.go +++ b/cmd/gpu-kubelet-plugin/driver.go @@ -36,6 +36,7 @@ import ( "sigs.k8s.io/nvidia-dra-driver-gpu/pkg/featuregates" "sigs.k8s.io/nvidia-dra-driver-gpu/pkg/flock" + drametrics "sigs.k8s.io/nvidia-dra-driver-gpu/pkg/metrics" ) // DriverPrepUprepFlockPath is the path to a lock file used to make sure @@ -332,19 +333,23 @@ func (d *driver) HandleError(ctx context.Context, err error, msg string) { } func (d *driver) nodePrepareResource(ctx context.Context, claim *resourceapi.ResourceClaim) kubeletplugin.PrepareResult { + t0 := time.Now() // Instead of a global prepare/unprepare (PU) lock, we could rely on // fine-grained checkpoint locking, which was proven to work correctly in // case of DynamicMIG mode. However, out of caution, retain this global PU // lock for now in all modes (re-evaluate the performance impact at a later // time). - t0 := time.Now() + release, err := d.pulock.Acquire(ctx, flock.WithTimeout(10*time.Second)) if err != nil { + drametrics.IncNodePrepareError(DriverName, "lock_acquire") return kubeletplugin.PrepareResult{ Err: fmt.Errorf("error acquiring prep/unprep lock: %w", err), } } defer release() + doneInFlight := drametrics.TrackInFlight(DriverName, "prepare") + defer doneInFlight() klog.V(6).Infof("t_prep_lock_acq %.3f s", time.Since(t0).Seconds()) cs := ResourceClaimToString(claim) @@ -353,6 +358,7 @@ func (d *driver) nodePrepareResource(ctx context.Context, claim *resourceapi.Res klog.V(6).Infof("t_prep %.3f s (claim %s)", time.Since(tprep0).Seconds(), cs) if err != nil { + drametrics.IncNodePrepareError(DriverName, "prepare_devices") return kubeletplugin.PrepareResult{ Err: fmt.Errorf("error preparing devices for claim %s: %w", cs, err), } @@ -361,6 +367,7 @@ func (d *driver) nodePrepareResource(ctx context.Context, claim *resourceapi.Res if featuregates.Enabled(featuregates.PassthroughSupport) { // Re-advertise updated resourceslice after preparing devices. if err = d.publishResources(ctx, d.state.config); err != nil { + drametrics.IncNodePrepareError(DriverName, "publish_resources") return kubeletplugin.PrepareResult{ Err: fmt.Errorf("error preparing devices for claim %v: %w", claim.UID, err), } @@ -368,16 +375,21 @@ func (d *driver) nodePrepareResource(ctx context.Context, claim *resourceapi.Res } klog.Infof("Returning newly prepared devices for claim '%s': %v", cs, devs) + drametrics.ObserveRequest(DriverName, "prepare", time.Since(t0)) return kubeletplugin.PrepareResult{Devices: devs} } func (d *driver) nodeUnprepareResource(ctx context.Context, claimRef kubeletplugin.NamespacedObject) error { t0 := time.Now() + release, err := d.pulock.Acquire(ctx, flock.WithTimeout(10*time.Second)) if err != nil { + drametrics.IncNodeUnprepareError(DriverName, "lock_acquire") return fmt.Errorf("error acquiring prep/unprep lock: %w", err) } defer release() + doneInFlight := drametrics.TrackInFlight(DriverName, "unprepare") + defer doneInFlight() klog.V(6).Infof("t_unprep_lock_acq %.3f s", time.Since(t0).Seconds()) cs := claimRef.String() @@ -386,16 +398,19 @@ func (d *driver) nodeUnprepareResource(ctx context.Context, claimRef kubeletplug klog.V(6).Infof("t_unprep %.3f s (claim %s)", time.Since(tunprep0).Seconds(), cs) if err != nil { + drametrics.IncNodeUnprepareError(DriverName, "unprepare_devices") return fmt.Errorf("error unpreparing devices for claim %v: %w", claimRef.String(), err) } if featuregates.Enabled(featuregates.PassthroughSupport) { // Re-advertise updated resourceslice after unpreparing devices. if err = d.publishResources(ctx, d.state.config); err != nil { + drametrics.IncNodeUnprepareError(DriverName, "publish_resources") return fmt.Errorf("error publishing resources: %w", err) } } + drametrics.ObserveRequest(DriverName, "unprepare", time.Since(t0)) return nil } diff --git a/cmd/gpu-kubelet-plugin/main.go b/cmd/gpu-kubelet-plugin/main.go index 92653a5d0..48569168a 100644 --- a/cmd/gpu-kubelet-plugin/main.go +++ b/cmd/gpu-kubelet-plugin/main.go @@ -35,6 +35,7 @@ import ( "sigs.k8s.io/nvidia-dra-driver-gpu/internal/info" "sigs.k8s.io/nvidia-dra-driver-gpu/pkg/featuregates" pkgflags "sigs.k8s.io/nvidia-dra-driver-gpu/pkg/flags" + "sigs.k8s.io/nvidia-dra-driver-gpu/pkg/metrics" ) const ( @@ -47,6 +48,8 @@ type Flags struct { nodeName string namespace string + httpEndpoint string + metricsPath string cdiRoot string containerDriverRoot string hostDriverRoot string @@ -159,6 +162,19 @@ func newApp() *cli.App { Destination: &flags.additionalXidsToIgnore, EnvVars: []string{"ADDITIONAL_XIDS_TO_IGNORE"}, }, + &cli.StringFlag{ + Name: "http-endpoint", + Usage: "The TCP network `address` where the metrics HTTP server will listen (example: `:8080`). The default is the empty string, which means the server is disabled.", + Destination: &flags.httpEndpoint, + EnvVars: []string{"HTTP_ENDPOINT"}, + }, + &cli.StringFlag{ + Name: "metrics-path", + Usage: "The HTTP `path` where Prometheus metrics are exposed, disabled if empty.", + Value: "/metrics", + Destination: &flags.metricsPath, + EnvVars: []string{"METRICS_PATH"}, + }, } cliFlags = append(cliFlags, flags.kubeClientConfig.Flags()...) cliFlags = append(cliFlags, featureGateConfig.Flags()...) @@ -253,6 +269,12 @@ func RunPlugin(ctx context.Context, config *Config) error { ctx, cancel := signal.NotifyContext(ctx, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) defer cancel() + if config.flags.httpEndpoint != "" { + if err := metrics.RunPrometheusMetricsServer(ctx, config.flags.httpEndpoint, config.flags.metricsPath); err != nil { + return fmt.Errorf("setup metrics endpoint: %w", err) + } + } + // Create and start the driver driver, err := NewDriver(ctx, config) if err != nil { diff --git a/deployments/helm/nvidia-dra-driver-gpu/templates/controller.yaml b/deployments/helm/nvidia-dra-driver-gpu/templates/controller.yaml index 3b1a19302..1594394a6 100644 --- a/deployments/helm/nvidia-dra-driver-gpu/templates/controller.yaml +++ b/deployments/helm/nvidia-dra-driver-gpu/templates/controller.yaml @@ -56,6 +56,15 @@ spec: resources: {{- toYaml .Values.controller.containers.computeDomain.resources | nindent 10 }} env: + {{- if .Values.controller.metrics.enabled }} + # HTTP_ENDPOINT and METRICS_PATH are used to configure the metrics HTTP server. + - name: HTTP_ENDPOINT + value: "{{ .Values.controller.metrics.httpEndpoint }}" + - name: METRICS_PATH + value: "{{ .Values.controller.metrics.metricsPath }}" + - name: PPROF_PATH + value: "{{ .Values.controller.metrics.profilePath }}" + {{- end }} # LOG_VERBOSITY is the source of truth for this program's klog # configuration. Currently injected via CLI argument (see above) because # klog's verbosity for now cannot be sanely set from an env var. diff --git a/deployments/helm/nvidia-dra-driver-gpu/templates/kubeletplugin.yaml b/deployments/helm/nvidia-dra-driver-gpu/templates/kubeletplugin.yaml index 023631a18..dd53780ff 100644 --- a/deployments/helm/nvidia-dra-driver-gpu/templates/kubeletplugin.yaml +++ b/deployments/helm/nvidia-dra-driver-gpu/templates/kubeletplugin.yaml @@ -124,6 +124,13 @@ spec: timeoutSeconds: 10 {{- end }} env: + {{- if .Values.kubeletPlugin.metrics.enabled }} + # HTTP_ENDPOINT and METRICS_PATH are used to configure the metrics HTTP server. + - name: HTTP_ENDPOINT + value: "{{ .Values.kubeletPlugin.metrics.computeDomainHttpEndpoint }}" + - name: METRICS_PATH + value: "{{ .Values.kubeletPlugin.metrics.metricsPath }}" + {{- end }} # LOG_VERBOSITY is the source of truth for this program's klog # configuration. Currently injected via CLI argument (see above) because # klog's verbosity for now cannot be sanely set from an environment @@ -225,6 +232,13 @@ spec: timeoutSeconds: 10 {{- end }} env: + {{- if .Values.kubeletPlugin.metrics.enabled }} + # HTTP_ENDPOINT and METRICS_PATH are used to configure the metrics HTTP server. + - name: HTTP_ENDPOINT + value: "{{ .Values.kubeletPlugin.metrics.gpuHttpEndpoint }}" + - name: METRICS_PATH + value: "{{ .Values.kubeletPlugin.metrics.metricsPath }}" + {{- end }} # LOG_VERBOSITY is the source of truth for this program's klog # configuration. Currently injected via CLI argument (see above) because # klog's verbosity for now cannot be sanely set from an environment diff --git a/deployments/helm/nvidia-dra-driver-gpu/values.yaml b/deployments/helm/nvidia-dra-driver-gpu/values.yaml index 964d0876c..c6082f81e 100644 --- a/deployments/helm/nvidia-dra-driver-gpu/values.yaml +++ b/deployments/helm/nvidia-dra-driver-gpu/values.yaml @@ -73,6 +73,7 @@ resources: computeDomains: enabled: true + # Feature gates configuration following Kubernetes patterns # Configure feature gates as key-value pairs (feature_name: true/false) # Examples: @@ -191,6 +192,11 @@ controller: leaseDuration: "15s" renewDeadline: "10s" retryPeriod: "2s" + metrics: + enabled: true + httpEndpoint: ":8080" + metricsPath: "/metrics" + profilePath: "" priorityClassName: "system-node-critical" podAnnotations: {} podSecurityContext: {} @@ -228,6 +234,11 @@ controller: enabled: false kubeletPlugin: + metrics: + enabled: true + gpuHttpEndpoint: ":8080" + computeDomainHttpEndpoint: ":8081" + metricsPath: "/metrics" priorityClassName: "system-node-critical" updateStrategy: type: RollingUpdate diff --git a/pkg/metrics/computedomain_cluster.go b/pkg/metrics/computedomain_cluster.go new file mode 100644 index 000000000..930fe9e50 --- /dev/null +++ b/pkg/metrics/computedomain_cluster.go @@ -0,0 +1,95 @@ +/* +Copyright The Kubernetes Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "sync" + + "github.com/prometheus/client_golang/prometheus" + "k8s.io/component-base/metrics/legacyregistry" +) + +var ( + computeDomainClusterMetricsOnce sync.Once + computeDomains *prometheus.GaugeVec + + computeDomainLastStatus map[string]string // ComputeDomain UID -> last published status label +) + +func initComputeDomainClusterMetrics() { + computeDomains = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "nvidia_dra", + Name: "compute_domain_info", + Help: "Current number of ComputeDomain custom resources in the cluster as seen by compute-domain-controller, partitioned by compute domain status.", + }, + []string{"status"}, + ) +} + +// RegisterComputeDomainClusterMetrics registers cluster-wide ComputeDomain gauges on the +// Kubernetes legacy metrics registry (included in the controller /metrics endpoint). +func registerComputeDomainClusterMetrics() { + computeDomainClusterMetricsOnce.Do(func() { + initComputeDomainClusterMetrics() + legacyregistry.RawMustRegister(computeDomains) + }) +} + +// ObserveComputeDomainStatus updates gauges for a single ComputeDomain when its global +// status label changes. It is safe to call repeatedly with the same uid and statusLabel (no-op). +// uid must be non-empty. +func ObserveComputeDomainStatus(uid, statusLabel string) { + if uid == "" { + return + } + registerComputeDomainClusterMetrics() + + if computeDomainLastStatus == nil { + computeDomainLastStatus = make(map[string]string) + } + + if prev, ok := computeDomainLastStatus[uid]; ok { // previous observation for this UID + if prev == statusLabel { + return + } + computeDomains.WithLabelValues(prev).Add(-1) + computeDomains.WithLabelValues(statusLabel).Add(1) + computeDomainLastStatus[uid] = statusLabel + } else { // first observation for this UID + computeDomains.WithLabelValues(statusLabel).Add(1) + computeDomainLastStatus[uid] = statusLabel + } +} + +// ForgetComputeDomain removes a ComputeDomain UID from metrics (e.g. on informer Delete). +func ForgetComputeDomain(uid string) { + if uid == "" { + return + } + registerComputeDomainClusterMetrics() + + if computeDomainLastStatus == nil { + return + } + prev, ok := computeDomainLastStatus[uid] + if !ok { + return + } + computeDomains.WithLabelValues(prev).Add(-1) + delete(computeDomainLastStatus, uid) +} diff --git a/pkg/metrics/dra_requests.go b/pkg/metrics/dra_requests.go new file mode 100644 index 000000000..e0e7db834 --- /dev/null +++ b/pkg/metrics/dra_requests.go @@ -0,0 +1,131 @@ +/* +Copyright The Kubernetes Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "k8s.io/component-base/metrics/legacyregistry" +) + +var ( + registerOnce sync.Once + requestDurationSecondsBuckets = prometheus.ExponentialBuckets(0.05, 2, 9) + draRequestsTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "nvidia_dra", + Name: "requests_total", + Help: "Total number of DRA prepare and unprepare requests.", + }, + []string{"driver", "operation"}, + ) + + draRequestDurationSeconds = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "nvidia_dra", + Name: "request_duration_seconds", + Help: "Duration of DRA prepare and unprepare requests.", + Buckets: requestDurationSecondsBuckets, + }, + []string{"driver", "operation"}, + ) + + draRequestsInFlight = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "nvidia_dra", + Name: "requests_inflight", + Help: "Number of in-flight DRA prepare and unprepare requests.", + }, + []string{"driver", "operation"}, + ) + + draPreparedDevices = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "nvidia_dra", + Name: "prepared_devices", + Help: "Current number of prepared devices by device type.", + }, + []string{"node", "driver", "device_type"}, + ) + + // node_prepare_errors_total is scoped to kubelet node prepare failures (GPU DRA plugin). + nodePrepareErrorsTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "nvidia_dra", + Name: "node_prepare_errors_total", + Help: "Total number of failures during DRA node prepare for the GPU kubelet plugin.", + }, + []string{"driver", "error_type"}, + ) + + nodeUnprepareErrorsTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "nvidia_dra", + Name: "node_unprepare_errors_total", + Help: "Total number of failures during DRA node unprepare for the GPU kubelet plugin.", + }, + []string{"driver", "error_type"}, + ) +) + +func Register() { + registerOnce.Do(func() { + legacyregistry.RawMustRegister( + draRequestsTotal, + draRequestDurationSeconds, + draRequestsInFlight, + draPreparedDevices, + nodePrepareErrorsTotal, + nodeUnprepareErrorsTotal, + ) + }) +} + +func TrackInFlight(driver, operation string) func() { + Register() + draRequestsInFlight.WithLabelValues(driver, operation).Inc() + return func() { + draRequestsInFlight.WithLabelValues(driver, operation).Dec() + } +} + +func ObserveRequest(driver, operation string, d time.Duration) { + Register() + draRequestsTotal.WithLabelValues(driver, operation).Inc() + draRequestDurationSeconds.WithLabelValues(driver, operation).Observe(d.Seconds()) +} + +// IncNodePrepareError increments node_prepare_errors_total (GPU kubelet plugin). +func IncNodePrepareError(driver, errorType string) { + Register() + nodePrepareErrorsTotal.WithLabelValues(driver, errorType).Inc() +} + +// IncNodeUnprepareError increments node_unprepare_errors_total (GPU kubelet plugin). +func IncNodeUnprepareError(driver, errorType string) { + Register() + nodeUnprepareErrorsTotal.WithLabelValues(driver, errorType).Inc() +} + +// SetPreparedDevicesCounts sets prepared_devices gauges from a full snapshot (e.g. checkpoint-derived). +// Types that were previously non-zero but are absent or zero in counts are set back to 0. +func SetPreparedDevicesCounts(nodeName, driver, deviceType string, count int) { + Register() + draPreparedDevices.WithLabelValues(nodeName, driver, deviceType).Set(float64(count)) +} diff --git a/pkg/metrics/prometheus_httpserver.go b/pkg/metrics/prometheus_httpserver.go new file mode 100644 index 000000000..3f7265ac4 --- /dev/null +++ b/pkg/metrics/prometheus_httpserver.go @@ -0,0 +1,87 @@ +/* +Copyright The Kubernetes Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "context" + "errors" + "fmt" + "net" + "net/http" + "path" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "k8s.io/component-base/metrics/legacyregistry" + "k8s.io/klog/v2" +) + +// NewLegacyPrometheusHandler returns an HTTP handler that gathers metrics from +// k8s.io/component-base/metrics/legacyregistry together with instrumentation +// for the scrape itself (same pattern as Kubernetes component-base metrics). +func NewLegacyPrometheusHandler() http.Handler { + reg := prometheus.NewRegistry() + gatherers := prometheus.Gatherers{ + // Go runtime and process metrics, etc.: + // https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/component-base/metrics/legacyregistry/registry.go + legacyregistry.DefaultGatherer, + } + gatherers = append(gatherers, reg) + return promhttp.InstrumentMetricHandler( + reg, + promhttp.HandlerFor(gatherers, promhttp.HandlerOpts{})) +} + +// RunPrometheusMetricsServer listens on endpoint and serves Prometheus +// metrics until ctx is canceled. +func RunPrometheusMetricsServer(ctx context.Context, endpoint, metricsPath string) error { + if metricsPath == "" { + return nil + } + if endpoint == "" { + return fmt.Errorf("metrics endpoint is required when metrics path is set") + } + + mux := http.NewServeMux() + actualPath := path.Join("/", metricsPath) + mux.Handle(actualPath, NewLegacyPrometheusHandler()) + + listener, err := net.Listen("tcp", endpoint) + if err != nil { + return fmt.Errorf("listen on metrics endpoint: %w", err) + } + + server := &http.Server{Handler: mux} + go func() { + <-ctx.Done() + shutdownCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second) + defer cancel() + _ = server.Shutdown(shutdownCtx) + }() + + go func() { + klog.InfoS("Starting metrics HTTP server", "endpoint", endpoint, "path", actualPath) + err := server.Serve(listener) + if err != nil && !errors.Is(err, http.ErrServerClosed) { + klog.ErrorS(err, "metrics HTTP server failed") + klog.FlushAndExit(klog.ExitFlushTimeout, 1) + } + }() + + return nil +}