Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/compute-domain-controller/computedomain.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/klog/v2"

nvapi "sigs.k8s.io/nvidia-dra-driver-gpu/api/nvidia.com/resource/v1beta1"
"sigs.k8s.io/nvidia-dra-driver-gpu/pkg/metrics"
nvinformers "sigs.k8s.io/nvidia-dra-driver-gpu/pkg/nvidia.com/informers/externalversions"
nvlisters "sigs.k8s.io/nvidia-dra-driver-gpu/pkg/nvidia.com/listers/resource/v1beta1"
)
Expand Down Expand Up @@ -207,6 +208,8 @@ func (m *ComputeDomainManager) UpdateStatus(ctx context.Context, cd *nvapi.Compu
// Recalculate global status based on current state
cd.Status.Status = m.calculateGlobalStatus(cd)

metrics.ObserveComputeDomainStatus(string(cd.UID), cd.Status.Status)

updatedCD, err := m.config.clientsets.Nvidia.ResourceV1beta1().ComputeDomains(cd.Namespace).UpdateStatus(ctx, cd, metav1.UpdateOptions{})
if err != nil {
return nil, err
Expand Down Expand Up @@ -344,6 +347,7 @@ func (m *ComputeDomainManager) onAddOrUpdate(ctx context.Context, obj any) error
return fmt.Errorf("error removing finalizer: %w", err)
}

metrics.ForgetComputeDomain(string(cd.UID))
return nil
}

Expand Down
9 changes: 9 additions & 0 deletions cmd/compute-domain-controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ type ManagerConfig struct {
// logVerbosityCDDaemon controls the log verbosity for dynamically launched
// ComputeDomain daemons.
logVerbosityCDDaemon int

// httpEndpoint is the TCP network address where the HTTP server for diagnostics
// (including pprof and metrics) will listen
httpEndpoint string

// metricsPath is the HTTP path for Prometheus metrics
metricsPath string
}

// Controller manages the lifecycle of the DRA driver and its components.
Expand Down Expand Up @@ -84,6 +91,8 @@ func (c *Controller) Run(ctx context.Context) error {
clientsets: c.config.clientsets,
workQueue: workQueue,
logVerbosityCDDaemon: c.config.flags.logVerbosityCDDaemon,
httpEndpoint: c.config.flags.httpEndpoint,
metricsPath: c.config.flags.metricsPath,
}

// TODO: log full, nested cliFlags structure.
Expand Down
21 changes: 2 additions & 19 deletions cmd/compute-domain-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,12 @@ import (
"syscall"

"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/urfave/cli/v2"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/component-base/logs"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/klog/v2"

_ "k8s.io/component-base/metrics/prometheus/restclient" // for client metric registration
Expand All @@ -47,6 +44,7 @@ import (
"sigs.k8s.io/nvidia-dra-driver-gpu/internal/info"
"sigs.k8s.io/nvidia-dra-driver-gpu/pkg/featuregates"
pkgflags "sigs.k8s.io/nvidia-dra-driver-gpu/pkg/flags"
"sigs.k8s.io/nvidia-dra-driver-gpu/pkg/metrics"
)

const (
Expand Down Expand Up @@ -371,24 +369,9 @@ func runWithLeaderElection(ctx context.Context, config *Config, controller *Cont

func SetupHTTPEndpoint(config *Config) error {
if config.flags.metricsPath != "" {
// To collect metrics data from the metric handler itself, we
// let it register itself and then collect from that registry.
reg := prometheus.NewRegistry()
gatherers := prometheus.Gatherers{
// Include Go runtime and process metrics:
// https://github.com/kubernetes/kubernetes/blob/9780d88cb6a4b5b067256ecb4abf56892093ee87/staging/src/k8s.io/component-base/metrics/legacyregistry/registry.go#L46-L49
legacyregistry.DefaultGatherer,
}
gatherers = append(gatherers, reg)

actualPath := path.Join("/", config.flags.metricsPath)
klog.InfoS("Starting metrics", "path", actualPath)
// This is similar to k8s.io/component-base/metrics HandlerWithReset
// except that we gather from multiple sources.
config.mux.Handle(actualPath,
promhttp.InstrumentMetricHandler(
reg,
promhttp.HandlerFor(gatherers, promhttp.HandlerOpts{})))
config.mux.Handle(path.Join("/", config.flags.metricsPath), metrics.NewLegacyPrometheusHandler())
}

if config.flags.profilePath != "" {
Expand Down
6 changes: 6 additions & 0 deletions cmd/compute-domain-daemon/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type DaemonInfoManager interface {

// ManagerConfig holds the configuration for the compute domain manager.
type ManagerConfig struct {
httpEndpoint string
metricsPath string
workQueue *workqueue.WorkQueue
clientsets flags.ClientSets
nodeName string
Expand All @@ -52,6 +54,8 @@ type ManagerConfig struct {

// ControllerConfig holds the configuration for the controller.
type ControllerConfig struct {
httpEndpoint string
metricsPath string
clientsets flags.ClientSets
nodeName string
computeDomainUUID string
Expand All @@ -77,6 +81,8 @@ func NewController(config *ControllerConfig) (*Controller, error) {

mc := &ManagerConfig{
workQueue: workQueue,
httpEndpoint: config.httpEndpoint,
metricsPath: config.metricsPath,
clientsets: config.clientsets,
nodeName: config.nodeName,
computeDomainUUID: config.computeDomainUUID,
Expand Down
4 changes: 4 additions & 0 deletions cmd/compute-domain-daemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ type Flags struct {
podName string
podNamespace string
maxNodesPerIMEXDomain int
httpEndpoint string
metricsPath string
klogVerbosity int
}

Expand Down Expand Up @@ -244,6 +246,8 @@ func run(ctx context.Context, cancel context.CancelFunc, flags *Flags) error {
}

config := &ControllerConfig{
httpEndpoint: flags.httpEndpoint,
metricsPath: flags.metricsPath,
clientsets: clientsets,
cliqueID: flags.cliqueID,
computeDomainUUID: flags.computeDomainUUID,
Expand Down
44 changes: 43 additions & 1 deletion cmd/compute-domain-kubelet-plugin/device_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
configapi "sigs.k8s.io/nvidia-dra-driver-gpu/api/nvidia.com/resource/v1beta1"
"sigs.k8s.io/nvidia-dra-driver-gpu/internal/common"
"sigs.k8s.io/nvidia-dra-driver-gpu/pkg/featuregates"
drametrics "sigs.k8s.io/nvidia-dra-driver-gpu/pkg/metrics"
)

type OpaqueDeviceConfig struct {
Expand Down Expand Up @@ -132,6 +133,11 @@ func NewDeviceState(ctx context.Context, config *Config) (*DeviceState, error) {
for _, c := range checkpoints {
if c == DriverPluginCheckpointFileBasename {
klog.Infof("Found previous checkpoint: %s", c)
cp, err := state.getCheckpoint()
if err != nil {
return nil, fmt.Errorf("unable to get checkpoint: %w", err)
}
syncPreparedDevicesGaugeFromCheckpoint(config.flags.nodeName, cp)
return state, nil
}
}
Expand Down Expand Up @@ -288,7 +294,11 @@ func (s *DeviceState) Unprepare(ctx context.Context, claimRef kubeletplugin.Name
}

func (s *DeviceState) createCheckpoint(cp *Checkpoint) error {
return s.checkpointManager.CreateCheckpoint(DriverPluginCheckpointFileBasename, cp)
if err := s.checkpointManager.CreateCheckpoint(DriverPluginCheckpointFileBasename, cp); err != nil {
return err
}
syncPreparedDevicesGaugeFromCheckpoint(s.config.flags.nodeName, cp)
return nil
}

func (s *DeviceState) getCheckpoint() (*Checkpoint, error) {
Expand Down Expand Up @@ -317,6 +327,7 @@ func (s *DeviceState) updateCheckpoint(mutate func(*Checkpoint)) error {
return fmt.Errorf("unable to create checkpoint: %w", err)
}

syncPreparedDevicesGaugeFromCheckpoint(s.config.flags.nodeName, checkpoint)
return nil
}

Expand Down Expand Up @@ -825,3 +836,34 @@ func (s *DeviceState) validateNoOverlappingPreparedDevices(checkpoint *Checkpoin
}
return nil
}

func syncPreparedDevicesGaugeFromCheckpoint(nodeName string, cp *Checkpoint) {
counts := make(map[string]int)
if cp == nil {
return
}
lv := cp.ToLatestVersion()
if lv != nil && lv.V2 != nil {
for _, pc := range lv.V2.PreparedClaims {
if pc.CheckpointState != ClaimCheckpointStatePrepareCompleted {
continue
}
for _, g := range pc.PreparedDevices {
for _, dev := range g.Devices {
if _, ok := counts[dev.Type()]; !ok {
counts[dev.Type()] = 0
}
counts[dev.Type()]++
}
}
}
}

for _, dt := range []string{ComputeDomainChannelType, ComputeDomainDaemonType, UnknownDeviceType} {
if count, ok := counts[dt]; !ok {
drametrics.SetPreparedDevicesCounts(nodeName, DriverName, dt, 0)
} else {
drametrics.SetPreparedDevicesCounts(nodeName, DriverName, dt, count)
}
}
}
15 changes: 15 additions & 0 deletions cmd/compute-domain-kubelet-plugin/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"k8s.io/klog/v2"

"sigs.k8s.io/nvidia-dra-driver-gpu/pkg/flock"
drametrics "sigs.k8s.io/nvidia-dra-driver-gpu/pkg/metrics"
"sigs.k8s.io/nvidia-dra-driver-gpu/pkg/workqueue"
)

Expand Down Expand Up @@ -241,14 +242,19 @@ func (d *driver) HandleError(ctx context.Context, err error, msg string) {
// also reflect an error. Set the boolean to `true` for any result wrapping a
// non-retryable error.
func (d *driver) nodePrepareResource(ctx context.Context, claim *resourceapi.ResourceClaim) (bool, kubeletplugin.PrepareResult) {
t0 := time.Now()

release, err := d.pulock.Acquire(ctx, flock.WithTimeout(10*time.Second))
if err != nil {
drametrics.IncNodePrepareError(DriverName, "lock_acquire")
res := kubeletplugin.PrepareResult{
Err: fmt.Errorf("error acquiring prep/unprep lock: %w", err),
}
return false, res
}
defer release()
doneInFlight := drametrics.TrackInFlight(DriverName, "prepare")
defer doneInFlight()

if claim.Status.Allocation == nil {
res := kubeletplugin.PrepareResult{
Expand All @@ -259,6 +265,7 @@ func (d *driver) nodePrepareResource(ctx context.Context, claim *resourceapi.Res

devs, err := d.state.Prepare(ctx, claim)
if err != nil {
drametrics.IncNodePrepareError(DriverName, "prepare_devices")
res := kubeletplugin.PrepareResult{
Err: fmt.Errorf("error preparing devices for claim '%s': %w", ResourceClaimToString(claim), err),
}
Expand All @@ -270,24 +277,32 @@ func (d *driver) nodePrepareResource(ctx context.Context, claim *resourceapi.Res
}

klog.V(1).Infof("Prepared devices for claim '%s': %v", ResourceClaimToString(claim), devs)
drametrics.ObserveRequest(DriverName, "prepare", time.Since(t0))

return true, kubeletplugin.PrepareResult{Devices: devs}
}

// Return 2-tuple: the first value is a boolean indicating to the retry logic
// whether the work is 'done'.
func (d *driver) nodeUnprepareResource(ctx context.Context, claimRef kubeletplugin.NamespacedObject) (bool, error) {
tstart := time.Now()

release, err := d.pulock.Acquire(ctx, flock.WithTimeout(10*time.Second))
if err != nil {
drametrics.IncNodeUnprepareError(DriverName, "lock_acquire")
return false, fmt.Errorf("error acquiring prep/unprep lock: %w", err)
}
defer release()
doneInFlight := drametrics.TrackInFlight(DriverName, "unprepare")
defer doneInFlight()

if err := d.state.Unprepare(ctx, claimRef); err != nil {
drametrics.IncNodeUnprepareError(DriverName, "unprepare_devices")
return isPermanentError(err), fmt.Errorf("error unpreparing devices for claim '%v': %w", claimRef.String(), err)
}

klog.V(1).Infof("Unprepared devices for claim '%v'", claimRef.String())
drametrics.ObserveRequest(DriverName, "unprepare", time.Since(tstart))
return true, nil
}

Expand Down
25 changes: 23 additions & 2 deletions cmd/compute-domain-kubelet-plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ import (

"github.com/urfave/cli/v2"

"k8s.io/component-base/logs"
"k8s.io/dynamic-resource-allocation/kubeletplugin"
"k8s.io/klog/v2"

"k8s.io/component-base/logs"

"sigs.k8s.io/nvidia-dra-driver-gpu/internal/common"
"sigs.k8s.io/nvidia-dra-driver-gpu/internal/info"
"sigs.k8s.io/nvidia-dra-driver-gpu/pkg/featuregates"
pkgflags "sigs.k8s.io/nvidia-dra-driver-gpu/pkg/flags"
"sigs.k8s.io/nvidia-dra-driver-gpu/pkg/metrics"
)

const (
Expand All @@ -48,6 +48,8 @@ type Flags struct {
kubeClientConfig pkgflags.KubeClientConfig

nodeName string
httpEndpoint string
metricsPath string
namespace string
cdiRoot string
containerDriverRoot string
Expand Down Expand Up @@ -95,6 +97,19 @@ func newApp() *cli.App {
Destination: &flags.namespace,
EnvVars: []string{"NAMESPACE"},
},
&cli.StringFlag{
Name: "http-endpoint",
Usage: "The TCP network `address` where the metrics HTTP server will listen (example: `:8080`). The default is the empty string, which means the server is disabled.",
Destination: &flags.httpEndpoint,
EnvVars: []string{"HTTP_ENDPOINT"},
},
&cli.StringFlag{
Name: "metrics-path",
Usage: "The HTTP `path` where Prometheus metrics are exposed, disabled if empty.",
Value: "/metrics",
Destination: &flags.metricsPath,
EnvVars: []string{"METRICS_PATH"},
},
&cli.StringFlag{
Name: "cdi-root",
Usage: "Absolute path to the directory where CDI files will be generated.",
Expand Down Expand Up @@ -239,6 +254,12 @@ func RunPlugin(ctx context.Context, config *Config) error {
ctx, cancel := signal.NotifyContext(ctx, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
defer cancel()

if config.flags.httpEndpoint != "" {
if err := metrics.RunPrometheusMetricsServer(ctx, config.flags.httpEndpoint, config.flags.metricsPath); err != nil {
return fmt.Errorf("setup metrics endpoint: %w", err)
}
}

// Create and start the driver
driver, err := NewDriver(ctx, config)
if err != nil {
Expand Down
Loading
Loading