diff --git a/pkg/validator/checks/conformance/ai_service_metrics_check.go b/pkg/validator/checks/conformance/ai_service_metrics_check.go index 44bb14b9..712e5a58 100644 --- a/pkg/validator/checks/conformance/ai_service_metrics_check.go +++ b/pkg/validator/checks/conformance/ai_service_metrics_check.go @@ -83,12 +83,34 @@ func checkAIServiceMetricsWithURL(ctx *checks.ValidationContext, promBaseURL str return errors.New(errors.ErrCodeInternal, "discovery REST client is not available") } result := restClient.Get().AbsPath(rawURL).Do(ctx.Context) + var statusCode int + result.StatusCode(&statusCode) if cmErr := result.Error(); cmErr != nil { - recordArtifact(ctx, "Custom Metrics API", fmt.Sprintf("Status: unavailable\nError: %v", cmErr)) + recordArtifact(ctx, "Custom Metrics API", + fmt.Sprintf("Endpoint: %s\nHTTP Status: %d\nStatus: unavailable\nError: %v", + rawURL, statusCode, cmErr)) return errors.Wrap(errors.ErrCodeNotFound, "custom metrics API not available", cmErr) } - recordArtifact(ctx, "Custom Metrics API", "Status: available\nEndpoint: /apis/custom.metrics.k8s.io/v1beta1") + + groupVersion := "unknown" + resourceCount := 0 + discoveryBody, rawErr := result.Raw() + if rawErr == nil { + var discovery struct { + GroupVersion string `json:"groupVersion"` + Resources []json.RawMessage `json:"resources"` + } + if json.Unmarshal(discoveryBody, &discovery) == nil { + if discovery.GroupVersion != "" { + groupVersion = discovery.GroupVersion + } + resourceCount = len(discovery.Resources) + } + } + recordArtifact(ctx, "Custom Metrics API", + fmt.Sprintf("Endpoint: %s\nHTTP Status: %d\nGroupVersion: %s\nAPI Resources: %d\nStatus: available", + rawURL, statusCode, groupVersion, resourceCount)) return nil } diff --git a/pkg/validator/checks/conformance/cluster_autoscaling_check.go b/pkg/validator/checks/conformance/cluster_autoscaling_check.go index fbea0b81..bbd3e16b 100644 --- a/pkg/validator/checks/conformance/cluster_autoscaling_check.go +++ b/pkg/validator/checks/conformance/cluster_autoscaling_check.go @@ -42,6 +42,16 @@ const ( karpenterNodePoolLabel = "karpenter.sh/nodepool" ) +type clusterAutoscalingReport struct { + NodePool string + HPADesired int32 + HPACurrent int32 + BaselineNodes int + ObservedNodes int + TotalPods int + ScheduledPods int +} + func init() { checks.RegisterCheck(&checks.Check{ Name: "cluster-autoscaling", @@ -122,14 +132,19 @@ func CheckClusterAutoscaling(ctx *checks.ValidationContext) error { var lastErr error for _, poolName := range gpuNodePoolNames { slog.Info("attempting behavioral validation with NodePool", "nodePool", poolName) - lastErr = validateClusterAutoscaling(ctx.Context, ctx.Clientset, poolName) - if lastErr == nil { + report, runErr := validateClusterAutoscaling(ctx.Context, ctx.Clientset, poolName) + if runErr == nil { recordArtifact(ctx, "Cluster Autoscaling Behavioral Test", - fmt.Sprintf("NodePool: %s\nHPA: scaling intent detected\nKarpenter: new node(s) provisioned\nPods: scheduled on new nodes", poolName)) + fmt.Sprintf("NodePool: %s\nHPA desired/current: %d/%d\nKarpenter nodes: baseline=%d observed=%d new=%d\nPods scheduled: %d/%d", + report.NodePool, + report.HPADesired, report.HPACurrent, + report.BaselineNodes, report.ObservedNodes, report.ObservedNodes-report.BaselineNodes, + report.ScheduledPods, report.TotalPods)) return nil } + lastErr = runErr slog.Debug("behavioral validation failed for NodePool", - "nodePool", poolName, "error", lastErr) + "nodePool", poolName, "error", runErr) } return lastErr } @@ -138,11 +153,15 @@ func CheckClusterAutoscaling(ctx *checks.ValidationContext) error { // Deployment + HPA (external metric) → HPA computes scale-up → Karpenter provisions // KWOK nodes → pods are scheduled. This proves the chain works end-to-end. // nodePoolName is the discovered GPU NodePool name from the precheck. -func validateClusterAutoscaling(ctx context.Context, clientset kubernetes.Interface, nodePoolName string) error { +func validateClusterAutoscaling(ctx context.Context, clientset kubernetes.Interface, nodePoolName string) (*clusterAutoscalingReport, error) { + report := &clusterAutoscalingReport{ + NodePool: nodePoolName, + } + // Generate unique test resource names and namespace (prevents cross-run interference). b := make([]byte, 4) if _, err := rand.Read(b); err != nil { - return errors.Wrap(errors.ErrCodeInternal, "failed to generate random suffix", err) + return nil, errors.Wrap(errors.ErrCodeInternal, "failed to generate random suffix", err) } suffix := hex.EncodeToString(b) nsName := clusterAutoTestPrefix + suffix @@ -154,7 +173,7 @@ func validateClusterAutoscaling(ctx context.Context, clientset kubernetes.Interf ObjectMeta: metav1.ObjectMeta{Name: nsName}, } if _, err := clientset.CoreV1().Namespaces().Create(ctx, ns, metav1.CreateOptions{}); k8s.IgnoreAlreadyExists(err) != nil { - return errors.Wrap(errors.ErrCodeInternal, "failed to create cluster autoscaling test namespace", err) + return nil, errors.Wrap(errors.ErrCodeInternal, "failed to create cluster autoscaling test namespace", err) } // Cleanup: delete namespace (cascades all resources, triggers Karpenter consolidation). @@ -175,37 +194,51 @@ func validateClusterAutoscaling(ctx context.Context, clientset kubernetes.Interf LabelSelector: fmt.Sprintf("%s=%s", karpenterNodePoolLabel, nodePoolName), }) if err != nil { - return errors.Wrap(errors.ErrCodeInternal, "failed to count baseline Karpenter nodes", err) + return nil, errors.Wrap(errors.ErrCodeInternal, "failed to count baseline Karpenter nodes", err) } baselineNodeCount := len(baselineNodes.Items) + report.BaselineNodes = baselineNodeCount slog.Info("baseline Karpenter node count", "pool", nodePoolName, "count", baselineNodeCount) // Create Deployment: GPU-requesting pods with Karpenter nodeSelector. deploy := buildClusterAutoTestDeployment(deployName, nsName, nodePoolName) - if _, err := clientset.AppsV1().Deployments(nsName).Create( - ctx, deploy, metav1.CreateOptions{}); err != nil { - return errors.Wrap(errors.ErrCodeInternal, "failed to create cluster autoscaling test deployment", err) + _, createErr := clientset.AppsV1().Deployments(nsName).Create( + ctx, deploy, metav1.CreateOptions{}) + if createErr != nil { + return nil, errors.Wrap(errors.ErrCodeInternal, "failed to create cluster autoscaling test deployment", createErr) } // Create HPA targeting external metric dcgm_gpu_power_usage. hpa := buildClusterAutoTestHPA(hpaName, deployName, nsName) - if _, err := clientset.AutoscalingV2().HorizontalPodAutoscalers(nsName).Create( - ctx, hpa, metav1.CreateOptions{}); err != nil { - return errors.Wrap(errors.ErrCodeInternal, "failed to create cluster autoscaling test HPA", err) + _, createErr = clientset.AutoscalingV2().HorizontalPodAutoscalers(nsName).Create( + ctx, hpa, metav1.CreateOptions{}) + if createErr != nil { + return nil, errors.Wrap(errors.ErrCodeInternal, "failed to create cluster autoscaling test HPA", createErr) } // Wait for HPA to report scaling intent. - if err := waitForClusterAutoHPAScale(ctx, clientset, nsName, hpaName); err != nil { - return err + desired, current, err := waitForHPAScaleUp(ctx, clientset, nsName, hpaName, "cluster autoscaling") + if err != nil { + return nil, err } + report.HPADesired = desired + report.HPACurrent = current // Wait for Karpenter to provision KWOK nodes (above baseline count). - if err := waitForKarpenterNodes(ctx, clientset, nodePoolName, baselineNodeCount); err != nil { - return err + observedNodes, err := waitForKarpenterNodes(ctx, clientset, nodePoolName, baselineNodeCount) + if err != nil { + return nil, err } + report.ObservedNodes = observedNodes // Verify pods are scheduled (not Pending) with poll loop. - return verifyPodsScheduled(ctx, clientset, nsName) + totalPods, scheduledPods, err := verifyPodsScheduled(ctx, clientset, nsName) + if err != nil { + return nil, err + } + report.TotalPods = totalPods + report.ScheduledPods = scheduledPods + return report, nil } // buildClusterAutoTestDeployment creates a Deployment that requests GPU resources @@ -317,45 +350,10 @@ func buildClusterAutoTestHPA(name, deployName, namespace string) *autoscalingv2. } } -// waitForClusterAutoHPAScale polls the HPA until desiredReplicas > currentReplicas. -func waitForClusterAutoHPAScale(ctx context.Context, clientset kubernetes.Interface, namespace, hpaName string) error { - waitCtx, cancel := context.WithTimeout(ctx, defaults.HPAScaleTimeout) - defer cancel() - - err := wait.PollUntilContextCancel(waitCtx, defaults.HPAPollInterval, true, - func(ctx context.Context) (bool, error) { - hpa, getErr := clientset.AutoscalingV2().HorizontalPodAutoscalers(namespace).Get( - ctx, hpaName, metav1.GetOptions{}) - if getErr != nil { - slog.Debug("HPA not ready yet", "error", getErr) - return false, nil - } - - desired := hpa.Status.DesiredReplicas - current := hpa.Status.CurrentReplicas - slog.Debug("cluster autoscaling HPA status", "desired", desired, "current", current) - - if desired > current { - slog.Info("cluster autoscaling HPA scaling intent detected", - "desiredReplicas", desired, "currentReplicas", current) - return true, nil - } - return false, nil - }, - ) - if err != nil { - if ctx.Err() != nil || waitCtx.Err() != nil { - return errors.Wrap(errors.ErrCodeTimeout, - "HPA did not report scaling intent — external metrics pipeline may be broken", err) - } - return errors.Wrap(errors.ErrCodeInternal, "HPA scaling intent polling failed", err) - } - return nil -} - // waitForKarpenterNodes polls until nodes with the discovered NodePool label exceed the // baseline count. This proves Karpenter provisioned NEW nodes, not just pre-existing ones. -func waitForKarpenterNodes(ctx context.Context, clientset kubernetes.Interface, nodePoolName string, baselineNodeCount int) error { +func waitForKarpenterNodes(ctx context.Context, clientset kubernetes.Interface, nodePoolName string, baselineNodeCount int) (int, error) { + var observedNodeCount int waitCtx, cancel := context.WithTimeout(ctx, defaults.KarpenterNodeTimeout) defer cancel() @@ -369,10 +367,11 @@ func waitForKarpenterNodes(ctx context.Context, clientset kubernetes.Interface, return false, nil } - if len(nodes.Items) > baselineNodeCount { + observedNodeCount = len(nodes.Items) + if observedNodeCount > baselineNodeCount { slog.Info("Karpenter provisioned new KWOK GPU node(s)", - "total", len(nodes.Items), "baseline", baselineNodeCount, - "new", len(nodes.Items)-baselineNodeCount) + "total", observedNodeCount, "baseline", baselineNodeCount, + "new", observedNodeCount-baselineNodeCount) return true, nil } return false, nil @@ -380,18 +379,20 @@ func waitForKarpenterNodes(ctx context.Context, clientset kubernetes.Interface, ) if err != nil { if ctx.Err() != nil || waitCtx.Err() != nil { - return errors.Wrap(errors.ErrCodeTimeout, + return 0, errors.Wrap(errors.ErrCodeTimeout, "Karpenter did not provision GPU nodes within timeout", err) } - return errors.Wrap(errors.ErrCodeInternal, "Karpenter node polling failed", err) + return 0, errors.Wrap(errors.ErrCodeInternal, "Karpenter node polling failed", err) } - return nil + return observedNodeCount, nil } // verifyPodsScheduled polls until pods in the unique test namespace are scheduled (not Pending). // This proves the full chain: HPA → scale → Karpenter → nodes → pods scheduled. // The namespace is unique per run, so all pods belong to this test — no stale pod interference. -func verifyPodsScheduled(ctx context.Context, clientset kubernetes.Interface, namespace string) error { +func verifyPodsScheduled(ctx context.Context, clientset kubernetes.Interface, namespace string) (int, int, error) { + var observedTotal int + var observedScheduled int waitCtx, cancel := context.WithTimeout(ctx, defaults.PodScheduleTimeout) defer cancel() @@ -403,8 +404,9 @@ func verifyPodsScheduled(ctx context.Context, clientset kubernetes.Interface, na return false, nil } - if len(pods.Items) < 2 { - slog.Debug("waiting for HPA-scaled pods", "count", len(pods.Items)) + observedTotal = len(pods.Items) + if observedTotal < 2 { + slog.Debug("waiting for HPA-scaled pods", "count", observedTotal) return false, nil } @@ -415,12 +417,13 @@ func verifyPodsScheduled(ctx context.Context, clientset kubernetes.Interface, na } } + observedScheduled = scheduled slog.Debug("cluster autoscaling pod status", - "total", len(pods.Items), "scheduled", scheduled) + "total", observedTotal, "scheduled", observedScheduled) - if scheduled >= 2 { + if observedScheduled >= 2 { slog.Info("cluster autoscaling pods verified", - "total", len(pods.Items), "scheduled", scheduled) + "total", observedTotal, "scheduled", observedScheduled) return true, nil } return false, nil @@ -428,10 +431,10 @@ func verifyPodsScheduled(ctx context.Context, clientset kubernetes.Interface, na ) if err != nil { if ctx.Err() != nil || waitCtx.Err() != nil { - return errors.Wrap(errors.ErrCodeTimeout, + return 0, 0, errors.Wrap(errors.ErrCodeTimeout, "test pods not scheduled within timeout — Karpenter nodes may not be ready", err) } - return errors.Wrap(errors.ErrCodeInternal, "pod scheduling verification failed", err) + return 0, 0, errors.Wrap(errors.ErrCodeInternal, "pod scheduling verification failed", err) } - return nil + return observedTotal, observedScheduled, nil } diff --git a/pkg/validator/checks/conformance/cluster_autoscaling_check_unit_test.go b/pkg/validator/checks/conformance/cluster_autoscaling_check_unit_test.go index dd9011c3..977f8b2a 100644 --- a/pkg/validator/checks/conformance/cluster_autoscaling_check_unit_test.go +++ b/pkg/validator/checks/conformance/cluster_autoscaling_check_unit_test.go @@ -397,7 +397,7 @@ func TestValidateClusterAutoscaling(t *testing.T) { defer cancel() } - err := validateClusterAutoscaling(ctx, clientset, testNodePool) + report, err := validateClusterAutoscaling(ctx, clientset, testNodePool) if (err != nil) != tt.wantErr { t.Errorf("validateClusterAutoscaling() error = %v, wantErr %v", err, tt.wantErr) @@ -409,6 +409,22 @@ func TestValidateClusterAutoscaling(t *testing.T) { t.Errorf("validateClusterAutoscaling() error = %v, should contain %q", err, tt.errContains) } } + + if !tt.wantErr { + if report == nil { + t.Fatal("validateClusterAutoscaling() report = nil, want non-nil") + } + if report.HPADesired != tt.hpaDesired || report.HPACurrent != tt.hpaCurrent { + t.Errorf("report HPA desired/current = %d/%d, want %d/%d", + report.HPADesired, report.HPACurrent, tt.hpaDesired, tt.hpaCurrent) + } + if report.ObservedNodes != tt.kwokNodes { + t.Errorf("report observed nodes = %d, want %d", report.ObservedNodes, tt.kwokNodes) + } + if report.TotalPods != tt.podCount { + t.Errorf("report total pods = %d, want %d", report.TotalPods, tt.podCount) + } + } }) } } diff --git a/pkg/validator/checks/conformance/gang_scheduling_check.go b/pkg/validator/checks/conformance/gang_scheduling_check.go index 018bd8aa..c52a1a30 100644 --- a/pkg/validator/checks/conformance/gang_scheduling_check.go +++ b/pkg/validator/checks/conformance/gang_scheduling_check.go @@ -67,6 +67,12 @@ type gangTestRun struct { claims [gangMinMembers]string } +type gangSchedulingReport struct { + EarliestScheduled time.Time + LatestScheduled time.Time + CoScheduleSpan time.Duration +} + func newGangTestRun() (*gangTestRun, error) { b := make([]byte, 4) if _, err := rand.Read(b); err != nil { @@ -111,11 +117,18 @@ func CheckGangScheduling(ctx *checks.ValidationContext) error { // 1. All KAI scheduler deployments available. var schedulerSummary strings.Builder for _, name := range kaiSchedulerDeployments { - if err := verifyDeploymentAvailable(ctx, "kai-scheduler", name); err != nil { + deploy, err := getDeploymentIfAvailable(ctx, "kai-scheduler", name) + if err != nil { return errors.Wrap(errors.ErrCodeNotFound, fmt.Sprintf("KAI scheduler component %s check failed", name), err) } - fmt.Fprintf(&schedulerSummary, " %-25s available\n", name) + expected := int32(1) + if deploy.Spec.Replicas != nil { + expected = *deploy.Spec.Replicas + } + fmt.Fprintf(&schedulerSummary, " %-25s available=%d/%d image=%s\n", + name, deploy.Status.AvailableReplicas, expected, + firstContainerImage(deploy.Spec.Template.Spec.Containers)) } recordArtifact(ctx, "KAI Scheduler Components", schedulerSummary.String()) @@ -131,12 +144,15 @@ func CheckGangScheduling(ctx *checks.ValidationContext) error { "queues.scheduling.run.ai", "podgroups.scheduling.run.ai", } + var crdSummary strings.Builder for _, crd := range requiredCRDs { if _, crdErr := dynClient.Resource(crdGVR).Get(ctx.Context, crd, metav1.GetOptions{}); crdErr != nil { return errors.Wrap(errors.ErrCodeNotFound, fmt.Sprintf("gang scheduling CRD %s not found", crd), crdErr) } + fmt.Fprintf(&crdSummary, " %s: present\n", crd) } + recordArtifact(ctx, "Gang Scheduling CRDs", crdSummary.String()) // 3. Pre-flight: ensure enough free GPUs for the gang test. total, free, gpuErr := countAvailableGPUs(ctx.Context, dynClient) @@ -167,7 +183,8 @@ func CheckGangScheduling(ctx *checks.ValidationContext) error { return err } - if err := validateGangPatterns(pods, run); err != nil { + gangReport, err := validateGangPatterns(pods, run) + if err != nil { return err } @@ -187,6 +204,11 @@ func CheckGangScheduling(ctx *checks.ValidationContext) error { fmt.Fprintf(&gangResults, "Pod %d: %s phase=%s scheduler=%s scheduled=%s\n", i, pod.Name, pod.Status.Phase, pod.Spec.SchedulerName, schedTime) } + fmt.Fprintf(&gangResults, "Co-schedule span: %s\n", gangReport.CoScheduleSpan) + fmt.Fprintf(&gangResults, "Allowed window: %s\n", defaults.CoScheduleWindow) + fmt.Fprintf(&gangResults, "Earliest/Latest: %s / %s\n", + gangReport.EarliestScheduled.Format(time.RFC3339), + gangReport.LatestScheduled.Format(time.RFC3339)) recordArtifact(ctx, "Gang Scheduling Test Results", gangResults.String()) return nil @@ -269,37 +291,37 @@ func waitForGangTestPods(ctx context.Context, clientset kubernetes.Interface, ru } // validateGangPatterns verifies all pods completed successfully and were scheduled by kai-scheduler. -func validateGangPatterns(pods [gangMinMembers]*corev1.Pod, run *gangTestRun) error { +func validateGangPatterns(pods [gangMinMembers]*corev1.Pod, run *gangTestRun) (*gangSchedulingReport, error) { for i, pod := range pods { if pod == nil { - return errors.New(errors.ErrCodeInternal, + return nil, errors.New(errors.ErrCodeInternal, fmt.Sprintf("gang test pod %s result is nil", run.pods[i])) } // Pod must have succeeded. if pod.Status.Phase != corev1.PodSucceeded { - return errors.New(errors.ErrCodeInternal, + return nil, errors.New(errors.ErrCodeInternal, fmt.Sprintf("gang test pod %s phase=%s (want Succeeded), gang scheduling may have failed", run.pods[i], pod.Status.Phase)) } // Pod must use kai-scheduler. if pod.Spec.SchedulerName != "kai-scheduler" { - return errors.New(errors.ErrCodeInternal, + return nil, errors.New(errors.ErrCodeInternal, fmt.Sprintf("gang test pod %s schedulerName=%s (want kai-scheduler)", run.pods[i], pod.Spec.SchedulerName)) } // Pod must have PodGroup label. if pod.Labels["pod-group.scheduling.run.ai/name"] != run.groupName { - return errors.New(errors.ErrCodeInternal, + return nil, errors.New(errors.ErrCodeInternal, fmt.Sprintf("gang test pod %s missing PodGroup label (want %s)", run.pods[i], run.groupName)) } // Pod must use DRA (resourceClaims, not device plugin). if len(pod.Spec.ResourceClaims) == 0 { - return errors.New(errors.ErrCodeInternal, + return nil, errors.New(errors.ErrCodeInternal, fmt.Sprintf("gang test pod %s does not use DRA resourceClaims", run.pods[i])) } } @@ -317,7 +339,7 @@ func validateGangPatterns(pods [gangMinMembers]*corev1.Pod, run *gangTestRun) er } } if !found { - return errors.New(errors.ErrCodeInternal, + return nil, errors.New(errors.ErrCodeInternal, fmt.Sprintf("gang test pod %s missing PodScheduled=True condition", run.pods[i])) } } @@ -332,13 +354,18 @@ func validateGangPatterns(pods [gangMinMembers]*corev1.Pod, run *gangTestRun) er latest = t } } - if latest.Sub(earliest) > defaults.CoScheduleWindow { - return errors.New(errors.ErrCodeInternal, + span := latest.Sub(earliest) + if span > defaults.CoScheduleWindow { + return nil, errors.New(errors.ErrCodeInternal, fmt.Sprintf("gang scheduling pods not co-scheduled: schedule times span %s (max %s)", - latest.Sub(earliest), defaults.CoScheduleWindow)) + span, defaults.CoScheduleWindow)) } - return nil + return &gangSchedulingReport{ + EarliestScheduled: earliest, + LatestScheduled: latest, + CoScheduleSpan: span, + }, nil } // cleanupGangTestResources removes test resources. Best-effort: errors are ignored. diff --git a/pkg/validator/checks/conformance/helpers.go b/pkg/validator/checks/conformance/helpers.go index aa680dc2..e9ea7209 100644 --- a/pkg/validator/checks/conformance/helpers.go +++ b/pkg/validator/checks/conformance/helpers.go @@ -22,6 +22,7 @@ import ( "net/http" "strings" + "github.com/NVIDIA/aicr/pkg/defaults" "github.com/NVIDIA/aicr/pkg/errors" "github.com/NVIDIA/aicr/pkg/validator/checks" appsv1 "k8s.io/api/apps/v1" @@ -29,7 +30,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" ) // phaseConformance is the phase identifier for conformance checks. @@ -69,29 +72,48 @@ func httpGet(ctx context.Context, url string) ([]byte, error) { return io.ReadAll(resp.Body) } -// checkCondition verifies a status condition on an unstructured object. -func checkCondition(obj *unstructured.Unstructured, condType, expectedStatus string) error { +type conditionObservation struct { + Status string + Reason string + Message string +} + +func getConditionObservation(obj *unstructured.Unstructured, condType string) (*conditionObservation, error) { conditions, found, err := unstructured.NestedSlice(obj.Object, "status", "conditions") if err != nil || !found { - return errors.New(errors.ErrCodeInternal, "status.conditions not found") + return nil, errors.New(errors.ErrCodeInternal, "status.conditions not found") } + for _, c := range conditions { cond, ok := c.(map[string]interface{}) if !ok { continue } - if cond["type"] == condType { - if cond["status"] == expectedStatus { - return nil - } - return errors.New(errors.ErrCodeInternal, - fmt.Sprintf("condition %s=%v (want %s)", condType, cond["status"], expectedStatus)) + condName, _ := cond["type"].(string) + if condName != condType { + continue } + + status, _ := cond["status"].(string) + return &conditionObservation{ + Status: status, + Reason: stringFieldOrDefault(cond, "reason", "not-reported"), + Message: stringFieldOrDefault(cond, "message", "not-reported"), + }, nil } - return errors.New(errors.ErrCodeNotFound, + + return nil, errors.New(errors.ErrCodeNotFound, fmt.Sprintf("condition %s not found", condType)) } +func stringFieldOrDefault(obj map[string]interface{}, key, fallback string) string { + v, _ := obj[key].(string) + if v == "" { + return fallback + } + return v +} + // verifyDeploymentAvailable checks that a Deployment has at least one available replica. func verifyDeploymentAvailable(ctx *checks.ValidationContext, namespace, name string) error { _, err := getDeploymentIfAvailable(ctx, namespace, name) @@ -183,6 +205,87 @@ func containsAllMetrics(text string, required []string) []string { return missing } +// podStuckReason inspects a Pod for non-recoverable stuck states and returns a +// human-readable reason. Returns empty string if the pod is not stuck. +// Follows the pattern from pkg/validator/agent/wait.go:getJobFailureReasonFromPod. +func podStuckReason(pod *corev1.Pod) string { + for _, cs := range pod.Status.ContainerStatuses { + if w := cs.State.Waiting; w != nil { + switch w.Reason { + case "ImagePullBackOff", "ErrImagePull", "InvalidImageName", "CrashLoopBackOff": + return fmt.Sprintf("%s: %s (image: %s)", w.Reason, w.Message, cs.Image) + } + } + } + for _, cs := range pod.Status.InitContainerStatuses { + if w := cs.State.Waiting; w != nil { + switch w.Reason { + case "ImagePullBackOff", "ErrImagePull", "InvalidImageName", "CrashLoopBackOff": + return fmt.Sprintf("%s: %s (init container, image: %s)", w.Reason, w.Message, cs.Image) + } + } + } + for _, cond := range pod.Status.Conditions { + if cond.Type == corev1.PodScheduled && cond.Status == corev1.ConditionFalse && + cond.Reason == string(corev1.PodReasonUnschedulable) { + + return fmt.Sprintf("Unschedulable: %s", cond.Message) + } + } + return "" +} + +// podWaitingStatus returns the first container's waiting reason and message, or "none" +// if no container is in a waiting state. Used for diagnostic output on timeout. +func podWaitingStatus(pod *corev1.Pod) string { + for _, cs := range pod.Status.ContainerStatuses { + if w := cs.State.Waiting; w != nil { + return fmt.Sprintf("%s: %s", w.Reason, w.Message) + } + } + return "none" +} + +// waitForHPAScaleUp polls the HPA until desiredReplicas > currentReplicas. +// This proves the HPA read metrics and computed a scale-up intent. The logPrefix +// is prepended to log messages to distinguish callers (e.g. "pod-autoscaling", "cluster-autoscaling"). +func waitForHPAScaleUp(ctx context.Context, clientset kubernetes.Interface, namespace, hpaName, logPrefix string) (int32, int32, error) { + var observedDesired int32 + var observedCurrent int32 + waitCtx, cancel := context.WithTimeout(ctx, defaults.HPAScaleTimeout) + defer cancel() + + err := wait.PollUntilContextCancel(waitCtx, defaults.HPAPollInterval, true, + func(ctx context.Context) (bool, error) { + hpa, getErr := clientset.AutoscalingV2().HorizontalPodAutoscalers(namespace).Get( + ctx, hpaName, metav1.GetOptions{}) + if getErr != nil { + slog.Debug("HPA not ready yet", "context", logPrefix, "error", getErr) + return false, nil + } + + observedDesired = hpa.Status.DesiredReplicas + observedCurrent = hpa.Status.CurrentReplicas + slog.Debug(logPrefix+" HPA status", "desired", observedDesired, "current", observedCurrent) + + if observedDesired > observedCurrent { + slog.Info(logPrefix+" HPA scaling intent detected", + "desiredReplicas", observedDesired, "currentReplicas", observedCurrent) + return true, nil + } + return false, nil + }, + ) + if err != nil { + if ctx.Err() != nil || waitCtx.Err() != nil { + return 0, 0, errors.Wrap(errors.ErrCodeTimeout, + logPrefix+": HPA did not report scaling intent within timeout", err) + } + return 0, 0, errors.Wrap(errors.ErrCodeInternal, logPrefix+": HPA scaling intent polling failed", err) + } + return observedDesired, observedCurrent, nil +} + // gpuDriverName is the DRA driver name for NVIDIA GPUs. const gpuDriverName = "gpu.nvidia.com" diff --git a/pkg/validator/checks/conformance/inference_gateway_check.go b/pkg/validator/checks/conformance/inference_gateway_check.go index 2bbedf68..b2216c73 100644 --- a/pkg/validator/checks/conformance/inference_gateway_check.go +++ b/pkg/validator/checks/conformance/inference_gateway_check.go @@ -30,6 +30,13 @@ var httpRouteGVR = schema.GroupVersionResource{ Group: "gateway.networking.k8s.io", Version: "v1", Resource: "httproutes", } +type gatewayDataPlaneReport struct { + ListenerAttachedRoutes []string + AttachedHTTPRoutes int + MatchingEndpointSlices int + ReadyEndpoints int +} + func init() { checks.RegisterCheck(&checks.Check{ Name: "inference-gateway", @@ -62,11 +69,18 @@ func CheckInferenceGateway(ctx *checks.ValidationContext) error { if err != nil { return errors.Wrap(errors.ErrCodeNotFound, "GatewayClass 'kgateway' not found", err) } - if condErr := checkCondition(gc, "Accepted", "True"); condErr != nil { + gcCond, condErr := getConditionObservation(gc, "Accepted") + if condErr != nil { return errors.Wrap(errors.ErrCodeInternal, "GatewayClass not accepted", condErr) } + if gcCond.Status != "True" { + return errors.Wrap(errors.ErrCodeInternal, "GatewayClass not accepted", + errors.New(errors.ErrCodeInternal, + fmt.Sprintf("condition Accepted=%s (want True)", gcCond.Status))) + } recordArtifact(ctx, "GatewayClass Status", - "Name: kgateway\nAccepted: True") + fmt.Sprintf("Name: %s\nAccepted: %s\nReason: %s\nMessage: %s", + gc.GetName(), gcCond.Status, gcCond.Reason, gcCond.Message)) // 2. Gateway "inference-gateway" programmed gwGVR := schema.GroupVersionResource{ @@ -77,11 +91,18 @@ func CheckInferenceGateway(ctx *checks.ValidationContext) error { if err != nil { return errors.Wrap(errors.ErrCodeNotFound, "Gateway 'inference-gateway' not found", err) } - if condErr := checkCondition(gw, "Programmed", "True"); condErr != nil { + gwCond, condErr := getConditionObservation(gw, "Programmed") + if condErr != nil { return errors.Wrap(errors.ErrCodeInternal, "Gateway not programmed", condErr) } + if gwCond.Status != "True" { + return errors.Wrap(errors.ErrCodeInternal, "Gateway not programmed", + errors.New(errors.ErrCodeInternal, + fmt.Sprintf("condition Programmed=%s (want True)", gwCond.Status))) + } recordArtifact(ctx, "Gateway Status", - "Name: inference-gateway\nNamespace: kgateway-system\nProgrammed: True") + fmt.Sprintf("Name: %s\nNamespace: %s\nProgrammed: %s\nReason: %s\nMessage: %s", + gw.GetName(), gw.GetNamespace(), gwCond.Status, gwCond.Reason, gwCond.Message)) // 3. Required CRDs exist crdGVR := schema.GroupVersionResource{ @@ -94,30 +115,44 @@ func CheckInferenceGateway(ctx *checks.ValidationContext) error { } var crdSummary strings.Builder for _, crdName := range requiredCRDs { - _, err := dynClient.Resource(crdGVR).Get(ctx.Context, crdName, metav1.GetOptions{}) - if err != nil { + _, crdErr := dynClient.Resource(crdGVR).Get(ctx.Context, crdName, metav1.GetOptions{}) + if crdErr != nil { return errors.Wrap(errors.ErrCodeNotFound, - fmt.Sprintf("CRD %s not found", crdName), err) + fmt.Sprintf("CRD %s not found", crdName), crdErr) } fmt.Fprintf(&crdSummary, " %s: present\n", crdName) } recordArtifact(ctx, "Required CRDs", crdSummary.String()) // 4. Gateway data-plane readiness (behavioral validation). - return validateGatewayDataPlane(ctx) + dpReport, err := validateGatewayDataPlane(ctx) + if err != nil { + return err + } + + listenerSummary := "none" + if len(dpReport.ListenerAttachedRoutes) > 0 { + listenerSummary = strings.Join(dpReport.ListenerAttachedRoutes, ", ") + } + recordArtifact(ctx, "Gateway Data Plane", + fmt.Sprintf("Listeners: %s\nAttached HTTPRoutes: %d\nMatching EndpointSlices: %d\nReady Endpoints: %d", + listenerSummary, dpReport.AttachedHTTPRoutes, dpReport.MatchingEndpointSlices, dpReport.ReadyEndpoints)) + return nil } // validateGatewayDataPlane verifies the gateway data plane is operational by checking // listener status, discovering attached HTTPRoutes, and confirming ready proxy endpoints. -func validateGatewayDataPlane(ctx *checks.ValidationContext) error { +func validateGatewayDataPlane(ctx *checks.ValidationContext) (*gatewayDataPlaneReport, error) { + report := &gatewayDataPlaneReport{} + if ctx.Clientset == nil { - return errors.New(errors.ErrCodeInvalidRequest, + return nil, errors.New(errors.ErrCodeInvalidRequest, "kubernetes client is not available for endpoint validation") } dynClient, err := getDynamicClient(ctx) if err != nil { - return err + return nil, err } // 1. Listener status (informational): log attached routes count. @@ -133,6 +168,8 @@ func validateGatewayDataPlane(ctx *checks.ValidationContext) error { if lMap, ok := l.(map[string]interface{}); ok { name, _, _ := unstructured.NestedString(lMap, "name") attached, _, _ := unstructured.NestedInt64(lMap, "attachedRoutes") + report.ListenerAttachedRoutes = append(report.ListenerAttachedRoutes, + fmt.Sprintf("%s=%d", name, attached)) slog.Info("gateway listener status", "listener", name, "attachedRoutes", attached) } } @@ -159,6 +196,7 @@ func validateGatewayDataPlane(ctx *checks.ValidationContext) error { } } } + report.AttachedHTTPRoutes = attached slog.Info("HTTPRoutes attached to inference-gateway", "count", attached) } @@ -168,34 +206,27 @@ func validateGatewayDataPlane(ctx *checks.ValidationContext) error { slices, err := ctx.Clientset.DiscoveryV1().EndpointSlices("kgateway-system").List( ctx.Context, metav1.ListOptions{}) if err != nil { - return errors.Wrap(errors.ErrCodeInternal, + return nil, errors.Wrap(errors.ErrCodeInternal, "failed to list EndpointSlices in kgateway-system", err) } - var hasReadyEndpoint bool for _, slice := range slices.Items { svcName := slice.Labels["kubernetes.io/service-name"] if !strings.Contains(svcName, "inference-gateway") { continue } + report.MatchingEndpointSlices++ for _, ep := range slice.Endpoints { if ep.Conditions.Ready != nil && *ep.Conditions.Ready { - hasReadyEndpoint = true - break + report.ReadyEndpoints++ } } - if hasReadyEndpoint { - break - } } - if !hasReadyEndpoint { - return errors.New(errors.ErrCodeInternal, + if report.ReadyEndpoints == 0 { + return nil, errors.New(errors.ErrCodeInternal, "no ready endpoints for inference-gateway proxy in kgateway-system") } - recordArtifact(ctx, "Gateway Data Plane", - "Endpoint readiness: ready endpoints found for inference-gateway proxy") - - return nil + return report, nil } diff --git a/pkg/validator/checks/conformance/pod_autoscaling_check.go b/pkg/validator/checks/conformance/pod_autoscaling_check.go index f11bb2eb..6ab1c1de 100644 --- a/pkg/validator/checks/conformance/pod_autoscaling_check.go +++ b/pkg/validator/checks/conformance/pod_autoscaling_check.go @@ -40,6 +40,13 @@ const ( hpaTestPrefix = "hpa-test-" ) +type hpaBehaviorReport struct { + ScaleUpDesiredReplicas int32 + ScaleUpCurrentReplicas int32 + ScaleUpDeployReplicas int32 + ScaleDownDeployReplicas int32 +} + func init() { checks.RegisterCheck(&checks.Check{ Name: "pod-autoscaling", @@ -147,11 +154,14 @@ func CheckPodAutoscaling(ctx *checks.ValidationContext) error { extPath, len(extResp.Items))) // 4. HPA behavioral validation: prove HPA reads external metrics and computes scale-up. - if err := validateHPABehavior(ctx.Context, ctx.Clientset); err != nil { + report, err := validateHPABehavior(ctx.Context, ctx.Clientset) + if err != nil { return err } recordArtifact(ctx, "HPA Behavioral Test", - "Scale-up: PASS — HPA computed desiredReplicas > currentReplicas, deployment scaled\nScale-down: PASS — HPA reduced replicas after target increased") + fmt.Sprintf("Scale-up HPA desired/current: %d/%d\nScale-up deployment replicas: %d\nScale-down deployment replicas: %d", + report.ScaleUpDesiredReplicas, report.ScaleUpCurrentReplicas, + report.ScaleUpDeployReplicas, report.ScaleDownDeployReplicas)) return nil } @@ -159,11 +169,13 @@ func CheckPodAutoscaling(ctx *checks.ValidationContext) error { // then verifies the HPA computes desiredReplicas > currentReplicas and the Deployment // actually scales. This proves the full metrics pipeline (DCGM → Prometheus → adapter → HPA) // is functional end-to-end. -func validateHPABehavior(ctx context.Context, clientset kubernetes.Interface) error { +func validateHPABehavior(ctx context.Context, clientset kubernetes.Interface) (*hpaBehaviorReport, error) { + report := &hpaBehaviorReport{} + // Generate unique test resource names and namespace (prevents cross-run interference). b := make([]byte, 4) if _, err := rand.Read(b); err != nil { - return errors.Wrap(errors.ErrCodeInternal, "failed to generate random suffix", err) + return nil, errors.Wrap(errors.ErrCodeInternal, "failed to generate random suffix", err) } suffix := hex.EncodeToString(b) nsName := hpaTestPrefix + suffix @@ -175,7 +187,7 @@ func validateHPABehavior(ctx context.Context, clientset kubernetes.Interface) er ObjectMeta: metav1.ObjectMeta{Name: nsName}, } if _, err := clientset.CoreV1().Namespaces().Create(ctx, ns, metav1.CreateOptions{}); k8s.IgnoreAlreadyExists(err) != nil { - return errors.Wrap(errors.ErrCodeInternal, "failed to create HPA test namespace", err) + return nil, errors.Wrap(errors.ErrCodeInternal, "failed to create HPA test namespace", err) } // Cleanup: delete namespace (cascades all resources). @@ -194,25 +206,30 @@ func validateHPABehavior(ctx context.Context, clientset kubernetes.Interface) er deploy := buildHPATestDeployment(deployName, nsName) if _, err := clientset.AppsV1().Deployments(nsName).Create( ctx, deploy, metav1.CreateOptions{}); err != nil { - return errors.Wrap(errors.ErrCodeInternal, "failed to create HPA test deployment", err) + return nil, errors.Wrap(errors.ErrCodeInternal, "failed to create HPA test deployment", err) } // Create HPA targeting external metric dcgm_gpu_power_usage with very low threshold. hpa := buildHPATestHPA(hpaName, deployName, nsName) if _, err := clientset.AutoscalingV2().HorizontalPodAutoscalers(nsName).Create( ctx, hpa, metav1.CreateOptions{}); err != nil { - return errors.Wrap(errors.ErrCodeInternal, "failed to create HPA test resource", err) + return nil, errors.Wrap(errors.ErrCodeInternal, "failed to create HPA test resource", err) } // Wait for HPA to report scaling intent: desiredReplicas > currentReplicas. - if err := waitForHPAScalingIntent(ctx, clientset, nsName, hpaName); err != nil { - return err + desiredReplicas, currentReplicas, err := waitForHPAScaleUp(ctx, clientset, nsName, hpaName, "pod-autoscaling") + if err != nil { + return nil, err } + report.ScaleUpDesiredReplicas = desiredReplicas + report.ScaleUpCurrentReplicas = currentReplicas // Wait for Deployment to actually scale up (proves HPA → Deployment controller chain). - if err := waitForDeploymentScale(ctx, clientset, nsName, deployName); err != nil { - return err + scaleUpReplicas, err := waitForDeploymentScale(ctx, clientset, nsName, deployName) + if err != nil { + return nil, err } + report.ScaleUpDeployReplicas = scaleUpReplicas // Scale-down: patch HPA with high target so metric reads well below threshold. // This triggers the HPA to compute desiredReplicas = minReplicas (scale-down). @@ -221,16 +238,22 @@ func validateHPABehavior(ctx context.Context, clientset kubernetes.Interface) er currentHPA, err := clientset.AutoscalingV2().HorizontalPodAutoscalers(nsName).Get( ctx, hpaName, metav1.GetOptions{}) if err != nil { - return errors.Wrap(errors.ErrCodeInternal, "failed to get HPA for scale-down test", err) + return nil, errors.Wrap(errors.ErrCodeInternal, "failed to get HPA for scale-down test", err) } currentHPA.Spec.Metrics[0].External.Target.AverageValue = resourceQuantityPtr("999999") - if _, err := clientset.AutoscalingV2().HorizontalPodAutoscalers(nsName).Update( - ctx, currentHPA, metav1.UpdateOptions{}); err != nil { - return errors.Wrap(errors.ErrCodeInternal, "failed to update HPA target for scale-down", err) + _, updateErr := clientset.AutoscalingV2().HorizontalPodAutoscalers(nsName).Update( + ctx, currentHPA, metav1.UpdateOptions{}) + if updateErr != nil { + return nil, errors.Wrap(errors.ErrCodeInternal, "failed to update HPA target for scale-down", updateErr) } // Wait for Deployment to scale down (proves HPA scale-down path works). - return waitForDeploymentScaleDown(ctx, clientset, nsName, deployName) + scaleDownReplicas, err := waitForDeploymentScaleDown(ctx, clientset, nsName, deployName) + if err != nil { + return nil, err + } + report.ScaleDownDeployReplicas = scaleDownReplicas + return report, nil } // buildHPATestDeployment creates a minimal Deployment for the HPA behavioral test. @@ -320,48 +343,10 @@ func resourceQuantityPtr(val string) *resource.Quantity { return &q } -// waitForHPAScalingIntent polls the HPA until desiredReplicas > currentReplicas. -// This is the strict criterion: it proves the HPA read metrics and computed a scale-up. -// We do NOT accept ScalingActive=True alone as that can be true even without scale intent. -func waitForHPAScalingIntent(ctx context.Context, clientset kubernetes.Interface, namespace, hpaName string) error { - waitCtx, cancel := context.WithTimeout(ctx, defaults.HPAScaleTimeout) - defer cancel() - - err := wait.PollUntilContextCancel(waitCtx, defaults.HPAPollInterval, true, - func(ctx context.Context) (bool, error) { - hpa, getErr := clientset.AutoscalingV2().HorizontalPodAutoscalers(namespace).Get( - ctx, hpaName, metav1.GetOptions{}) - if getErr != nil { - slog.Debug("HPA not ready yet", "error", getErr) - return false, nil // retry - } - - desired := hpa.Status.DesiredReplicas - current := hpa.Status.CurrentReplicas - slog.Debug("HPA status", "desired", desired, "current", current) - - if desired > current { - slog.Info("HPA scaling intent detected", - "desiredReplicas", desired, "currentReplicas", current) - return true, nil - } - return false, nil - }, - ) - if err != nil { - if ctx.Err() != nil || waitCtx.Err() != nil { - return errors.Wrap(errors.ErrCodeTimeout, - "HPA did not report scaling intent within timeout — metrics pipeline may be broken", err) - } - return errors.Wrap(errors.ErrCodeInternal, "HPA scaling intent polling failed", err) - } - - return nil -} - // waitForDeploymentScale polls the Deployment until status.replicas > 1, proving // that the Deployment controller acted on the HPA's scaling recommendation. -func waitForDeploymentScale(ctx context.Context, clientset kubernetes.Interface, namespace, deployName string) error { +func waitForDeploymentScale(ctx context.Context, clientset kubernetes.Interface, namespace, deployName string) (int32, error) { + var observedReplicas int32 waitCtx, cancel := context.WithTimeout(ctx, defaults.DeploymentScaleTimeout) defer cancel() @@ -374,11 +359,11 @@ func waitForDeploymentScale(ctx context.Context, clientset kubernetes.Interface, return false, nil } - replicas := deploy.Status.Replicas - slog.Debug("deployment replica status", "name", deployName, "replicas", replicas) + observedReplicas = deploy.Status.Replicas + slog.Debug("deployment replica status", "name", deployName, "replicas", observedReplicas) - if replicas > 1 { - slog.Info("deployment scaled up", "name", deployName, "replicas", replicas) + if observedReplicas > 1 { + slog.Info("deployment scaled up", "name", deployName, "replicas", observedReplicas) return true, nil } return false, nil @@ -386,18 +371,19 @@ func waitForDeploymentScale(ctx context.Context, clientset kubernetes.Interface, ) if err != nil { if ctx.Err() != nil || waitCtx.Err() != nil { - return errors.Wrap(errors.ErrCodeTimeout, + return 0, errors.Wrap(errors.ErrCodeTimeout, "deployment did not scale up within timeout — HPA may not be effective", err) } - return errors.Wrap(errors.ErrCodeInternal, "deployment scale verification failed", err) + return 0, errors.Wrap(errors.ErrCodeInternal, "deployment scale verification failed", err) } - return nil + return observedReplicas, nil } // waitForDeploymentScaleDown polls the Deployment until status.replicas <= 1, proving // that the HPA's scale-down recommendation was enacted by the Deployment controller. -func waitForDeploymentScaleDown(ctx context.Context, clientset kubernetes.Interface, namespace, deployName string) error { +func waitForDeploymentScaleDown(ctx context.Context, clientset kubernetes.Interface, namespace, deployName string) (int32, error) { + var observedReplicas int32 waitCtx, cancel := context.WithTimeout(ctx, defaults.DeploymentScaleTimeout) defer cancel() @@ -410,11 +396,11 @@ func waitForDeploymentScaleDown(ctx context.Context, clientset kubernetes.Interf return false, nil } - replicas := deploy.Status.Replicas - slog.Debug("deployment replica status (scale-down)", "name", deployName, "replicas", replicas) + observedReplicas = deploy.Status.Replicas + slog.Debug("deployment replica status (scale-down)", "name", deployName, "replicas", observedReplicas) - if replicas <= 1 { - slog.Info("deployment scaled down", "name", deployName, "replicas", replicas) + if observedReplicas <= 1 { + slog.Info("deployment scaled down", "name", deployName, "replicas", observedReplicas) return true, nil } return false, nil @@ -422,11 +408,11 @@ func waitForDeploymentScaleDown(ctx context.Context, clientset kubernetes.Interf ) if err != nil { if ctx.Err() != nil || waitCtx.Err() != nil { - return errors.Wrap(errors.ErrCodeTimeout, + return 0, errors.Wrap(errors.ErrCodeTimeout, "deployment did not scale down within timeout — HPA scale-down may not be effective", err) } - return errors.Wrap(errors.ErrCodeInternal, "deployment scale-down verification failed", err) + return 0, errors.Wrap(errors.ErrCodeInternal, "deployment scale-down verification failed", err) } - return nil + return observedReplicas, nil } diff --git a/pkg/validator/checks/conformance/pod_autoscaling_check_unit_test.go b/pkg/validator/checks/conformance/pod_autoscaling_check_unit_test.go index dc087b35..0df5cd83 100644 --- a/pkg/validator/checks/conformance/pod_autoscaling_check_unit_test.go +++ b/pkg/validator/checks/conformance/pod_autoscaling_check_unit_test.go @@ -177,7 +177,7 @@ func TestValidateHPABehavior(t *testing.T) { defer cancel() } - err := validateHPABehavior(ctx, clientset) + report, err := validateHPABehavior(ctx, clientset) if (err != nil) != tt.wantErr { t.Errorf("validateHPABehavior() error = %v, wantErr %v", err, tt.wantErr) @@ -189,6 +189,24 @@ func TestValidateHPABehavior(t *testing.T) { t.Errorf("validateHPABehavior() error = %v, should contain %q", err, tt.errContains) } } + + if !tt.wantErr { + if report == nil { + t.Fatal("validateHPABehavior() report = nil, want non-nil") + } + if report.ScaleUpDesiredReplicas != tt.desiredReplicas || report.ScaleUpCurrentReplicas != tt.currentReplicas { + t.Errorf("report scale-up desired/current = %d/%d, want %d/%d", + report.ScaleUpDesiredReplicas, report.ScaleUpCurrentReplicas, tt.desiredReplicas, tt.currentReplicas) + } + if report.ScaleUpDeployReplicas != tt.deployReplicas { + t.Errorf("report scale-up deployment replicas = %d, want %d", + report.ScaleUpDeployReplicas, tt.deployReplicas) + } + if report.ScaleDownDeployReplicas != 1 { + t.Errorf("report scale-down deployment replicas = %d, want 1", + report.ScaleDownDeployReplicas) + } + } }) } } diff --git a/pkg/validator/checks/conformance/robust_controller_check.go b/pkg/validator/checks/conformance/robust_controller_check.go index 0aed2a30..17972098 100644 --- a/pkg/validator/checks/conformance/robust_controller_check.go +++ b/pkg/validator/checks/conformance/robust_controller_check.go @@ -35,6 +35,13 @@ var dgdGVR = schema.GroupVersionResource{ Group: "nvidia.com", Version: "v1alpha1", Resource: "dynamographdeployments", } +type webhookRejectionReport struct { + ResourceName string + StatusCode int32 + Reason metav1.StatusReason + Message string +} + func init() { checks.RegisterCheck(&checks.Check{ Name: "robust-controller", @@ -136,27 +143,29 @@ func CheckRobustController(ctx *checks.ValidationContext) error { } // 4. Validating webhook actively rejects invalid resources (behavioral test). - if err := validateWebhookRejects(ctx); err != nil { + report, err := validateWebhookRejects(ctx) + if err != nil { return err } recordArtifact(ctx, "Webhook Rejection Test", - "Result: PASS — webhook rejected invalid DynamoGraphDeployment") + fmt.Sprintf("Resource: %s\nHTTP Code: %d\nReason: %s\nMessage: %s", + report.ResourceName, report.StatusCode, report.Reason, report.Message)) return nil } // validateWebhookRejects verifies that the Dynamo validating webhook actively rejects // invalid DynamoGraphDeployment resources. This proves the webhook is not just present // but functionally operational. -func validateWebhookRejects(ctx *checks.ValidationContext) error { +func validateWebhookRejects(ctx *checks.ValidationContext) (*webhookRejectionReport, error) { dynClient, err := getDynamicClient(ctx) if err != nil { - return err + return nil, err } // Generate unique test resource name. b := make([]byte, 4) if _, err := rand.Read(b); err != nil { - return errors.Wrap(errors.ErrCodeInternal, "failed to generate random suffix", err) + return nil, errors.Wrap(errors.ErrCodeInternal, "failed to generate random suffix", err) } name := robustTestPrefix + hex.EncodeToString(b) @@ -183,7 +192,7 @@ func validateWebhookRejects(ctx *checks.ValidationContext) error { // Webhook did not reject — clean up the accidentally created resource. _ = dynClient.Resource(dgdGVR).Namespace("dynamo-system").Delete( ctx.Context, name, metav1.DeleteOptions{}) - return errors.New(errors.ErrCodeInternal, + return nil, errors.New(errors.ErrCodeInternal, "validating webhook did not reject invalid DynamoGraphDeployment") } @@ -192,18 +201,27 @@ func validateWebhookRejects(ctx *checks.ValidationContext) error { // IsForbidden can also match RBAC denials, so we explicitly exclude those // by checking the structured status message for RBAC patterns. if k8serrors.IsForbidden(createErr) || k8serrors.IsInvalid(createErr) { + report := &webhookRejectionReport{ + ResourceName: name, + Message: createErr.Error(), + } + var statusErr *k8serrors.StatusError if stderrors.As(createErr, &statusErr) { - msg := statusErr.Status().Message + status := statusErr.Status() + msg := status.Message if strings.Contains(msg, "cannot create resource") { - return errors.Wrap(errors.ErrCodeInternal, + return nil, errors.Wrap(errors.ErrCodeInternal, "RBAC denied the request, not an admission webhook rejection", createErr) } + report.StatusCode = status.Code + report.Reason = status.Reason + report.Message = msg } - return nil // PASS — webhook rejected the invalid resource + return report, nil // PASS — webhook rejected the invalid resource } // Non-admission error (network, CRD not installed, server error, etc). - return errors.Wrap(errors.ErrCodeInternal, + return nil, errors.Wrap(errors.ErrCodeInternal, "unexpected error testing webhook rejection", createErr) } diff --git a/pkg/validator/checks/conformance/secure_access_check.go b/pkg/validator/checks/conformance/secure_access_check.go index 12dda69e..c81db49e 100644 --- a/pkg/validator/checks/conformance/secure_access_check.go +++ b/pkg/validator/checks/conformance/secure_access_check.go @@ -49,6 +49,14 @@ type draTestRun struct { noClaimPodName string } +type draIsolationReport struct { + PodName string + PodPhase corev1.PodPhase + ExitCode int32 + HostPathGPUMounts int + ResourceClaimsCount int +} + func newDRATestRun() (*draTestRun, error) { b := make([]byte, 4) if _, err := rand.Read(b); err != nil { @@ -124,11 +132,16 @@ func CheckSecureAcceleratorAccess(ctx *checks.ValidationContext) error { len(pod.Spec.ResourceClaims))) // Validate isolation: a pod without DRA claims cannot access GPU devices. - if err := validateDRAIsolation(ctx.Context, ctx.Clientset, run); err != nil { + // Target the same node as the DRA test pod — isolation must be proven on the + // GPU node, not a control-plane node that has no GPUs in the first place. + report, err := validateDRAIsolation(ctx.Context, ctx.Clientset, run, pod.Spec.NodeName) + if err != nil { return err } recordArtifact(ctx, "DRA Isolation Test", - "Result: PASS — pod without DRA claims cannot see GPU devices") + fmt.Sprintf("Pod: %s/%s\nPhase: %s\nExit Code: %d\nResourceClaims: %d\nHostPath GPU mounts:%d", + draTestNamespace, report.PodName, report.PodPhase, report.ExitCode, + report.ResourceClaimsCount, report.HostPathGPUMounts)) return nil } @@ -167,13 +180,22 @@ func waitForDRATestPod(ctx context.Context, clientset kubernetes.Interface, run err := wait.PollUntilContextCancel(waitCtx, defaults.PodPollInterval, true, func(ctx context.Context) (bool, error) { - pod, err := clientset.CoreV1().Pods(draTestNamespace).Get( + pod, getErr := clientset.CoreV1().Pods(draTestNamespace).Get( ctx, run.podName, metav1.GetOptions{}) - if err != nil { - if k8serrors.IsNotFound(err) { + if getErr != nil { + if k8serrors.IsNotFound(getErr) { return false, nil // pod not yet visible after create, keep polling } - return false, errors.Wrap(errors.ErrCodeInternal, "failed to get DRA test pod", err) + // K8s client rate limiter fires near context deadline — retry gracefully. + if strings.Contains(getErr.Error(), "rate limiter") { + return false, nil + } + return false, errors.Wrap(errors.ErrCodeInternal, "failed to get DRA test pod", getErr) + } + // Fail fast if pod is stuck in a non-recoverable state (e.g. ImagePullBackOff). + if reason := podStuckReason(pod); reason != "" { + return false, errors.New(errors.ErrCodeInternal, + fmt.Sprintf("DRA test pod stuck: %s", reason)) } switch pod.Status.Phase { //nolint:exhaustive // only terminal states matter case corev1.PodSucceeded, corev1.PodFailed: @@ -240,12 +262,14 @@ func validateDRAPatterns(ctx context.Context, dynClient dynamic.Interface, pod * // validateDRAIsolation verifies that a pod WITHOUT DRA ResourceClaims cannot see GPU devices. // This proves GPU access is truly mediated by DRA — the scheduler does not expose devices -// to pods that lack claims. -func validateDRAIsolation(ctx context.Context, clientset kubernetes.Interface, run *draTestRun) error { - // Create no-claim pod. - pod := buildNoClaimTestPod(run) +// to pods that lack claims. gpuNodeName pins the pod to the same GPU node where the DRA +// test ran, ensuring isolation is proven on a node that actually has GPUs and bypassing +// scheduler-level delays. +func validateDRAIsolation(ctx context.Context, clientset kubernetes.Interface, run *draTestRun, gpuNodeName string) (*draIsolationReport, error) { + // Create no-claim pod pinned to the GPU node. + pod := buildNoClaimTestPod(run, gpuNodeName) if _, err := clientset.CoreV1().Pods(draTestNamespace).Create(ctx, pod, metav1.CreateOptions{}); err != nil { - return errors.Wrap(errors.ErrCodeInternal, "failed to create no-claim isolation test pod", err) + return nil, errors.Wrap(errors.ErrCodeInternal, "failed to create no-claim isolation test pod", err) } defer func() { _ = k8s.IgnoreNotFound(clientset.CoreV1().Pods(draTestNamespace).Delete( @@ -259,19 +283,33 @@ func validateDRAIsolation(ctx context.Context, clientset kubernetes.Interface, r // Wait for no-claim pod to reach terminal state. var resultPod *corev1.Pod + var lastPhase corev1.PodPhase + var lastContainerStatus string waitCtx, cancel := context.WithTimeout(ctx, defaults.DRATestPodTimeout) defer cancel() err := wait.PollUntilContextCancel(waitCtx, defaults.PodPollInterval, true, func(ctx context.Context) (bool, error) { - p, err := clientset.CoreV1().Pods(draTestNamespace).Get( + p, getErr := clientset.CoreV1().Pods(draTestNamespace).Get( ctx, run.noClaimPodName, metav1.GetOptions{}) - if err != nil { - if k8serrors.IsNotFound(err) { + if getErr != nil { + if k8serrors.IsNotFound(getErr) { return false, nil // pod not yet visible after create, keep polling } + // K8s client rate limiter fires near context deadline — retry gracefully. + if strings.Contains(getErr.Error(), "rate limiter") { + return false, nil + } return false, errors.Wrap(errors.ErrCodeInternal, - "failed to get no-claim isolation test pod", err) + "failed to get no-claim isolation test pod", getErr) + } + // Track last known state for diagnostics on timeout. + lastPhase = p.Status.Phase + lastContainerStatus = podWaitingStatus(p) + // Fail fast if pod is stuck in a non-recoverable state (e.g. ImagePullBackOff). + if reason := podStuckReason(p); reason != "" { + return false, errors.New(errors.ErrCodeInternal, + fmt.Sprintf("no-claim isolation test pod stuck: %s", reason)) } switch p.Status.Phase { //nolint:exhaustive // only terminal states matter case corev1.PodSucceeded, corev1.PodFailed: @@ -284,28 +322,30 @@ func validateDRAIsolation(ctx context.Context, clientset kubernetes.Interface, r ) if err != nil { if ctx.Err() != nil || waitCtx.Err() != nil { - return errors.Wrap(errors.ErrCodeTimeout, - "no-claim isolation test pod did not complete in time", err) + return nil, errors.Wrap(errors.ErrCodeTimeout, + fmt.Sprintf("no-claim isolation test pod did not complete in time (last phase=%s, status=%s, node=%s)", + lastPhase, lastContainerStatus, gpuNodeName), err) } - return errors.Wrap(errors.ErrCodeInternal, + return nil, errors.Wrap(errors.ErrCodeInternal, "no-claim isolation test pod polling failed", err) } + report := &draIsolationReport{ + PodName: resultPod.Name, + PodPhase: resultPod.Status.Phase, + ExitCode: podExitCode(resultPod), + ResourceClaimsCount: len(resultPod.Spec.ResourceClaims), + } + // Strict success criteria: require Succeeded (exit 0 = script confirmed no GPU visible). // Failed means either GPU was visible (exit 1) or the container failed for other reasons. if resultPod.Status.Phase != corev1.PodSucceeded { - exitCode := int32(-1) - if len(resultPod.Status.ContainerStatuses) > 0 { - cs := resultPod.Status.ContainerStatuses[0] - if cs.State.Terminated != nil { - exitCode = cs.State.Terminated.ExitCode - } - } + exitCode := report.ExitCode if exitCode == 1 { - return errors.New(errors.ErrCodeInternal, + return nil, errors.New(errors.ErrCodeInternal, "GPU devices visible without DRA claim — isolation broken (container exit code 1)") } - return errors.New(errors.ErrCodeInternal, + return nil, errors.New(errors.ErrCodeInternal, fmt.Sprintf("no-claim isolation test pod failed with exit code %d — cannot verify isolation", exitCode)) } @@ -313,13 +353,13 @@ func validateDRAIsolation(ctx context.Context, clientset kubernetes.Interface, r // Verify no hostPath to GPU devices on the no-claim pod. for _, vol := range resultPod.Spec.Volumes { if vol.HostPath != nil && strings.Contains(vol.HostPath.Path, "/dev/nvidia") { - return errors.New(errors.ErrCodeInternal, + return nil, errors.New(errors.ErrCodeInternal, fmt.Sprintf("no-claim pod has hostPath volume to %s — isolation broken", vol.HostPath.Path)) } } - return nil + return report, nil } // cleanupDRATestResources removes test resources. Best-effort: errors are ignored @@ -394,13 +434,16 @@ func buildDRATestPod(run *draTestRun) *corev1.Pod { // If the cluster properly mediates GPU access through DRA, this pod will not see GPU devices. // Uses a lightweight image (busybox) since no CUDA libraries are needed — only checking // whether /dev/nvidia* device files are visible. -func buildNoClaimTestPod(run *draTestRun) *corev1.Pod { +// gpuNodeName pins the pod to the GPU node via NodeName, bypassing the scheduler to ensure +// the isolation test runs on a node that actually has GPUs and avoiding scheduler delays. +func buildNoClaimTestPod(run *draTestRun, gpuNodeName string) *corev1.Pod { return &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: run.noClaimPodName, Namespace: draTestNamespace, }, Spec: corev1.PodSpec{ + NodeName: gpuNodeName, RestartPolicy: corev1.RestartPolicyNever, Tolerations: []corev1.Toleration{ {Operator: corev1.TolerationOpExists}, @@ -408,7 +451,7 @@ func buildNoClaimTestPod(run *draTestRun) *corev1.Pod { Containers: []corev1.Container{ { Name: "isolation-test", - Image: "busybox:stable", + Image: "busybox:1.37", Command: []string{ "sh", "-c", "if ls /dev/nvidia* 2>/dev/null; then echo 'FAIL: GPU visible without DRA claim' && exit 1; else echo 'PASS: GPU isolated' && exit 0; fi", @@ -450,3 +493,14 @@ func buildResourceClaim(run *draTestRun) *unstructured.Unstructured { func strPtr(s string) *string { return &s } + +func podExitCode(pod *corev1.Pod) int32 { + if len(pod.Status.ContainerStatuses) == 0 { + return -1 + } + terminated := pod.Status.ContainerStatuses[0].State.Terminated + if terminated == nil { + return -1 + } + return terminated.ExitCode +} diff --git a/pkg/validator/checks/conformance/secure_access_check_unit_test.go b/pkg/validator/checks/conformance/secure_access_check_unit_test.go index 7cb6d172..2cea7f1f 100644 --- a/pkg/validator/checks/conformance/secure_access_check_unit_test.go +++ b/pkg/validator/checks/conformance/secure_access_check_unit_test.go @@ -117,7 +117,7 @@ func TestCheckSecureAcceleratorAccess(t *testing.T) { Name: ga.GetName(), Namespace: draTestNamespace, }, - Spec: *buildNoClaimTestPod(&draTestRun{noClaimPodName: ga.GetName()}).Spec.DeepCopy(), + Spec: *buildNoClaimTestPod(&draTestRun{noClaimPodName: ga.GetName()}, "test-node").Spec.DeepCopy(), Status: corev1.PodStatus{ Phase: corev1.PodSucceeded, },