diff --git a/vertical-pod-autoscaler/benchmark/README.md b/vertical-pod-autoscaler/benchmark/README.md index aae4407f0e36..2118ea8c33b4 100644 --- a/vertical-pod-autoscaler/benchmark/README.md +++ b/vertical-pod-autoscaler/benchmark/README.md @@ -2,8 +2,6 @@ Measures VPA component latencies using KWOK (Kubernetes WithOut Kubelet) to simulate pods without real resource consumption. -> **Note:** Currently only updater metrics are collected. Recommender metrics are planned for the future. - - [Prerequisites](#prerequisites) - [Quick Start (Local)](#quick-start-local) @@ -12,7 +10,9 @@ Measures VPA component latencies using KWOK (Kubernetes WithOut Kubelet) to simu - [Profiles](#profiles) - [Flags](#flags) - [Metrics Collected](#metrics-collected) + - [Recommender Metrics](#recommender-metrics) - [Updater Metrics](#updater-metrics) + - [Admission Controller Metrics](#admission-controller-metrics) - [Scripts](#scripts) - [Cleanup](#cleanup) - [Notes](#notes) @@ -22,7 +22,7 @@ Measures VPA component latencies using KWOK (Kubernetes WithOut Kubelet) to simu ## Prerequisites -- Go 1.21+ +- Go 1.25+ - kubectl - Kind - Helm @@ -68,30 +68,58 @@ go build -C benchmark -o ../bin/vpa-benchmark . The benchmark program (`main.go`) assumes the cluster is already set up with VPA, KWOK, and the fake node. It then: 1. For each profile run: - - Scales down VPA components - - Cleans up previous benchmark resources + - Scales down all VPA components and cleans up previous benchmark resources - Creates ReplicaSets with fake pods assigned directly to KWOK node (bypasses scheduler) - - Creates noise ReplicaSets (if `--noise-ratio` > 0) — these are not managed by any VPA + - Creates noise ReplicaSets (if `--noise-percentage` > 0) — these are not managed by any VPA - Creates VPAs targeting managed ReplicaSets only - - Scales up recommender, waits for recommendations + - Scales up recommender and admission controller, waits for recommendations + - Scrapes recommender execution latency metrics - Scales up updater, waits for its loop to complete - - Scrapes `vpa_updater_execution_latency_seconds_sum` metrics -2. Outputs results to stdout and/or a CSV file if specified + - Scrapes updater and admission controller execution latency metrics +2. Outputs per-run tables (with Avg column when multiple runs) and cross-profile summary tables to stdout and/or a CSV file + +> [!NOTE] +> Recommender and updater latencies are cumulative sums from a single loop. Admission controller latencies are per-request averages (sum divided by request count), since it handles many requests per benchmark run. e.g., of output using this command: `bin/vpa-benchmark --profile=small,large,xxlarge` -```bash -========== Results ========== +``` +========== Results [Recommender] ========== +┌─────────────────────┬───────────────┬────────────────┬───────────────────┐ +│ STEP │ SMALL ( 25 ) │ LARGE ( 250 ) │ XXLARGE ( 1000 ) │ +├─────────────────────┼───────────────┼────────────────┼───────────────────┤ +│ LoadVPAs │ 0.0005s │ 0.0022s │ 0.0099s │ +│ LoadPods │ 0.0007s │ 0.0138s │ 0.1869s │ +│ LoadMetrics │ 0.0031s │ 0.0055s │ 0.0036s │ +│ UpdateVPAs │ 0.0142s │ 0.5050s │ 8.0046s │ +│ MaintainCheckpoints │ 0.0174s │ 3.0046s │ 18.0054s │ +│ GarbageCollect │ 0.0001s │ 0.0055s │ 0.0426s │ +│ total │ 0.0361s │ 3.5367s │ 26.2529s │ +└─────────────────────┴───────────────┴────────────────┴───────────────────┘ + +========== Results [Updater] ========== ┌───────────────┬───────────────┬────────────────┬───────────────────┐ │ STEP │ SMALL ( 25 ) │ LARGE ( 250 ) │ XXLARGE ( 1000 ) │ ├───────────────┼───────────────┼────────────────┼───────────────────┤ -│ AdmissionInit │ 0.0000s │ 0.0001s │ 0.0004s │ -│ EvictPods │ 2.4239s │ 24.5535s │ 98.6963s │ -│ FilterPods │ 0.0002s │ 0.0020s │ 0.0925s │ -│ ListPods │ 0.0001s │ 0.0006s │ 0.0025s │ -│ ListVPAs │ 0.0024s │ 0.0030s │ 0.0027s │ -│ total │ 2.4267s │ 24.5592s │ 98.7945s │ +│ ListVPAs │ 0.0021s │ 0.0020s │ 0.0023s │ +│ ListPods │ 0.0001s │ 0.0004s │ 0.0022s │ +│ FilterPods │ 0.0001s │ 0.0016s │ 0.0242s │ +│ AdmissionInit │ 0.0000s │ 0.0001s │ 0.0003s │ +│ EvictPods │ 2.3205s │ 24.5523s │ 98.5502s │ +│ total │ 2.3229s │ 24.5565s │ 98.5792s │ └───────────────┴───────────────┴────────────────┴───────────────────┘ + +========== Results [Admission Controller] ========== +┌────────────────┬───────────────┬────────────────┬───────────────────┐ +│ STEP │ SMALL ( 25 ) │ LARGE ( 250 ) │ XXLARGE ( 1000 ) │ +├────────────────┼───────────────┼────────────────┼───────────────────┤ +│ read_request │ 0.0000s │ 0.0000s │ 0.0000s │ +│ admit │ 0.0004s │ 0.0005s │ 0.0007s │ +│ build_response │ 0.0000s │ 0.0000s │ 0.0000s │ +│ write_response │ 0.0000s │ 0.0000s │ 0.0000s │ +│ request_count │ 26 │ 251 │ 1001 │ +│ total │ 0.0005s │ 0.0005s │ 0.0007s │ +└────────────────┴───────────────┴────────────────┴───────────────────┘ ``` We can then compare the results of a code change with the results of the main branch. @@ -121,10 +149,26 @@ When `--noise-percentage=P` is set, each profile also creates `P%` additional no ## Metrics Collected +All metrics are scraped from each component's `/metrics` endpoint via port-forwarding. Values are parsed from `vpa__execution_latency_seconds` histograms. Admission controller values are per-request averages. + +### Recommender Metrics + +Steps are listed in execution order. + +| Step | Description | +| ---- | ----------- | +| `LoadVPAs` | Load VPA objects | +| `LoadPods` | Load pods matching VPA targets | +| `LoadMetrics` | Load metrics from metrics-server | +| `UpdateVPAs` | Compute and write recommendations | +| `MaintainCheckpoints` | Create/update VPA checkpoints | +| `GarbageCollect` | Clean up stale data | +| `total` | Total loop time | + ### Updater Metrics -| Metric | Description | -| ------ | ----------- | +| Step | Description | +| ---- | ----------- | | `ListVPAs` | List VPA objects | | `ListPods` | List pods matching VPA targets | | `FilterPods` | Filter evictable pods | @@ -132,6 +176,17 @@ When `--noise-percentage=P` is set, each profile also creates `P%` additional no | `EvictPods` | Evict pods needing updates | | `total` | Total loop time | +### Admission Controller Metrics + +| Step | Description | +| ---- | ----------- | +| `read_request` | Parse incoming admission request | +| `admit` | Compute resource recommendations for the pod | +| `build_response` | Build admission response | +| `write_response` | Write response back to API server | +| `request_count` | Total number of admission requests handled | +| `total` | Total per-request time | + ## Scripts | Script | Purpose | @@ -147,7 +202,6 @@ Environment variables accepted by the scripts: | `KWOK_VERSION` | `v0.7.0` | `install-kwok.sh` | | `KWOK_NAMESPACE` | `kube-system` | `install-kwok.sh` | | `KWOK_NODE_NAME` | `kwok-node` | `install-kwok.sh` | -| `VPA_NAMESPACE` | `kube-system` | `configure-vpa.sh` | | `KIND_CLUSTER_NAME` | `kind` | `full-benchmark.sh` | ## Cleanup @@ -172,6 +226,6 @@ The benchmark includes several performance optimizations: ### Caveats -- The updater uses `time.Tick` which waits the full interval before the first tick, so the benchmark sleeps 2 minutes before polling for metrics +- The updater uses `time.Tick` which waits the full interval before the first tick, so the benchmark polls for up to 5 minutes waiting for the updater's `total` metric to appear. - The benchmark uses Recreate update mode. In-place scaling is not supported on KWOK pods. - The benchmark scales down all VPA components at the start of each run, so that any caching is not a factor. diff --git a/vertical-pod-autoscaler/benchmark/main.go b/vertical-pod-autoscaler/benchmark/main.go index 87888dd04012..75143d9ddf01 100644 --- a/vertical-pod-autoscaler/benchmark/main.go +++ b/vertical-pod-autoscaler/benchmark/main.go @@ -17,60 +17,26 @@ limitations under the License. package main import ( - "bufio" "context" "flag" "fmt" - "io" - "net/http" - "os" - "regexp" - "sort" - "strconv" "strings" "time" - "github.com/olekukonko/tablewriter" - "golang.org/x/sync/errgroup" - - appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" - "k8s.io/client-go/tools/portforward" - "k8s.io/client-go/transport/spdy" - "k8s.io/client-go/util/retry" "k8s.io/klog/v2" - autoscalingv1 "k8s.io/api/autoscaling/v1" - vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1" vpa_clientset "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned" -) -const ( - benchmarkNamespace = "benchmark" - kwokNodeName = "kwok-node" - vpaNamespace = "kube-system" + "k8s.io/autoscaler/vertical-pod-autoscaler/benchmark/pkg/cluster" + "k8s.io/autoscaler/vertical-pod-autoscaler/benchmark/pkg/component" + "k8s.io/autoscaler/vertical-pod-autoscaler/benchmark/pkg/results" ) -const replicasPerReplicaSet = 2 - -// profiles defines number of ReplicaSets (each with replicasPerReplicaSet pods) -var profiles = map[string]int{ - "small": 25, // 25 VPAs, 25 ReplicaSets, 50 pods - "medium": 100, // 100 VPAs, 100 ReplicaSets, 200 pods - "large": 250, // 250 VPAs, 250 ReplicaSets, 500 pods - "xlarge": 500, // 500 VPAs, 500 ReplicaSets, 1000 pods - "xxlarge": 1000, // 1000 VPAs, 1000 ReplicaSets, 2000 pods -} - -var restConfig *rest.Config - func main() { var kubeconfig string var profilesFlag string @@ -79,7 +45,7 @@ func main() { var noisePercentage int flag.StringVar(&kubeconfig, "kubeconfig", "", "path to kubeconfig (default: $KUBECONFIG or ~/.kube/config)") - flag.StringVar(&profilesFlag, "profile", "small", "benchmark profiles (comma-separated): small,medium,large,xlarge,xxlarge,huge") + flag.StringVar(&profilesFlag, "profile", "small", "benchmark profiles (comma-separated): small,medium,large,xlarge,xxlarge") flag.IntVar(&runs, "runs", 1, "number of benchmark runs per profile for averaging") flag.StringVar(&outputFile, "output", "", "path to output CSV file (optional)") flag.IntVar(&noisePercentage, "noise-percentage", 0, "percentage of additional noise (unmanaged) ReplicaSets relative to managed ReplicaSets (0 = no noise) (optional)") @@ -89,7 +55,7 @@ func main() { profileList := strings.Split(profilesFlag, ",") for _, p := range profileList { p = strings.TrimSpace(p) - if _, ok := profiles[p]; !ok { + if _, ok := cluster.Profiles[p]; !ok { klog.Fatalf("Unknown profile: %s", p) } } @@ -108,10 +74,8 @@ func main() { if err != nil { klog.Fatalf("Error building kubeconfig: %v", err) } - // Increase rate limits to avoid client-side throttling during benchmark config.QPS = 200 config.Burst = 400 - restConfig = config kubeClient, err := kubernetes.NewForConfig(config) if err != nil { @@ -124,631 +88,167 @@ func main() { } ctx := context.Background() + components := component.NewComponents(kubeClient, config) - fmt.Printf("=== VPA Benchmark with KWOK ===\n") - fmt.Printf("Profiles: %v, Runs per profile: %d\n\n", profileList, runs) + klog.Infof("=== VPA Benchmark with KWOK ===") + klog.Infof("Profiles: %v, Runs per profile: %d", profileList, runs) - // Ensure namespace - ns := &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: benchmarkNamespace}} + ns := &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: cluster.BenchmarkNamespace}} kubeClient.CoreV1().Namespaces().Create(ctx, ns, metav1.CreateOptions{}) - // Collect results for all profiles - profileResults := make(map[string]map[string]float64) - // Store all individual run results for summary - allRunResults := make(map[string][]map[string]float64) + profileResults := make(map[string]results.ComponentResults) + allRunResults := make(map[string][]results.ComponentResults) for _, profile := range profileList { profile = strings.TrimSpace(profile) - count := profiles[profile] + count := cluster.Profiles[profile] noiseCount := count * noisePercentage / 100 if noiseCount > 0 { - fmt.Printf("\n========== Profile: %s (%d VPAs, %d noise RS, %d total pods) ==========\n", - profile, count, noiseCount, (count+noiseCount)*replicasPerReplicaSet) + klog.Infof("========== Profile: %s (%d VPAs, %d noise RS, %d total pods) ==========", + profile, count, noiseCount, (count+noiseCount)*cluster.ReplicasPerReplicaSet) } else { - fmt.Printf("\n========== Profile: %s (%d VPAs) ==========\n", profile, count) + klog.Infof("========== Profile: %s (%d VPAs) ==========", profile, count) } - // Collect results from all runs for this profile - allResults := make([]map[string]float64, 0, runs) + allResults := make([]results.ComponentResults, 0, runs) for run := 1; run <= runs; run++ { if runs > 1 { - fmt.Printf("\n--- Run %d/%d ---\n", run, runs) + klog.Infof("--- Run %d/%d ---", run, runs) } - // Run one benchmark iteration - latencies, err := runBenchmarkIteration(ctx, kubeClient, vpaClient, count, noiseCount) + iterResults, err := runBenchmarkIteration(ctx, kubeClient, vpaClient, components, count, noiseCount) if err != nil { klog.Warningf("Run %d failed: %v", run, err) continue } - allResults = append(allResults, latencies) - - // Print this run's results - if runs > 1 { - printLatencies(latencies, fmt.Sprintf("Run %d Results", run)) - } + allResults = append(allResults, iterResults) } if len(allResults) == 0 { - fmt.Printf("No successful runs for profile %s!\n", profile) + klog.Warningf("No successful runs for profile %s!", profile) continue } - // Store all run results for summary allRunResults[profile] = allResults - - // Calculate average for this profile - var avgLatencies map[string]float64 - if runs == 1 { - avgLatencies = allResults[0] - } else { - avgLatencies = averageLatencies(allResults) - } - profileResults[profile] = avgLatencies - - printLatencies(avgLatencies, fmt.Sprintf("%s Results", profile)) + profileResults[profile] = results.AverageResults(allResults) } - // Final cleanup - fmt.Println("\nFinal cleanup...") - cleanupBenchmarkResources(ctx, kubeClient, vpaClient) + klog.Infof("Final cleanup...") + cluster.CleanupBenchmarkResources(ctx, kubeClient, vpaClient) - // Print run summary if multiple runs - if runs > 1 { - printRunSummary(profileList, allRunResults) - } + results.PrintRunSummary(profileList, allRunResults) - // Print results table and write to file if len(profileResults) > 0 { - printResultsTable(profileList, profileResults, outputFile, noisePercentage) + results.PrintResultsTable(profileList, profileResults, cluster.Profiles, outputFile, noisePercentage) } - fmt.Println("\nBenchmark completed successfully.") + klog.Infof("Benchmark completed successfully.") } -func runBenchmarkIteration(ctx context.Context, kubeClient kubernetes.Interface, vpaClient vpa_clientset.Interface, count int, noiseCount int) (map[string]float64, error) { - // Step 1: Scale down VPA components - fmt.Println("Scaling down VPA components...") - if err := scaleDownVPAComponents(ctx, kubeClient); err != nil { - return nil, fmt.Errorf("failed to scale down VPA components: %v", err) +// runBenchmarkIteration runs a single benchmark iteration: +// 1. Scale down all VPA components, clean up previous resources. +// 2. Create ReplicaSets, VPAs, and wait for KWOK pods. +// 3. Scale up recommender + admission controller, wait for recommendations. +// 4. Scrape recommender latency metrics. +// 5. Scale up updater, scrape its latency metrics once its loop completes. +// 6. Scrape admission controller metrics (accumulated during updater evictions). +func runBenchmarkIteration(ctx context.Context, kubeClient kubernetes.Interface, vpaClient vpa_clientset.Interface, components *component.Components, count int, noiseCount int) (results.ComponentResults, error) { + klog.Infof("Scaling down VPA components...") + for _, c := range components.All { + if err := c.ScaleDown(ctx); err != nil { + return nil, err + } } - // Step 2: Delete all VPA checkpoints - fmt.Println("Deleting all VPA checkpoints...") - deleteAllVPACheckpoints(ctx, vpaClient) - - // Step 3: Cleanup any existing benchmark resources - fmt.Println("Cleaning up existing benchmark resources...") - cleanupBenchmarkResources(ctx, kubeClient, vpaClient) - - // Step 4: Create ReplicaSets (targeting KWOK node) - fmt.Printf("Creating %d ReplicaSets (%d pods each, %d total)...\n", count, replicasPerReplicaSet, count*replicasPerReplicaSet) - createInParallel(ctx, count, func(ctx context.Context, name string) { - rs := makeReplicaSet(name) - err := withRetry(func() error { - _, err := kubeClient.AppsV1().ReplicaSets(benchmarkNamespace).Create(ctx, rs, metav1.CreateOptions{}) - if errors.IsAlreadyExists(err) { - return nil - } - return err - }) - if err != nil { - klog.Warningf("Error creating ReplicaSet %s: %v", name, err) - } + klog.Infof("Deleting all VPA checkpoints...") + cluster.DeleteAllVPACheckpoints(ctx, vpaClient) + + klog.Infof("Cleaning up existing benchmark resources...") + cluster.CleanupBenchmarkResources(ctx, kubeClient, vpaClient) + + klog.Infof("Creating %d ReplicaSets (%d pods each, %d total)...", count, cluster.ReplicasPerReplicaSet, count*cluster.ReplicasPerReplicaSet) + cluster.CreateInParallel(ctx, count, func(ctx context.Context, name string) { + cluster.CreateReplicaSet(ctx, kubeClient, name) }) - // Step 4b: Create noise ReplicaSets (not managed by any VPA) if noiseCount > 0 { - fmt.Printf("Creating %d noise ReplicaSets (%d pods each, %d noise pods)...\n", - noiseCount, replicasPerReplicaSet, noiseCount*replicasPerReplicaSet) - g, gctx := errgroup.WithContext(ctx) - g.SetLimit(50) - for i := range noiseCount { - g.Go(func() error { - name := fmt.Sprintf("noise-%d", i) - rs := makeReplicaSet(name) - err := withRetry(func() error { - _, err := kubeClient.AppsV1().ReplicaSets(benchmarkNamespace).Create(gctx, rs, metav1.CreateOptions{}) - if errors.IsAlreadyExists(err) { - return nil - } - return err - }) - if err != nil { - klog.Warningf("Error creating noise ReplicaSet %s: %v", name, err) - } - return nil - }) - } - g.Wait() + klog.Infof("Creating %d noise ReplicaSets (%d pods each, %d noise pods)...", + noiseCount, cluster.ReplicasPerReplicaSet, noiseCount*cluster.ReplicasPerReplicaSet) + cluster.CreateNoiseReplicaSets(ctx, kubeClient, noiseCount) } - // Step 5: Create VPAs (while VPA components are still down) - fmt.Printf("Creating %d VPAs...\n", count) - createInParallel(ctx, count, func(ctx context.Context, name string) { - vpa := makeVPA(name) - err := withRetry(func() error { - _, err := vpaClient.AutoscalingV1().VerticalPodAutoscalers(benchmarkNamespace).Create(ctx, vpa, metav1.CreateOptions{}) - if errors.IsAlreadyExists(err) { - return nil - } - return err - }) - if err != nil { - klog.Warningf("Error creating VPA %s: %v", name, err) - } + klog.Infof("Creating %d VPAs...", count) + cluster.CreateInParallel(ctx, count, func(ctx context.Context, name string) { + cluster.CreateVPA(ctx, vpaClient, name) }) - // Step 6: Wait for pods to be running - expectedPods := (count + noiseCount) * replicasPerReplicaSet - fmt.Printf("Waiting for %d KWOK pods to be running (%d managed + %d noise)...\n", - expectedPods, count*replicasPerReplicaSet, noiseCount*replicasPerReplicaSet) + expectedPods := (count + noiseCount) * cluster.ReplicasPerReplicaSet + klog.Infof("Waiting for %d KWOK pods to be running (%d managed + %d noise)...", + expectedPods, count*cluster.ReplicasPerReplicaSet, noiseCount*cluster.ReplicasPerReplicaSet) err := wait.PollUntilContextTimeout(ctx, 2*time.Second, 5*time.Minute, true, func(ctx context.Context) (bool, error) { - pods, _ := kubeClient.CoreV1().Pods(benchmarkNamespace).List(ctx, metav1.ListOptions{ + pods, _ := kubeClient.CoreV1().Pods(cluster.BenchmarkNamespace).List(ctx, metav1.ListOptions{ FieldSelector: "status.phase=Running", }) - fmt.Printf(" Pods: %d/%d\n", len(pods.Items), expectedPods) + klog.Infof("> Pods: %d/%d", len(pods.Items), expectedPods) return len(pods.Items) >= expectedPods, nil }) if err != nil { return nil, fmt.Errorf("timeout waiting for pods: %v", err) } - // Step 7: Scale up recommender (not updater yet) - fmt.Println("Scaling up recommender...") - if err := scaleUpRecommender(ctx, kubeClient); err != nil { - return nil, fmt.Errorf("failed to scale up recommender: %v", err) + klog.Infof("Scaling up recommender and admission controller...") + for _, c := range []*component.Component{components.Recommender, components.Admission} { + if err := c.ScaleUp(ctx); err != nil { + return nil, err + } } - // Step 8: Wait for VPA recommendations - fmt.Println("Waiting for VPA recommendations...") + klog.Infof("Waiting for VPA recommendations...") wait.PollUntilContextTimeout(ctx, 5*time.Second, 2*time.Minute, true, func(ctx context.Context) (bool, error) { - vpas, _ := vpaClient.AutoscalingV1().VerticalPodAutoscalers(benchmarkNamespace).List(ctx, metav1.ListOptions{}) + vpas, _ := vpaClient.AutoscalingV1().VerticalPodAutoscalers(cluster.BenchmarkNamespace).List(ctx, metav1.ListOptions{}) withRec := 0 for _, v := range vpas.Items { if v.Status.Recommendation != nil { withRec++ } } - fmt.Printf(" VPAs with recommendations: %d/%d\n", withRec, count) + klog.Infof("> VPAs with recommendations: %d/%d", withRec, count) return withRec == count, nil }) - // Step 9: Scale up updater (now that recommendations exist) - fmt.Println("Scaling up updater...") - if err := scaleUpUpdater(ctx, kubeClient); err != nil { - return nil, fmt.Errorf("failed to scale up updater: %v", err) - } - - // Step 10: Wait for updater's first loop - // The updater uses time.Tick which waits the full interval before the first tick - // We set --updater-interval=2m, so wait 2 minutes for the first loop to start - fmt.Println("Waiting 2 minutes for updater's first loop...") - time.Sleep(2 * time.Minute) - - // Step 11: Poll for updater metrics (until 'total' step appears) - // this step appears because of the "defer timer.ObserveTotal()" in updater.RunOnce() - fmt.Println("Polling for updater metrics...") - return waitForAndScrapeMetrics(ctx, kubeClient) -} - -func scaleDeployment(ctx context.Context, kubeClient kubernetes.Interface, namespace, name string, replicas int32) error { - return withRetry(func() error { - scale, err := kubeClient.AppsV1().Deployments(namespace).GetScale(ctx, name, metav1.GetOptions{}) - if err != nil { - return err - } - scale.Spec.Replicas = replicas - _, err = kubeClient.AppsV1().Deployments(namespace).UpdateScale(ctx, name, scale, metav1.UpdateOptions{}) - return err - }) -} - -func scaleVPADeployments(ctx context.Context, kubeClient kubernetes.Interface, replicas int32) error { - deployments := []string{"vpa-updater", "vpa-recommender", "vpa-admission-controller"} - for _, name := range deployments { - if err := scaleDeployment(ctx, kubeClient, vpaNamespace, name, replicas); err != nil { - return err - } - } - return nil -} - -func scaleDownVPAComponents(ctx context.Context, kubeClient kubernetes.Interface) error { - // Scale all VPA deployments to 0 - if err := scaleVPADeployments(ctx, kubeClient, 0); err != nil { - return err - } - - // Wait for pods to be gone - wait.PollUntilContextTimeout(ctx, 2*time.Second, 60*time.Second, true, func(ctx context.Context) (bool, error) { - pods, _ := kubeClient.CoreV1().Pods(vpaNamespace).List(ctx, metav1.ListOptions{ - LabelSelector: "app in (vpa-updater,vpa-recommender,vpa-admission-controller)", - }) - if len(pods.Items) == 0 { - return true, nil - } - fmt.Printf(" Waiting for %d VPA pods to terminate...\n", len(pods.Items)) - return false, nil - }) - fmt.Println(" VPA components scaled down") - return nil -} - -func scaleUpRecommender(ctx context.Context, kubeClient kubernetes.Interface) error { - // Scale recommender and admission-controller to 1 - if err := scaleDeployment(ctx, kubeClient, vpaNamespace, "vpa-recommender", 1); err != nil { - return err - } - if err := scaleDeployment(ctx, kubeClient, vpaNamespace, "vpa-admission-controller", 1); err != nil { - return err - } - fmt.Println(" Waiting for recommender to be ready...") - if err := waitForVPAPodReady(ctx, kubeClient, "recommender"); err != nil { - return err - } - fmt.Println(" Recommender ready") - return nil -} - -func scaleUpUpdater(ctx context.Context, kubeClient kubernetes.Interface) error { - if err := scaleDeployment(ctx, kubeClient, vpaNamespace, "vpa-updater", 1); err != nil { - return err - } - fmt.Println(" Waiting for updater to be ready...") - if err := waitForVPAPodReady(ctx, kubeClient, "updater"); err != nil { - return err - } - fmt.Println(" Updater ready") - return nil -} - -func waitForVPAPodReady(ctx context.Context, kubeClient kubernetes.Interface, appLabel string) error { - return wait.PollUntilContextTimeout(ctx, 2*time.Second, 120*time.Second, true, func(ctx context.Context) (bool, error) { - pods, _ := kubeClient.CoreV1().Pods(vpaNamespace).List(ctx, metav1.ListOptions{ - LabelSelector: fmt.Sprintf("app.kubernetes.io/component=%s", appLabel), - }) - for _, p := range pods.Items { - if p.Status.Phase == corev1.PodRunning { - allReady := true - for _, c := range p.Status.ContainerStatuses { - if !c.Ready { - allReady = false - break - } - } - if allReady { - return true, nil - } - } - } - return false, nil - }) -} - -func deleteAllVPACheckpoints(ctx context.Context, vpaClient vpa_clientset.Interface) { - nsList, _ := vpaClient.AutoscalingV1().VerticalPodAutoscalerCheckpoints("").List(ctx, metav1.ListOptions{}) - for _, cp := range nsList.Items { - vpaClient.AutoscalingV1().VerticalPodAutoscalerCheckpoints(cp.Namespace).Delete(ctx, cp.Name, metav1.DeleteOptions{}) - } - fmt.Printf(" Deleted %d VPA checkpoints\n", len(nsList.Items)) -} - -func makeReplicaSet(name string) *appsv1.ReplicaSet { - replicas := int32(replicasPerReplicaSet) - return &appsv1.ReplicaSet{ - ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: benchmarkNamespace}, - Spec: appsv1.ReplicaSetSpec{ - Replicas: &replicas, - Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"app": name}}, - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"app": name}}, - Spec: corev1.PodSpec{ - // Directly assign to KWOK node - bypasses scheduler for faster pod creation - NodeName: kwokNodeName, - // Tolerate the kwok node taint - Tolerations: []corev1.Toleration{{ - Key: "kwok.x-k8s.io/node", - Operator: corev1.TolerationOpEqual, - Value: "fake", - Effect: corev1.TaintEffectNoSchedule, - }}, - Containers: []corev1.Container{{ - Name: "app", - Image: "registry.k8s.io/pause:3.10", - Resources: corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("10m"), - corev1.ResourceMemory: resource.MustParse("10Mi"), - }, - }, - }}, - }, - }, - }, - } -} - -func makeVPA(name string) *vpa_types.VerticalPodAutoscaler { - // KWOK pods don't support in-place scaling right now, so we use Recreate - mode := vpa_types.UpdateModeRecreate - return &vpa_types.VerticalPodAutoscaler{ - ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: benchmarkNamespace}, - Spec: vpa_types.VerticalPodAutoscalerSpec{ - TargetRef: &autoscalingv1.CrossVersionObjectReference{ - APIVersion: "apps/v1", - Kind: "ReplicaSet", - Name: name, - }, - UpdatePolicy: &vpa_types.PodUpdatePolicy{UpdateMode: &mode}, - }, - } -} + iterResults := make(results.ComponentResults) -func cleanupBenchmarkResources(ctx context.Context, kubeClient kubernetes.Interface, vpaClient vpa_clientset.Interface) { - vpaClient.AutoscalingV1().VerticalPodAutoscalers(benchmarkNamespace).DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{}) - - kubeClient.AppsV1().ReplicaSets(benchmarkNamespace).DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{}) - - wait.PollUntilContextTimeout(ctx, time.Second, 60*time.Second, true, func(ctx context.Context) (bool, error) { - pods, _ := kubeClient.CoreV1().Pods(benchmarkNamespace).List(ctx, metav1.ListOptions{}) - return len(pods.Items) == 0, nil - }) -} - -func waitForAndScrapeMetrics(ctx context.Context, kubeClient kubernetes.Interface) (map[string]float64, error) { - pods, err := kubeClient.CoreV1().Pods(vpaNamespace).List(ctx, metav1.ListOptions{LabelSelector: "app.kubernetes.io/component=updater"}) - if err != nil || len(pods.Items) == 0 { - return nil, fmt.Errorf("no vpa-updater pod found") - } - - // The port-forward is configured to listen on this arbitrary port - port := 18943 - - podName := pods.Items[0].Name - stopChan := make(chan struct{}) - readyChan := make(chan struct{}) - - go func() { - url := kubeClient.CoreV1().RESTClient().Post(). - Resource("pods").Namespace(vpaNamespace).Name(podName). - SubResource("portforward").URL() - - transport, upgrader, _ := spdy.RoundTripperFor(restConfig) - dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url) - pf, _ := portforward.New(dialer, []string{fmt.Sprintf("%d:8943", port)}, stopChan, readyChan, io.Discard, io.Discard) - pf.ForwardPorts() - }() - - select { - case <-readyChan: - case <-time.After(10 * time.Second): - close(stopChan) - return nil, fmt.Errorf("port-forward timeout") - } - defer close(stopChan) - - // Poll metrics endpoint until 'total' step appears (indicates loop completion) - // 3 minute timeout since we've already waited interval time for the loop to start - var latencies map[string]float64 - startTime := time.Now() - err = wait.PollUntilContextTimeout(ctx, 10*time.Second, 3*time.Minute, true, func(ctx context.Context) (bool, error) { - resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", port)) - if err != nil { - return false, nil // Keep trying - } - defer resp.Body.Close() - - latencies, err = parseMetrics(resp.Body) - if err != nil { - return false, nil - } - - // Check if 'total' step is present - indicates loop completed - if _, ok := latencies["total"]; ok { - return true, nil - } - fmt.Printf(" Waiting for updater loop to complete. Elapsed: %.2fs\n", time.Since(startTime).Seconds()) - return false, nil - }) + klog.Infof("Scraping recommender metrics...") + recResults, stopRecommender, err := components.Recommender.Scrape(ctx) if err != nil { - return nil, fmt.Errorf("timed out waiting for updater metrics: %v", err) - } - - return latencies, nil -} - -func parseMetrics(body io.Reader) (map[string]float64, error) { - latencies := make(map[string]float64) - re := regexp.MustCompile(`vpa_updater_execution_latency_seconds_sum\{step="([^"]+)"\}\s+([\d.e+-]+)`) - scanner := bufio.NewScanner(body) - for scanner.Scan() { - if m := re.FindStringSubmatch(scanner.Text()); len(m) == 3 { - if v, err := strconv.ParseFloat(m[2], 64); err == nil { - latencies[m[1]] = v - } - } - } - return latencies, nil -} - -func printLatencies(latencies map[string]float64, title string) { - fmt.Printf("\n=== %s ===\n", title) - var steps []string - for k := range latencies { - steps = append(steps, k) - } - sort.Strings(steps) - for _, s := range steps { - fmt.Printf(" %-15s: %.4fs\n", s, latencies[s]) - } -} - -func averageLatencies(results []map[string]float64) map[string]float64 { - if len(results) == 0 { - return nil - } - - stepCounts := make(map[string]int) - stepSums := make(map[string]float64) - - for _, r := range results { - for step, val := range r { - stepSums[step] += val - stepCounts[step]++ - } + return nil, err } + defer stopRecommender() + iterResults["recommender"] = recResults - avg := make(map[string]float64) - for step, sum := range stepSums { - avg[step] = sum / float64(stepCounts[step]) + klog.Infof("Scaling up updater...") + if err := components.Updater.ScaleUp(ctx); err != nil { + return nil, err } - return avg -} -func printRunSummary(profileList []string, allRunResults map[string][]map[string]float64) { - for _, profile := range profileList { - profile = strings.TrimSpace(profile) - runResults, ok := allRunResults[profile] - if !ok || len(runResults) == 0 { - continue - } - - // Get all steps - stepSet := make(map[string]bool) - for _, r := range runResults { - for step := range r { - stepSet[step] = true - } - } - var steps []string - for s := range stepSet { - steps = append(steps, s) - } - sort.Strings(steps) - - // Build header: Step, Run1, Run2, ... - header := []string{"Step"} - for i := range runResults { - header = append(header, fmt.Sprintf("Run %d", i+1)) - } - - // Build rows - var rows [][]string - for _, step := range steps { - row := []string{step} - for _, r := range runResults { - if v, ok := r[step]; ok { - row = append(row, fmt.Sprintf("%.4fs", v)) - } else { - row = append(row, "-") - } - } - rows = append(rows, row) - } - - fmt.Printf("\n========== %s: All Runs ==========\n", profile) - table := tablewriter.NewWriter(os.Stdout) - table.Header(header) - table.Bulk(rows) - table.Render() - } -} - -func printResultsTable(profileList []string, results map[string]map[string]float64, outputFile string, noisePercentage int) { - // Get all steps from metric results - stepSet := make(map[string]bool) - for _, r := range results { - for step := range r { - stepSet[step] = true - } - } - var steps []string - for s := range stepSet { - steps = append(steps, s) - } - sort.Strings(steps) - - // Build header - header := []string{"Step"} - for _, p := range profileList { - p = strings.TrimSpace(p) - count := profiles[p] - noiseCount := count * noisePercentage / 100 - if noiseCount > 0 { - header = append(header, fmt.Sprintf("%s (%d+%dn)", p, count, noiseCount)) - } else { - header = append(header, fmt.Sprintf("%s (%d)", p, count)) - } - } - - // Build rows - var rows [][]string - for _, step := range steps { - row := []string{step} - for _, p := range profileList { - p = strings.TrimSpace(p) - if r, ok := results[p]; ok { - if v, ok := r[step]; ok { - row = append(row, fmt.Sprintf("%.4fs", v)) - } else { - row = append(row, "-") - } - } else { - row = append(row, "-") - } - } - rows = append(rows, row) - } - - fmt.Println("\n========== Results ==========") - table := tablewriter.NewWriter(os.Stdout) - table.Header(header) - table.Bulk(rows) - table.Render() - - if outputFile != "" { - var buf strings.Builder - buf.WriteString(strings.Join(header, ",") + "\n") - for _, row := range rows { - buf.WriteString(strings.Join(row, ",") + "\n") - } - if err := os.WriteFile(outputFile, []byte(buf.String()), 0644); err != nil { - klog.Warningf("Failed to write output file %s: %v", outputFile, err) - } else { - fmt.Printf("\nResults written to %s (CSV format)\n", outputFile) - } + klog.Infof("Scraping updater metrics...") + updaterResults, stopUpdater, err := components.Updater.Scrape(ctx) + if err != nil { + return nil, err } -} + defer stopUpdater() + iterResults["updater"] = updaterResults -func createInParallel(ctx context.Context, count int, createFn func(ctx context.Context, name string)) { - g, gctx := errgroup.WithContext(ctx) - g.SetLimit(50) - for i := range count { - g.Go(func() error { - name := fmt.Sprintf("bench-%d", i) - createFn(gctx, name) - return nil - }) + klog.Infof("Scraping admission controller metrics...") + admResults, stopAdmission, err := components.Admission.Scrape(ctx) + if err != nil { + return nil, err } - g.Wait() -} - -// retryBackoff is used for transient API errors -var retryBackoff = wait.Backoff{ - Steps: 5, - Duration: 100 * time.Millisecond, - Factor: 2.0, - Jitter: 0.1, -} + defer stopAdmission() + iterResults["admission"] = admResults -// withRetry wraps an API call with retry logic for transient errors -func withRetry(fn func() error) error { - return retry.OnError(retryBackoff, func(err error) bool { - // Retry on conflicts, server errors, and rate limiting - return errors.IsConflict(err) || errors.IsServerTimeout(err) || errors.IsTooManyRequests(err) || errors.IsServiceUnavailable(err) - }, fn) + return iterResults, nil } diff --git a/vertical-pod-autoscaler/benchmark/pkg/cluster/cluster.go b/vertical-pod-autoscaler/benchmark/pkg/cluster/cluster.go new file mode 100644 index 000000000000..e5081a620c0d --- /dev/null +++ b/vertical-pod-autoscaler/benchmark/pkg/cluster/cluster.go @@ -0,0 +1,245 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster + +import ( + "context" + "fmt" + "time" + + "golang.org/x/sync/errgroup" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/util/retry" + "k8s.io/klog/v2" + + autoscalingv1 "k8s.io/api/autoscaling/v1" + vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1" + vpa_clientset "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned" +) + +// Cluster-level constants for the benchmark environment. +const ( + BenchmarkNamespace = "benchmark" + KwokNodeName = "kwok-node" + VPANamespace = "kube-system" + ReplicasPerReplicaSet = 2 + PauseImage = "registry.k8s.io/pause:3.10.1" +) + +// Profiles defines the number of ReplicaSets (each with ReplicasPerReplicaSet pods) +// for each benchmark size. +var Profiles = map[string]int{ + "small": 25, // 25 VPAs, 25 ReplicaSets, 50 pods + "medium": 100, // 100 VPAs, 100 ReplicaSets, 200 pods + "large": 250, // 250 VPAs, 250 ReplicaSets, 500 pods + "xlarge": 500, // 500 VPAs, 500 ReplicaSets, 1000 pods + "xxlarge": 1000, // 1000 VPAs, 1000 ReplicaSets, 2000 pods +} + +var retryBackoff = wait.Backoff{ + Steps: 5, + Duration: 100 * time.Millisecond, + Factor: 2.0, + Jitter: 0.1, +} + +func withRetry(fn func() error) error { + return retry.OnError(retryBackoff, func(err error) bool { + return errors.IsConflict(err) || errors.IsServerTimeout(err) || errors.IsTooManyRequests(err) || errors.IsServiceUnavailable(err) + }, fn) +} + +// ScaleDeployment sets the replica count on the named deployment. +func ScaleDeployment(ctx context.Context, kubeClient kubernetes.Interface, namespace, name string, replicas int32) error { + return withRetry(func() error { + scale, err := kubeClient.AppsV1().Deployments(namespace).GetScale(ctx, name, metav1.GetOptions{}) + if err != nil { + return err + } + scale.Spec.Replicas = replicas + _, err = kubeClient.AppsV1().Deployments(namespace).UpdateScale(ctx, name, scale, metav1.UpdateOptions{}) + return err + }) +} + +// WaitForVPAPodReady polls until a running, ready pod exists for the given component label. +func WaitForVPAPodReady(ctx context.Context, kubeClient kubernetes.Interface, appLabel string) error { + return wait.PollUntilContextTimeout(ctx, 2*time.Second, 120*time.Second, true, func(ctx context.Context) (bool, error) { + pods, _ := kubeClient.CoreV1().Pods(VPANamespace).List(ctx, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("app.kubernetes.io/component=%s", appLabel), + }) + for _, p := range pods.Items { + if p.Status.Phase == corev1.PodRunning { + allReady := true + for _, c := range p.Status.ContainerStatuses { + if !c.Ready { + allReady = false + break + } + } + if allReady { + return true, nil + } + } + } + return false, nil + }) +} + +// DeleteAllVPACheckpoints removes all VPA checkpoint objects across all namespaces. +func DeleteAllVPACheckpoints(ctx context.Context, vpaClient vpa_clientset.Interface) { + nsList, _ := vpaClient.AutoscalingV1().VerticalPodAutoscalerCheckpoints("").List(ctx, metav1.ListOptions{}) + for _, cp := range nsList.Items { + vpaClient.AutoscalingV1().VerticalPodAutoscalerCheckpoints(cp.Namespace).Delete(ctx, cp.Name, metav1.DeleteOptions{}) + } + klog.Infof("> Deleted %d VPA checkpoints", len(nsList.Items)) +} + +// MakeReplicaSet builds a ReplicaSet spec for the benchmark namespace with KWOK-compatible pods. +func MakeReplicaSet(name string) *appsv1.ReplicaSet { + replicas := int32(ReplicasPerReplicaSet) + return &appsv1.ReplicaSet{ + ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: BenchmarkNamespace}, + Spec: appsv1.ReplicaSetSpec{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"app": name}}, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"app": name}}, + Spec: corev1.PodSpec{ + NodeName: KwokNodeName, + Tolerations: []corev1.Toleration{{ + Key: "kwok.x-k8s.io/node", + Operator: corev1.TolerationOpEqual, + Value: "fake", + Effect: corev1.TaintEffectNoSchedule, + }}, + Containers: []corev1.Container{{ + Name: "app", + Image: PauseImage, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("10m"), + corev1.ResourceMemory: resource.MustParse("10Mi"), + }, + }, + }}, + }, + }, + }, + } +} + +// MakeVPA builds a VPA spec targeting the named ReplicaSet with Recreate update mode. +func MakeVPA(name string) *vpa_types.VerticalPodAutoscaler { + mode := vpa_types.UpdateModeRecreate + return &vpa_types.VerticalPodAutoscaler{ + ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: BenchmarkNamespace}, + Spec: vpa_types.VerticalPodAutoscalerSpec{ + TargetRef: &autoscalingv1.CrossVersionObjectReference{ + APIVersion: "apps/v1", + Kind: "ReplicaSet", + Name: name, + }, + UpdatePolicy: &vpa_types.PodUpdatePolicy{UpdateMode: &mode}, + }, + } +} + +// CleanupBenchmarkResources deletes all VPAs, ReplicaSets, and pods in the benchmark namespace. +func CleanupBenchmarkResources(ctx context.Context, kubeClient kubernetes.Interface, vpaClient vpa_clientset.Interface) { + vpaClient.AutoscalingV1().VerticalPodAutoscalers(BenchmarkNamespace).DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{}) + kubeClient.AppsV1().ReplicaSets(BenchmarkNamespace).DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{}) + wait.PollUntilContextTimeout(ctx, time.Second, 60*time.Second, true, func(ctx context.Context) (bool, error) { + pods, _ := kubeClient.CoreV1().Pods(BenchmarkNamespace).List(ctx, metav1.ListOptions{}) + return len(pods.Items) == 0, nil + }) +} + +// CreateInParallel runs createFn concurrently for bench-0..bench-(count-1). +func CreateInParallel(ctx context.Context, count int, createFn func(ctx context.Context, name string)) { + g, gctx := errgroup.WithContext(ctx) + g.SetLimit(50) + for i := range count { + g.Go(func() error { + name := fmt.Sprintf("bench-%d", i) + createFn(gctx, name) + return nil + }) + } + g.Wait() +} + +// CreateReplicaSet creates a benchmark ReplicaSet, retrying on transient errors. +func CreateReplicaSet(ctx context.Context, kubeClient kubernetes.Interface, name string) { + rs := MakeReplicaSet(name) + err := withRetry(func() error { + _, err := kubeClient.AppsV1().ReplicaSets(BenchmarkNamespace).Create(ctx, rs, metav1.CreateOptions{}) + if errors.IsAlreadyExists(err) { + return nil + } + return err + }) + if err != nil { + klog.Warningf("Error creating ReplicaSet %s: %v", name, err) + } +} + +// CreateVPA creates a VPA targeting the named ReplicaSet, retrying on transient errors. +func CreateVPA(ctx context.Context, vpaClient vpa_clientset.Interface, name string) { + vpa := MakeVPA(name) + err := withRetry(func() error { + _, err := vpaClient.AutoscalingV1().VerticalPodAutoscalers(BenchmarkNamespace).Create(ctx, vpa, metav1.CreateOptions{}) + if errors.IsAlreadyExists(err) { + return nil + } + return err + }) + if err != nil { + klog.Warningf("Error creating VPA %s: %v", name, err) + } +} + +// CreateNoiseReplicaSets creates unmanaged ReplicaSets to simulate background pod noise. +func CreateNoiseReplicaSets(ctx context.Context, kubeClient kubernetes.Interface, noiseCount int) { + g, gctx := errgroup.WithContext(ctx) + g.SetLimit(50) + for i := range noiseCount { + g.Go(func() error { + name := fmt.Sprintf("noise-%d", i) + rs := MakeReplicaSet(name) + err := withRetry(func() error { + _, err := kubeClient.AppsV1().ReplicaSets(BenchmarkNamespace).Create(gctx, rs, metav1.CreateOptions{}) + if errors.IsAlreadyExists(err) { + return nil + } + return err + }) + if err != nil { + klog.Warningf("Error creating noise ReplicaSet %s: %v", name, err) + } + return nil + }) + } + g.Wait() +} diff --git a/vertical-pod-autoscaler/benchmark/pkg/component/component.go b/vertical-pod-autoscaler/benchmark/pkg/component/component.go new file mode 100644 index 000000000000..9e2134805053 --- /dev/null +++ b/vertical-pod-autoscaler/benchmark/pkg/component/component.go @@ -0,0 +1,251 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package component + +import ( + "bufio" + "context" + "fmt" + "io" + "net/http" + "regexp" + "strconv" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/portforward" + "k8s.io/client-go/transport/spdy" + "k8s.io/klog/v2" + + "k8s.io/autoscaler/vertical-pod-autoscaler/benchmark/pkg/cluster" +) + +const ( + scaleDownPollInterval = 2 * time.Second + scaleDownTimeout = 60 * time.Second + portForwardTimeout = 10 * time.Second + scrapePollInterval = 10 * time.Second + scrapeTimeout = 10 * time.Minute +) + +// Component represents a VPA component's full lifecycle: scaling, port-forwarding, +// and metrics scraping. Instantiated once via NewComponents with shared clients. +type Component struct { + Label string + MetricPrefix string + PodPort int + LocalPort int + PerRequest bool + + kubeClient kubernetes.Interface + restConfig *rest.Config +} + +// Components holds the three VPA component instances and provides +// named access as well as iteration via All. +type Components struct { + Recommender *Component + Updater *Component + Admission *Component + All []*Component +} + +// NewComponents creates all VPA component instances with shared clients. +func NewComponents(kubeClient kubernetes.Interface, restConfig *rest.Config) *Components { + c := &Components{ + Recommender: &Component{ + Label: "recommender", + MetricPrefix: "vpa_recommender", + PodPort: 8942, + LocalPort: 18942, + kubeClient: kubeClient, + restConfig: restConfig, + }, + Updater: &Component{ + Label: "updater", + MetricPrefix: "vpa_updater", + PodPort: 8943, + LocalPort: 18943, + kubeClient: kubeClient, + restConfig: restConfig, + }, + Admission: &Component{ + Label: "admission-controller", + MetricPrefix: "vpa_admission_controller", + PodPort: 8944, + LocalPort: 18944, + PerRequest: true, + kubeClient: kubeClient, + restConfig: restConfig, + }, + } + c.All = []*Component{c.Recommender, c.Updater, c.Admission} + return c +} + +// DeploymentName returns the Kubernetes deployment name for this component. +func (c *Component) DeploymentName() string { + return "vpa-" + c.Label +} + +// MetricsURL returns the local URL for this component's Prometheus metrics endpoint. +func (c *Component) MetricsURL() string { + return fmt.Sprintf("http://localhost:%d/metrics", c.LocalPort) +} + +// ScaleUp scales the component's deployment to 1 replica and waits for the pod +// to become ready. +func (c *Component) ScaleUp(ctx context.Context) error { + if err := cluster.ScaleDeployment(ctx, c.kubeClient, cluster.VPANamespace, c.DeploymentName(), 1); err != nil { + return fmt.Errorf("failed to scale up %s: %v", c.Label, err) + } + if err := cluster.WaitForVPAPodReady(ctx, c.kubeClient, c.Label); err != nil { + return fmt.Errorf("%s not ready: %v", c.Label, err) + } + return nil +} + +// ScaleDown scales the component's deployment to 0 replicas and waits for +// all its pods to terminate. +func (c *Component) ScaleDown(ctx context.Context) error { + if err := cluster.ScaleDeployment(ctx, c.kubeClient, cluster.VPANamespace, c.DeploymentName(), 0); err != nil { + return fmt.Errorf("failed to scale down %s: %v", c.Label, err) + } + return wait.PollUntilContextTimeout(ctx, scaleDownPollInterval, scaleDownTimeout, true, func(ctx context.Context) (bool, error) { + pods, _ := c.kubeClient.CoreV1().Pods(cluster.VPANamespace).List(ctx, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("app.kubernetes.io/component=%s", c.Label), + }) + return len(pods.Items) == 0, nil + }) +} + +// PortForward sets up port forwarding to this component's pod. +// Returns a stop function to close the port-forward. +func (c *Component) PortForward(ctx context.Context) (stop func(), err error) { + pods, err := c.kubeClient.CoreV1().Pods(cluster.VPANamespace).List(ctx, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("app.kubernetes.io/component=%s", c.Label), + }) + if err != nil || len(pods.Items) == 0 { + return nil, fmt.Errorf("no pod found for component %s", c.Label) + } + + podName := pods.Items[0].Name + stopChan := make(chan struct{}) + readyChan := make(chan struct{}) + + go func() { + url := c.kubeClient.CoreV1().RESTClient().Post(). + Resource("pods").Namespace(cluster.VPANamespace).Name(podName). + SubResource("portforward").URL() + + transport, upgrader, _ := spdy.RoundTripperFor(c.restConfig) + dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url) + pf, _ := portforward.New(dialer, []string{fmt.Sprintf("%d:%d", c.LocalPort, c.PodPort)}, stopChan, readyChan, io.Discard, io.Discard) + pf.ForwardPorts() + }() + + select { + case <-readyChan: + case <-time.After(portForwardTimeout): + close(stopChan) + return nil, fmt.Errorf("port-forward timeout for %s", c.Label) + } + + return func() { close(stopChan) }, nil +} + +// ScrapeLatencies fetches /metrics from this component and parses execution latency. +func (c *Component) ScrapeLatencies() (sums, counts map[string]float64, err error) { + resp, err := http.Get(c.MetricsURL()) + if err != nil { + return nil, nil, err + } + defer resp.Body.Close() + return ParseExecutionLatency(resp.Body, c.MetricPrefix) +} + +// ParseExecutionLatency parses Prometheus text format for execution_latency_seconds +// _sum and _count values, keyed by step label. +func ParseExecutionLatency(body io.Reader, metricPrefix string) (sums, counts map[string]float64, err error) { + sums = make(map[string]float64) + counts = make(map[string]float64) + reSum := regexp.MustCompile(regexp.QuoteMeta(metricPrefix) + `_execution_latency_seconds_sum\{step="([^"]+)"\}\s+([\d.e+-]+)`) + reCount := regexp.MustCompile(regexp.QuoteMeta(metricPrefix) + `_execution_latency_seconds_count\{step="([^"]+)"\}\s+([\d.e+-]+)`) + scanner := bufio.NewScanner(body) + for scanner.Scan() { + line := scanner.Text() + if m := reSum.FindStringSubmatch(line); len(m) == 3 { + if v, err := strconv.ParseFloat(m[2], 64); err == nil { + sums[m[1]] = v + } + } + if m := reCount.FindStringSubmatch(line); len(m) == 3 { + if v, err := strconv.ParseFloat(m[2], 64); err == nil { + counts[m[1]] = v + } + } + } + return sums, counts, scanner.Err() +} + +// Scrape port-forwards to this component, polls until the 'total' latency step +// appears, then returns the processed step map. For PerRequest components +// (admission controller) values are divided by invocation count. The returned +// stop function must be deferred by the caller to close the port-forward. +func (c *Component) Scrape(ctx context.Context) (stepResults map[string]float64, stop func(), err error) { + stop, err = c.PortForward(ctx) + if err != nil { + return nil, nil, err + } + + var sums, counts map[string]float64 + startTime := time.Now() + pollErr := wait.PollUntilContextTimeout(ctx, scrapePollInterval, scrapeTimeout, true, func(ctx context.Context) (bool, error) { + sums, counts, err = c.ScrapeLatencies() + if err != nil { + return false, nil + } + if _, ok := sums["total"]; ok { + return true, nil + } + klog.Infof("> Waiting for %s metrics. Elapsed: %.2fs", c.Label, time.Since(startTime).Seconds()) + return false, nil + }) + if pollErr != nil { + stop() + return nil, nil, fmt.Errorf("timed out waiting for %s metrics: %v", c.Label, pollErr) + } + + if c.PerRequest { + stepResults = make(map[string]float64) + for step, sum := range sums { + if cnt, ok := counts[step]; ok && cnt > 0 { + stepResults[step] = sum / cnt + } + } + if cnt, ok := counts["total"]; ok { + stepResults["request_count"] = cnt + } + } else { + stepResults = sums + } + + return stepResults, stop, nil +} diff --git a/vertical-pod-autoscaler/benchmark/pkg/results/results.go b/vertical-pod-autoscaler/benchmark/pkg/results/results.go new file mode 100644 index 000000000000..634b483005fb --- /dev/null +++ b/vertical-pod-autoscaler/benchmark/pkg/results/results.go @@ -0,0 +1,310 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package results + +import ( + "fmt" + "os" + "sort" + "strings" + + "github.com/olekukonko/tablewriter" + "k8s.io/klog/v2" +) + +// ComponentResults maps component name -> step name -> value. +type ComponentResults = map[string]map[string]float64 + +// ComponentOrder defines the display order for component results. +var ComponentOrder = []string{"recommender", "updater", "admission"} + +// ComponentTitles maps component keys to human-readable display titles. +var ComponentTitles = map[string]string{ + "recommender": "Recommender", + "updater": "Updater", + "admission": "Admission Controller", +} + +// stepOrder defines execution order per component. +// Steps not listed here are appended alphabetically at the end. +var stepOrder = map[string][]string{ + "recommender": {"LoadVPAs", "LoadPods", "LoadMetrics", "UpdateVPAs", "MaintainCheckpoints", "GarbageCollect", "total"}, + "updater": {"ListVPAs", "ListPods", "FilterPods", "AdmissionInit", "EvictPods", "total"}, + "admission": {"read_request", "admit", "build_response", "write_response", "request_count", "total"}, +} + +func sortSteps(component string, steps []string) []string { + order, ok := stepOrder[component] + if !ok { + sort.Strings(steps) + return steps + } + rank := make(map[string]int, len(order)) + for i, s := range order { + rank[s] = i + } + sort.Slice(steps, func(i, j int) bool { + ri, oki := rank[steps[i]] + rj, okj := rank[steps[j]] + if oki && okj { + return ri < rj + } + if oki { + return true + } + if okj { + return false + } + return steps[i] < steps[j] + }) + return steps +} + +func isCountField(step string) bool { + return step == "request_count" +} + +func formatValue(step string, v float64) string { + if isCountField(step) { + return fmt.Sprintf("%.0f", v) + } + return fmt.Sprintf("%.4fs", v) +} + +// PrintAllComponentLatencies prints per-component latency tables for a single result set. +func PrintAllComponentLatencies(results ComponentResults, title string) { + for _, component := range ComponentOrder { + latencies, ok := results[component] + if !ok || len(latencies) == 0 { + continue + } + componentTitle, ok := ComponentTitles[component] + if !ok { + componentTitle = component + } + printLatencies(component, latencies, fmt.Sprintf("%s [%s]", title, componentTitle)) + } +} + +func printLatencies(component string, latencies map[string]float64, title string) { + fmt.Printf("\n=== %s ===\n", title) + var steps []string + for k := range latencies { + steps = append(steps, k) + } + sortSteps(component, steps) + for _, s := range steps { + fmt.Printf(" %-25s: %s\n", s, formatValue(s, latencies[s])) + } +} + +// AverageResults averages per-component, per-step values across multiple runs. +func AverageResults(results []ComponentResults) ComponentResults { + if len(results) == 0 { + return nil + } + + type accumulator struct { + sum float64 + count int + } + accum := make(map[string]map[string]*accumulator) + + for _, r := range results { + for component, steps := range r { + if accum[component] == nil { + accum[component] = make(map[string]*accumulator) + } + for step, val := range steps { + if accum[component][step] == nil { + accum[component][step] = &accumulator{} + } + accum[component][step].sum += val + accum[component][step].count++ + } + } + } + + avg := make(map[string]map[string]float64) + for component, steps := range accum { + avg[component] = make(map[string]float64) + for step, a := range steps { + avg[component][step] = a.sum / float64(a.count) + } + } + return avg +} + +// PrintRunSummary prints a per-run comparison table for each component within each profile. +func PrintRunSummary(profileList []string, allRunResults map[string][]ComponentResults) { + for _, profile := range profileList { + profile = strings.TrimSpace(profile) + runResults, ok := allRunResults[profile] + if !ok || len(runResults) == 0 { + continue + } + + for _, component := range ComponentOrder { + stepSet := make(map[string]bool) + hasData := false + for _, r := range runResults { + if steps, ok := r[component]; ok { + for step := range steps { + stepSet[step] = true + } + hasData = true + } + } + if !hasData { + continue + } + + var steps []string + for s := range stepSet { + steps = append(steps, s) + } + sortSteps(component, steps) + + header := []string{"Step"} + for i := range runResults { + header = append(header, fmt.Sprintf("Run %d", i+1)) + } + header = append(header, "Avg") + + var rows [][]string + for _, step := range steps { + row := []string{step} + var sum float64 + var count int + for _, r := range runResults { + if componentSteps, ok := r[component]; ok { + if v, ok := componentSteps[step]; ok { + row = append(row, formatValue(step, v)) + sum += v + count++ + } else { + row = append(row, "-") + } + } else { + row = append(row, "-") + } + } + if count > 0 { + row = append(row, formatValue(step, sum/float64(count))) + } else { + row = append(row, "-") + } + rows = append(rows, row) + } + + componentTitle := ComponentTitles[component] + fmt.Printf("\n========== %s: All Runs [%s] ==========\n", profile, componentTitle) + table := tablewriter.NewWriter(os.Stdout) + table.Header(header) + table.Bulk(rows) + table.Render() + } + } +} + +// PrintResultsTable prints a cross-profile comparison table for each component +// and optionally writes CSV output. +func PrintResultsTable(profileList []string, results map[string]ComponentResults, profiles map[string]int, outputFile string, noisePercentage int) { + profileHeaders := make([]string, 0, len(profileList)) + for _, p := range profileList { + p = strings.TrimSpace(p) + count := profiles[p] + noiseCount := count * noisePercentage / 100 + if noiseCount > 0 { + profileHeaders = append(profileHeaders, fmt.Sprintf("%s (%d+%dn)", p, count, noiseCount)) + } else { + profileHeaders = append(profileHeaders, fmt.Sprintf("%s (%d)", p, count)) + } + } + + var csvBuf strings.Builder + + for _, component := range ComponentOrder { + stepSet := make(map[string]bool) + hasData := false + for _, r := range results { + if steps, ok := r[component]; ok { + for step := range steps { + stepSet[step] = true + } + hasData = true + } + } + if !hasData { + continue + } + + var steps []string + for s := range stepSet { + steps = append(steps, s) + } + sortSteps(component, steps) + + header := append([]string{"Step"}, profileHeaders...) + + var rows [][]string + for _, step := range steps { + row := []string{step} + for _, p := range profileList { + p = strings.TrimSpace(p) + if r, ok := results[p]; ok { + if componentSteps, ok := r[component]; ok { + if v, ok := componentSteps[step]; ok { + row = append(row, formatValue(step, v)) + } else { + row = append(row, "-") + } + } else { + row = append(row, "-") + } + } else { + row = append(row, "-") + } + } + rows = append(rows, row) + } + + componentTitle := ComponentTitles[component] + fmt.Printf("\n========== Results [%s] ==========\n", componentTitle) + table := tablewriter.NewWriter(os.Stdout) + table.Header(header) + table.Bulk(rows) + table.Render() + + if outputFile != "" { + csvBuf.WriteString(fmt.Sprintf("# %s\n", componentTitle)) + csvBuf.WriteString(strings.Join(header, ",") + "\n") + for _, row := range rows { + csvBuf.WriteString(strings.Join(row, ",") + "\n") + } + csvBuf.WriteString("\n") + } + } + + if outputFile != "" && csvBuf.Len() > 0 { + if err := os.WriteFile(outputFile, []byte(csvBuf.String()), 0644); err != nil { + klog.Warningf("Failed to write output file %s: %v", outputFile, err) + } else { + fmt.Printf("\nResults written to %s (CSV format)\n", outputFile) + } + } +}