Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions pkg/validator/checks/conformance/ai_service_metrics_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,34 @@ func checkAIServiceMetricsWithURL(ctx *checks.ValidationContext, promBaseURL str
return errors.New(errors.ErrCodeInternal, "discovery REST client is not available")
}
result := restClient.Get().AbsPath(rawURL).Do(ctx.Context)
var statusCode int
result.StatusCode(&statusCode)
if cmErr := result.Error(); cmErr != nil {
recordArtifact(ctx, "Custom Metrics API", fmt.Sprintf("Status: unavailable\nError: %v", cmErr))
recordArtifact(ctx, "Custom Metrics API",
fmt.Sprintf("Endpoint: %s\nHTTP Status: %d\nStatus: unavailable\nError: %v",
rawURL, statusCode, cmErr))
return errors.Wrap(errors.ErrCodeNotFound,
"custom metrics API not available", cmErr)
}
recordArtifact(ctx, "Custom Metrics API", "Status: available\nEndpoint: /apis/custom.metrics.k8s.io/v1beta1")

groupVersion := "unknown"
resourceCount := 0
discoveryBody, rawErr := result.Raw()
if rawErr == nil {
var discovery struct {
GroupVersion string `json:"groupVersion"`
Resources []json.RawMessage `json:"resources"`
}
if json.Unmarshal(discoveryBody, &discovery) == nil {
if discovery.GroupVersion != "" {
groupVersion = discovery.GroupVersion
}
resourceCount = len(discovery.Resources)
}
}
recordArtifact(ctx, "Custom Metrics API",
fmt.Sprintf("Endpoint: %s\nHTTP Status: %d\nGroupVersion: %s\nAPI Resources: %d\nStatus: available",
rawURL, statusCode, groupVersion, resourceCount))

return nil
}
145 changes: 74 additions & 71 deletions pkg/validator/checks/conformance/cluster_autoscaling_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ const (
karpenterNodePoolLabel = "karpenter.sh/nodepool"
)

type clusterAutoscalingReport struct {
NodePool string
HPADesired int32
HPACurrent int32
BaselineNodes int
ObservedNodes int
TotalPods int
ScheduledPods int
}

func init() {
checks.RegisterCheck(&checks.Check{
Name: "cluster-autoscaling",
Expand Down Expand Up @@ -122,14 +132,19 @@ func CheckClusterAutoscaling(ctx *checks.ValidationContext) error {
var lastErr error
for _, poolName := range gpuNodePoolNames {
slog.Info("attempting behavioral validation with NodePool", "nodePool", poolName)
lastErr = validateClusterAutoscaling(ctx.Context, ctx.Clientset, poolName)
if lastErr == nil {
report, runErr := validateClusterAutoscaling(ctx.Context, ctx.Clientset, poolName)
if runErr == nil {
recordArtifact(ctx, "Cluster Autoscaling Behavioral Test",
fmt.Sprintf("NodePool: %s\nHPA: scaling intent detected\nKarpenter: new node(s) provisioned\nPods: scheduled on new nodes", poolName))
fmt.Sprintf("NodePool: %s\nHPA desired/current: %d/%d\nKarpenter nodes: baseline=%d observed=%d new=%d\nPods scheduled: %d/%d",
report.NodePool,
report.HPADesired, report.HPACurrent,
report.BaselineNodes, report.ObservedNodes, report.ObservedNodes-report.BaselineNodes,
report.ScheduledPods, report.TotalPods))
return nil
}
lastErr = runErr
slog.Debug("behavioral validation failed for NodePool",
"nodePool", poolName, "error", lastErr)
"nodePool", poolName, "error", runErr)
}
return lastErr
}
Expand All @@ -138,11 +153,15 @@ func CheckClusterAutoscaling(ctx *checks.ValidationContext) error {
// Deployment + HPA (external metric) → HPA computes scale-up → Karpenter provisions
// KWOK nodes → pods are scheduled. This proves the chain works end-to-end.
// nodePoolName is the discovered GPU NodePool name from the precheck.
func validateClusterAutoscaling(ctx context.Context, clientset kubernetes.Interface, nodePoolName string) error {
func validateClusterAutoscaling(ctx context.Context, clientset kubernetes.Interface, nodePoolName string) (*clusterAutoscalingReport, error) {
report := &clusterAutoscalingReport{
NodePool: nodePoolName,
}

// Generate unique test resource names and namespace (prevents cross-run interference).
b := make([]byte, 4)
if _, err := rand.Read(b); err != nil {
return errors.Wrap(errors.ErrCodeInternal, "failed to generate random suffix", err)
return nil, errors.Wrap(errors.ErrCodeInternal, "failed to generate random suffix", err)
}
suffix := hex.EncodeToString(b)
nsName := clusterAutoTestPrefix + suffix
Expand All @@ -154,7 +173,7 @@ func validateClusterAutoscaling(ctx context.Context, clientset kubernetes.Interf
ObjectMeta: metav1.ObjectMeta{Name: nsName},
}
if _, err := clientset.CoreV1().Namespaces().Create(ctx, ns, metav1.CreateOptions{}); k8s.IgnoreAlreadyExists(err) != nil {
return errors.Wrap(errors.ErrCodeInternal, "failed to create cluster autoscaling test namespace", err)
return nil, errors.Wrap(errors.ErrCodeInternal, "failed to create cluster autoscaling test namespace", err)
}

// Cleanup: delete namespace (cascades all resources, triggers Karpenter consolidation).
Expand All @@ -175,37 +194,51 @@ func validateClusterAutoscaling(ctx context.Context, clientset kubernetes.Interf
LabelSelector: fmt.Sprintf("%s=%s", karpenterNodePoolLabel, nodePoolName),
})
if err != nil {
return errors.Wrap(errors.ErrCodeInternal, "failed to count baseline Karpenter nodes", err)
return nil, errors.Wrap(errors.ErrCodeInternal, "failed to count baseline Karpenter nodes", err)
}
baselineNodeCount := len(baselineNodes.Items)
report.BaselineNodes = baselineNodeCount
slog.Info("baseline Karpenter node count", "pool", nodePoolName, "count", baselineNodeCount)

// Create Deployment: GPU-requesting pods with Karpenter nodeSelector.
deploy := buildClusterAutoTestDeployment(deployName, nsName, nodePoolName)
if _, err := clientset.AppsV1().Deployments(nsName).Create(
ctx, deploy, metav1.CreateOptions{}); err != nil {
return errors.Wrap(errors.ErrCodeInternal, "failed to create cluster autoscaling test deployment", err)
_, createErr := clientset.AppsV1().Deployments(nsName).Create(
ctx, deploy, metav1.CreateOptions{})
if createErr != nil {
return nil, errors.Wrap(errors.ErrCodeInternal, "failed to create cluster autoscaling test deployment", createErr)
}

// Create HPA targeting external metric dcgm_gpu_power_usage.
hpa := buildClusterAutoTestHPA(hpaName, deployName, nsName)
if _, err := clientset.AutoscalingV2().HorizontalPodAutoscalers(nsName).Create(
ctx, hpa, metav1.CreateOptions{}); err != nil {
return errors.Wrap(errors.ErrCodeInternal, "failed to create cluster autoscaling test HPA", err)
_, createErr = clientset.AutoscalingV2().HorizontalPodAutoscalers(nsName).Create(
ctx, hpa, metav1.CreateOptions{})
if createErr != nil {
return nil, errors.Wrap(errors.ErrCodeInternal, "failed to create cluster autoscaling test HPA", createErr)
}

// Wait for HPA to report scaling intent.
if err := waitForClusterAutoHPAScale(ctx, clientset, nsName, hpaName); err != nil {
return err
desired, current, err := waitForHPAScaleUp(ctx, clientset, nsName, hpaName, "cluster autoscaling")
if err != nil {
return nil, err
}
report.HPADesired = desired
report.HPACurrent = current

// Wait for Karpenter to provision KWOK nodes (above baseline count).
if err := waitForKarpenterNodes(ctx, clientset, nodePoolName, baselineNodeCount); err != nil {
return err
observedNodes, err := waitForKarpenterNodes(ctx, clientset, nodePoolName, baselineNodeCount)
if err != nil {
return nil, err
}
report.ObservedNodes = observedNodes

// Verify pods are scheduled (not Pending) with poll loop.
return verifyPodsScheduled(ctx, clientset, nsName)
totalPods, scheduledPods, err := verifyPodsScheduled(ctx, clientset, nsName)
if err != nil {
return nil, err
}
report.TotalPods = totalPods
report.ScheduledPods = scheduledPods
return report, nil
}

// buildClusterAutoTestDeployment creates a Deployment that requests GPU resources
Expand Down Expand Up @@ -317,45 +350,10 @@ func buildClusterAutoTestHPA(name, deployName, namespace string) *autoscalingv2.
}
}

// waitForClusterAutoHPAScale polls the HPA until desiredReplicas > currentReplicas.
func waitForClusterAutoHPAScale(ctx context.Context, clientset kubernetes.Interface, namespace, hpaName string) error {
waitCtx, cancel := context.WithTimeout(ctx, defaults.HPAScaleTimeout)
defer cancel()

err := wait.PollUntilContextCancel(waitCtx, defaults.HPAPollInterval, true,
func(ctx context.Context) (bool, error) {
hpa, getErr := clientset.AutoscalingV2().HorizontalPodAutoscalers(namespace).Get(
ctx, hpaName, metav1.GetOptions{})
if getErr != nil {
slog.Debug("HPA not ready yet", "error", getErr)
return false, nil
}

desired := hpa.Status.DesiredReplicas
current := hpa.Status.CurrentReplicas
slog.Debug("cluster autoscaling HPA status", "desired", desired, "current", current)

if desired > current {
slog.Info("cluster autoscaling HPA scaling intent detected",
"desiredReplicas", desired, "currentReplicas", current)
return true, nil
}
return false, nil
},
)
if err != nil {
if ctx.Err() != nil || waitCtx.Err() != nil {
return errors.Wrap(errors.ErrCodeTimeout,
"HPA did not report scaling intent — external metrics pipeline may be broken", err)
}
return errors.Wrap(errors.ErrCodeInternal, "HPA scaling intent polling failed", err)
}
return nil
}

// waitForKarpenterNodes polls until nodes with the discovered NodePool label exceed the
// baseline count. This proves Karpenter provisioned NEW nodes, not just pre-existing ones.
func waitForKarpenterNodes(ctx context.Context, clientset kubernetes.Interface, nodePoolName string, baselineNodeCount int) error {
func waitForKarpenterNodes(ctx context.Context, clientset kubernetes.Interface, nodePoolName string, baselineNodeCount int) (int, error) {
var observedNodeCount int
waitCtx, cancel := context.WithTimeout(ctx, defaults.KarpenterNodeTimeout)
defer cancel()

Expand All @@ -369,29 +367,32 @@ func waitForKarpenterNodes(ctx context.Context, clientset kubernetes.Interface,
return false, nil
}

if len(nodes.Items) > baselineNodeCount {
observedNodeCount = len(nodes.Items)
if observedNodeCount > baselineNodeCount {
slog.Info("Karpenter provisioned new KWOK GPU node(s)",
"total", len(nodes.Items), "baseline", baselineNodeCount,
"new", len(nodes.Items)-baselineNodeCount)
"total", observedNodeCount, "baseline", baselineNodeCount,
"new", observedNodeCount-baselineNodeCount)
return true, nil
}
return false, nil
},
)
if err != nil {
if ctx.Err() != nil || waitCtx.Err() != nil {
return errors.Wrap(errors.ErrCodeTimeout,
return 0, errors.Wrap(errors.ErrCodeTimeout,
"Karpenter did not provision GPU nodes within timeout", err)
}
return errors.Wrap(errors.ErrCodeInternal, "Karpenter node polling failed", err)
return 0, errors.Wrap(errors.ErrCodeInternal, "Karpenter node polling failed", err)
}
return nil
return observedNodeCount, nil
}

// verifyPodsScheduled polls until pods in the unique test namespace are scheduled (not Pending).
// This proves the full chain: HPA → scale → Karpenter → nodes → pods scheduled.
// The namespace is unique per run, so all pods belong to this test — no stale pod interference.
func verifyPodsScheduled(ctx context.Context, clientset kubernetes.Interface, namespace string) error {
func verifyPodsScheduled(ctx context.Context, clientset kubernetes.Interface, namespace string) (int, int, error) {
var observedTotal int
var observedScheduled int
waitCtx, cancel := context.WithTimeout(ctx, defaults.PodScheduleTimeout)
defer cancel()

Expand All @@ -403,8 +404,9 @@ func verifyPodsScheduled(ctx context.Context, clientset kubernetes.Interface, na
return false, nil
}

if len(pods.Items) < 2 {
slog.Debug("waiting for HPA-scaled pods", "count", len(pods.Items))
observedTotal = len(pods.Items)
if observedTotal < 2 {
slog.Debug("waiting for HPA-scaled pods", "count", observedTotal)
return false, nil
}

Expand All @@ -415,23 +417,24 @@ func verifyPodsScheduled(ctx context.Context, clientset kubernetes.Interface, na
}
}

observedScheduled = scheduled
slog.Debug("cluster autoscaling pod status",
"total", len(pods.Items), "scheduled", scheduled)
"total", observedTotal, "scheduled", observedScheduled)

if scheduled >= 2 {
if observedScheduled >= 2 {
slog.Info("cluster autoscaling pods verified",
"total", len(pods.Items), "scheduled", scheduled)
"total", observedTotal, "scheduled", observedScheduled)
return true, nil
}
return false, nil
},
)
if err != nil {
if ctx.Err() != nil || waitCtx.Err() != nil {
return errors.Wrap(errors.ErrCodeTimeout,
return 0, 0, errors.Wrap(errors.ErrCodeTimeout,
"test pods not scheduled within timeout — Karpenter nodes may not be ready", err)
}
return errors.Wrap(errors.ErrCodeInternal, "pod scheduling verification failed", err)
return 0, 0, errors.Wrap(errors.ErrCodeInternal, "pod scheduling verification failed", err)
}
return nil
return observedTotal, observedScheduled, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ func TestValidateClusterAutoscaling(t *testing.T) {
defer cancel()
}

err := validateClusterAutoscaling(ctx, clientset, testNodePool)
report, err := validateClusterAutoscaling(ctx, clientset, testNodePool)

if (err != nil) != tt.wantErr {
t.Errorf("validateClusterAutoscaling() error = %v, wantErr %v", err, tt.wantErr)
Expand All @@ -409,6 +409,22 @@ func TestValidateClusterAutoscaling(t *testing.T) {
t.Errorf("validateClusterAutoscaling() error = %v, should contain %q", err, tt.errContains)
}
}

if !tt.wantErr {
if report == nil {
t.Fatal("validateClusterAutoscaling() report = nil, want non-nil")
}
if report.HPADesired != tt.hpaDesired || report.HPACurrent != tt.hpaCurrent {
t.Errorf("report HPA desired/current = %d/%d, want %d/%d",
report.HPADesired, report.HPACurrent, tt.hpaDesired, tt.hpaCurrent)
}
if report.ObservedNodes != tt.kwokNodes {
t.Errorf("report observed nodes = %d, want %d", report.ObservedNodes, tt.kwokNodes)
}
if report.TotalPods != tt.podCount {
t.Errorf("report total pods = %d, want %d", report.TotalPods, tt.podCount)
}
}
})
}
}
Expand Down
Loading
Loading