From fc2f10e2ef55d3fcbc799da2a27094b02bbae81d Mon Sep 17 00:00:00 2001 From: Shouren Yang Date: Wed, 13 May 2026 16:17:44 +0800 Subject: [PATCH 1/2] feat: add monitor sidecar for dra driver Signed-off-by: Shouren Yang --- AGENTS.md | 22 +- .../hami-dra-driver/templates/daemonset.yaml | 30 ++ chart/hami-dra-driver/values.yaml | 4 + cmd/hami-core-monitor/feedback.go | 137 +++++++ cmd/hami-core-monitor/main.go | 133 +++++++ cmd/hami-core-monitor/mapper.go | 338 ++++++++++++++++++ cmd/hami-core-monitor/metrics.go | 272 ++++++++++++++ deploy/container/Dockerfile | 2 + go.mod | 2 +- pkg/monitor/cache.go | 198 ++++++++++ pkg/monitor/spec_v0.go | 188 ++++++++++ pkg/monitor/spec_v1.go | 195 ++++++++++ pkg/monitor/usage.go | 50 +++ 13 files changed, 1569 insertions(+), 2 deletions(-) create mode 100644 cmd/hami-core-monitor/feedback.go create mode 100644 cmd/hami-core-monitor/main.go create mode 100644 cmd/hami-core-monitor/mapper.go create mode 100644 cmd/hami-core-monitor/metrics.go create mode 100644 pkg/monitor/cache.go create mode 100644 pkg/monitor/spec_v0.go create mode 100644 pkg/monitor/spec_v1.go create mode 100644 pkg/monitor/usage.go diff --git a/AGENTS.md b/AGENTS.md index 09adbe21..8eb9eec3 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -150,6 +150,22 @@ This section defines the architectural agents within the project for SDD. * Provides `NewMigSpecTupleFromCanonicalName` for parsing canonical device names (e.g., `gpu-1-mig-2g47gb-14-0`) back into `MigSpecTuple`. * **Constraints:** Used exclusively within the `DynamicMIG` feature gate code paths. +### 9. HAMi Core Monitor (Metrics & QoS Agent) +**Source:** `cmd/hami-core-monitor/` +* **Role:** Exports Prometheus GPU metrics and performs soft-QoS feedback for HAMi-Core virtualized workloads. +* **Responsibilities:** + * Scans `/vgpu/claims//*.cache` and `mmap(2)` reads the shared-memory usage structures written by `libvgpu.so`. + * Auto-detects v0 (`1197897` byte) and v1 (`majorVersion == 1`) cache formats, providing backward compatibility. + * Emits per-claim vGPU metrics with pod-aware labels via an informer-based `ClaimMapper` that watches node-scoped Pods and cluster-wide ResourceClaims. Only maps claims whose `DeviceClassName` is `hami-core-gpu.project-hami.io`. + * Emits host-level GPU metrics (`hami_host_gpu_memory_used_bytes`, `hami_host_gpu_utilization_ratio`) via NVML. + * Runs a periodic soft-QoS feedback loop (`watchAndFeedback`) that inspects claim cache utilization and blocks lower-priority GPU tasks when contention is detected. + * Serves metrics on `:9394/metrics` through a dedicated Prometheus registry (`prometheus.NewRegistry`) with modern metric names only (legacy names removed). +* **Constraints:** + * Must run on the same node as the workloads it monitors (DaemonSet sidecar in the kubelet plugin pod). + * Requires `host-vgpu` and `host-tmp` volume mounts for cache access. + * Gracefully degradates to `"unknown"` pod labels when Kubernetes API is unreachable. + * No legacy metric format support — targets HAMi ≥ v2.9.0. + --- ## Part 3: Feature Gate Registry @@ -225,6 +241,7 @@ The project produces a single distroless-based container image that bundles all | Path in Image | Source Stage | Purpose | |---|---|---| | `/usr/bin/hami-kubelet-plugin` | `build` | Main Driver Agent binary. | +| `/usr/bin/hami-core-monitor` | `build` | GPU monitor and Prometheus metrics exporter for HAMi-Core. | | `/usr/local/lib/hami/libvgpu.so` | `hami-core-build` | Enforcement library injected into containers. | | `/usr/local/lib/hami/ld.so.preload` | `hami-core-build` | Preload config that activates `libvgpu.so` in containers. | | `/usr/bin/vgpu-init.sh` | `hami-core-build` | Node-level initialization script for vGPU. | @@ -246,11 +263,13 @@ helm install hami-dra-driver ./chart/hami-dra-driver \ ``` Key templates: -- `daemonset.yaml` — Deploys the kubelet plugin DaemonSet. +- `daemonset.yaml` — Deploys the kubelet plugin DaemonSet; conditionally injects the `hami-core-monitor` sidecar when `monitor.enabled=true`. - `rbac-kubeletplugin.yaml.yaml` — RBAC including granular DRA status authorization rules. - `deviceclass-hami-gpu.yaml` — The `DeviceClass` for `hami-core-gpu.project-hami.io`. - `validation.yaml` — Helm validation hooks. +The `monitor.enabled` value (default `false`) controls whether the metrics sidecar is rendered. When enabled, the sidecar mounts `host-vgpu` and `host-tmp` and exposes port `9394`. The kubelet plugin itself does not expose metrics directly — the monitor container handles all metric scraping. + ### 4. Build Commands The build is orchestrated via `Makefile` (top-level) and `deploy/container/Makefile` (image builds). @@ -291,3 +310,4 @@ make -f deploy/container/Makefile build BUILD_MULTI_ARCH_IMAGES=true PUSH_ON_BUI | `0d0d90a` | feat: Support install with helm chart | Added `chart/hami-dra-driver/` for Helm-based cluster deployment. | | `a2ad09e` | fix: inject failed for hami-gpu | Prepare logic bypasses overlap validation and partial-rollback when `HAMiCoreSupport` is enabled; completed claims are non-idempotent. | | `6841f23` | fix: invalide featuregates | `pkg/flags/` package extracted for reusable CLI flags (`FeatureGateConfig`, `LoggingConfig`, `KubeClientConfig`); `ComputeDomainCliques` default changed to `false`. | +| `HEAD` | feat: add hami-core-monitor | Added `cmd/hami-core-monitor/` as a standalone metrics exporter and soft-QoS agent with `pkg/monitor/` shared-memory cache reader. Helm chart supports `monitor.enabled` toggle. Legacy metrics removed. | diff --git a/chart/hami-dra-driver/templates/daemonset.yaml b/chart/hami-dra-driver/templates/daemonset.yaml index 6e455d44..eae6a2eb 100644 --- a/chart/hami-dra-driver/templates/daemonset.yaml +++ b/chart/hami-dra-driver/templates/daemonset.yaml @@ -176,6 +176,36 @@ spec: mountPath: /proc/ mountPropagation: Bidirectional {{- end }} + {{- if .Values.monitor.enabled }} + - name: monitor + image: {{ include "hami-dra-driver.fullimage" . }} + imagePullPolicy: {{ .Values.image.pullPolicy }} + securityContext: + privileged: true + command: ["hami-core-monitor"] + args: + - "--node-name=$(NODE_NAME)" + - "--hook-path={{ .Values.driver.vgpuInitPath }}" + - "--bind-address=:9394" + env: + - name: NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName + {{- with .Values.monitor.resources }} + resources: + {{- toYaml . | nindent 10 }} + {{- end }} + ports: + - name: metrics + containerPort: 9394 + protocol: TCP + volumeMounts: + - name: host-vgpu + mountPath: {{ .Values.driver.vgpuInitPath | quote }} + - name: host-tmp + mountPath: {{ .Values.driver.hostTmp | quote }} + {{- end }} volumes: - name: plugins-registry hostPath: diff --git a/chart/hami-dra-driver/values.yaml b/chart/hami-dra-driver/values.yaml index 427786b0..39bc16f8 100644 --- a/chart/hami-dra-driver/values.yaml +++ b/chart/hami-dra-driver/values.yaml @@ -78,6 +78,10 @@ featureGates: {} # 0 = errors/warnings/info only; higher numbers increase verbosity. logVerbosity: "4" +monitor: + enabled: false + resources: {} + kubeletPlugin: priorityClassName: "system-node-critical" updateStrategy: diff --git a/cmd/hami-core-monitor/feedback.go b/cmd/hami-core-monitor/feedback.go new file mode 100644 index 00000000..b07cae1e --- /dev/null +++ b/cmd/hami-core-monitor/feedback.go @@ -0,0 +1,137 @@ +/* + * Copyright 2025 The HAMi Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/Project-HAMi/k8s-dra-driver/pkg/monitor" + + "k8s.io/klog/v2" +) + +// utilizationPerDevice tracks how many processes with recent kernels are on a +// given physical GPU for a given priority class. +type utilizationPerDevice struct { + count uint64 +} + +// watchAndFeedback runs a ticker-driven loop that periodically rescans the +// claim cache directory and applies the soft-QoS feedback rules. +func watchAndFeedback(ctx context.Context, lister *monitor.ClaimLister, interval time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := lister.Update(); err != nil { + klog.V(3).ErrorS(err, "Failed to update claim lister") + } + observe(lister) + } + } +} + +// observe evaluates every active claim, decrements the recent-kernel counters, +// and mutates utilizationSwitch / recentKernel based on GPU-level contention. +func observe(lister *monitor.ClaimLister) { + claims := lister.ListClaims() + if len(claims) == 0 { + return + } + + // Aggregate active processes per (short-UUID, priority). + utSwitchOn := make(map[string]utilizationPerDevice) + + for _, c := range claims { + if c.Info == nil { + continue + } + recentKernel := c.Info.GetRecentKernel() + if recentKernel > 0 { + recentKernel-- + for i := 0; i < c.Info.DeviceNum(); i++ { + if !c.Info.IsValidUUID(i) { + continue + } + uuid := strings.Split(c.Info.DeviceUUID(i), "-")[0] + key := fmt.Sprintf("%s_%d", uuid, c.Info.GetPriority()) + utSwitchOn[key] = utilizationPerDevice{count: utSwitchOn[key].count + 1} + } + } + c.Info.SetRecentKernel(recentKernel) + } + + // Second pass: set blocking / priority flags. + for _, c := range claims { + if c.Info == nil { + continue + } + if checkBlocking(utSwitchOn, c.Info.GetPriority(), c.Info) { + c.Info.SetRecentKernel(-1) + } else if c.Info.GetRecentKernel() < 0 { + c.Info.SetRecentKernel(0) + } + if checkPriority(utSwitchOn, c.Info.GetPriority(), c.Info) { + c.Info.SetUtilizationSwitch(1) + } else { + c.Info.SetUtilizationSwitch(0) + } + } +} + +// checkBlocking returns true when another process with the same or higher +// priority has a recent kernel on any of the claim's devices. +func checkBlocking(utSwitchOn map[string]utilizationPerDevice, priority int, info monitor.UsageInfo) bool { + for i := 0; i < info.DeviceNum(); i++ { + if !info.IsValidUUID(i) { + continue + } + uuid := strings.Split(info.DeviceUUID(i), "-")[0] + for p := 0; p <= priority; p++ { + key := fmt.Sprintf("%s_%d", uuid, p) + if val, ok := utSwitchOn[key]; ok && val.count > 1 { + return true + } + } + } + return false +} + +// checkPriority returns true when any process with the same or higher priority +// is active on any of the claim's devices. +func checkPriority(utSwitchOn map[string]utilizationPerDevice, priority int, info monitor.UsageInfo) bool { + for i := 0; i < info.DeviceNum(); i++ { + if !info.IsValidUUID(i) { + continue + } + uuid := strings.Split(info.DeviceUUID(i), "-")[0] + for p := 0; p <= priority; p++ { + key := fmt.Sprintf("%s_%d", uuid, p) + if val, ok := utSwitchOn[key]; ok && val.count > 0 { + return true + } + } + } + return false +} diff --git a/cmd/hami-core-monitor/main.go b/cmd/hami-core-monitor/main.go new file mode 100644 index 00000000..bcec0b27 --- /dev/null +++ b/cmd/hami-core-monitor/main.go @@ -0,0 +1,133 @@ +/* + * Copyright 2025 The HAMi Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "context" + "fmt" + "net/http" + "os" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/urfave/cli/v2" + + "github.com/Project-HAMi/k8s-dra-driver/pkg/common" + pkgflags "github.com/Project-HAMi/k8s-dra-driver/pkg/flags" + "github.com/Project-HAMi/k8s-dra-driver/pkg/monitor" + "github.com/Project-HAMi/k8s-dra-driver/pkg/version" + + "k8s.io/klog/v2" +) + +type Flags struct { + kubeClientConfig pkgflags.KubeClientConfig + nodeName string + hookPath string + bindAddr string + feedbackInterval time.Duration +} + +func main() { + flags := &Flags{} + + app := &cli.App{ + Name: "hami-core-monitor", + Usage: "HAMi-Core GPU monitor and metrics exporter", + Version: version.Version(), + Flags: append(flags.kubeClientConfig.Flags(), + &cli.StringFlag{ + Name: "node-name", + Usage: "Name of the node this monitor is running on", + Destination: &flags.nodeName, + EnvVars: []string{"NODE_NAME"}, + }, + &cli.StringFlag{ + Name: "hook-path", + Usage: "Host path where vGPU hooks and claim caches are mounted", + Value: "/usr/local/vgpu", + Destination: &flags.hookPath, + }, + &cli.StringFlag{ + Name: "bind-address", + Usage: "The address the metric endpoint binds to", + Value: ":9394", + Destination: &flags.bindAddr, + }, + &cli.DurationFlag{ + Name: "feedback-interval", + Usage: "Interval between soft-QoS feedback evaluations", + Value: 5 * time.Second, + Destination: &flags.feedbackInterval, + }, + ), + Action: func(c *cli.Context) error { + return run(c.Context, flags) + }, + } + + if err := app.Run(os.Args); err != nil { + klog.Fatalf("Failed to run monitor: %v", err) + } +} + +func run(ctx context.Context, flags *Flags) error { + common.StartDebugSignalHandlers() + + if flags.nodeName == "" { + return fmt.Errorf("--node-name or NODE_NAME must be set") + } + if flags.feedbackInterval <= 0 { + return fmt.Errorf("feedback-interval must be positive") + } + + claimLister := monitor.NewClaimLister(flags.hookPath) + + var mapper *ClaimMapper + clientsets, err := flags.kubeClientConfig.NewClientSets() + if err != nil { + klog.ErrorS(err, "Failed to build Kubernetes clients; claim-to-pod mapping disabled") + } else { + mapper = NewClaimMapper(clientsets.Core, flags.nodeName) + go mapper.Start(ctx) + } + + reg := prometheus.NewRegistry() + collector := newCollector(claimLister, mapper) + reg.MustRegister(collector) + + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{})) + + srv := &http.Server{Addr: flags.bindAddr, Handler: mux} + go func() { + klog.InfoS("Starting metrics server", "addr", flags.bindAddr) + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + klog.ErrorS(err, "Metrics server failed") + } + }() + + go watchAndFeedback(ctx, claimLister, flags.feedbackInterval) + + <-ctx.Done() + klog.InfoS("Shutting down monitor") + + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + return srv.Shutdown(shutdownCtx) +} diff --git a/cmd/hami-core-monitor/mapper.go b/cmd/hami-core-monitor/mapper.go new file mode 100644 index 00000000..37ddc1e3 --- /dev/null +++ b/cmd/hami-core-monitor/mapper.go @@ -0,0 +1,338 @@ +/* + * Copyright 2025 The HAMi Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "context" + "sync" + "time" + + corev1 "k8s.io/api/core/v1" + resourcev1 "k8s.io/api/resource/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/informers" + corev1informer "k8s.io/client-go/informers/core/v1" + resourcev1informer "k8s.io/client-go/informers/resource/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" +) + +const ( + resyncPeriod = 30 * time.Minute + hamiDeviceClassName = "hami-core-gpu.project-hami.io" +) + +// PodRef holds pod metadata for a ResourceClaim. +type PodRef struct { + Namespace string + PodName string + Container string // empty if none, "_multiple" if >1 +} + +// ClaimMapper maintains an eventually-consistent mapping from ResourceClaim UID +// to the pod (and container) consuming it, driven by shared informers. +type ClaimMapper struct { + nodeName string + podInformer corev1informer.PodInformer + claimInformer resourcev1informer.ResourceClaimInformer + + mu sync.RWMutex + mapping map[string]PodRef // claimUID -> pod metadata +} + +// NewClaimMapper creates a mapper and wires shared informers. +func NewClaimMapper(client kubernetes.Interface, nodeName string) *ClaimMapper { + // Pod informer scoped to this node via field selector. + podFactory := informers.NewSharedInformerFactoryWithOptions( + client, + resyncPeriod, + informers.WithTweakListOptions(func(lo *metav1.ListOptions) { + lo.FieldSelector = "spec.nodeName=" + nodeName + }), + ) + + // Claim informer watches the whole cluster. + claimFactory := informers.NewSharedInformerFactory(client, resyncPeriod) + + return &ClaimMapper{ + nodeName: nodeName, + podInformer: podFactory.Core().V1().Pods(), + claimInformer: claimFactory.Resource().V1().ResourceClaims(), + mapping: make(map[string]PodRef), + } +} + +// Start runs both informers and blocks until ctx is cancelled. +func (m *ClaimMapper) Start(ctx context.Context) { + // Debounced full rebuild channel – fed by the periodic ticker only. + rebuildCh := make(chan struct{}, 1) + + // Wire event handlers for targeted partial updates. + _, _ = m.podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { m.upsertPod(obj.(*corev1.Pod)) }, + UpdateFunc: func(_, newObj any) { m.upsertPod(newObj.(*corev1.Pod)) }, + DeleteFunc: m.removePod, + }) + _, _ = m.claimInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { m.upsertClaim(obj.(*resourcev1.ResourceClaim)) }, + UpdateFunc: func(_, newObj any) { m.upsertClaim(newObj.(*resourcev1.ResourceClaim)) }, + DeleteFunc: m.removeClaim, + }) + + stopPod := make(chan struct{}) + stopClaim := make(chan struct{}) + go m.podInformer.Informer().Run(stopPod) + go m.claimInformer.Informer().Run(stopClaim) + defer close(stopPod) + defer close(stopClaim) + + if !cache.WaitForCacheSync(ctx.Done(), m.podInformer.Informer().HasSynced, m.claimInformer.Informer().HasSynced) { + klog.V(2).InfoS("ClaimMapper cache sync cancelled") + return + } + + // Initial full rebuild before any events arrive. + m.fullRebuild() + + // Periodic safety-net full rebuild. + ticker := time.NewTicker(resyncPeriod) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + select { + case rebuildCh <- struct{}{}: + default: + } + case <-rebuildCh: + m.fullRebuild() + } + } +} + +// Lookup returns pod metadata for a given claim UID. +func (m *ClaimMapper) Lookup(claimUID string) (PodRef, bool) { + m.mu.RLock() + defer m.mu.RUnlock() + ref, ok := m.mapping[claimUID] + return ref, ok +} + +// --------------------------------------------------------------------------- +// Partial (targeted) updates driven by informer events. +// --------------------------------------------------------------------------- + +// upsertPod adds or updates the mapping entries for every HAMi claim +// referenced by the given pod. Called on Pod Add/Update. +func (m *ClaimMapper) upsertPod(pod *corev1.Pod) { + m.mu.Lock() + defer m.mu.Unlock() + + for _, status := range pod.Status.ResourceClaimStatuses { + if status.ResourceClaimName == nil || *status.ResourceClaimName == "" { + continue + } + claim, err := m.claimInformer.Lister().ResourceClaims(pod.Namespace).Get(*status.ResourceClaimName) + if err != nil { + // Claim not in cache yet — will be fixed when the claim event arrives. + continue + } + if !isHamiClaim(claim) { + continue + } + m.mapping[string(claim.UID)] = m.makePodRef(pod, status.Name, *status.ResourceClaimName) + } +} + +// removePod deletes all mapping entries that belong to the given pod. +// Called on Pod Delete. +func (m *ClaimMapper) removePod(obj any) { + var pod *corev1.Pod + switch t := obj.(type) { + case *corev1.Pod: + pod = t + case cache.DeletedFinalStateUnknown: + pod = t.Obj.(*corev1.Pod) + default: + return + } + + m.mu.Lock() + defer m.mu.Unlock() + + for uid, ref := range m.mapping { + if ref.Namespace == pod.Namespace && ref.PodName == pod.Name { + delete(m.mapping, uid) + } + } +} + +// upsertClaim resolves the given claim back to a pod on this node and inserts +// the mapping entry. Called on ResourceClaim Add/Update. +func (m *ClaimMapper) upsertClaim(claim *resourcev1.ResourceClaim) { + if !isHamiClaim(claim) { + return + } + // Find the pod on this node that references this claim. + pods, err := m.podInformer.Lister().List(labels.Everything()) + if err != nil { + klog.V(3).ErrorS(err, "ClaimMapper: failed to list pods from cache") + return + } + for _, pod := range pods { + for _, status := range pod.Status.ResourceClaimStatuses { + if status.ResourceClaimName == nil || *status.ResourceClaimName == "" { + continue + } + if pod.Namespace == claim.Namespace && *status.ResourceClaimName == claim.Name { + m.mu.Lock() + m.mapping[string(claim.UID)] = m.makePodRef(pod, status.Name, claim.Name) + m.mu.Unlock() + return // DRA: a claim is bound to at most one pod. + } + } + } +} + +// removeClaim deletes the mapping entry for the given claim. +// Called on ResourceClaim Delete. +func (m *ClaimMapper) removeClaim(obj any) { + var claim *resourcev1.ResourceClaim + switch t := obj.(type) { + case *resourcev1.ResourceClaim: + claim = t + case cache.DeletedFinalStateUnknown: + claim = t.Obj.(*resourcev1.ResourceClaim) + default: + return + } + + m.mu.Lock() + delete(m.mapping, string(claim.UID)) + m.mu.Unlock() +} + +// --------------------------------------------------------------------------- +// Full rebuild (safety net, run once at startup and then periodically). +// --------------------------------------------------------------------------- + +func (m *ClaimMapper) fullRebuild() { + pods, err := m.podInformer.Lister().List(labels.Everything()) + if err != nil { + klog.V(3).ErrorS(err, "ClaimMapper: full rebuild failed listing pods") + return + } + + newMapping := make(map[string]PodRef, len(pods)) + for _, pod := range pods { + for _, status := range pod.Status.ResourceClaimStatuses { + if status.ResourceClaimName == nil || *status.ResourceClaimName == "" { + continue + } + claim, err := m.claimInformer.Lister().ResourceClaims(pod.Namespace).Get(*status.ResourceClaimName) + if err != nil { + continue + } + if !isHamiClaim(claim) { + continue + } + newMapping[string(claim.UID)] = m.makePodRef(pod, status.Name, *status.ResourceClaimName) + } + } + + m.mu.Lock() + m.mapping = newMapping + m.mu.Unlock() + + klog.V(4).InfoS("ClaimMapper full rebuild", "pods", len(pods), "claims", len(newMapping)) +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +func (m *ClaimMapper) makePodRef(pod *corev1.Pod, localClaimName, claimName string) PodRef { + containers := containersUsingClaim(pod, localClaimName) + containerName := "" + switch len(containers) { + case 1: + containerName = containers[0] + case 0: + containerName = "" + default: + containerName = "_multiple" + } + return PodRef{ + Namespace: pod.Namespace, + PodName: pod.Name, + Container: containerName, + } +} + +// isHamiClaim returns true if the claim is for the HAMi device class. +func isHamiClaim(claim *resourcev1.ResourceClaim) bool { + for _, req := range claim.Spec.Devices.Requests { + if req.Exactly != nil && req.Exactly.DeviceClassName == hamiDeviceClassName { + return true + } + for _, sub := range req.FirstAvailable { + if sub.DeviceClassName == hamiDeviceClassName { + return true + } + } + } + return false +} + +func containersUsingClaim(pod *corev1.Pod, localClaimName string) []string { + var names []string + for _, c := range pod.Spec.Containers { + for _, rc := range c.Resources.Claims { + if rc.Name == localClaimName { + names = append(names, c.Name) + break + } + } + } + for _, c := range pod.Spec.InitContainers { + for _, rc := range c.Resources.Claims { + if rc.Name == localClaimName { + names = append(names, c.Name) + break + } + } + } + return names +} + +var unknownPodRef = PodRef{Namespace: "unknown", PodName: "unknown", Container: "unknown"} + +func resolve(claimUID string, mapper *ClaimMapper) []string { + if mapper == nil { + return []string{unknownPodRef.Namespace, unknownPodRef.PodName, unknownPodRef.Container, claimUID} + } + ref, ok := mapper.Lookup(claimUID) + if !ok { + return []string{unknownPodRef.Namespace, unknownPodRef.PodName, unknownPodRef.Container, claimUID} + } + return []string{ref.Namespace, ref.PodName, ref.Container, claimUID} +} diff --git a/cmd/hami-core-monitor/metrics.go b/cmd/hami-core-monitor/metrics.go new file mode 100644 index 00000000..bf4e8f78 --- /dev/null +++ b/cmd/hami-core-monitor/metrics.go @@ -0,0 +1,272 @@ +/* + * Copyright 2025 The HAMi Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "fmt" + "time" + "unicode/utf8" + + "github.com/NVIDIA/go-nvml/pkg/nvml" + "github.com/prometheus/client_golang/prometheus" + + "github.com/Project-HAMi/k8s-dra-driver/pkg/monitor" + + "k8s.io/klog/v2" +) + +// --------------------------------------------------------------------------- +// Metric descriptors. +// Claim-level metrics carry: namespace, pod, container, claim_uid, +// vdevice_index, device_uuid. +// --------------------------------------------------------------------------- +var ( + hostGPUMemoryDesc = prometheus.NewDesc( + "hami_host_gpu_memory_used_bytes", + "GPU device memory usage in bytes", + []string{"device_index", "device_uuid", "device_type"}, nil, + ) + + hostGPUUtilizationDesc = prometheus.NewDesc( + "hami_host_gpu_utilization_ratio", + "GPU core utilization ratio (0-100)", + []string{"device_index", "device_uuid", "device_type"}, nil, + ) + + claimMemoryUsedDesc = prometheus.NewDesc( + "hami_vgpu_memory_used_bytes", + "vGPU device memory usage in bytes", + []string{"namespace", "pod", "container", "claim_uid", "vdevice_index", "device_uuid"}, nil, + ) + + claimMemoryLimitDesc = prometheus.NewDesc( + "hami_vgpu_memory_limit_bytes", + "vGPU device memory limit in bytes", + []string{"namespace", "pod", "container", "claim_uid", "vdevice_index", "device_uuid"}, nil, + ) + + claimDeviceMemoryDesc = prometheus.NewDesc( + "hami_container_device_memory_bytes", + "Container device memory usage breakdown in bytes", + []string{"namespace", "pod", "container", "claim_uid", "vdevice_index", "device_uuid", "context_size", "module_size", "buffer_size", "offset"}, nil, + ) + + claimDeviceUtilizationDesc = prometheus.NewDesc( + "hami_container_device_utilization_ratio", + "Container device SM utilization ratio", + []string{"namespace", "pod", "container", "claim_uid", "vdevice_index", "device_uuid"}, nil, + ) + + claimDeviceMemoryContextDesc = prometheus.NewDesc( + "hami_vgpu_memory_context_bytes", + "Container device memory context size in bytes", + []string{"namespace", "pod", "container", "claim_uid", "vdevice_index", "device_uuid"}, nil, + ) + + claimDeviceMemoryModuleDesc = prometheus.NewDesc( + "hami_vgpu_memory_module_bytes", + "Container device memory module size in bytes", + []string{"namespace", "pod", "container", "claim_uid", "vdevice_index", "device_uuid"}, nil, + ) + + claimDeviceMemoryBufferDesc = prometheus.NewDesc( + "hami_vgpu_memory_buffer_bytes", + "Container device memory buffer size in bytes", + []string{"namespace", "pod", "container", "claim_uid", "vdevice_index", "device_uuid"}, nil, + ) + + claimLastKernelDesc = prometheus.NewDesc( + "hami_container_last_kernel_elapsed_seconds", + "Seconds since last kernel execution in container", + []string{"namespace", "pod", "container", "claim_uid", "vdevice_index", "device_uuid"}, nil, + ) +) + +// --------------------------------------------------------------------------- +// Collector +// --------------------------------------------------------------------------- + +type collector struct { + claimLister *monitor.ClaimLister + mapper *ClaimMapper +} + +func newCollector(claimLister *monitor.ClaimLister, mapper *ClaimMapper) *collector { + return &collector{ + claimLister: claimLister, + mapper: mapper, + } +} + +// Describe implements prometheus.Collector. +func (c collector) Describe(ch chan<- *prometheus.Desc) { + ch <- hostGPUMemoryDesc + ch <- hostGPUUtilizationDesc + ch <- claimMemoryUsedDesc + ch <- claimMemoryLimitDesc + ch <- claimDeviceMemoryDesc + ch <- claimDeviceUtilizationDesc + ch <- claimDeviceMemoryContextDesc + ch <- claimDeviceMemoryModuleDesc + ch <- claimDeviceMemoryBufferDesc + ch <- claimLastKernelDesc +} + +// Collect implements prometheus.Collector. +func (c collector) Collect(ch chan<- prometheus.Metric) { + klog.V(4).InfoS("Starting metrics collection") + + if err := c.collectHostGPU(ch); err != nil { + klog.ErrorS(err, "Failed to collect host GPU metrics") + } + if err := c.collectClaims(ch); err != nil { + klog.ErrorS(err, "Failed to collect claim metrics") + } + + klog.V(4).InfoS("Finished metrics collection") +} + +// --------------------------------------------------------------------------- +// Host GPU metrics (via NVML). +// --------------------------------------------------------------------------- + +func (c collector) collectHostGPU(ch chan<- prometheus.Metric) error { + if ret := nvml.Init(); ret != nvml.SUCCESS { + return fmt.Errorf("nvml.Init: %s", nvml.ErrorString(ret)) + } + defer nvml.Shutdown() + + count, ret := nvml.DeviceGetCount() + if ret != nvml.SUCCESS { + return fmt.Errorf("nvml.DeviceGetCount: %s", nvml.ErrorString(ret)) + } + + for i := 0; i < count; i++ { + dev, ret := nvml.DeviceGetHandleByIndex(i) + if ret != nvml.SUCCESS { + klog.V(3).ErrorS(nil, "nvml.DeviceGetHandleByIndex", "index", i, "error", nvml.ErrorString(ret)) + continue + } + if err := c.collectHostDevice(ch, dev, i); err != nil { + klog.V(3).ErrorS(err, "Failed to collect metrics for host GPU", "index", i) + } + } + return nil +} + +func (c collector) collectHostDevice(ch chan<- prometheus.Metric, dev nvml.Device, index int) error { + uuid, ret := dev.GetUUID() + if ret != nvml.SUCCESS { + return fmt.Errorf("nvml.GetUUID: %s", nvml.ErrorString(ret)) + } + + name, ret := dev.GetName() + if ret != nvml.SUCCESS { + return fmt.Errorf("nvml.GetName: %s", nvml.ErrorString(ret)) + } + deviceType := "NVIDIA-" + name + idxStr := fmt.Sprint(index) + + // Memory. + mem, ret := dev.GetMemoryInfo() + if ret == nvml.ERROR_NOT_SUPPORTED { + klog.V(3).InfoS("Memory metrics not supported for device, skipping", "index", index) + } else if ret != nvml.SUCCESS { + return fmt.Errorf("nvml.GetMemoryInfo: %s", nvml.ErrorString(ret)) + } else { + ch <- prometheus.MustNewConstMetric(hostGPUMemoryDesc, prometheus.GaugeValue, float64(mem.Used), idxStr, uuid, deviceType) + } + + // Utilization. + util, ret := dev.GetUtilizationRates() + if ret != nvml.SUCCESS { + return fmt.Errorf("nvml.GetUtilizationRates: %s", nvml.ErrorString(ret)) + } + ch <- prometheus.MustNewConstMetric(hostGPUUtilizationDesc, prometheus.GaugeValue, float64(util.Gpu), idxStr, uuid, deviceType) + + return nil +} + +// --------------------------------------------------------------------------- +// Claim-level metrics (via mmaped shared memory caches). +// --------------------------------------------------------------------------- + +func (c collector) collectClaims(ch chan<- prometheus.Metric) error { + claims := c.claimLister.ListClaims() + nowSec := time.Now().Unix() + + for _, claim := range claims { + if claim.Info == nil { + continue + } + if err := c.collectClaim(ch, claim, nowSec); err != nil { + klog.V(3).ErrorS(err, "Failed to collect metrics for claim", "claimUID", claim.ClaimUID) + } + } + return nil +} + +func (c collector) collectClaim(ch chan<- prometheus.Metric, claim *monitor.ClaimUsage, nowSec int64) error { + info := claim.Info + baseLabels := resolve(claim.ClaimUID, c.mapper) + + for i := 0; i < info.DeviceNum(); i++ { + uuid := info.DeviceUUID(i) + if len(uuid) < 40 { + klog.V(5).InfoS("Skipping device with invalid UUID length", "claim", claim.ClaimUID, "index", i, "len", len(uuid)) + continue + } + uuid = uuid[:40] + if !utf8.ValidString(uuid) { + klog.V(5).InfoS("Skipping device with invalid UTF-8 UUID; cache may not be initialised yet", "claim", claim.ClaimUID, "index", i) + continue + } + + labels := append(baseLabels, fmt.Sprint(i), uuid) + + // Memory used and limit. + memTotal := info.DeviceMemoryTotal(i) + memLimit := info.DeviceMemoryLimit(i) + ch <- prometheus.MustNewConstMetric(claimMemoryUsedDesc, prometheus.GaugeValue, float64(memTotal), labels...) + ch <- prometheus.MustNewConstMetric(claimMemoryLimitDesc, prometheus.GaugeValue, float64(memLimit), labels...) + + // Breakdown. + memCtx := info.DeviceMemoryContextSize(i) + memMod := info.DeviceMemoryModuleSize(i) + memBuf := info.DeviceMemoryBufferSize(i) + memOffset := memTotal - memCtx - memMod - memBuf + breakdownLabels := append(labels, fmt.Sprint(memCtx), fmt.Sprint(memMod), fmt.Sprint(memBuf), fmt.Sprint(memOffset)) + ch <- prometheus.MustNewConstMetric(claimDeviceMemoryDesc, prometheus.GaugeValue, float64(memTotal), breakdownLabels...) + + // Context / module / buffer sub-metrics. + ch <- prometheus.MustNewConstMetric(claimDeviceMemoryContextDesc, prometheus.GaugeValue, float64(memCtx), labels...) + ch <- prometheus.MustNewConstMetric(claimDeviceMemoryModuleDesc, prometheus.GaugeValue, float64(memMod), labels...) + ch <- prometheus.MustNewConstMetric(claimDeviceMemoryBufferDesc, prometheus.GaugeValue, float64(memBuf), labels...) + + // SM utilization. + smUtil := info.DeviceSmUtil(i) + ch <- prometheus.MustNewConstMetric(claimDeviceUtilizationDesc, prometheus.GaugeValue, float64(smUtil), labels...) + + // Last kernel time. + lastKernelTime := info.LastKernelTime() + if lastKernelTime > 0 { + lastSec := max(nowSec-lastKernelTime, 0) + ch <- prometheus.MustNewConstMetric(claimLastKernelDesc, prometheus.GaugeValue, float64(lastSec), labels...) + } + } + return nil +} diff --git a/deploy/container/Dockerfile b/deploy/container/Dockerfile index a565a2a7..e074ae8c 100644 --- a/deploy/container/Dockerfile +++ b/deploy/container/Dockerfile @@ -142,6 +142,7 @@ COPY --from=build /build/scripts/unbind_from_driver.sh /usr/bin/unbind_fro COPY /hack/kubelet-plugin-prestart.sh /usr/bin/kubelet-plugin-prestart.sh COPY --from=build /artifacts/hami-kubelet-plugin /usr/bin/hami-kubelet-plugin +COPY --from=build /artifacts/hami-core-monitor /usr/bin/hami-core-monitor COPY --from=hami-core-build /k8s-dra-driver/lib/local /usr/local COPY --from=hami-core-build /k8s-dra-driver/lib/nvidia/vgpu-init.sh /usr/bin/vgpu-init.sh @@ -152,6 +153,7 @@ USER root:root # Smoke-test executables (provide early build feedback). RUN ["/usr/bin/hami-kubelet-plugin", "--version"] +RUN ["/usr/bin/hami-core-monitor", "--version"] RUN ["/bin/bash", "--version"] ENTRYPOINT ["/bin/bash"] diff --git a/go.mod b/go.mod index 9493d1fc..299dae8a 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/NVIDIA/k8s-dra-driver-gpu v0.0.0-20260304152636-db70fc24dd3f github.com/NVIDIA/nvidia-container-toolkit v1.18.2 github.com/google/uuid v1.6.0 + github.com/prometheus/client_golang v1.23.2 github.com/sirupsen/logrus v1.9.4 github.com/spf13/pflag v1.0.10 github.com/stretchr/testify v1.11.1 @@ -58,7 +59,6 @@ require ( github.com/opencontainers/runtime-tools v0.9.1-0.20251114084447-edf4cb3d2116 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/prometheus/client_golang v1.23.2 // indirect github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.66.1 // indirect github.com/prometheus/procfs v0.16.1 // indirect diff --git a/pkg/monitor/cache.go b/pkg/monitor/cache.go new file mode 100644 index 00000000..abaa06a3 --- /dev/null +++ b/pkg/monitor/cache.go @@ -0,0 +1,198 @@ +/* + * Copyright 2025 The HAMi Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monitor + +import ( + "fmt" + "os" + "path/filepath" + "strings" + "sync" + "syscall" + "unsafe" + + "k8s.io/klog/v2" +) + +// ClaimUsage wraps a single mmaped cache file for a ResourceClaim. +type ClaimUsage struct { + ClaimUID string + CacheFile string + data []byte + Info UsageInfo +} + +// ClaimLister scans the host vGPU claims directory, discovers .cache +// files created by libvgpu.so, and mmap(2)s them for live access. +type ClaimLister struct { + hookPath string + claims map[string]*ClaimUsage // key: / + mutex sync.Mutex +} + +// NewClaimLister creates a new ClaimLister rooted at hookPath. +// The effective scan path is /vgpu/claims/. +func NewClaimLister(hookPath string) *ClaimLister { + return &ClaimLister{ + hookPath: hookPath, + claims: make(map[string]*ClaimUsage), + } +} + +// ListClaims returns a shallow copy of the currently known claims. +func (l *ClaimLister) ListClaims() map[string]*ClaimUsage { + l.mutex.Lock() + defer l.mutex.Unlock() + + out := make(map[string]*ClaimUsage, len(l.claims)) + for k, v := range l.claims { + out[k] = v + } + return out +} + +// Update rescans the claims directory, mmaping new cache files and +// munmapping ones that have disappeared. +func (l *ClaimLister) Update() error { + l.mutex.Lock() + defer l.mutex.Unlock() + + claimsPath := filepath.Join(l.hookPath, "vgpu", "claims") + entries, err := os.ReadDir(claimsPath) + if err != nil { + if os.IsNotExist(err) { + // No claims directory yet; clear everything. + l.cleanupAll() + return nil + } + return err + } + + seen := make(map[string]struct{}) + for _, entry := range entries { + if !entry.IsDir() { + continue + } + claimUID := entry.Name() + claimDir := filepath.Join(claimsPath, claimUID) + cacheFile, err := findCacheFile(claimDir) + if err != nil { + klog.V(4).InfoS("No cache file in claim dir", "dir", claimDir, "err", err) + continue + } + if cacheFile == "" { + continue + } + key := claimUID + "/" + filepath.Base(cacheFile) + seen[key] = struct{}{} + + if _, ok := l.claims[key]; ok { + continue + } + + usage, err := loadCache(claimUID, cacheFile) + if err != nil { + klog.ErrorS(err, "Failed to load cache", "file", cacheFile) + continue + } + l.claims[key] = usage + klog.V(3).InfoS("Loaded claim cache", "claim", claimUID, "file", cacheFile) + } + + // Remove disappeared claims. + for key, usage := range l.claims { + if _, ok := seen[key]; !ok { + _ = syscall.Munmap(usage.data) + delete(l.claims, key) + klog.V(3).InfoS("Removed claim cache", "key", key) + } + } + + return nil +} + +func (l *ClaimLister) cleanupAll() { + for key, usage := range l.claims { + _ = syscall.Munmap(usage.data) + delete(l.claims, key) + klog.V(3).InfoS("Cleaned up claim cache", "key", key) + } +} + +func findCacheFile(dir string) (string, error) { + files, err := os.ReadDir(dir) + if err != nil { + return "", err + } + for _, f := range files { + if f.IsDir() { + continue + } + if strings.HasSuffix(f.Name(), ".cache") { + return filepath.Join(dir, f.Name()), nil + } + } + return "", nil +} + +func loadCache(claimUID, cacheFile string) (*ClaimUsage, error) { + info, err := os.Stat(cacheFile) + if err != nil { + return nil, fmt.Errorf("stat cache file: %w", err) + } + minSize := int64(unsafe.Sizeof(headerT{})) + if info.Size() < minSize { + return nil, fmt.Errorf("cache file size %d too small (need >= %d)", info.Size(), minSize) + } + + f, err := os.OpenFile(cacheFile, os.O_RDWR, 0666) + if err != nil { + return nil, fmt.Errorf("open cache file: %w", err) + } + defer func() { _ = f.Close() }() + + data, err := syscall.Mmap(int(f.Fd()), 0, int(info.Size()), syscall.PROT_WRITE|syscall.PROT_READ, syscall.MAP_SHARED) + if err != nil { + return nil, fmt.Errorf("mmap cache file: %w", err) + } + + head := (*headerT)(unsafe.Pointer(&data[0])) + if head.initializedFlag != SharedRegionMagicFlag { + _ = syscall.Munmap(data) + return nil, fmt.Errorf("cache file magic flag not matched: got %d, want %d", head.initializedFlag, SharedRegionMagicFlag) + } + + var usageInfo UsageInfo + switch { + case info.Size() == 1197897: + klog.V(4).InfoS("Detected v0 cache file", "file", cacheFile) + usageInfo = castV0Spec(data) + case head.majorVersion == 1: + klog.V(4).InfoS("Detected v1 cache file", "file", cacheFile, "minorVersion", head.minorVersion) + usageInfo = castV1Spec(data) + default: + _ = syscall.Munmap(data) + return nil, fmt.Errorf("unknown cache version %d.%d size %d", head.majorVersion, head.minorVersion, info.Size()) + } + + return &ClaimUsage{ + ClaimUID: claimUID, + CacheFile: cacheFile, + data: data, + Info: usageInfo, + }, nil +} diff --git a/pkg/monitor/spec_v0.go b/pkg/monitor/spec_v0.go new file mode 100644 index 00000000..7bde351a --- /dev/null +++ b/pkg/monitor/spec_v0.go @@ -0,0 +1,188 @@ +/* + * Copyright 2024-2025 The HAMi Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monitor + +import "unsafe" + +const v0MaxDevices = 16 + +type v0DeviceMemory struct { + contextSize uint64 + moduleSize uint64 + bufferSize uint64 + offset uint64 + total uint64 +} + +type v0DeviceUtilization struct { + decUtil uint64 + encUtil uint64 + smUtil uint64 +} + +type v0ShrregProcSlotT struct { + pid int32 + hostpid int32 + used [16]v0DeviceMemory + monitorused [16]uint64 + deviceUtil [16]v0DeviceUtilization + status int32 +} + +type v0UUID struct { + uuid [96]byte +} + +type v0SemT struct { + sem [32]byte +} + +type v0SharedRegionT struct { + initializedFlag int32 + smInitFlag int32 + ownerPid uint32 + sem v0SemT + num uint64 + uuids [16]v0UUID + + limit [16]uint64 + smLimit [16]uint64 + procs [1024]v0ShrregProcSlotT + + procnum int32 + utilizationSwitch int32 + recentKernel int32 + priority int32 +} + +type v0Spec struct { + sr *v0SharedRegionT +} + +func (s v0Spec) DeviceMax() int { + return v0MaxDevices +} + +func (s v0Spec) DeviceNum() int { + return int(s.sr.num) +} + +func (s v0Spec) DeviceMemoryContextSize(idx int) uint64 { + var v uint64 + for _, p := range s.sr.procs { + v += p.used[idx].contextSize + } + return v +} + +func (s v0Spec) DeviceMemoryModuleSize(idx int) uint64 { + var v uint64 + for _, p := range s.sr.procs { + v += p.used[idx].moduleSize + } + return v +} + +func (s v0Spec) DeviceMemoryBufferSize(idx int) uint64 { + var v uint64 + for _, p := range s.sr.procs { + v += p.used[idx].bufferSize + } + return v +} + +func (s v0Spec) DeviceMemoryOffset(idx int) uint64 { + var v uint64 + for _, p := range s.sr.procs { + v += p.used[idx].offset + } + return v +} + +func (s v0Spec) DeviceMemoryTotal(idx int) uint64 { + var v uint64 + for _, p := range s.sr.procs { + v += p.used[idx].total + } + return v +} + +func (s v0Spec) DeviceSmUtil(idx int) uint64 { + var v uint64 + for _, p := range s.sr.procs { + v += p.deviceUtil[idx].smUtil + } + return v +} + +func (s v0Spec) SetDeviceSmLimit(l uint64) { + var idx uint64 + for idx < s.sr.num { + s.sr.smLimit[idx] = l + idx++ + } +} + +func (s v0Spec) IsValidUUID(idx int) bool { + return s.sr.uuids[idx].uuid[0] != 0 +} + +func (s v0Spec) DeviceUUID(idx int) string { + return string(s.sr.uuids[idx].uuid[:]) +} + +func (s v0Spec) DeviceMemoryLimit(idx int) uint64 { + return s.sr.limit[idx] +} + +func (s v0Spec) SetDeviceMemoryLimit(l uint64) { + var idx uint64 + for idx < s.sr.num { + s.sr.limit[idx] = l + idx++ + } +} + +func (s v0Spec) LastKernelTime() int64 { + return 0 +} + +func castV0Spec(data []byte) UsageInfo { + return v0Spec{ + sr: (*v0SharedRegionT)(unsafe.Pointer(&data[0])), + } +} + +func (s v0Spec) GetPriority() int { + return int(s.sr.priority) +} + +func (s v0Spec) GetRecentKernel() int32 { + return s.sr.recentKernel +} + +func (s v0Spec) SetRecentKernel(v int32) { + s.sr.recentKernel = v +} + +func (s v0Spec) GetUtilizationSwitch() int32 { + return s.sr.utilizationSwitch +} + +func (s v0Spec) SetUtilizationSwitch(v int32) { + s.sr.utilizationSwitch = v +} diff --git a/pkg/monitor/spec_v1.go b/pkg/monitor/spec_v1.go new file mode 100644 index 00000000..8897fe0f --- /dev/null +++ b/pkg/monitor/spec_v1.go @@ -0,0 +1,195 @@ +/* + * Copyright 2024-2025 The HAMi Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monitor + +import "unsafe" + +const v1MaxDevices = 16 + +type v1DeviceMemory struct { + contextSize uint64 + moduleSize uint64 + bufferSize uint64 + offset uint64 + total uint64 + unused [3]uint64 +} + +type v1DeviceUtilization struct { + decUtil uint64 + encUtil uint64 + smUtil uint64 + unused [3]uint64 +} + +type v1ShrregProcSlotT struct { + pid int32 + hostpid int32 + used [16]v1DeviceMemory + monitorused [16]uint64 + deviceUtil [16]v1DeviceUtilization + status int32 + unused [3]uint64 +} + +type v1UUID struct { + uuid [96]byte +} + +type v1SemT struct { + sem [32]byte +} + +type v1SharedRegionT struct { + initializedFlag int32 + majorVersion int32 + minorVersion int32 + smInitFlag int32 + ownerPid uint32 + sem v1SemT + num uint64 + uuids [16]v1UUID + + limit [16]uint64 + smLimit [16]uint64 + procs [1024]v1ShrregProcSlotT + + procnum int32 + utilizationSwitch int32 + recentKernel int32 + priority int32 + lastKernelTime int64 + unused [4]uint64 +} + +type v1Spec struct { + sr *v1SharedRegionT +} + +func (s v1Spec) DeviceMax() int { + return v1MaxDevices +} + +func (s v1Spec) DeviceNum() int { + return int(s.sr.num) +} + +func (s v1Spec) DeviceMemoryContextSize(idx int) uint64 { + var v uint64 + for _, p := range s.sr.procs[:int(s.sr.procnum)] { + v += p.used[idx].contextSize + } + return v +} + +func (s v1Spec) DeviceMemoryModuleSize(idx int) uint64 { + var v uint64 + for _, p := range s.sr.procs[:int(s.sr.procnum)] { + v += p.used[idx].moduleSize + } + return v +} + +func (s v1Spec) DeviceMemoryBufferSize(idx int) uint64 { + var v uint64 + for _, p := range s.sr.procs[:int(s.sr.procnum)] { + v += p.used[idx].bufferSize + } + return v +} + +func (s v1Spec) DeviceMemoryOffset(idx int) uint64 { + var v uint64 + for _, p := range s.sr.procs[:int(s.sr.procnum)] { + v += p.used[idx].offset + } + return v +} + +func (s v1Spec) DeviceMemoryTotal(idx int) uint64 { + var v uint64 + for _, p := range s.sr.procs[:int(s.sr.procnum)] { + v += p.used[idx].total + } + return v +} + +func (s v1Spec) DeviceSmUtil(idx int) uint64 { + var v uint64 + for _, p := range s.sr.procs[:int(s.sr.procnum)] { + v += p.deviceUtil[idx].smUtil + } + return v +} + +func (s v1Spec) SetDeviceSmLimit(l uint64) { + var idx uint64 + for idx < s.sr.num { + s.sr.smLimit[idx] = l + idx++ + } +} + +func (s v1Spec) IsValidUUID(idx int) bool { + return s.sr.uuids[idx].uuid[0] != 0 +} + +func (s v1Spec) DeviceUUID(idx int) string { + return string(s.sr.uuids[idx].uuid[:]) +} + +func (s v1Spec) DeviceMemoryLimit(idx int) uint64 { + return s.sr.limit[idx] +} + +func (s v1Spec) SetDeviceMemoryLimit(l uint64) { + var idx uint64 + for idx < s.sr.num { + s.sr.limit[idx] = l + idx++ + } +} + +func (s v1Spec) LastKernelTime() int64 { + return s.sr.lastKernelTime +} + +func castV1Spec(data []byte) UsageInfo { + return v1Spec{ + sr: (*v1SharedRegionT)(unsafe.Pointer(&data[0])), + } +} + +func (s v1Spec) GetPriority() int { + return int(s.sr.priority) +} + +func (s v1Spec) GetRecentKernel() int32 { + return s.sr.recentKernel +} + +func (s v1Spec) SetRecentKernel(v int32) { + s.sr.recentKernel = v +} + +func (s v1Spec) GetUtilizationSwitch() int32 { + return s.sr.utilizationSwitch +} + +func (s v1Spec) SetUtilizationSwitch(v int32) { + s.sr.utilizationSwitch = v +} diff --git a/pkg/monitor/usage.go b/pkg/monitor/usage.go new file mode 100644 index 00000000..bf69e9ba --- /dev/null +++ b/pkg/monitor/usage.go @@ -0,0 +1,50 @@ +/* + * Copyright 2025 The HAMi Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package monitor + +// UsageInfo abstracts over v0/v1 shared-memory cache formats produced by +// libvgpu.so. All methods operate directly on the mmaped shared_region_t. +type UsageInfo interface { + DeviceMax() int + DeviceNum() int + DeviceMemoryContextSize(idx int) uint64 + DeviceMemoryModuleSize(idx int) uint64 + DeviceMemoryBufferSize(idx int) uint64 + DeviceMemoryOffset(idx int) uint64 + DeviceMemoryTotal(idx int) uint64 + DeviceSmUtil(idx int) uint64 + SetDeviceSmLimit(l uint64) + IsValidUUID(idx int) bool + DeviceUUID(idx int) string + DeviceMemoryLimit(idx int) uint64 + SetDeviceMemoryLimit(l uint64) + LastKernelTime() int64 + GetPriority() int + GetRecentKernel() int32 + SetRecentKernel(v int32) + GetUtilizationSwitch() int32 + SetUtilizationSwitch(v int32) +} + +// SharedRegionMagicFlag is the magic value stored in initializedFlag. +const SharedRegionMagicFlag = 19920718 + +type headerT struct { + initializedFlag int32 + majorVersion int32 + minorVersion int32 +} From 03eecfd8abccacc072ad9d1f03d2ac8940ed3275 Mon Sep 17 00:00:00 2001 From: Shouren Yang Date: Fri, 15 May 2026 23:21:18 +0800 Subject: [PATCH 2/2] feat: integrate with vGPUMonitor from HAMi image Signed-off-by: Shouren Yang --- AGENTS.md | 24 +- .../hami-dra-driver/templates/daemonset.yaml | 12 +- chart/hami-dra-driver/values.yaml | 1 + cmd/hami-core-monitor/feedback.go | 137 ------- cmd/hami-core-monitor/main.go | 133 ------- cmd/hami-core-monitor/mapper.go | 338 ------------------ cmd/hami-core-monitor/metrics.go | 272 -------------- cmd/hami-kubelet-plugin/device_state.go | 5 +- cmd/hami-kubelet-plugin/driver.go | 4 + cmd/hami-kubelet-plugin/hami_core.go | 170 +++++++-- cmd/hami-kubelet-plugin/main.go | 8 + deploy/container/Dockerfile | 9 +- deploy/container/Makefile | 1 + pkg/flags/featuregates_test.go | 3 +- pkg/monitor/cache.go | 198 ---------- pkg/monitor/spec_v0.go | 188 ---------- pkg/monitor/spec_v1.go | 195 ---------- pkg/monitor/usage.go | 50 --- versions.mk | 3 + 19 files changed, 197 insertions(+), 1554 deletions(-) delete mode 100644 cmd/hami-core-monitor/feedback.go delete mode 100644 cmd/hami-core-monitor/main.go delete mode 100644 cmd/hami-core-monitor/mapper.go delete mode 100644 cmd/hami-core-monitor/metrics.go delete mode 100644 pkg/monitor/cache.go delete mode 100644 pkg/monitor/spec_v0.go delete mode 100644 pkg/monitor/spec_v1.go delete mode 100644 pkg/monitor/usage.go diff --git a/AGENTS.md b/AGENTS.md index 8eb9eec3..ad780361 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -151,20 +151,20 @@ This section defines the architectural agents within the project for SDD. * **Constraints:** Used exclusively within the `DynamicMIG` feature gate code paths. ### 9. HAMi Core Monitor (Metrics & QoS Agent) -**Source:** `cmd/hami-core-monitor/` +**Source:** upstream `vGPUmonitor` binary (from `projecthami/hami:${HAMI_VGPUMONITOR_IMAGE}`) * **Role:** Exports Prometheus GPU metrics and performs soft-QoS feedback for HAMi-Core virtualized workloads. * **Responsibilities:** - * Scans `/vgpu/claims//*.cache` and `mmap(2)` reads the shared-memory usage structures written by `libvgpu.so`. + * Scans `/vgpu/containers/_/` for `.cache` files created by `libvgpu.so`. * Auto-detects v0 (`1197897` byte) and v1 (`majorVersion == 1`) cache formats, providing backward compatibility. - * Emits per-claim vGPU metrics with pod-aware labels via an informer-based `ClaimMapper` that watches node-scoped Pods and cluster-wide ResourceClaims. Only maps claims whose `DeviceClassName` is `hami-core-gpu.project-hami.io`. + * Emits per-container vGPU metrics with pod-aware labels. * Emits host-level GPU metrics (`hami_host_gpu_memory_used_bytes`, `hami_host_gpu_utilization_ratio`) via NVML. - * Runs a periodic soft-QoS feedback loop (`watchAndFeedback`) that inspects claim cache utilization and blocks lower-priority GPU tasks when contention is detected. - * Serves metrics on `:9394/metrics` through a dedicated Prometheus registry (`prometheus.NewRegistry`) with modern metric names only (legacy names removed). -* **Constraints:** - * Must run on the same node as the workloads it monitors (DaemonSet sidecar in the kubelet plugin pod). + * Applies soft-QoS feedback (`recentKernel`/`utilizationSwitch`) by reading/writing the mmaped shared-region. + * Serves metrics on `:9394/metrics`. +* **Constraints (DRA Mode):** + * Runs as a DaemonSet sidecar in the kubelet plugin pod. + * Activated by `DRA_MODE=true`; in this mode MIG metrics collection and stale-cache self-cleanup are disabled (the DRA driver owns lifecycle cleanup). + * Requires `HOOK_PATH` and `NODE_NAME` environment variables. * Requires `host-vgpu` and `host-tmp` volume mounts for cache access. - * Gracefully degradates to `"unknown"` pod labels when Kubernetes API is unreachable. - * No legacy metric format support — targets HAMi ≥ v2.9.0. --- @@ -241,7 +241,7 @@ The project produces a single distroless-based container image that bundles all | Path in Image | Source Stage | Purpose | |---|---|---| | `/usr/bin/hami-kubelet-plugin` | `build` | Main Driver Agent binary. | -| `/usr/bin/hami-core-monitor` | `build` | GPU monitor and Prometheus metrics exporter for HAMi-Core. | +| `/usr/bin/vGPUmonitor` | upstream HAMi image (`projecthami/hami:*`) | GPU monitor and Prometheus metrics exporter for HAMi-Core. | | `/usr/local/lib/hami/libvgpu.so` | `hami-core-build` | Enforcement library injected into containers. | | `/usr/local/lib/hami/ld.so.preload` | `hami-core-build` | Preload config that activates `libvgpu.so` in containers. | | `/usr/bin/vgpu-init.sh` | `hami-core-build` | Node-level initialization script for vGPU. | @@ -263,7 +263,7 @@ helm install hami-dra-driver ./chart/hami-dra-driver \ ``` Key templates: -- `daemonset.yaml` — Deploys the kubelet plugin DaemonSet; conditionally injects the `hami-core-monitor` sidecar when `monitor.enabled=true`. +- `daemonset.yaml` — Deploys the kubelet plugin DaemonSet; conditionally injects the `vGPUmonitor` sidecar when `monitor.enabled=true`. - `rbac-kubeletplugin.yaml.yaml` — RBAC including granular DRA status authorization rules. - `deviceclass-hami-gpu.yaml` — The `DeviceClass` for `hami-core-gpu.project-hami.io`. - `validation.yaml` — Helm validation hooks. @@ -310,4 +310,4 @@ make -f deploy/container/Makefile build BUILD_MULTI_ARCH_IMAGES=true PUSH_ON_BUI | `0d0d90a` | feat: Support install with helm chart | Added `chart/hami-dra-driver/` for Helm-based cluster deployment. | | `a2ad09e` | fix: inject failed for hami-gpu | Prepare logic bypasses overlap validation and partial-rollback when `HAMiCoreSupport` is enabled; completed claims are non-idempotent. | | `6841f23` | fix: invalide featuregates | `pkg/flags/` package extracted for reusable CLI flags (`FeatureGateConfig`, `LoggingConfig`, `KubeClientConfig`); `ComputeDomainCliques` default changed to `false`. | -| `HEAD` | feat: add hami-core-monitor | Added `cmd/hami-core-monitor/` as a standalone metrics exporter and soft-QoS agent with `pkg/monitor/` shared-memory cache reader. Helm chart supports `monitor.enabled` toggle. Legacy metrics removed. | +| `HEAD` | feat: add vGPUmonitor DRA support | Replaced `cmd/hami-core-monitor/` with upstream `vGPUmonitor`. DRA driver creates `_/.cache` layout. `HAMI_VGPUMONITOR_IMAGE` build-arg is configurable. | diff --git a/chart/hami-dra-driver/templates/daemonset.yaml b/chart/hami-dra-driver/templates/daemonset.yaml index eae6a2eb..90c2904c 100644 --- a/chart/hami-dra-driver/templates/daemonset.yaml +++ b/chart/hami-dra-driver/templates/daemonset.yaml @@ -128,6 +128,8 @@ spec: valueFrom: fieldRef: fieldPath: metadata.namespace + - name: HOOK_PATH + value: {{ .Values.driver.hostHookPath | quote }} - name: IMAGE_NAME value: {{ include "hami-dra-driver.fullimage" . }} {{- if .Values.nvidiaCDIHookPath }} @@ -182,16 +184,16 @@ spec: imagePullPolicy: {{ .Values.image.pullPolicy }} securityContext: privileged: true - command: ["hami-core-monitor"] - args: - - "--node-name=$(NODE_NAME)" - - "--hook-path={{ .Values.driver.vgpuInitPath }}" - - "--bind-address=:9394" + command: ["vGPUmonitor"] env: - name: NODE_NAME valueFrom: fieldRef: fieldPath: spec.nodeName + - name: HOOK_PATH + value: {{ .Values.driver.vgpuInitPath | quote }} + - name: DRA_MODE + value: "true" {{- with .Values.monitor.resources }} resources: {{- toYaml . | nindent 10 }} diff --git a/chart/hami-dra-driver/values.yaml b/chart/hami-dra-driver/values.yaml index 39bc16f8..59222c06 100644 --- a/chart/hami-dra-driver/values.yaml +++ b/chart/hami-dra-driver/values.yaml @@ -65,6 +65,7 @@ driver: cdiRoot: /var/run/cdi vgpuInitPath: /usr/local/vgpu hostTmp: /tmp + hostHookPath: /usr/local # Feature gates forwarded to the hami-kubelet-plugin binary as the # FEATURE_GATES environment variable. diff --git a/cmd/hami-core-monitor/feedback.go b/cmd/hami-core-monitor/feedback.go deleted file mode 100644 index b07cae1e..00000000 --- a/cmd/hami-core-monitor/feedback.go +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Copyright 2025 The HAMi Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package main - -import ( - "context" - "fmt" - "strings" - "time" - - "github.com/Project-HAMi/k8s-dra-driver/pkg/monitor" - - "k8s.io/klog/v2" -) - -// utilizationPerDevice tracks how many processes with recent kernels are on a -// given physical GPU for a given priority class. -type utilizationPerDevice struct { - count uint64 -} - -// watchAndFeedback runs a ticker-driven loop that periodically rescans the -// claim cache directory and applies the soft-QoS feedback rules. -func watchAndFeedback(ctx context.Context, lister *monitor.ClaimLister, interval time.Duration) { - ticker := time.NewTicker(interval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - if err := lister.Update(); err != nil { - klog.V(3).ErrorS(err, "Failed to update claim lister") - } - observe(lister) - } - } -} - -// observe evaluates every active claim, decrements the recent-kernel counters, -// and mutates utilizationSwitch / recentKernel based on GPU-level contention. -func observe(lister *monitor.ClaimLister) { - claims := lister.ListClaims() - if len(claims) == 0 { - return - } - - // Aggregate active processes per (short-UUID, priority). - utSwitchOn := make(map[string]utilizationPerDevice) - - for _, c := range claims { - if c.Info == nil { - continue - } - recentKernel := c.Info.GetRecentKernel() - if recentKernel > 0 { - recentKernel-- - for i := 0; i < c.Info.DeviceNum(); i++ { - if !c.Info.IsValidUUID(i) { - continue - } - uuid := strings.Split(c.Info.DeviceUUID(i), "-")[0] - key := fmt.Sprintf("%s_%d", uuid, c.Info.GetPriority()) - utSwitchOn[key] = utilizationPerDevice{count: utSwitchOn[key].count + 1} - } - } - c.Info.SetRecentKernel(recentKernel) - } - - // Second pass: set blocking / priority flags. - for _, c := range claims { - if c.Info == nil { - continue - } - if checkBlocking(utSwitchOn, c.Info.GetPriority(), c.Info) { - c.Info.SetRecentKernel(-1) - } else if c.Info.GetRecentKernel() < 0 { - c.Info.SetRecentKernel(0) - } - if checkPriority(utSwitchOn, c.Info.GetPriority(), c.Info) { - c.Info.SetUtilizationSwitch(1) - } else { - c.Info.SetUtilizationSwitch(0) - } - } -} - -// checkBlocking returns true when another process with the same or higher -// priority has a recent kernel on any of the claim's devices. -func checkBlocking(utSwitchOn map[string]utilizationPerDevice, priority int, info monitor.UsageInfo) bool { - for i := 0; i < info.DeviceNum(); i++ { - if !info.IsValidUUID(i) { - continue - } - uuid := strings.Split(info.DeviceUUID(i), "-")[0] - for p := 0; p <= priority; p++ { - key := fmt.Sprintf("%s_%d", uuid, p) - if val, ok := utSwitchOn[key]; ok && val.count > 1 { - return true - } - } - } - return false -} - -// checkPriority returns true when any process with the same or higher priority -// is active on any of the claim's devices. -func checkPriority(utSwitchOn map[string]utilizationPerDevice, priority int, info monitor.UsageInfo) bool { - for i := 0; i < info.DeviceNum(); i++ { - if !info.IsValidUUID(i) { - continue - } - uuid := strings.Split(info.DeviceUUID(i), "-")[0] - for p := 0; p <= priority; p++ { - key := fmt.Sprintf("%s_%d", uuid, p) - if val, ok := utSwitchOn[key]; ok && val.count > 0 { - return true - } - } - } - return false -} diff --git a/cmd/hami-core-monitor/main.go b/cmd/hami-core-monitor/main.go deleted file mode 100644 index bcec0b27..00000000 --- a/cmd/hami-core-monitor/main.go +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Copyright 2025 The HAMi Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package main - -import ( - "context" - "fmt" - "net/http" - "os" - "time" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/urfave/cli/v2" - - "github.com/Project-HAMi/k8s-dra-driver/pkg/common" - pkgflags "github.com/Project-HAMi/k8s-dra-driver/pkg/flags" - "github.com/Project-HAMi/k8s-dra-driver/pkg/monitor" - "github.com/Project-HAMi/k8s-dra-driver/pkg/version" - - "k8s.io/klog/v2" -) - -type Flags struct { - kubeClientConfig pkgflags.KubeClientConfig - nodeName string - hookPath string - bindAddr string - feedbackInterval time.Duration -} - -func main() { - flags := &Flags{} - - app := &cli.App{ - Name: "hami-core-monitor", - Usage: "HAMi-Core GPU monitor and metrics exporter", - Version: version.Version(), - Flags: append(flags.kubeClientConfig.Flags(), - &cli.StringFlag{ - Name: "node-name", - Usage: "Name of the node this monitor is running on", - Destination: &flags.nodeName, - EnvVars: []string{"NODE_NAME"}, - }, - &cli.StringFlag{ - Name: "hook-path", - Usage: "Host path where vGPU hooks and claim caches are mounted", - Value: "/usr/local/vgpu", - Destination: &flags.hookPath, - }, - &cli.StringFlag{ - Name: "bind-address", - Usage: "The address the metric endpoint binds to", - Value: ":9394", - Destination: &flags.bindAddr, - }, - &cli.DurationFlag{ - Name: "feedback-interval", - Usage: "Interval between soft-QoS feedback evaluations", - Value: 5 * time.Second, - Destination: &flags.feedbackInterval, - }, - ), - Action: func(c *cli.Context) error { - return run(c.Context, flags) - }, - } - - if err := app.Run(os.Args); err != nil { - klog.Fatalf("Failed to run monitor: %v", err) - } -} - -func run(ctx context.Context, flags *Flags) error { - common.StartDebugSignalHandlers() - - if flags.nodeName == "" { - return fmt.Errorf("--node-name or NODE_NAME must be set") - } - if flags.feedbackInterval <= 0 { - return fmt.Errorf("feedback-interval must be positive") - } - - claimLister := monitor.NewClaimLister(flags.hookPath) - - var mapper *ClaimMapper - clientsets, err := flags.kubeClientConfig.NewClientSets() - if err != nil { - klog.ErrorS(err, "Failed to build Kubernetes clients; claim-to-pod mapping disabled") - } else { - mapper = NewClaimMapper(clientsets.Core, flags.nodeName) - go mapper.Start(ctx) - } - - reg := prometheus.NewRegistry() - collector := newCollector(claimLister, mapper) - reg.MustRegister(collector) - - mux := http.NewServeMux() - mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{})) - - srv := &http.Server{Addr: flags.bindAddr, Handler: mux} - go func() { - klog.InfoS("Starting metrics server", "addr", flags.bindAddr) - if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { - klog.ErrorS(err, "Metrics server failed") - } - }() - - go watchAndFeedback(ctx, claimLister, flags.feedbackInterval) - - <-ctx.Done() - klog.InfoS("Shutting down monitor") - - shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - return srv.Shutdown(shutdownCtx) -} diff --git a/cmd/hami-core-monitor/mapper.go b/cmd/hami-core-monitor/mapper.go deleted file mode 100644 index 37ddc1e3..00000000 --- a/cmd/hami-core-monitor/mapper.go +++ /dev/null @@ -1,338 +0,0 @@ -/* - * Copyright 2025 The HAMi Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package main - -import ( - "context" - "sync" - "time" - - corev1 "k8s.io/api/core/v1" - resourcev1 "k8s.io/api/resource/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/client-go/informers" - corev1informer "k8s.io/client-go/informers/core/v1" - resourcev1informer "k8s.io/client-go/informers/resource/v1" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" - "k8s.io/klog/v2" -) - -const ( - resyncPeriod = 30 * time.Minute - hamiDeviceClassName = "hami-core-gpu.project-hami.io" -) - -// PodRef holds pod metadata for a ResourceClaim. -type PodRef struct { - Namespace string - PodName string - Container string // empty if none, "_multiple" if >1 -} - -// ClaimMapper maintains an eventually-consistent mapping from ResourceClaim UID -// to the pod (and container) consuming it, driven by shared informers. -type ClaimMapper struct { - nodeName string - podInformer corev1informer.PodInformer - claimInformer resourcev1informer.ResourceClaimInformer - - mu sync.RWMutex - mapping map[string]PodRef // claimUID -> pod metadata -} - -// NewClaimMapper creates a mapper and wires shared informers. -func NewClaimMapper(client kubernetes.Interface, nodeName string) *ClaimMapper { - // Pod informer scoped to this node via field selector. - podFactory := informers.NewSharedInformerFactoryWithOptions( - client, - resyncPeriod, - informers.WithTweakListOptions(func(lo *metav1.ListOptions) { - lo.FieldSelector = "spec.nodeName=" + nodeName - }), - ) - - // Claim informer watches the whole cluster. - claimFactory := informers.NewSharedInformerFactory(client, resyncPeriod) - - return &ClaimMapper{ - nodeName: nodeName, - podInformer: podFactory.Core().V1().Pods(), - claimInformer: claimFactory.Resource().V1().ResourceClaims(), - mapping: make(map[string]PodRef), - } -} - -// Start runs both informers and blocks until ctx is cancelled. -func (m *ClaimMapper) Start(ctx context.Context) { - // Debounced full rebuild channel – fed by the periodic ticker only. - rebuildCh := make(chan struct{}, 1) - - // Wire event handlers for targeted partial updates. - _, _ = m.podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj any) { m.upsertPod(obj.(*corev1.Pod)) }, - UpdateFunc: func(_, newObj any) { m.upsertPod(newObj.(*corev1.Pod)) }, - DeleteFunc: m.removePod, - }) - _, _ = m.claimInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj any) { m.upsertClaim(obj.(*resourcev1.ResourceClaim)) }, - UpdateFunc: func(_, newObj any) { m.upsertClaim(newObj.(*resourcev1.ResourceClaim)) }, - DeleteFunc: m.removeClaim, - }) - - stopPod := make(chan struct{}) - stopClaim := make(chan struct{}) - go m.podInformer.Informer().Run(stopPod) - go m.claimInformer.Informer().Run(stopClaim) - defer close(stopPod) - defer close(stopClaim) - - if !cache.WaitForCacheSync(ctx.Done(), m.podInformer.Informer().HasSynced, m.claimInformer.Informer().HasSynced) { - klog.V(2).InfoS("ClaimMapper cache sync cancelled") - return - } - - // Initial full rebuild before any events arrive. - m.fullRebuild() - - // Periodic safety-net full rebuild. - ticker := time.NewTicker(resyncPeriod) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - select { - case rebuildCh <- struct{}{}: - default: - } - case <-rebuildCh: - m.fullRebuild() - } - } -} - -// Lookup returns pod metadata for a given claim UID. -func (m *ClaimMapper) Lookup(claimUID string) (PodRef, bool) { - m.mu.RLock() - defer m.mu.RUnlock() - ref, ok := m.mapping[claimUID] - return ref, ok -} - -// --------------------------------------------------------------------------- -// Partial (targeted) updates driven by informer events. -// --------------------------------------------------------------------------- - -// upsertPod adds or updates the mapping entries for every HAMi claim -// referenced by the given pod. Called on Pod Add/Update. -func (m *ClaimMapper) upsertPod(pod *corev1.Pod) { - m.mu.Lock() - defer m.mu.Unlock() - - for _, status := range pod.Status.ResourceClaimStatuses { - if status.ResourceClaimName == nil || *status.ResourceClaimName == "" { - continue - } - claim, err := m.claimInformer.Lister().ResourceClaims(pod.Namespace).Get(*status.ResourceClaimName) - if err != nil { - // Claim not in cache yet — will be fixed when the claim event arrives. - continue - } - if !isHamiClaim(claim) { - continue - } - m.mapping[string(claim.UID)] = m.makePodRef(pod, status.Name, *status.ResourceClaimName) - } -} - -// removePod deletes all mapping entries that belong to the given pod. -// Called on Pod Delete. -func (m *ClaimMapper) removePod(obj any) { - var pod *corev1.Pod - switch t := obj.(type) { - case *corev1.Pod: - pod = t - case cache.DeletedFinalStateUnknown: - pod = t.Obj.(*corev1.Pod) - default: - return - } - - m.mu.Lock() - defer m.mu.Unlock() - - for uid, ref := range m.mapping { - if ref.Namespace == pod.Namespace && ref.PodName == pod.Name { - delete(m.mapping, uid) - } - } -} - -// upsertClaim resolves the given claim back to a pod on this node and inserts -// the mapping entry. Called on ResourceClaim Add/Update. -func (m *ClaimMapper) upsertClaim(claim *resourcev1.ResourceClaim) { - if !isHamiClaim(claim) { - return - } - // Find the pod on this node that references this claim. - pods, err := m.podInformer.Lister().List(labels.Everything()) - if err != nil { - klog.V(3).ErrorS(err, "ClaimMapper: failed to list pods from cache") - return - } - for _, pod := range pods { - for _, status := range pod.Status.ResourceClaimStatuses { - if status.ResourceClaimName == nil || *status.ResourceClaimName == "" { - continue - } - if pod.Namespace == claim.Namespace && *status.ResourceClaimName == claim.Name { - m.mu.Lock() - m.mapping[string(claim.UID)] = m.makePodRef(pod, status.Name, claim.Name) - m.mu.Unlock() - return // DRA: a claim is bound to at most one pod. - } - } - } -} - -// removeClaim deletes the mapping entry for the given claim. -// Called on ResourceClaim Delete. -func (m *ClaimMapper) removeClaim(obj any) { - var claim *resourcev1.ResourceClaim - switch t := obj.(type) { - case *resourcev1.ResourceClaim: - claim = t - case cache.DeletedFinalStateUnknown: - claim = t.Obj.(*resourcev1.ResourceClaim) - default: - return - } - - m.mu.Lock() - delete(m.mapping, string(claim.UID)) - m.mu.Unlock() -} - -// --------------------------------------------------------------------------- -// Full rebuild (safety net, run once at startup and then periodically). -// --------------------------------------------------------------------------- - -func (m *ClaimMapper) fullRebuild() { - pods, err := m.podInformer.Lister().List(labels.Everything()) - if err != nil { - klog.V(3).ErrorS(err, "ClaimMapper: full rebuild failed listing pods") - return - } - - newMapping := make(map[string]PodRef, len(pods)) - for _, pod := range pods { - for _, status := range pod.Status.ResourceClaimStatuses { - if status.ResourceClaimName == nil || *status.ResourceClaimName == "" { - continue - } - claim, err := m.claimInformer.Lister().ResourceClaims(pod.Namespace).Get(*status.ResourceClaimName) - if err != nil { - continue - } - if !isHamiClaim(claim) { - continue - } - newMapping[string(claim.UID)] = m.makePodRef(pod, status.Name, *status.ResourceClaimName) - } - } - - m.mu.Lock() - m.mapping = newMapping - m.mu.Unlock() - - klog.V(4).InfoS("ClaimMapper full rebuild", "pods", len(pods), "claims", len(newMapping)) -} - -// --------------------------------------------------------------------------- -// Helpers -// --------------------------------------------------------------------------- - -func (m *ClaimMapper) makePodRef(pod *corev1.Pod, localClaimName, claimName string) PodRef { - containers := containersUsingClaim(pod, localClaimName) - containerName := "" - switch len(containers) { - case 1: - containerName = containers[0] - case 0: - containerName = "" - default: - containerName = "_multiple" - } - return PodRef{ - Namespace: pod.Namespace, - PodName: pod.Name, - Container: containerName, - } -} - -// isHamiClaim returns true if the claim is for the HAMi device class. -func isHamiClaim(claim *resourcev1.ResourceClaim) bool { - for _, req := range claim.Spec.Devices.Requests { - if req.Exactly != nil && req.Exactly.DeviceClassName == hamiDeviceClassName { - return true - } - for _, sub := range req.FirstAvailable { - if sub.DeviceClassName == hamiDeviceClassName { - return true - } - } - } - return false -} - -func containersUsingClaim(pod *corev1.Pod, localClaimName string) []string { - var names []string - for _, c := range pod.Spec.Containers { - for _, rc := range c.Resources.Claims { - if rc.Name == localClaimName { - names = append(names, c.Name) - break - } - } - } - for _, c := range pod.Spec.InitContainers { - for _, rc := range c.Resources.Claims { - if rc.Name == localClaimName { - names = append(names, c.Name) - break - } - } - } - return names -} - -var unknownPodRef = PodRef{Namespace: "unknown", PodName: "unknown", Container: "unknown"} - -func resolve(claimUID string, mapper *ClaimMapper) []string { - if mapper == nil { - return []string{unknownPodRef.Namespace, unknownPodRef.PodName, unknownPodRef.Container, claimUID} - } - ref, ok := mapper.Lookup(claimUID) - if !ok { - return []string{unknownPodRef.Namespace, unknownPodRef.PodName, unknownPodRef.Container, claimUID} - } - return []string{ref.Namespace, ref.PodName, ref.Container, claimUID} -} diff --git a/cmd/hami-core-monitor/metrics.go b/cmd/hami-core-monitor/metrics.go deleted file mode 100644 index bf4e8f78..00000000 --- a/cmd/hami-core-monitor/metrics.go +++ /dev/null @@ -1,272 +0,0 @@ -/* - * Copyright 2025 The HAMi Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package main - -import ( - "fmt" - "time" - "unicode/utf8" - - "github.com/NVIDIA/go-nvml/pkg/nvml" - "github.com/prometheus/client_golang/prometheus" - - "github.com/Project-HAMi/k8s-dra-driver/pkg/monitor" - - "k8s.io/klog/v2" -) - -// --------------------------------------------------------------------------- -// Metric descriptors. -// Claim-level metrics carry: namespace, pod, container, claim_uid, -// vdevice_index, device_uuid. -// --------------------------------------------------------------------------- -var ( - hostGPUMemoryDesc = prometheus.NewDesc( - "hami_host_gpu_memory_used_bytes", - "GPU device memory usage in bytes", - []string{"device_index", "device_uuid", "device_type"}, nil, - ) - - hostGPUUtilizationDesc = prometheus.NewDesc( - "hami_host_gpu_utilization_ratio", - "GPU core utilization ratio (0-100)", - []string{"device_index", "device_uuid", "device_type"}, nil, - ) - - claimMemoryUsedDesc = prometheus.NewDesc( - "hami_vgpu_memory_used_bytes", - "vGPU device memory usage in bytes", - []string{"namespace", "pod", "container", "claim_uid", "vdevice_index", "device_uuid"}, nil, - ) - - claimMemoryLimitDesc = prometheus.NewDesc( - "hami_vgpu_memory_limit_bytes", - "vGPU device memory limit in bytes", - []string{"namespace", "pod", "container", "claim_uid", "vdevice_index", "device_uuid"}, nil, - ) - - claimDeviceMemoryDesc = prometheus.NewDesc( - "hami_container_device_memory_bytes", - "Container device memory usage breakdown in bytes", - []string{"namespace", "pod", "container", "claim_uid", "vdevice_index", "device_uuid", "context_size", "module_size", "buffer_size", "offset"}, nil, - ) - - claimDeviceUtilizationDesc = prometheus.NewDesc( - "hami_container_device_utilization_ratio", - "Container device SM utilization ratio", - []string{"namespace", "pod", "container", "claim_uid", "vdevice_index", "device_uuid"}, nil, - ) - - claimDeviceMemoryContextDesc = prometheus.NewDesc( - "hami_vgpu_memory_context_bytes", - "Container device memory context size in bytes", - []string{"namespace", "pod", "container", "claim_uid", "vdevice_index", "device_uuid"}, nil, - ) - - claimDeviceMemoryModuleDesc = prometheus.NewDesc( - "hami_vgpu_memory_module_bytes", - "Container device memory module size in bytes", - []string{"namespace", "pod", "container", "claim_uid", "vdevice_index", "device_uuid"}, nil, - ) - - claimDeviceMemoryBufferDesc = prometheus.NewDesc( - "hami_vgpu_memory_buffer_bytes", - "Container device memory buffer size in bytes", - []string{"namespace", "pod", "container", "claim_uid", "vdevice_index", "device_uuid"}, nil, - ) - - claimLastKernelDesc = prometheus.NewDesc( - "hami_container_last_kernel_elapsed_seconds", - "Seconds since last kernel execution in container", - []string{"namespace", "pod", "container", "claim_uid", "vdevice_index", "device_uuid"}, nil, - ) -) - -// --------------------------------------------------------------------------- -// Collector -// --------------------------------------------------------------------------- - -type collector struct { - claimLister *monitor.ClaimLister - mapper *ClaimMapper -} - -func newCollector(claimLister *monitor.ClaimLister, mapper *ClaimMapper) *collector { - return &collector{ - claimLister: claimLister, - mapper: mapper, - } -} - -// Describe implements prometheus.Collector. -func (c collector) Describe(ch chan<- *prometheus.Desc) { - ch <- hostGPUMemoryDesc - ch <- hostGPUUtilizationDesc - ch <- claimMemoryUsedDesc - ch <- claimMemoryLimitDesc - ch <- claimDeviceMemoryDesc - ch <- claimDeviceUtilizationDesc - ch <- claimDeviceMemoryContextDesc - ch <- claimDeviceMemoryModuleDesc - ch <- claimDeviceMemoryBufferDesc - ch <- claimLastKernelDesc -} - -// Collect implements prometheus.Collector. -func (c collector) Collect(ch chan<- prometheus.Metric) { - klog.V(4).InfoS("Starting metrics collection") - - if err := c.collectHostGPU(ch); err != nil { - klog.ErrorS(err, "Failed to collect host GPU metrics") - } - if err := c.collectClaims(ch); err != nil { - klog.ErrorS(err, "Failed to collect claim metrics") - } - - klog.V(4).InfoS("Finished metrics collection") -} - -// --------------------------------------------------------------------------- -// Host GPU metrics (via NVML). -// --------------------------------------------------------------------------- - -func (c collector) collectHostGPU(ch chan<- prometheus.Metric) error { - if ret := nvml.Init(); ret != nvml.SUCCESS { - return fmt.Errorf("nvml.Init: %s", nvml.ErrorString(ret)) - } - defer nvml.Shutdown() - - count, ret := nvml.DeviceGetCount() - if ret != nvml.SUCCESS { - return fmt.Errorf("nvml.DeviceGetCount: %s", nvml.ErrorString(ret)) - } - - for i := 0; i < count; i++ { - dev, ret := nvml.DeviceGetHandleByIndex(i) - if ret != nvml.SUCCESS { - klog.V(3).ErrorS(nil, "nvml.DeviceGetHandleByIndex", "index", i, "error", nvml.ErrorString(ret)) - continue - } - if err := c.collectHostDevice(ch, dev, i); err != nil { - klog.V(3).ErrorS(err, "Failed to collect metrics for host GPU", "index", i) - } - } - return nil -} - -func (c collector) collectHostDevice(ch chan<- prometheus.Metric, dev nvml.Device, index int) error { - uuid, ret := dev.GetUUID() - if ret != nvml.SUCCESS { - return fmt.Errorf("nvml.GetUUID: %s", nvml.ErrorString(ret)) - } - - name, ret := dev.GetName() - if ret != nvml.SUCCESS { - return fmt.Errorf("nvml.GetName: %s", nvml.ErrorString(ret)) - } - deviceType := "NVIDIA-" + name - idxStr := fmt.Sprint(index) - - // Memory. - mem, ret := dev.GetMemoryInfo() - if ret == nvml.ERROR_NOT_SUPPORTED { - klog.V(3).InfoS("Memory metrics not supported for device, skipping", "index", index) - } else if ret != nvml.SUCCESS { - return fmt.Errorf("nvml.GetMemoryInfo: %s", nvml.ErrorString(ret)) - } else { - ch <- prometheus.MustNewConstMetric(hostGPUMemoryDesc, prometheus.GaugeValue, float64(mem.Used), idxStr, uuid, deviceType) - } - - // Utilization. - util, ret := dev.GetUtilizationRates() - if ret != nvml.SUCCESS { - return fmt.Errorf("nvml.GetUtilizationRates: %s", nvml.ErrorString(ret)) - } - ch <- prometheus.MustNewConstMetric(hostGPUUtilizationDesc, prometheus.GaugeValue, float64(util.Gpu), idxStr, uuid, deviceType) - - return nil -} - -// --------------------------------------------------------------------------- -// Claim-level metrics (via mmaped shared memory caches). -// --------------------------------------------------------------------------- - -func (c collector) collectClaims(ch chan<- prometheus.Metric) error { - claims := c.claimLister.ListClaims() - nowSec := time.Now().Unix() - - for _, claim := range claims { - if claim.Info == nil { - continue - } - if err := c.collectClaim(ch, claim, nowSec); err != nil { - klog.V(3).ErrorS(err, "Failed to collect metrics for claim", "claimUID", claim.ClaimUID) - } - } - return nil -} - -func (c collector) collectClaim(ch chan<- prometheus.Metric, claim *monitor.ClaimUsage, nowSec int64) error { - info := claim.Info - baseLabels := resolve(claim.ClaimUID, c.mapper) - - for i := 0; i < info.DeviceNum(); i++ { - uuid := info.DeviceUUID(i) - if len(uuid) < 40 { - klog.V(5).InfoS("Skipping device with invalid UUID length", "claim", claim.ClaimUID, "index", i, "len", len(uuid)) - continue - } - uuid = uuid[:40] - if !utf8.ValidString(uuid) { - klog.V(5).InfoS("Skipping device with invalid UTF-8 UUID; cache may not be initialised yet", "claim", claim.ClaimUID, "index", i) - continue - } - - labels := append(baseLabels, fmt.Sprint(i), uuid) - - // Memory used and limit. - memTotal := info.DeviceMemoryTotal(i) - memLimit := info.DeviceMemoryLimit(i) - ch <- prometheus.MustNewConstMetric(claimMemoryUsedDesc, prometheus.GaugeValue, float64(memTotal), labels...) - ch <- prometheus.MustNewConstMetric(claimMemoryLimitDesc, prometheus.GaugeValue, float64(memLimit), labels...) - - // Breakdown. - memCtx := info.DeviceMemoryContextSize(i) - memMod := info.DeviceMemoryModuleSize(i) - memBuf := info.DeviceMemoryBufferSize(i) - memOffset := memTotal - memCtx - memMod - memBuf - breakdownLabels := append(labels, fmt.Sprint(memCtx), fmt.Sprint(memMod), fmt.Sprint(memBuf), fmt.Sprint(memOffset)) - ch <- prometheus.MustNewConstMetric(claimDeviceMemoryDesc, prometheus.GaugeValue, float64(memTotal), breakdownLabels...) - - // Context / module / buffer sub-metrics. - ch <- prometheus.MustNewConstMetric(claimDeviceMemoryContextDesc, prometheus.GaugeValue, float64(memCtx), labels...) - ch <- prometheus.MustNewConstMetric(claimDeviceMemoryModuleDesc, prometheus.GaugeValue, float64(memMod), labels...) - ch <- prometheus.MustNewConstMetric(claimDeviceMemoryBufferDesc, prometheus.GaugeValue, float64(memBuf), labels...) - - // SM utilization. - smUtil := info.DeviceSmUtil(i) - ch <- prometheus.MustNewConstMetric(claimDeviceUtilizationDesc, prometheus.GaugeValue, float64(smUtil), labels...) - - // Last kernel time. - lastKernelTime := info.LastKernelTime() - if lastKernelTime > 0 { - lastSec := max(nowSec-lastKernelTime, 0) - ch <- prometheus.MustNewConstMetric(claimLastKernelDesc, prometheus.GaugeValue, float64(lastSec), labels...) - } - } - return nil -} diff --git a/cmd/hami-kubelet-plugin/device_state.go b/cmd/hami-kubelet-plugin/device_state.go index fddaa885..20f96c64 100644 --- a/cmd/hami-kubelet-plugin/device_state.go +++ b/cmd/hami-kubelet-plugin/device_state.go @@ -125,7 +125,10 @@ func NewDeviceState(ctx context.Context, config *Config) (*DeviceState, error) { var hamiCoreManager *HAMiCoreManager if featuregates.Enabled(featuregates.HAMiCoreSupport) { - hamiCoreManager = NewHAMiCoreManager(nvdevlib) + hamiCoreManager = NewHAMiCoreManager(nvdevlib, config.flags.hostHookPath, config.clientsets.Core, config.flags.nodeName) + if !hamiCoreManager.WaitForPodCacheSync(ctx) { + klog.Warningf("HAMiCoreManager Pod cache sync was cancelled or timed out; claim-to-pod resolution may be unavailable initially") + } } var tsManager *TimeSlicingManager diff --git a/cmd/hami-kubelet-plugin/driver.go b/cmd/hami-kubelet-plugin/driver.go index a1cdecdf..d230a7c6 100644 --- a/cmd/hami-kubelet-plugin/driver.go +++ b/cmd/hami-kubelet-plugin/driver.go @@ -288,6 +288,10 @@ func (d *driver) Shutdown() error { d.wg.Wait() + if d.state.hamiCoreManager != nil { + d.state.hamiCoreManager.Stop() + } + if err := d.state.checkpointCleanupManager.Stop(); err != nil { return fmt.Errorf("error stopping CheckpointCleanupManager: %w", err) } diff --git a/cmd/hami-kubelet-plugin/hami_core.go b/cmd/hami-kubelet-plugin/hami_core.go index d2b9fae9..5c3eea1c 100644 --- a/cmd/hami-kubelet-plugin/hami_core.go +++ b/cmd/hami-kubelet-plugin/hami_core.go @@ -17,17 +17,25 @@ limitations under the License. package main import ( + "context" "fmt" "maps" "os" + "path/filepath" "slices" "strconv" + "time" "github.com/Masterminds/semver" - "github.com/google/uuid" + corev1 "k8s.io/api/core/v1" resourceapi "k8s.io/api/resource/v1" "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" "k8s.io/dynamic-resource-allocation/kubeletplugin" "k8s.io/klog/v2" "k8s.io/utils/ptr" @@ -172,13 +180,47 @@ func (g *PreparedDeviceGroup) HAMIGpuUUIDs() []string { type HAMiCoreManager struct { hostHookPath string nvdevlib *deviceLib + nodeName string + + podInformerFactory informers.SharedInformerFactory + podLister corelisters.PodLister + podListerSynced cache.InformerSynced + stopCh chan struct{} } -func NewHAMiCoreManager(deviceLib *deviceLib) *HAMiCoreManager { - return &HAMiCoreManager{ +func NewHAMiCoreManager(deviceLib *deviceLib, hostHookPath string, clientset kubernetes.Interface, nodeName string) *HAMiCoreManager { + m := &HAMiCoreManager{ nvdevlib: deviceLib, - hostHookPath: "/usr/local", + hostHookPath: hostHookPath, + nodeName: nodeName, + stopCh: make(chan struct{}), + } + if clientset != nil { + m.podInformerFactory = informers.NewSharedInformerFactoryWithOptions( + clientset, + 30*time.Minute, + informers.WithTweakListOptions(func(lo *metav1.ListOptions) { + lo.FieldSelector = "spec.nodeName=" + nodeName + }), + ) + podInformer := m.podInformerFactory.Core().V1().Pods() + m.podLister = podInformer.Lister() + m.podListerSynced = podInformer.Informer().HasSynced + m.podInformerFactory.Start(m.stopCh) + } + return m +} + +// WaitForPodCacheSync blocks until the local Pod cache has synced for the first time. +func (m *HAMiCoreManager) WaitForPodCacheSync(ctx context.Context) bool { + if m.podListerSynced == nil { + return true } + return cache.WaitForCacheSync(ctx.Done(), m.podListerSynced) +} + +func (m *HAMiCoreManager) Stop() { + close(m.stopCh) } func (m *HAMiCoreManager) getConsumableCapacityMap(claim *resourceapi.ResourceClaim) map[string]map[resourceapi.QualifiedName]resource.Quantity { @@ -193,26 +235,91 @@ func (m *HAMiCoreManager) getConsumableCapacityMap(claim *resourceapi.ResourceCl return resMap } -func (m *HAMiCoreManager) GetCDIContainerEdits(claim *resourceapi.ResourceClaim, devs AllocatableDevices) *cdiapi.ContainerEdits { - cacheFileHostDirectory := fmt.Sprintf("%s/vgpu/claims/%s", m.hostHookPath, claim.UID) - // TODO: We should check the status of claim, becasue there may be two pod share the claim - var err error - err = os.RemoveAll(cacheFileHostDirectory) - if err != nil { - klog.Warningf("Failed to remove host directory for cachefile %s: %s", cacheFileHostDirectory, err) +// resolveClaimToPod searches the local Pod informer cache for the Pod that +// reserved the given claim. HAMi DRA guarantees a 1:1 claim-to-container +// binding, so it also returns the exact container name. +func (m *HAMiCoreManager) resolveClaimToPod(claim *resourceapi.ResourceClaim) (*corev1.Pod, string, error) { + if m.podLister == nil { + return nil, "", fmt.Errorf("pod lister not initialized") + } + if len(claim.Status.ReservedFor) == 0 { + return nil, "", fmt.Errorf("claim %s has no ReservedFor entries", claim.UID) } - err = os.MkdirAll(cacheFileHostDirectory, 0777) + + // Find the Pod that reserved this claim. + consumer := claim.Status.ReservedFor[0] + if consumer.Resource != "pods" { + return nil, "", fmt.Errorf("claim %s reservedFor[0] is not a Pod", claim.UID) + } + + pod, err := m.podLister.Pods(claim.Namespace).Get(consumer.Name) if err != nil { - klog.Warningf("Failed to create host directory for cachefile %s: %s", cacheFileHostDirectory, err) + return nil, "", fmt.Errorf("pod %s/%s not found in local cache: %w", claim.Namespace, consumer.Name, err) + } + + // HAMi DRA design guarantees one claim per container, but we defensively + // iterate over all containers and init containers. + var containerName string + for _, c := range pod.Spec.Containers { + for _, rc := range c.Resources.Claims { + if rc.Name == claim.Name { + containerName = c.Name + break + } + } + if containerName != "" { + break + } + } + if containerName == "" { + for _, c := range pod.Spec.InitContainers { + for _, rc := range c.Resources.Claims { + if rc.Name == claim.Name { + containerName = c.Name + break + } + } + if containerName != "" { + break + } + } } - err = os.Chmod(cacheFileHostDirectory, 0777) + if containerName == "" { + return nil, "", fmt.Errorf("no container in pod %s/%s references claim %s", claim.Namespace, pod.Name, claim.Name) + } + + return pod, containerName, nil +} + +func (m *HAMiCoreManager) GetCDIContainerEdits(claim *resourceapi.ResourceClaim, devs AllocatableDevices) *cdiapi.ContainerEdits { + pod, containerName, err := m.resolveClaimToPod(claim) if err != nil { - klog.Warningf("Failed to change mod of host directory for cachefile %s: %s", cacheFileHostDirectory, err) + klog.Warningf("HAMiCoreManager: cannot resolve claim %s to pod/container: %v", claim.UID, err) + // Fallback to claim-scoped directory so that Prepare does not hard-fail. + // Metrics will be incomplete, but the workload can still run. + pod = &corev1.Pod{} + pod.UID = claim.UID + containerName = "unknown" + } + + podUID := string(pod.UID) + cacheFileHostDirectory := filepath.Join(m.hostHookPath, "vgpu", "containers", podUID+"_"+containerName) + cacheFilePath := filepath.Join(cacheFileHostDirectory, string(claim.UID)+".cache") + + // Clean up and recreate the directory for this pod+container. + if err := os.RemoveAll(cacheFileHostDirectory); err != nil { + klog.Warningf("Failed to remove host directory for cachefile %s: %v", cacheFileHostDirectory, err) + } + if err := os.MkdirAll(cacheFileHostDirectory, 0777); err != nil { + klog.Warningf("Failed to create host directory for cachefile %s: %v", cacheFileHostDirectory, err) + } + if err := os.Chmod(cacheFileHostDirectory, 0777); err != nil { + klog.Warningf("Failed to chmod host directory for cachefile %s: %v", cacheFileHostDirectory, err) } hamiEnvs := []string{} // TOOD: Get SM Limit from Claim's Annotation - hamiEnvs = append(hamiEnvs, fmt.Sprintf("CUDA_DEVICE_MEMORY_SHARED_CACHE=%s", fmt.Sprintf("%s/%v.cache", cacheFileHostDirectory, uuid.New().String()))) + hamiEnvs = append(hamiEnvs, fmt.Sprintf("CUDA_DEVICE_MEMORY_SHARED_CACHE=%s", cacheFilePath)) devCapMap := m.getConsumableCapacityMap(claim) idx := 0 @@ -255,14 +362,14 @@ func (m *HAMiCoreManager) GetCDIContainerEdits(claim *resourceapi.ResourceClaim, Options: []string{"rw", "nosuid", "nodev", "bind"}, }, { - ContainerPath: m.hostHookPath + "/vgpu/libvgpu.so", - HostPath: m.hostHookPath + "/vgpu/libvgpu.so", + ContainerPath: filepath.Join(m.hostHookPath, "vgpu", "libvgpu.so"), + HostPath: filepath.Join(m.hostHookPath, "vgpu", "libvgpu.so"), Options: []string{"ro", "nosuid", "nodev", "bind"}, }, // TODO: Check CUDA_DISABLE_CONTROL env before mount ld.so.preload { ContainerPath: "/etc/ld.so.preload", - HostPath: m.hostHookPath + "/vgpu/ld.so.preload", + HostPath: filepath.Join(m.hostHookPath, "vgpu", "ld.so.preload"), Options: []string{"ro", "nosuid", "nodev", "bind"}, }, { @@ -276,8 +383,29 @@ func (m *HAMiCoreManager) GetCDIContainerEdits(claim *resourceapi.ResourceClaim, } func (m *HAMiCoreManager) Unprepare(claimUID string, pl PreparedDeviceList) error { - path := fmt.Sprintf("%s/vgpu/claims/%s", m.hostHookPath, claimUID) - _ = os.RemoveAll(path) + containersPath := filepath.Join(m.hostHookPath, "vgpu", "containers") + entries, err := os.ReadDir(containersPath) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return fmt.Errorf("failed to list containers path %s: %w", containersPath, err) + } + for _, entry := range entries { + if !entry.IsDir() { + continue + } + claimCache := filepath.Join(containersPath, entry.Name(), claimUID+".cache") + if _, err := os.Stat(claimCache); err == nil { + dirToRemove := filepath.Join(containersPath, entry.Name()) + if err := os.RemoveAll(dirToRemove); err != nil { + return fmt.Errorf("failed to remove container cache directory %s: %w", dirToRemove, err) + } + klog.V(4).Infof("Unprepare: removed HAMi-Core cache directory %s for claim %s", dirToRemove, claimUID) + return nil + } + } + klog.V(4).Infof("Unprepare: no HAMi-Core cache directory found for claim %s", claimUID) return nil } diff --git a/cmd/hami-kubelet-plugin/main.go b/cmd/hami-kubelet-plugin/main.go index 9ac65a03..ab05a8b6 100644 --- a/cmd/hami-kubelet-plugin/main.go +++ b/cmd/hami-kubelet-plugin/main.go @@ -53,6 +53,7 @@ type Flags struct { cdiRoot string containerDriverRoot string hostDriverRoot string + hostHookPath string nvidiaCDIHookPath string imageName string kubeletRegistrarDirectoryPath string @@ -120,6 +121,13 @@ func newApp() *cli.App { Destination: &flags.containerDriverRoot, EnvVars: []string{"DRIVER_ROOT_CTR_PATH"}, }, + &cli.StringFlag{ + Name: "host-hook-path", + Value: "/usr/local", + Usage: "the host path where vGPU hooks and claim caches are rooted (the container must have this path mounted)", + Destination: &flags.hostHookPath, + EnvVars: []string{"HOOK_PATH"}, + }, &cli.StringFlag{ Name: "nvidia-cdi-hook-path", Usage: "Absolute path to the nvidia-cdi-hook executable in the host file system. Used in the generated CDI specification.", diff --git a/deploy/container/Dockerfile b/deploy/container/Dockerfile index e074ae8c..200aabe1 100644 --- a/deploy/container/Dockerfile +++ b/deploy/container/Dockerfile @@ -16,6 +16,9 @@ # limitations under the License. ARG TOOLKIT_CONTAINER_IMAGE=unknown +ARG HAMI_VGPUMONITOR_IMAGE=projecthami/hami:v2.9.0 + +FROM ${HAMI_VGPUMONITOR_IMAGE} AS vgpumonitor FROM --platform=$BUILDPLATFORM nvidia/cuda:12.3.2-devel-ubuntu20.04 AS hami-core-build @@ -141,8 +144,8 @@ COPY --from=build /build/scripts/bind_to_driver.sh /usr/bin/bind_to_dr COPY --from=build /build/scripts/unbind_from_driver.sh /usr/bin/unbind_from_driver.sh COPY /hack/kubelet-plugin-prestart.sh /usr/bin/kubelet-plugin-prestart.sh -COPY --from=build /artifacts/hami-kubelet-plugin /usr/bin/hami-kubelet-plugin -COPY --from=build /artifacts/hami-core-monitor /usr/bin/hami-core-monitor +COPY --from=build /artifacts/hami-kubelet-plugin /usr/bin/hami-kubelet-plugin +COPY --from=vgpumonitor /k8s-vgpu/bin/vGPUmonitor /usr/bin/vGPUmonitor COPY --from=hami-core-build /k8s-dra-driver/lib/local /usr/local COPY --from=hami-core-build /k8s-dra-driver/lib/nvidia/vgpu-init.sh /usr/bin/vgpu-init.sh @@ -153,7 +156,7 @@ USER root:root # Smoke-test executables (provide early build feedback). RUN ["/usr/bin/hami-kubelet-plugin", "--version"] -RUN ["/usr/bin/hami-core-monitor", "--version"] +RUN ["/usr/bin/vGPUmonitor", "--version"] RUN ["/bin/bash", "--version"] ENTRYPOINT ["/bin/bash"] diff --git a/deploy/container/Makefile b/deploy/container/Makefile index 8cb484a9..a1609ed7 100644 --- a/deploy/container/Makefile +++ b/deploy/container/Makefile @@ -66,6 +66,7 @@ build: --build-arg BASH_STATIC_GIT_REF="$(BASH_STATIC_GIT_REF)" \ --build-arg TOOLKIT_CONTAINER_IMAGE="$(TOOLKIT_CONTAINER_IMAGE)" \ --build-arg HAMI_CORE_BUILD_IMAGE="$(HAMI_CORE_BUILD_IMAGE)" \ + --build-arg HAMI_VGPUMONITOR_IMAGE="$(HAMI_VGPUMONITOR_IMAGE)" \ --build-arg VERSION="$(VERSION)" \ --build-arg GIT_COMMIT="$(GIT_COMMIT)" \ --progress=plain \ diff --git a/pkg/flags/featuregates_test.go b/pkg/flags/featuregates_test.go index c55f04ef..c95f7290 100644 --- a/pkg/flags/featuregates_test.go +++ b/pkg/flags/featuregates_test.go @@ -24,7 +24,8 @@ import ( "github.com/urfave/cli/v2" "k8s.io/component-base/featuregate" - "github.com/NVIDIA/k8s-dra-driver-gpu/pkg/featuregates" + // "github.com/NVIDIA/k8s-dra-driver-gpu/pkg/featuregates" + "github.com/Project-HAMi/k8s-dra-driver/pkg/featuregates" ) // Test feature constants for different scenarios. diff --git a/pkg/monitor/cache.go b/pkg/monitor/cache.go deleted file mode 100644 index abaa06a3..00000000 --- a/pkg/monitor/cache.go +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Copyright 2025 The HAMi Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package monitor - -import ( - "fmt" - "os" - "path/filepath" - "strings" - "sync" - "syscall" - "unsafe" - - "k8s.io/klog/v2" -) - -// ClaimUsage wraps a single mmaped cache file for a ResourceClaim. -type ClaimUsage struct { - ClaimUID string - CacheFile string - data []byte - Info UsageInfo -} - -// ClaimLister scans the host vGPU claims directory, discovers .cache -// files created by libvgpu.so, and mmap(2)s them for live access. -type ClaimLister struct { - hookPath string - claims map[string]*ClaimUsage // key: / - mutex sync.Mutex -} - -// NewClaimLister creates a new ClaimLister rooted at hookPath. -// The effective scan path is /vgpu/claims/. -func NewClaimLister(hookPath string) *ClaimLister { - return &ClaimLister{ - hookPath: hookPath, - claims: make(map[string]*ClaimUsage), - } -} - -// ListClaims returns a shallow copy of the currently known claims. -func (l *ClaimLister) ListClaims() map[string]*ClaimUsage { - l.mutex.Lock() - defer l.mutex.Unlock() - - out := make(map[string]*ClaimUsage, len(l.claims)) - for k, v := range l.claims { - out[k] = v - } - return out -} - -// Update rescans the claims directory, mmaping new cache files and -// munmapping ones that have disappeared. -func (l *ClaimLister) Update() error { - l.mutex.Lock() - defer l.mutex.Unlock() - - claimsPath := filepath.Join(l.hookPath, "vgpu", "claims") - entries, err := os.ReadDir(claimsPath) - if err != nil { - if os.IsNotExist(err) { - // No claims directory yet; clear everything. - l.cleanupAll() - return nil - } - return err - } - - seen := make(map[string]struct{}) - for _, entry := range entries { - if !entry.IsDir() { - continue - } - claimUID := entry.Name() - claimDir := filepath.Join(claimsPath, claimUID) - cacheFile, err := findCacheFile(claimDir) - if err != nil { - klog.V(4).InfoS("No cache file in claim dir", "dir", claimDir, "err", err) - continue - } - if cacheFile == "" { - continue - } - key := claimUID + "/" + filepath.Base(cacheFile) - seen[key] = struct{}{} - - if _, ok := l.claims[key]; ok { - continue - } - - usage, err := loadCache(claimUID, cacheFile) - if err != nil { - klog.ErrorS(err, "Failed to load cache", "file", cacheFile) - continue - } - l.claims[key] = usage - klog.V(3).InfoS("Loaded claim cache", "claim", claimUID, "file", cacheFile) - } - - // Remove disappeared claims. - for key, usage := range l.claims { - if _, ok := seen[key]; !ok { - _ = syscall.Munmap(usage.data) - delete(l.claims, key) - klog.V(3).InfoS("Removed claim cache", "key", key) - } - } - - return nil -} - -func (l *ClaimLister) cleanupAll() { - for key, usage := range l.claims { - _ = syscall.Munmap(usage.data) - delete(l.claims, key) - klog.V(3).InfoS("Cleaned up claim cache", "key", key) - } -} - -func findCacheFile(dir string) (string, error) { - files, err := os.ReadDir(dir) - if err != nil { - return "", err - } - for _, f := range files { - if f.IsDir() { - continue - } - if strings.HasSuffix(f.Name(), ".cache") { - return filepath.Join(dir, f.Name()), nil - } - } - return "", nil -} - -func loadCache(claimUID, cacheFile string) (*ClaimUsage, error) { - info, err := os.Stat(cacheFile) - if err != nil { - return nil, fmt.Errorf("stat cache file: %w", err) - } - minSize := int64(unsafe.Sizeof(headerT{})) - if info.Size() < minSize { - return nil, fmt.Errorf("cache file size %d too small (need >= %d)", info.Size(), minSize) - } - - f, err := os.OpenFile(cacheFile, os.O_RDWR, 0666) - if err != nil { - return nil, fmt.Errorf("open cache file: %w", err) - } - defer func() { _ = f.Close() }() - - data, err := syscall.Mmap(int(f.Fd()), 0, int(info.Size()), syscall.PROT_WRITE|syscall.PROT_READ, syscall.MAP_SHARED) - if err != nil { - return nil, fmt.Errorf("mmap cache file: %w", err) - } - - head := (*headerT)(unsafe.Pointer(&data[0])) - if head.initializedFlag != SharedRegionMagicFlag { - _ = syscall.Munmap(data) - return nil, fmt.Errorf("cache file magic flag not matched: got %d, want %d", head.initializedFlag, SharedRegionMagicFlag) - } - - var usageInfo UsageInfo - switch { - case info.Size() == 1197897: - klog.V(4).InfoS("Detected v0 cache file", "file", cacheFile) - usageInfo = castV0Spec(data) - case head.majorVersion == 1: - klog.V(4).InfoS("Detected v1 cache file", "file", cacheFile, "minorVersion", head.minorVersion) - usageInfo = castV1Spec(data) - default: - _ = syscall.Munmap(data) - return nil, fmt.Errorf("unknown cache version %d.%d size %d", head.majorVersion, head.minorVersion, info.Size()) - } - - return &ClaimUsage{ - ClaimUID: claimUID, - CacheFile: cacheFile, - data: data, - Info: usageInfo, - }, nil -} diff --git a/pkg/monitor/spec_v0.go b/pkg/monitor/spec_v0.go deleted file mode 100644 index 7bde351a..00000000 --- a/pkg/monitor/spec_v0.go +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Copyright 2024-2025 The HAMi Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package monitor - -import "unsafe" - -const v0MaxDevices = 16 - -type v0DeviceMemory struct { - contextSize uint64 - moduleSize uint64 - bufferSize uint64 - offset uint64 - total uint64 -} - -type v0DeviceUtilization struct { - decUtil uint64 - encUtil uint64 - smUtil uint64 -} - -type v0ShrregProcSlotT struct { - pid int32 - hostpid int32 - used [16]v0DeviceMemory - monitorused [16]uint64 - deviceUtil [16]v0DeviceUtilization - status int32 -} - -type v0UUID struct { - uuid [96]byte -} - -type v0SemT struct { - sem [32]byte -} - -type v0SharedRegionT struct { - initializedFlag int32 - smInitFlag int32 - ownerPid uint32 - sem v0SemT - num uint64 - uuids [16]v0UUID - - limit [16]uint64 - smLimit [16]uint64 - procs [1024]v0ShrregProcSlotT - - procnum int32 - utilizationSwitch int32 - recentKernel int32 - priority int32 -} - -type v0Spec struct { - sr *v0SharedRegionT -} - -func (s v0Spec) DeviceMax() int { - return v0MaxDevices -} - -func (s v0Spec) DeviceNum() int { - return int(s.sr.num) -} - -func (s v0Spec) DeviceMemoryContextSize(idx int) uint64 { - var v uint64 - for _, p := range s.sr.procs { - v += p.used[idx].contextSize - } - return v -} - -func (s v0Spec) DeviceMemoryModuleSize(idx int) uint64 { - var v uint64 - for _, p := range s.sr.procs { - v += p.used[idx].moduleSize - } - return v -} - -func (s v0Spec) DeviceMemoryBufferSize(idx int) uint64 { - var v uint64 - for _, p := range s.sr.procs { - v += p.used[idx].bufferSize - } - return v -} - -func (s v0Spec) DeviceMemoryOffset(idx int) uint64 { - var v uint64 - for _, p := range s.sr.procs { - v += p.used[idx].offset - } - return v -} - -func (s v0Spec) DeviceMemoryTotal(idx int) uint64 { - var v uint64 - for _, p := range s.sr.procs { - v += p.used[idx].total - } - return v -} - -func (s v0Spec) DeviceSmUtil(idx int) uint64 { - var v uint64 - for _, p := range s.sr.procs { - v += p.deviceUtil[idx].smUtil - } - return v -} - -func (s v0Spec) SetDeviceSmLimit(l uint64) { - var idx uint64 - for idx < s.sr.num { - s.sr.smLimit[idx] = l - idx++ - } -} - -func (s v0Spec) IsValidUUID(idx int) bool { - return s.sr.uuids[idx].uuid[0] != 0 -} - -func (s v0Spec) DeviceUUID(idx int) string { - return string(s.sr.uuids[idx].uuid[:]) -} - -func (s v0Spec) DeviceMemoryLimit(idx int) uint64 { - return s.sr.limit[idx] -} - -func (s v0Spec) SetDeviceMemoryLimit(l uint64) { - var idx uint64 - for idx < s.sr.num { - s.sr.limit[idx] = l - idx++ - } -} - -func (s v0Spec) LastKernelTime() int64 { - return 0 -} - -func castV0Spec(data []byte) UsageInfo { - return v0Spec{ - sr: (*v0SharedRegionT)(unsafe.Pointer(&data[0])), - } -} - -func (s v0Spec) GetPriority() int { - return int(s.sr.priority) -} - -func (s v0Spec) GetRecentKernel() int32 { - return s.sr.recentKernel -} - -func (s v0Spec) SetRecentKernel(v int32) { - s.sr.recentKernel = v -} - -func (s v0Spec) GetUtilizationSwitch() int32 { - return s.sr.utilizationSwitch -} - -func (s v0Spec) SetUtilizationSwitch(v int32) { - s.sr.utilizationSwitch = v -} diff --git a/pkg/monitor/spec_v1.go b/pkg/monitor/spec_v1.go deleted file mode 100644 index 8897fe0f..00000000 --- a/pkg/monitor/spec_v1.go +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Copyright 2024-2025 The HAMi Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package monitor - -import "unsafe" - -const v1MaxDevices = 16 - -type v1DeviceMemory struct { - contextSize uint64 - moduleSize uint64 - bufferSize uint64 - offset uint64 - total uint64 - unused [3]uint64 -} - -type v1DeviceUtilization struct { - decUtil uint64 - encUtil uint64 - smUtil uint64 - unused [3]uint64 -} - -type v1ShrregProcSlotT struct { - pid int32 - hostpid int32 - used [16]v1DeviceMemory - monitorused [16]uint64 - deviceUtil [16]v1DeviceUtilization - status int32 - unused [3]uint64 -} - -type v1UUID struct { - uuid [96]byte -} - -type v1SemT struct { - sem [32]byte -} - -type v1SharedRegionT struct { - initializedFlag int32 - majorVersion int32 - minorVersion int32 - smInitFlag int32 - ownerPid uint32 - sem v1SemT - num uint64 - uuids [16]v1UUID - - limit [16]uint64 - smLimit [16]uint64 - procs [1024]v1ShrregProcSlotT - - procnum int32 - utilizationSwitch int32 - recentKernel int32 - priority int32 - lastKernelTime int64 - unused [4]uint64 -} - -type v1Spec struct { - sr *v1SharedRegionT -} - -func (s v1Spec) DeviceMax() int { - return v1MaxDevices -} - -func (s v1Spec) DeviceNum() int { - return int(s.sr.num) -} - -func (s v1Spec) DeviceMemoryContextSize(idx int) uint64 { - var v uint64 - for _, p := range s.sr.procs[:int(s.sr.procnum)] { - v += p.used[idx].contextSize - } - return v -} - -func (s v1Spec) DeviceMemoryModuleSize(idx int) uint64 { - var v uint64 - for _, p := range s.sr.procs[:int(s.sr.procnum)] { - v += p.used[idx].moduleSize - } - return v -} - -func (s v1Spec) DeviceMemoryBufferSize(idx int) uint64 { - var v uint64 - for _, p := range s.sr.procs[:int(s.sr.procnum)] { - v += p.used[idx].bufferSize - } - return v -} - -func (s v1Spec) DeviceMemoryOffset(idx int) uint64 { - var v uint64 - for _, p := range s.sr.procs[:int(s.sr.procnum)] { - v += p.used[idx].offset - } - return v -} - -func (s v1Spec) DeviceMemoryTotal(idx int) uint64 { - var v uint64 - for _, p := range s.sr.procs[:int(s.sr.procnum)] { - v += p.used[idx].total - } - return v -} - -func (s v1Spec) DeviceSmUtil(idx int) uint64 { - var v uint64 - for _, p := range s.sr.procs[:int(s.sr.procnum)] { - v += p.deviceUtil[idx].smUtil - } - return v -} - -func (s v1Spec) SetDeviceSmLimit(l uint64) { - var idx uint64 - for idx < s.sr.num { - s.sr.smLimit[idx] = l - idx++ - } -} - -func (s v1Spec) IsValidUUID(idx int) bool { - return s.sr.uuids[idx].uuid[0] != 0 -} - -func (s v1Spec) DeviceUUID(idx int) string { - return string(s.sr.uuids[idx].uuid[:]) -} - -func (s v1Spec) DeviceMemoryLimit(idx int) uint64 { - return s.sr.limit[idx] -} - -func (s v1Spec) SetDeviceMemoryLimit(l uint64) { - var idx uint64 - for idx < s.sr.num { - s.sr.limit[idx] = l - idx++ - } -} - -func (s v1Spec) LastKernelTime() int64 { - return s.sr.lastKernelTime -} - -func castV1Spec(data []byte) UsageInfo { - return v1Spec{ - sr: (*v1SharedRegionT)(unsafe.Pointer(&data[0])), - } -} - -func (s v1Spec) GetPriority() int { - return int(s.sr.priority) -} - -func (s v1Spec) GetRecentKernel() int32 { - return s.sr.recentKernel -} - -func (s v1Spec) SetRecentKernel(v int32) { - s.sr.recentKernel = v -} - -func (s v1Spec) GetUtilizationSwitch() int32 { - return s.sr.utilizationSwitch -} - -func (s v1Spec) SetUtilizationSwitch(v int32) { - s.sr.utilizationSwitch = v -} diff --git a/pkg/monitor/usage.go b/pkg/monitor/usage.go deleted file mode 100644 index bf69e9ba..00000000 --- a/pkg/monitor/usage.go +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright 2025 The HAMi Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package monitor - -// UsageInfo abstracts over v0/v1 shared-memory cache formats produced by -// libvgpu.so. All methods operate directly on the mmaped shared_region_t. -type UsageInfo interface { - DeviceMax() int - DeviceNum() int - DeviceMemoryContextSize(idx int) uint64 - DeviceMemoryModuleSize(idx int) uint64 - DeviceMemoryBufferSize(idx int) uint64 - DeviceMemoryOffset(idx int) uint64 - DeviceMemoryTotal(idx int) uint64 - DeviceSmUtil(idx int) uint64 - SetDeviceSmLimit(l uint64) - IsValidUUID(idx int) bool - DeviceUUID(idx int) string - DeviceMemoryLimit(idx int) uint64 - SetDeviceMemoryLimit(l uint64) - LastKernelTime() int64 - GetPriority() int - GetRecentKernel() int32 - SetRecentKernel(v int32) - GetUtilizationSwitch() int32 - SetUtilizationSwitch(v int32) -} - -// SharedRegionMagicFlag is the magic value stored in initializedFlag. -const SharedRegionMagicFlag = 19920718 - -type headerT struct { - initializedFlag int32 - majorVersion int32 - minorVersion int32 -} diff --git a/versions.mk b/versions.mk index 289f2c4a..36287c17 100644 --- a/versions.mk +++ b/versions.mk @@ -30,6 +30,9 @@ vVERSION := v$(VERSION:v%=%) # The image to build hami-core lib HAMI_CORE_BUILD_IMAGE=nvidia/cuda:12.3.2-devel-ubuntu20.04 +# The upstream HAMi image to extract vGPUmonitor from +HAMI_VGPUMONITOR_IMAGE ?= projecthami/hami:v2.9.0 + GOLANG_VERSION := $(shell ./hack/golang-version.sh) TOOLKIT_CONTAINER_IMAGE := $(shell ./hack/toolkit-container-image.sh) BASH_STATIC_GIT_REF := 021f5f29f665c92ca16a369d9f27e288c3aed0c6