diff --git a/vertical-pod-autoscaler/deploy/vpa-rbac.yaml b/vertical-pod-autoscaler/deploy/vpa-rbac.yaml index a41f468a738..6ca91b7b5c7 100644 --- a/vertical-pod-autoscaler/deploy/vpa-rbac.yaml +++ b/vertical-pod-autoscaler/deploy/vpa-rbac.yaml @@ -121,6 +121,32 @@ rules: - create --- apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: system:vpa-updater-in-place +rules: + - apiGroups: + - "" + resources: + - pods/resize + - pods + verbs: + - patch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: system:vpa-updater-in-place-binding +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: system:vpa-updater-in-place +subjects: + - kind: ServiceAccount + name: vpa-updater + namespace: kube-system +--- +apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata: name: system:metrics-reader diff --git a/vertical-pod-autoscaler/deploy/vpa-v1-crd-gen.yaml b/vertical-pod-autoscaler/deploy/vpa-v1-crd-gen.yaml index 6499413aa75..fefa43312d6 100644 --- a/vertical-pod-autoscaler/deploy/vpa-v1-crd-gen.yaml +++ b/vertical-pod-autoscaler/deploy/vpa-v1-crd-gen.yaml @@ -458,6 +458,7 @@ spec: - "Off" - Initial - Recreate + - InPlaceOrRecreate - Auto type: string type: object diff --git a/vertical-pod-autoscaler/e2e/v1/actuation.go b/vertical-pod-autoscaler/e2e/v1/actuation.go index 124d2c40822..7bcca72fe96 100644 --- a/vertical-pod-autoscaler/e2e/v1/actuation.go +++ b/vertical-pod-autoscaler/e2e/v1/actuation.go @@ -167,6 +167,346 @@ var _ = ActuationSuiteE2eDescribe("Actuation", func() { gomega.Expect(foundUpdated).To(gomega.Equal(1)) }) + ginkgo.It("still applies recommendations on restart when update mode is InPlaceOrRecreate", func() { + ginkgo.By("Setting up a hamster deployment") + SetupHamsterDeployment(f, "100m", "100Mi", defaultHamsterReplicas) + podList, err := GetHamsterPods(f) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + podSet := MakePodSet(podList) + + ginkgo.By("Setting up a VPA CRD in mode InPlaceOrRecreate") + containerName := GetHamsterContainerNameByIndex(0) + vpaCRD := test.VerticalPodAutoscaler(). + WithName("hamster-vpa"). + WithNamespace(f.Namespace.Name). + WithTargetRef(hamsterTargetRef). + WithUpdateMode(vpa_types.UpdateModeInPlaceOrRecreate). + WithContainer(containerName). + AppendRecommendation( + test.Recommendation(). + WithContainer(containerName). + WithTarget("200m", ""). + WithLowerBound("200m", ""). + WithUpperBound("200m", ""). + GetContainerResources()). + Get() + + InstallVPA(f, vpaCRD) + updatedCPURequest := ParseQuantityOrDie("200m") + + ginkgo.By(fmt.Sprintf("Waiting for pods to be evicted, hoping it won't happen, sleep for %s", VpaEvictionTimeout.String())) + CheckNoPodsEvicted(f, podSet) + ginkgo.By("Forcefully killing one pod") + killPod(f, podList) + + ginkgo.By("Checking that request was modified after forceful restart") + updatedPodList, _ := GetHamsterPods(f) + var foundUpdated int32 + for _, pod := range updatedPodList.Items { + podRequest := getCPURequest(pod.Spec) + framework.Logf("podReq: %v", podRequest) + if podRequest.Cmp(updatedCPURequest) == 0 { + foundUpdated += 1 + } + } + gomega.Expect(foundUpdated).To(gomega.Equal(defaultHamsterReplicas)) + }) + + // TODO(maxcao13): disruptionless means no container/pod restart; however NotRequired policy does not guarantee this... + ginkgo.It("applies disruptionless in-place updates to all containers where request is within bounds when update mode is InPlaceOrRecreate", func() { + ginkgo.By("Setting up a hamster deployment with all containers with NotRequired resize policies") + replicas := int32(2) + expectedContainerRestarts := int32(0) + SetupHamsterDeployment(f, "100m", "100Mi", replicas) + podList, err := GetHamsterPods(f) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + podSet := MakePodSet(podList) + + ginkgo.By("Setting up a VPA CRD") + containerName := GetHamsterContainerNameByIndex(0) + updatedCPU := "250m" + vpaCRD := test.VerticalPodAutoscaler(). + WithName("hamster-vpa"). + WithNamespace(f.Namespace.Name). + WithTargetRef(hamsterTargetRef). + WithContainer(containerName). + WithUpdateMode(vpa_types.UpdateModeInPlaceOrRecreate). + AppendRecommendation( + test.Recommendation(). + WithContainer(containerName). + WithTarget(updatedCPU, "200Mi"). + WithLowerBound("100m", "100Mi"). + WithUpperBound("300m", "250Mi"). + GetContainerResources()). + Get() + + InstallVPA(f, vpaCRD) + + ginkgo.By(fmt.Sprintf("Waiting for pods to be evicted, hoping it won't happen, sleep for %s", VpaEvictionTimeout.String())) + CheckNoPodsEvicted(f, podSet) + + ginkgo.By("Checking that request was modified after a while due to in-place update, and was disruptionless") + updatedPodList, _ := GetHamsterPods(f) + var foundUpdated, foundContainerRestarts int32 = 0, 0 + for _, pod := range updatedPodList.Items { + podRequest := getCPURequest(pod.Spec) + containerRestarts := getContainerRestarts(pod.Status) + framework.Logf("podReq: %v, containerRestarts: %v", podRequest, getContainerRestarts(pod.Status)) + if podRequest.Cmp(ParseQuantityOrDie(updatedCPU)) == 0 { + foundUpdated += 1 + } + foundContainerRestarts += containerRestarts + } + gomega.Expect(foundUpdated).To(gomega.Equal(replicas)) + gomega.Expect(foundContainerRestarts).To(gomega.Equal(expectedContainerRestarts)) + }) + + ginkgo.It("applies partial disruptionless in-place updates to a pod when request is within bounds when update mode is InPlaceOrRecreate", func() { + ginkgo.By("Setting up a hamster deployment with first container using RestartContainer resize policies") + cpuQuantity := ParseQuantityOrDie("100m") + memoryQuantity := ParseQuantityOrDie("100Mi") + replicas := int32(2) + containers := 2 + expectedContainerRestarts := int32(1) * replicas // 1 container restart per pod + d := NewNHamstersDeployment(f, containers) + d.Spec.Template.Spec.Containers[0].Resources.Requests = apiv1.ResourceList{ + apiv1.ResourceCPU: cpuQuantity, + apiv1.ResourceMemory: memoryQuantity, + } + d.Spec.Replicas = &replicas + d.Spec.Template.Spec.Containers[0].ResizePolicy = []apiv1.ContainerResizePolicy{ + { + ResourceName: apiv1.ResourceCPU, + RestartPolicy: apiv1.RestartContainer, + }, + { + ResourceName: apiv1.ResourceMemory, + RestartPolicy: apiv1.RestartContainer, + }, + } + d, err := f.ClientSet.AppsV1().Deployments(f.Namespace.Name).Create(context.TODO(), d, metav1.CreateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred(), "unexpected error when starting deployment creation") + err = framework_deployment.WaitForDeploymentComplete(f.ClientSet, d) + gomega.Expect(err).NotTo(gomega.HaveOccurred(), "unexpected error waiting for deployment creation to finish") + + podList, err := GetHamsterPods(f) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + podSet := MakePodSet(podList) + + ginkgo.By("Setting up a VPA CRD") + containerName := GetHamsterContainerNameByIndex(0) + updatedCPU := "250m" + vpaCRD := test.VerticalPodAutoscaler(). + WithName("hamster-vpa"). + WithNamespace(f.Namespace.Name). + WithTargetRef(hamsterTargetRef). + WithContainer(containerName). + WithUpdateMode(vpa_types.UpdateModeInPlaceOrRecreate). + AppendRecommendation( + test.Recommendation(). + WithContainer(containerName). + WithTarget(updatedCPU, "200Mi"). + WithLowerBound("100m", "100Mi"). + WithUpperBound("300m", "250Mi"). + GetContainerResources()). + Get() + + InstallVPA(f, vpaCRD) + + ginkgo.By(fmt.Sprintf("Waiting for pods to be evicted, hoping it won't happen, sleep for %s", VpaEvictionTimeout.String())) + CheckNoPodsEvicted(f, podSet) + + ginkgo.By("Checking that request was modified after a while due to in-place update, and was partially disruptionless") + updatedPodList, _ := GetHamsterPods(f) + var foundUpdated, foundContainerRestarts int32 = 0, 0 + for _, pod := range updatedPodList.Items { + podRequest := getCPURequest(pod.Spec) + containerRestarts := getContainerRestarts(pod.Status) + framework.Logf("podReq: %v, containerRestarts: %v", podRequest, getContainerRestarts(pod.Status)) + if podRequest.Cmp(ParseQuantityOrDie(updatedCPU)) == 0 { + foundUpdated += 1 + } + foundContainerRestarts += containerRestarts + } + gomega.Expect(foundUpdated).To(gomega.Equal(replicas)) + gomega.Expect(foundContainerRestarts).To(gomega.Equal(expectedContainerRestarts)) + }) + + // TODO(maxcao13): To me how this test fits in the test plan does not make sense. Whether the container is + // able to be disrupted is not determined by the VPA but is by the kubelet, so why would we force the container + // to restart if the request is out of the recommendation bounds? I must be missing something... + ginkgo.It("applies disruptive in-place updates to a container in all pods when request out of bounds when update mode is InPlaceOrRecreate", func() { + ginkgo.Skip("This test should be re-enabled once we have a better understanding of how to test this scenario") + + ginkgo.By("Setting up a hamster deployment with a container using NotRequired resize policies") + replicas := int32(2) + expectedContainerRestarts := int32(1) * replicas // 1 container restart per pod + SetupHamsterDeployment(f, "1", "1Gi", replicas) // outside recommendation range + podList, err := GetHamsterPods(f) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + podSet := MakePodSet(podList) + + ginkgo.By("Setting up a VPA CRD") + containerName := GetHamsterContainerNameByIndex(0) + updatedCPU := "250m" + vpaCRD := test.VerticalPodAutoscaler(). + WithName("hamster-vpa"). + WithNamespace(f.Namespace.Name). + WithTargetRef(hamsterTargetRef). + WithContainer(containerName). + WithUpdateMode(vpa_types.UpdateModeInPlaceOrRecreate). + AppendRecommendation( + test.Recommendation(). + WithContainer(containerName). + WithTarget(updatedCPU, "200Mi"). + WithLowerBound("150m", "150Mi"). + WithUpperBound("300m", "250Mi"). + GetContainerResources()). + Get() + + InstallVPA(f, vpaCRD) + + ginkgo.By(fmt.Sprintf("Waiting for pods to be evicted, hoping it won't happen, sleep for %s", VpaEvictionTimeout.String())) + CheckNoPodsEvicted(f, podSet) + + ginkgo.By("Checking that request was modified after a while due to in-place update, and was disruptionless") + updatedPodList, _ := GetHamsterPods(f) + var foundUpdated, foundContainerRestarts int32 = 0, 0 + for _, pod := range updatedPodList.Items { + podRequest := getCPURequest(pod.Spec) + containerRestarts := getContainerRestarts(pod.Status) + framework.Logf("podReq: %v, containerRestarts: %v", podRequest, getContainerRestarts(pod.Status)) + if podRequest.Cmp(ParseQuantityOrDie(updatedCPU)) == 0 { + foundUpdated += 1 + } + foundContainerRestarts += containerRestarts + } + gomega.Expect(foundUpdated).To(gomega.Equal(replicas)) + gomega.Expect(foundContainerRestarts).To(gomega.Equal(expectedContainerRestarts)) + }) + + // TODO(maxcao13): disruptive in-place fails and we have to evict (InPlaceOrRecreate) + // This particular test checks if we fallback after a resize infeasible, particularly from the node not having enough resources (as specified in the AEP) + // Should we check other conditions that result in eviction fallback? + ginkgo.It("falls back to evicting pods when in-place update is Infeasible when update mode is InPlaceOrRecreate", func() { + ginkgo.By("Setting up a hamster deployment") + replicas := int32(2) + SetupHamsterDeployment(f, "100m", "100Mi", replicas) + podList, err := GetHamsterPods(f) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + ginkgo.By("Setting up a VPA CRD") + containerName := GetHamsterContainerNameByIndex(0) + updatedCPU := "999" // infeasible target + vpaCRD := test.VerticalPodAutoscaler(). + WithName("hamster-vpa"). + WithNamespace(f.Namespace.Name). + WithTargetRef(hamsterTargetRef). + WithContainer(containerName). + WithUpdateMode(vpa_types.UpdateModeInPlaceOrRecreate). + AppendRecommendation( + test.Recommendation(). + WithContainer(containerName). + WithTarget(updatedCPU, "200Mi"). + WithLowerBound("150m", "150Mi"). + WithUpperBound("300m", "250Mi"). + GetContainerResources()). + Get() + + InstallVPA(f, vpaCRD) + + ginkgo.By(fmt.Sprintf("Waiting for in-place update, hoping it won't happen, sleep for %s", VpaEvictionTimeout.String())) + CheckNoContainersRestarted(f) + + ginkgo.By("Waiting for pods to be evicted") + err = WaitForPodsEvicted(f, podList) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + }) + + ginkgo.It("falls back to evicting pods when pod QoS class changes when update mode is InPlaceOrRecreate", func() { + ginkgo.By("Setting up a hamster deployment") + replicas := int32(2) + + d := NewHamsterDeploymentWithGuaranteedResources(f, ParseQuantityOrDie("300m"), ParseQuantityOrDie("400Mi")) + d.Spec.Replicas = &replicas + d, err := f.ClientSet.AppsV1().Deployments(f.Namespace.Name).Create(context.TODO(), d, metav1.CreateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred(), "unexpected error when starting deployment creation") + err = framework_deployment.WaitForDeploymentComplete(f.ClientSet, d) + gomega.Expect(err).NotTo(gomega.HaveOccurred(), "unexpected error waiting for deployment creation to finish") + + podList, err := GetHamsterPods(f) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + ginkgo.By("Setting up a VPA CRD") + containerName := GetHamsterContainerNameByIndex(0) + updatedCPU := "200m" + vpaCRD := test.VerticalPodAutoscaler(). + WithName("hamster-vpa"). + WithNamespace(f.Namespace.Name). + WithTargetRef(hamsterTargetRef). + WithContainer(containerName). + WithUpdateMode(vpa_types.UpdateModeInPlaceOrRecreate). + AppendRecommendation( + test.Recommendation(). + WithContainer(containerName). + WithTarget(updatedCPU, "200Mi"). + WithLowerBound("150m", "150Mi"). + WithUpperBound("300m", "250Mi"). + GetContainerResources()). + Get() + + InstallVPA(f, vpaCRD) + + ginkgo.By(fmt.Sprintf("Waiting for in-place update, hoping it won't happen, sleep for %s", VpaEvictionTimeout.String())) + CheckNoContainersRestarted(f) + + ginkgo.By("Waiting for pods to be evicted") + err = WaitForPodsEvicted(f, podList) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + }) + + ginkgo.It("falls back to evicting pods when resize is Deferred and more than 1 minute has elapsed since last in-place update when update mode is InPlaceOrRecreate", func() { + ginkgo.By("Setting up a hamster deployment") + replicas := int32(2) + SetupHamsterDeployment(f, "100m", "100Mi", replicas) + podList, err := GetHamsterPods(f) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + ginkgo.By("Setting up a VPA CRD") + containerName := GetHamsterContainerNameByIndex(0) + + // get node name + nodeName := podList.Items[0].Spec.NodeName + node, err := f.ClientSet.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + allocatableCPU := node.Status.Allocatable[apiv1.ResourceCPU] + updatedCPU := allocatableCPU.String() + + vpaCRD := test.VerticalPodAutoscaler(). + WithName("hamster-vpa"). + WithNamespace(f.Namespace.Name). + WithTargetRef(hamsterTargetRef). + WithContainer(containerName). + WithUpdateMode(vpa_types.UpdateModeInPlaceOrRecreate). + AppendRecommendation( + test.Recommendation(). + WithContainer(containerName). + WithTarget(updatedCPU, "200Mi"). + WithLowerBound("150m", "150Mi"). + WithUpperBound("300m", "250Mi"). + GetContainerResources()). + Get() + + InstallVPA(f, vpaCRD) + + ginkgo.By(fmt.Sprintf("Waiting for in-place update, hoping it won't happen, sleep for %s", VpaEvictionTimeout.String())) + CheckNoContainersRestarted(f) + + ginkgo.By("Waiting for pods to be evicted") + err = WaitForPodsEvicted(f, podList) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + }) + perControllerTests := []struct { apiVersion string kind string @@ -519,6 +859,14 @@ func getCPURequest(podSpec apiv1.PodSpec) resource.Quantity { return podSpec.Containers[0].Resources.Requests[apiv1.ResourceCPU] } +// getContainerRestarts returns the sum of container restartCounts for all containers in the pod +func getContainerRestarts(podStatus apiv1.PodStatus) (restarts int32) { + for _, containerStatus := range podStatus.ContainerStatuses { + restarts += containerStatus.RestartCount + } + return +} + func killPod(f *framework.Framework, podList *apiv1.PodList) { f.ClientSet.CoreV1().Pods(f.Namespace.Name).Delete(context.TODO(), podList.Items[0].Name, metav1.DeleteOptions{}) err := WaitForPodsRestarted(f, podList) diff --git a/vertical-pod-autoscaler/e2e/v1/admission_controller.go b/vertical-pod-autoscaler/e2e/v1/admission_controller.go index d4e79c77828..9fc98f48b25 100644 --- a/vertical-pod-autoscaler/e2e/v1/admission_controller.go +++ b/vertical-pod-autoscaler/e2e/v1/admission_controller.go @@ -907,6 +907,38 @@ var _ = AdmissionControllerE2eDescribe("Admission-controller", func() { gomega.Expect(err.Error()).To(gomega.MatchRegexp(`.*admission webhook .*vpa.* denied the request: .*`), "Admission controller did not inspect the object") }) + ginkgo.It("starts pods with new recommended request with InPlaceOrRecreate mode", func() { + d := NewHamsterDeploymentWithResources(f, ParseQuantityOrDie("100m") /*cpu*/, ParseQuantityOrDie("100Mi") /*memory*/) + + ginkgo.By("Setting up a VPA CRD") + containerName := GetHamsterContainerNameByIndex(0) + vpaCRD := test.VerticalPodAutoscaler(). + WithName("hamster-vpa"). + WithNamespace(f.Namespace.Name). + WithTargetRef(hamsterTargetRef). + WithContainer(containerName). + WithUpdateMode(vpa_types.UpdateModeInPlaceOrRecreate). + AppendRecommendation( + test.Recommendation(). + WithContainer(containerName). + WithTarget("250m", "200Mi"). + WithLowerBound("250m", "200Mi"). + WithUpperBound("250m", "200Mi"). + GetContainerResources()). + Get() + + InstallVPA(f, vpaCRD) + + ginkgo.By("Setting up a hamster deployment") + podList := startDeploymentPods(f, d) + + // Originally Pods had 100m CPU, 100Mi of memory, but admission controller + // should change it to recommended 250m CPU and 200Mi of memory. + for _, pod := range podList.Items { + gomega.Expect(pod.Spec.Containers[0].Resources.Requests[apiv1.ResourceCPU]).To(gomega.Equal(ParseQuantityOrDie("250m"))) + gomega.Expect(pod.Spec.Containers[0].Resources.Requests[apiv1.ResourceMemory]).To(gomega.Equal(ParseQuantityOrDie("200Mi"))) + } + }) }) func startDeploymentPods(f *framework.Framework, deployment *appsv1.Deployment) *apiv1.PodList { diff --git a/vertical-pod-autoscaler/e2e/v1/common.go b/vertical-pod-autoscaler/e2e/v1/common.go index 6ce493c9a8b..4ad73d303d3 100644 --- a/vertical-pod-autoscaler/e2e/v1/common.go +++ b/vertical-pod-autoscaler/e2e/v1/common.go @@ -455,6 +455,20 @@ func CheckNoPodsEvicted(f *framework.Framework, initialPodSet PodSet) { gomega.Expect(restarted).To(gomega.Equal(0), "there should be no pod evictions") } +// CheckNoContainersRestarted waits for long enough period for VPA to start +// updating containers in-place and checks that no containers were restarted. +func CheckNoContainersRestarted(f *framework.Framework) { + var foundContainerRestarts int32 + time.Sleep(VpaEvictionTimeout) + podList, err := GetHamsterPods(f) + for _, pod := range podList.Items { + containerRestarts := getContainerRestarts(pod.Status) + foundContainerRestarts += containerRestarts + } + gomega.Expect(err).NotTo(gomega.HaveOccurred(), "unexpected error when listing hamster pods to check number of container restarts") + gomega.Expect(foundContainerRestarts).To(gomega.Equal(int32(0)), "there should be no container restarts") +} + // WaitForVPAMatch pools VPA object until match function returns true. Returns // polled vpa object. On timeout returns error. func WaitForVPAMatch(c vpa_clientset.Interface, vpa *vpa_types.VerticalPodAutoscaler, match func(vpa *vpa_types.VerticalPodAutoscaler) bool) (*vpa_types.VerticalPodAutoscaler, error) { @@ -555,3 +569,104 @@ func InstallLimitRangeWithMin(f *framework.Framework, minCpuLimit, minMemoryLimi minMemoryLimitQuantity := ParseQuantityOrDie(minMemoryLimit) installLimitRange(f, &minCpuLimitQuantity, &minMemoryLimitQuantity, nil, nil, lrType) } + +// WaitForPodsUpdatedWithoutEviction waits for pods to be updated without any evictions taking place over the polling +// interval. +func WaitForPodsUpdatedWithoutEviction(f *framework.Framework, initialPods, podList *apiv1.PodList) error { + // TODO(jkyros): This needs to be: + // 1. Make sure we wait for each of the containers to get an update queued + // 2. Make sure each of the containers actually finish the update + // 3. Once everyone has gone through 1 cycle, we don't care anymore, we can move on (it will keep scaling obviously) + framework.Logf("waiting for update to start and resources to differ") + var resourcesHaveDiffered bool + err := wait.PollUntilContextTimeout(context.TODO(), pollInterval, pollTimeout, false, func(context.Context) (bool, error) { + // TODO(jkyros): make sure we don't update too many pods at once + podList, err := GetHamsterPods(f) + if err != nil { + return false, err + } + resourcesAreSynced := true + podMissing := false + // Go through the list of initial pods + for _, initialPod := range initialPods.Items { + found := false + // Go through the list of pods we have now + for _, pod := range podList.Items { + // If we still have our initial pod, good + if initialPod.Name == pod.Name { + found = true + + // Check to see if we have our container resources updated + for num, container := range pod.Spec.Containers { + // If our current spec differs from initial, we know we were told to update + if !resourcesHaveDiffered { + for resourceName, resourceLimit := range container.Resources.Limits { + initialResourceLimit := initialPod.Spec.Containers[num].Resources.Limits[resourceName] + if !initialResourceLimit.Equal(resourceLimit) { + framework.Logf("E: %s/%s: %s limit (%v) differs from initial (%v), change has started ", pod.Name, container.Name, resourceName, resourceLimit.String(), initialResourceLimit.String()) + //fmt.Printf("UPD: L:%s: %s/%s %v differs from initial %v\n", resourceName, pod.Name, container.Name, resourceLimit, pod.Status.ContainerStatuses[num].Resources.Limits[resourceName]) + resourcesHaveDiffered = true + + } + + } + for resourceName, resourceRequest := range container.Resources.Requests { + initialResourceRequest := initialPod.Spec.Containers[num].Resources.Requests[resourceName] + if !initialResourceRequest.Equal(resourceRequest) { + framework.Logf("%s/%s: %s request (%v) differs from initial (%v), change has started ", pod.Name, container.Name, resourceName, resourceRequest.String(), initialResourceRequest.String()) + resourcesHaveDiffered = true + + } + } + } + + if len(pod.Status.ContainerStatuses) > num { + if pod.Status.ContainerStatuses[num].Resources != nil { + for resourceName, resourceLimit := range container.Resources.Limits { + statusResourceLimit := pod.Status.ContainerStatuses[num].Resources.Limits[resourceName] + if !statusResourceLimit.Equal(resourceLimit) { + framework.Logf("%s/%s: %s limit status (%v) differs from limit spec (%v), still in progress", pod.Name, container.Name, resourceName, resourceLimit.String(), statusResourceLimit.String()) + + resourcesAreSynced = false + + } + + } + for resourceName, resourceRequest := range container.Resources.Requests { + statusResourceRequest := pod.Status.ContainerStatuses[num].Resources.Requests[resourceName] + if !pod.Status.ContainerStatuses[num].Resources.Requests[resourceName].Equal(resourceRequest) { + framework.Logf("%s/%s: %s request status (%v) differs from request spec(%v), still in progress ", pod.Name, container.Name, resourceName, resourceRequest.String(), statusResourceRequest.String()) + resourcesAreSynced = false + + } + } + + } else { + framework.Logf("SOMEHOW ITS EMPTY\n") + } + } + + } + } + + } + if !found { + //framework.Logf("pod %s was evicted and should not have been\n", initialPod.Name) + podMissing = true + } + + } + if podMissing { + return false, fmt.Errorf("a pod was erroneously evicted") + } + if len(podList.Items) > 0 && resourcesAreSynced { + if !resourcesHaveDiffered { + return false, nil + } + framework.Logf("after checking %d pods, were are in sync\n", len(podList.Items)) + return true, nil + } + return false, nil + }) + return err +} diff --git a/vertical-pod-autoscaler/e2e/v1/updater.go b/vertical-pod-autoscaler/e2e/v1/updater.go index ccf0f07ded1..aab743bfcaf 100644 --- a/vertical-pod-autoscaler/e2e/v1/updater.go +++ b/vertical-pod-autoscaler/e2e/v1/updater.go @@ -38,6 +38,88 @@ var _ = UpdaterE2eDescribe("Updater", func() { f := framework.NewDefaultFramework("vertical-pod-autoscaling") f.NamespacePodSecurityEnforceLevel = podsecurity.LevelBaseline + // TODO(jkyros): it should only evict here if the feature gate is off, so we need to + // check behavior by making sure it aligns with the feature gate. e.g. if it's on, then do this test, if it's not, then skip it + + // 1. check if we have resize policies, if we do, do the test with in-place + // 2. if we don't, then do it the old way + + ginkgo.It("In-place update pods when Admission Controller status available", func() { + const statusUpdateInterval = 10 * time.Second + + ginkgo.By("Setting up the Admission Controller status") + stopCh := make(chan struct{}) + statusUpdater := status.NewUpdater( + f.ClientSet, + status.AdmissionControllerStatusName, + status.AdmissionControllerStatusNamespace, + statusUpdateInterval, + "e2e test", + ) + defer func() { + // Schedule a cleanup of the Admission Controller status. + // Status is created outside the test namespace. + ginkgo.By("Deleting the Admission Controller status") + close(stopCh) + err := f.ClientSet.CoordinationV1().Leases(status.AdmissionControllerStatusNamespace). + Delete(context.TODO(), status.AdmissionControllerStatusName, metav1.DeleteOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + }() + statusUpdater.Run(stopCh) + + podList := setupPodsForUpscalingInPlace(f) + if len(podList.Items[0].Spec.Containers[0].ResizePolicy) <= 0 { + // Feature is probably not working here + ginkgo.Skip("Skipping test, InPlacePodVerticalScaling not available") + } + + initialPods := podList.DeepCopy() + // 1. Take initial pod list + // 2. Loop through and compare all the resource values + // 3. When they change, it's good + + ginkgo.By("Waiting for pods to be in-place updated") + + //gomega.Expect(err).NotTo(gomega.HaveOccurred()) + err := WaitForPodsUpdatedWithoutEviction(f, initialPods, podList) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + }) + + ginkgo.It("Does not evict pods for downscaling in-place", func() { + const statusUpdateInterval = 10 * time.Second + + ginkgo.By("Setting up the Admission Controller status") + stopCh := make(chan struct{}) + statusUpdater := status.NewUpdater( + f.ClientSet, + status.AdmissionControllerStatusName, + status.AdmissionControllerStatusNamespace, + statusUpdateInterval, + "e2e test", + ) + defer func() { + // Schedule a cleanup of the Admission Controller status. + // Status is created outside the test namespace. + ginkgo.By("Deleting the Admission Controller status") + close(stopCh) + err := f.ClientSet.CoordinationV1().Leases(status.AdmissionControllerStatusNamespace). + Delete(context.TODO(), status.AdmissionControllerStatusName, metav1.DeleteOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + }() + statusUpdater.Run(stopCh) + + podList := setupPodsForDownscalingInPlace(f, nil) + if len(podList.Items[0].Spec.Containers[0].ResizePolicy) <= 0 { + // Feature is probably not working here + ginkgo.Skip("Skipping test, InPlacePodVerticalScaling not available") + } + initialPods := podList.DeepCopy() + + ginkgo.By("Waiting for pods to be in-place downscaled") + err := WaitForPodsUpdatedWithoutEviction(f, initialPods, podList) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + }) + ginkgo.It("evicts pods when Admission Controller status available", func() { const statusUpdateInterval = 10 * time.Second @@ -165,6 +247,49 @@ func setupPodsForEviction(f *framework.Framework, hamsterCPU, hamsterMemory stri WithName("hamster-vpa"). WithNamespace(f.Namespace.Name). WithTargetRef(controller). + WithUpdateMode(vpa_types.UpdateModeRecreate). + WithEvictionRequirements(er). + WithContainer(containerName). + AppendRecommendation( + test.Recommendation(). + WithContainer(containerName). + WithTarget(containerName, "200m"). + WithLowerBound(containerName, "200m"). + WithUpperBound(containerName, "200m"). + GetContainerResources()). + Get() + + InstallVPA(f, vpaCRD) + + return podList +} + +func setupPodsForUpscalingInPlace(f *framework.Framework) *apiv1.PodList { + return setupPodsForInPlace(f, "100m", "100Mi", nil) +} + +func setupPodsForDownscalingInPlace(f *framework.Framework, er []*vpa_types.EvictionRequirement) *apiv1.PodList { + return setupPodsForInPlace(f, "500m", "500Mi", er) +} + +func setupPodsForInPlace(f *framework.Framework, hamsterCPU, hamsterMemory string, er []*vpa_types.EvictionRequirement) *apiv1.PodList { + controller := &autoscaling.CrossVersionObjectReference{ + APIVersion: "apps/v1", + Kind: "Deployment", + Name: "hamster-deployment", + } + ginkgo.By(fmt.Sprintf("Setting up a hamster %v", controller.Kind)) + setupHamsterController(f, controller.Kind, hamsterCPU, hamsterMemory, defaultHamsterReplicas) + podList, err := GetHamsterPods(f) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + ginkgo.By("Setting up a VPA CRD") + containerName := GetHamsterContainerNameByIndex(0) + vpaCRD := test.VerticalPodAutoscaler(). + WithName("hamster-vpa"). + WithNamespace(f.Namespace.Name). + WithTargetRef(controller). + WithUpdateMode(vpa_types.UpdateModeInPlaceOrRecreate). WithEvictionRequirements(er). WithContainer(containerName). AppendRecommendation( diff --git a/vertical-pod-autoscaler/e2e/v1beta2/updater.go b/vertical-pod-autoscaler/e2e/v1beta2/updater.go index 5b8b894d565..33daf4da634 100644 --- a/vertical-pod-autoscaler/e2e/v1beta2/updater.go +++ b/vertical-pod-autoscaler/e2e/v1beta2/updater.go @@ -87,7 +87,7 @@ func setupPodsForEviction(f *framework.Framework) *apiv1.PodList { gomega.Expect(err).NotTo(gomega.HaveOccurred()) ginkgo.By("Setting up a VPA CRD") - SetupVPA(f, "200m", vpa_types.UpdateModeAuto, controller) + SetupVPA(f, "200m", vpa_types.UpdateModeRecreate, controller) return podList } diff --git a/vertical-pod-autoscaler/hack/e2e/vpa-rbac.yaml b/vertical-pod-autoscaler/hack/e2e/vpa-rbac.yaml new file mode 100644 index 00000000000..883c12ca58a --- /dev/null +++ b/vertical-pod-autoscaler/hack/e2e/vpa-rbac.yaml @@ -0,0 +1,442 @@ +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: system:metrics-reader +rules: + - apiGroups: + - "external.metrics.k8s.io" + resources: + - "*" + verbs: + - get + - list + - apiGroups: + - "metrics.k8s.io" + resources: + - pods + verbs: + - get + - list +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: system:vpa-actor +rules: + - apiGroups: + - "" + resources: + - pods + - nodes + - limitranges + verbs: + - get + - list + - watch + - apiGroups: + - "" + resources: + - events + verbs: + - get + - list + - watch + - create + - apiGroups: + - "poc.autoscaling.k8s.io" + resources: + - verticalpodautoscalers + verbs: + - get + - list + - watch + - apiGroups: + - "autoscaling.k8s.io" + resources: + - verticalpodautoscalers + verbs: + - get + - list + - watch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: system:vpa-status-actor +rules: + - apiGroups: + - "autoscaling.k8s.io" + resources: + - verticalpodautoscalers/status + verbs: + - get + - patch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: system:vpa-checkpoint-actor +rules: + - apiGroups: + - "poc.autoscaling.k8s.io" + resources: + - verticalpodautoscalercheckpoints + verbs: + - get + - list + - watch + - create + - patch + - delete + - apiGroups: + - "autoscaling.k8s.io" + resources: + - verticalpodautoscalercheckpoints + verbs: + - get + - list + - watch + - create + - patch + - delete + - apiGroups: + - "" + resources: + - namespaces + verbs: + - get + - list +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: system:evictioner +rules: + - apiGroups: + - "apps" + - "extensions" + resources: + - replicasets + verbs: + - get + - apiGroups: + - "" + resources: + - pods/eviction + verbs: + - create +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: system:metrics-reader +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: system:metrics-reader +subjects: + - kind: ServiceAccount + name: vpa-recommender + namespace: kube-system +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: system:vpa-actor +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: system:vpa-actor +subjects: + - kind: ServiceAccount + name: vpa-recommender + namespace: kube-system + - kind: ServiceAccount + name: vpa-updater + namespace: kube-system +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: system:vpa-status-actor +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: system:vpa-status-actor +subjects: + - kind: ServiceAccount + name: vpa-recommender + namespace: kube-system +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: system:vpa-checkpoint-actor +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: system:vpa-checkpoint-actor +subjects: + - kind: ServiceAccount + name: vpa-recommender + namespace: kube-system +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: system:vpa-target-reader +rules: + - apiGroups: + - '*' + resources: + - '*/scale' + verbs: + - get + - watch + - apiGroups: + - "" + resources: + - replicationcontrollers + verbs: + - get + - list + - watch + - apiGroups: + - apps + resources: + - daemonsets + - deployments + - replicasets + - statefulsets + verbs: + - get + - list + - watch + - apiGroups: + - batch + resources: + - jobs + - cronjobs + verbs: + - get + - list + - watch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: system:vpa-target-reader-binding +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: system:vpa-target-reader +subjects: + - kind: ServiceAccount + name: vpa-recommender + namespace: kube-system + - kind: ServiceAccount + name: vpa-admission-controller + namespace: kube-system + - kind: ServiceAccount + name: vpa-updater + namespace: kube-system +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: system:vpa-evictioner-binding +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: system:evictioner +subjects: + - kind: ServiceAccount + name: vpa-updater + namespace: kube-system +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: vpa-admission-controller + namespace: kube-system +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: vpa-recommender + namespace: kube-system +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: vpa-updater + namespace: kube-system +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: system:vpa-admission-controller +rules: + - apiGroups: + - "" + resources: + - pods + - configmaps + - nodes + - limitranges + verbs: + - get + - list + - watch + - apiGroups: + - "admissionregistration.k8s.io" + resources: + - mutatingwebhookconfigurations + verbs: + - create + - delete + - get + - list + - apiGroups: + - "poc.autoscaling.k8s.io" + resources: + - verticalpodautoscalers + verbs: + - get + - list + - watch + - apiGroups: + - "autoscaling.k8s.io" + resources: + - verticalpodautoscalers + verbs: + - get + - list + - watch + - apiGroups: + - "coordination.k8s.io" + resources: + - leases + verbs: + - create + - update + - get + - list + - watch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: system:vpa-admission-controller +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: system:vpa-admission-controller +subjects: + - kind: ServiceAccount + name: vpa-admission-controller + namespace: kube-system +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: system:vpa-status-reader +rules: + - apiGroups: + - "coordination.k8s.io" + resources: + - leases + verbs: + - get + - list + - watch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: system:vpa-status-reader-binding +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: system:vpa-status-reader +subjects: + - kind: ServiceAccount + name: vpa-updater + namespace: kube-system +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: system:leader-locking-vpa-updater + namespace: kube-system +rules: + - apiGroups: + - "coordination.k8s.io" + resources: + - leases + verbs: + - create + - apiGroups: + - "coordination.k8s.io" + resourceNames: + - vpa-updater + resources: + - leases + verbs: + - get + - watch + - update +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: system:leader-locking-vpa-updater + namespace: kube-system +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: system:leader-locking-vpa-updater +subjects: + - kind: ServiceAccount + name: vpa-updater + namespace: kube-system +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: system:leader-locking-vpa-recommender + namespace: kube-system +rules: + - apiGroups: + - "coordination.k8s.io" + resources: + - leases + verbs: + - create + - apiGroups: + - "coordination.k8s.io" + resourceNames: + # TODO: Clean vpa-recommender up once vpa-recommender-lease is used everywhere. See https://github.com/kubernetes/autoscaler/issues/7461. + - vpa-recommender + - vpa-recommender-lease + resources: + - leases + verbs: + - get + - watch + - update +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: system:leader-locking-vpa-recommender + namespace: kube-system +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: system:leader-locking-vpa-recommender +subjects: + - kind: ServiceAccount + name: vpa-recommender + namespace: kube-system diff --git a/vertical-pod-autoscaler/hack/inplace-kind-config.yaml b/vertical-pod-autoscaler/hack/inplace-kind-config.yaml new file mode 100644 index 00000000000..9d20acec394 --- /dev/null +++ b/vertical-pod-autoscaler/hack/inplace-kind-config.yaml @@ -0,0 +1,4 @@ +kind: Cluster +apiVersion: kind.x-k8s.io/v1alpha4 +featureGates: + InPlacePodVerticalScaling: true diff --git a/vertical-pod-autoscaler/hack/run-e2e-locally.sh b/vertical-pod-autoscaler/hack/run-e2e-locally.sh index 48d8e0f789f..ca0ce71ef5b 100755 --- a/vertical-pod-autoscaler/hack/run-e2e-locally.sh +++ b/vertical-pod-autoscaler/hack/run-e2e-locally.sh @@ -74,8 +74,8 @@ echo "Deleting KIND cluster 'kind'." kind delete cluster -n kind -q echo "Creating KIND cluster 'kind'" -KIND_VERSION="kindest/node:v1.26.3" -kind create cluster --image=${KIND_VERSION} +KIND_VERSION="kindest/node:v1.32.0" +kind create cluster --image=${KIND_VERSION} --config ${SCRIPT_ROOT}/hack/inplace-kind-config.yaml echo "Building metrics-pump image" docker build -t localhost:5001/write-metrics:dev -f ${SCRIPT_ROOT}/hack/e2e/Dockerfile.externalmetrics-writer ${SCRIPT_ROOT}/hack diff --git a/vertical-pod-autoscaler/pkg/admission-controller/resource/pod/patch/resource_updates.go b/vertical-pod-autoscaler/pkg/admission-controller/resource/pod/patch/resource_updates.go index cddec1aa83f..7310ccc82ac 100644 --- a/vertical-pod-autoscaler/pkg/admission-controller/resource/pod/patch/resource_updates.go +++ b/vertical-pod-autoscaler/pkg/admission-controller/resource/pod/patch/resource_updates.go @@ -77,7 +77,7 @@ func getContainerPatch(pod *core.Pod, i int, annotationsPerContainer vpa_api_uti // Add empty resources object if missing. if pod.Spec.Containers[i].Resources.Limits == nil && pod.Spec.Containers[i].Resources.Requests == nil { - patches = append(patches, getPatchInitializingEmptyResources(i)) + patches = append(patches, GetPatchInitializingEmptyResources(i)) } annotations, found := annotationsPerContainer[pod.Spec.Containers[i].Name] @@ -95,23 +95,23 @@ func getContainerPatch(pod *core.Pod, i int, annotationsPerContainer vpa_api_uti func appendPatchesAndAnnotations(patches []resource_admission.PatchRecord, annotations []string, current core.ResourceList, containerIndex int, resources core.ResourceList, fieldName, resourceName string) ([]resource_admission.PatchRecord, []string) { // Add empty object if it's missing and we're about to fill it. if current == nil && len(resources) > 0 { - patches = append(patches, getPatchInitializingEmptyResourcesSubfield(containerIndex, fieldName)) + patches = append(patches, GetPatchInitializingEmptyResourcesSubfield(containerIndex, fieldName)) } for resource, request := range resources { - patches = append(patches, getAddResourceRequirementValuePatch(containerIndex, fieldName, resource, request)) + patches = append(patches, GetAddResourceRequirementValuePatch(containerIndex, fieldName, resource, request)) annotations = append(annotations, fmt.Sprintf("%s %s", resource, resourceName)) } return patches, annotations } -func getAddResourceRequirementValuePatch(i int, kind string, resource core.ResourceName, quantity resource.Quantity) resource_admission.PatchRecord { +func GetAddResourceRequirementValuePatch(i int, kind string, resource core.ResourceName, quantity resource.Quantity) resource_admission.PatchRecord { return resource_admission.PatchRecord{ Op: "add", Path: fmt.Sprintf("/spec/containers/%d/resources/%s/%s", i, kind, resource), Value: quantity.String()} } -func getPatchInitializingEmptyResources(i int) resource_admission.PatchRecord { +func GetPatchInitializingEmptyResources(i int) resource_admission.PatchRecord { return resource_admission.PatchRecord{ Op: "add", Path: fmt.Sprintf("/spec/containers/%d/resources", i), @@ -119,7 +119,7 @@ func getPatchInitializingEmptyResources(i int) resource_admission.PatchRecord { } } -func getPatchInitializingEmptyResourcesSubfield(i int, kind string) resource_admission.PatchRecord { +func GetPatchInitializingEmptyResourcesSubfield(i int, kind string) resource_admission.PatchRecord { return resource_admission.PatchRecord{ Op: "add", Path: fmt.Sprintf("/spec/containers/%d/resources/%s", i, kind), diff --git a/vertical-pod-autoscaler/pkg/admission-controller/resource/vpa/handler.go b/vertical-pod-autoscaler/pkg/admission-controller/resource/vpa/handler.go index f2f2a29c4d3..f7387ba0feb 100644 --- a/vertical-pod-autoscaler/pkg/admission-controller/resource/vpa/handler.go +++ b/vertical-pod-autoscaler/pkg/admission-controller/resource/vpa/handler.go @@ -34,10 +34,11 @@ import ( var ( possibleUpdateModes = map[vpa_types.UpdateMode]interface{}{ - vpa_types.UpdateModeOff: struct{}{}, - vpa_types.UpdateModeInitial: struct{}{}, - vpa_types.UpdateModeRecreate: struct{}{}, - vpa_types.UpdateModeAuto: struct{}{}, + vpa_types.UpdateModeOff: struct{}{}, + vpa_types.UpdateModeInitial: struct{}{}, + vpa_types.UpdateModeRecreate: struct{}{}, + vpa_types.UpdateModeAuto: struct{}{}, + vpa_types.UpdateModeInPlaceOrRecreate: struct{}{}, } possibleScalingModes = map[vpa_types.ContainerScalingMode]interface{}{ diff --git a/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1/types.go b/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1/types.go index 5ede0fe4810..47255520294 100644 --- a/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1/types.go +++ b/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1/types.go @@ -152,7 +152,7 @@ type PodUpdatePolicy struct { } // UpdateMode controls when autoscaler applies changes to the pod resources. -// +kubebuilder:validation:Enum=Off;Initial;Recreate;Auto +// +kubebuilder:validation:Enum=Off;Initial;Recreate;InPlaceOrRecreate;Auto type UpdateMode string const ( @@ -172,6 +172,10 @@ const ( // using any available update method. Currently this is equivalent to // Recreate, which is the only available update method. UpdateModeAuto UpdateMode = "Auto" + // UpdateModeInPlaceOrRecreate means that autoscaler tries to assign resources in-place + // first, and if it cannot ( resize takes too long or is Infeasible ) it falls back to the + // "Recreate" update mode. + UpdateModeInPlaceOrRecreate UpdateMode = "InPlaceOrRecreate" ) // PodResourcePolicy controls how autoscaler computes the recommended resources diff --git a/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go b/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go index 5fef7df5deb..4e53fec6d4a 100644 --- a/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go +++ b/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go @@ -68,6 +68,9 @@ type ClusterStateFeeder interface { // LoadPods updates clusterState with current specification of Pods and their Containers. LoadPods() + // PruneContainers removes any containers from the cluster state that are no longer present in pods. + PruneContainers() + // LoadRealTimeMetrics updates clusterState with current usage metrics of containers. LoadRealTimeMetrics() @@ -290,7 +293,7 @@ func (feeder *clusterStateFeeder) GarbageCollectCheckpoints() { } for _, checkpoint := range checkpointList.Items { vpaID := model.VpaID{Namespace: checkpoint.Namespace, VpaName: checkpoint.Spec.VPAObjectName} - _, exists := feeder.clusterState.Vpas[vpaID] + vpa, exists := feeder.clusterState.Vpas[vpaID] if !exists { err = feeder.vpaCheckpointClient.VerticalPodAutoscalerCheckpoints(namespace).Delete(context.TODO(), checkpoint.Name, metav1.DeleteOptions{}) if err == nil { @@ -299,6 +302,21 @@ func (feeder *clusterStateFeeder) GarbageCollectCheckpoints() { klog.ErrorS(err, "Orphaned VPA checkpoint cleanup - error deleting", "checkpoint", klog.KRef(namespace, checkpoint.Name)) } } + // Also clean up a checkpoint if the VPA is still there, but the container is gone. AggregateStateByContainerName + // merges in the initial aggregates so we can use it to check "both lists" (initial, aggregates) at once + // TODO(jkyros): could we also just wait until it got "old" enough, e.g. the checkpoint hasn't + // been updated for an hour, blow it a away? Because once we remove it from the aggregate lists, it will stop + // being maintained. + _, aggregateExists := vpa.AggregateStateByContainerName()[checkpoint.Spec.ContainerName] + if !aggregateExists { + err = feeder.vpaCheckpointClient.VerticalPodAutoscalerCheckpoints(namespace).Delete(context.TODO(), checkpoint.Name, metav1.DeleteOptions{}) + if err == nil { + klog.V(3).Infof("Orphaned VPA checkpoint cleanup - deleting %v/%v.", namespace, checkpoint.Name) + } else { + klog.Errorf("Cannot delete VPA checkpoint %v/%v. Reason: %+v", namespace, checkpoint.Name, err) + } + } + } } } @@ -425,6 +443,50 @@ func (feeder *clusterStateFeeder) LoadPods() { } } +// PruneContainers removes any containers from the aggregates and initial aggregates that are no longer +// present in the feeder's clusterState. Without this, we would be averaging our resource calculations +// over containers that no longer exist, and continuing to update checkpoints that should be left to expire. +func (feeder *clusterStateFeeder) PruneContainers() { + + // Find all the containers that are still legitimately in pods + containersInPods := make(map[string]string) + for _, pod := range feeder.clusterState.Pods { + for containerID, _ := range pod.Containers { + containersInPods[containerID] = pod.ID.PodName + } + } + + var aggregatesPruned int + var initialAggregatesPruned int + for _, vpa := range feeder.clusterState.Vpas { + // Look at the aggregates + for container := range vpa.AggregateContainerStates() { + // If the container being aggregated isn't in a pod anymore according to the state, remove it + if _, ok := containersInPods[container.ContainerName()]; !ok { + klog.V(4).Infof("Deleting Aggregate container %s, not present in any pods", container.ContainerName()) + vpa.DeleteAggregation(container) + aggregatesPruned = aggregatesPruned + 1 + } + + } + // Also remove it from the initial aggregates. This is done separately from the normal aggregates because it + // could be in this list, but not that list and vice versa + for container := range vpa.ContainersInitialAggregateState { + if _, ok := containersInPods[container]; !ok { + klog.V(4).Infof("Deleting Initial Aggregate container %s, not present in any pods", container) + delete(vpa.ContainersInitialAggregateState, container) + initialAggregatesPruned = initialAggregatesPruned + 1 + + } + } + } + // Only log if we did something + if initialAggregatesPruned > 0 || aggregatesPruned > 0 { + klog.Infof("Pruned %d aggregate and %d initial aggregate containers", aggregatesPruned, initialAggregatesPruned) + } + +} + func (feeder *clusterStateFeeder) LoadRealTimeMetrics() { containersMetrics, err := feeder.metricsClient.GetContainersMetrics() if err != nil { diff --git a/vertical-pod-autoscaler/pkg/recommender/logic/recommender.go b/vertical-pod-autoscaler/pkg/recommender/logic/recommender.go index 8aaf9cbd5e9..b613f7a8b8d 100644 --- a/vertical-pod-autoscaler/pkg/recommender/logic/recommender.go +++ b/vertical-pod-autoscaler/pkg/recommender/logic/recommender.go @@ -22,6 +22,7 @@ import ( vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1" "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/model" + "k8s.io/klog/v2" ) var ( @@ -39,7 +40,7 @@ var ( // PodResourceRecommender computes resource recommendation for a Vpa object. type PodResourceRecommender interface { - GetRecommendedPodResources(containerNameToAggregateStateMap model.ContainerNameToAggregateStateMap) RecommendedPodResources + GetRecommendedPodResources(containerNameToAggregateStateMap model.ContainerNameToAggregateStateMap, containersPerPod int) RecommendedPodResources } // RecommendedPodResources is a Map from container name to recommended resources. @@ -65,13 +66,15 @@ type podResourceRecommender struct { upperBoundMemory MemoryEstimator } -func (r *podResourceRecommender) GetRecommendedPodResources(containerNameToAggregateStateMap model.ContainerNameToAggregateStateMap) RecommendedPodResources { +func (r *podResourceRecommender) GetRecommendedPodResources(containerNameToAggregateStateMap model.ContainerNameToAggregateStateMap, containersPerPod int) RecommendedPodResources { var recommendation = make(RecommendedPodResources) if len(containerNameToAggregateStateMap) == 0 { return recommendation } - fraction := 1.0 / float64(len(containerNameToAggregateStateMap)) + // TODO(jkyros): This right here is factoring in containers that don't exist anymore + fraction := 1.0 / float64(containersPerPod) + klog.Infof("Spreading recommendation across %d containers (fraction %f)", containersPerPod, fraction) minCPU := model.ScaleResource(model.CPUAmountFromCores(*podMinCPUMillicores*0.001), fraction) minMemory := model.ScaleResource(model.MemoryAmountFromBytes(*podMinMemoryMb*1024*1024), fraction) diff --git a/vertical-pod-autoscaler/pkg/recommender/logic/recommender_test.go b/vertical-pod-autoscaler/pkg/recommender/logic/recommender_test.go index 5824745a461..54f70af7bb8 100644 --- a/vertical-pod-autoscaler/pkg/recommender/logic/recommender_test.go +++ b/vertical-pod-autoscaler/pkg/recommender/logic/recommender_test.go @@ -40,7 +40,7 @@ func TestMinResourcesApplied(t *testing.T) { "container-1": &model.AggregateContainerState{}, } - recommendedResources := recommender.GetRecommendedPodResources(containerNameToAggregateStateMap) + recommendedResources := recommender.GetRecommendedPodResources(containerNameToAggregateStateMap, len(containerNameToAggregateStateMap)) assert.Equal(t, model.CPUAmountFromCores(*podMinCPUMillicores/1000), recommendedResources["container-1"].Target[model.ResourceCPU]) assert.Equal(t, model.MemoryAmountFromBytes(*podMinMemoryMb*1024*1024), recommendedResources["container-1"].Target[model.ResourceMemory]) } @@ -63,7 +63,7 @@ func TestMinResourcesSplitAcrossContainers(t *testing.T) { "container-2": &model.AggregateContainerState{}, } - recommendedResources := recommender.GetRecommendedPodResources(containerNameToAggregateStateMap) + recommendedResources := recommender.GetRecommendedPodResources(containerNameToAggregateStateMap, len(containerNameToAggregateStateMap)) assert.Equal(t, model.CPUAmountFromCores((*podMinCPUMillicores/1000)/2), recommendedResources["container-1"].Target[model.ResourceCPU]) assert.Equal(t, model.CPUAmountFromCores((*podMinCPUMillicores/1000)/2), recommendedResources["container-2"].Target[model.ResourceCPU]) assert.Equal(t, model.MemoryAmountFromBytes((*podMinMemoryMb*1024*1024)/2), recommendedResources["container-1"].Target[model.ResourceMemory]) @@ -90,7 +90,7 @@ func TestControlledResourcesFiltered(t *testing.T) { }, } - recommendedResources := recommender.GetRecommendedPodResources(containerNameToAggregateStateMap) + recommendedResources := recommender.GetRecommendedPodResources(containerNameToAggregateStateMap, len(containerNameToAggregateStateMap)) assert.Contains(t, recommendedResources[containerName].Target, model.ResourceMemory) assert.Contains(t, recommendedResources[containerName].LowerBound, model.ResourceMemory) assert.Contains(t, recommendedResources[containerName].UpperBound, model.ResourceMemory) @@ -119,7 +119,7 @@ func TestControlledResourcesFilteredDefault(t *testing.T) { }, } - recommendedResources := recommender.GetRecommendedPodResources(containerNameToAggregateStateMap) + recommendedResources := recommender.GetRecommendedPodResources(containerNameToAggregateStateMap, len(containerNameToAggregateStateMap)) assert.Contains(t, recommendedResources[containerName].Target, model.ResourceMemory) assert.Contains(t, recommendedResources[containerName].LowerBound, model.ResourceMemory) assert.Contains(t, recommendedResources[containerName].UpperBound, model.ResourceMemory) diff --git a/vertical-pod-autoscaler/pkg/recommender/main.go b/vertical-pod-autoscaler/pkg/recommender/main.go index 8a53137f1e5..af4f2d25fec 100644 --- a/vertical-pod-autoscaler/pkg/recommender/main.go +++ b/vertical-pod-autoscaler/pkg/recommender/main.go @@ -107,7 +107,7 @@ var ( const ( // aggregateContainerStateGCInterval defines how often expired AggregateContainerStates are garbage collected. - aggregateContainerStateGCInterval = 1 * time.Hour + aggregateContainerStateGCInterval = 5 * time.Minute scaleCacheEntryLifetime time.Duration = time.Hour scaleCacheEntryFreshnessTime time.Duration = 10 * time.Minute scaleCacheEntryJitterFactor float64 = 1. diff --git a/vertical-pod-autoscaler/pkg/recommender/model/cluster.go b/vertical-pod-autoscaler/pkg/recommender/model/cluster.go index 8fd2fd8f030..6e6183b403f 100644 --- a/vertical-pod-autoscaler/pkg/recommender/model/cluster.go +++ b/vertical-pod-autoscaler/pkg/recommender/model/cluster.go @@ -144,9 +144,28 @@ func (cluster *ClusterState) AddOrUpdatePod(podID PodID, newLabels labels.Set, p cluster.addPodToItsVpa(pod) } + // Tally the number of containers for later when we're averaging the recommendations + cluster.setVPAContainersPerPod(pod) pod.Phase = phase } +// setVPAContainersPerPod sets the number of containers per pod seen for pods connected to this VPA +// so that later when were spreading the minimum recommendations we're spreading them over the correct +// number and not just the number of all aggregates that have *ever* been in the pod. (We don't want minimum resources +// to erroneously shrink) +func (cluster *ClusterState) setVPAContainersPerPod(pod *PodState) { + for _, vpa := range cluster.Vpas { + if vpa_utils.PodLabelsMatchVPA(pod.ID.Namespace, cluster.labelSetMap[pod.labelSetKey], vpa.ID.Namespace, vpa.PodSelector) { + // We want the "high water mark" of the most containers in the pod in the event + // that we're rolling out a pod that has an additional container + if len(pod.Containers) > vpa.ContainersPerPod { + vpa.ContainersPerPod = len(pod.Containers) + } + } + } + +} + // addPodToItsVpa increases the count of Pods associated with a VPA object. // Does a scan similar to findOrCreateAggregateContainerState so could be optimized if needed. func (cluster *ClusterState) addPodToItsVpa(pod *PodState) { @@ -275,6 +294,11 @@ func (cluster *ClusterState) AddOrUpdateVpa(apiObject *vpa_types.VerticalPodAuto } vpa.PodCount = len(cluster.GetMatchingPods(vpa)) } + + // Default this to the minimum, we will tally the true number when we load the pods later + // TODO(jkyros): This is gross, it depends on the order I know it currently loads things in, but + // that might not be the case someday + vpa.ContainersPerPod = 1 vpa.TargetRef = apiObject.Spec.TargetRef vpa.Annotations = annotationsMap vpa.Conditions = conditionsMap diff --git a/vertical-pod-autoscaler/pkg/recommender/model/vpa.go b/vertical-pod-autoscaler/pkg/recommender/model/vpa.go index b4f32e07f8c..600453f4594 100644 --- a/vertical-pod-autoscaler/pkg/recommender/model/vpa.go +++ b/vertical-pod-autoscaler/pkg/recommender/model/vpa.go @@ -110,6 +110,11 @@ type Vpa struct { TargetRef *autoscaling.CrossVersionObjectReference // PodCount contains number of live Pods matching a given VPA object. PodCount int + // ContainersPerPod contains the "high water mark" of the number of containers + // per pod to average the recommandation across. Used to make sure we aren't + // "fractionalizing" minResources erroneously during a redeploy when when a pod's + // container is removed or renamed + ContainersPerPod int } // NewVpa returns a new Vpa with a given ID and pod selector. Doesn't set the @@ -209,6 +214,18 @@ func (vpa *Vpa) AggregateStateByContainerName() ContainerNameToAggregateStateMap return containerNameToAggregateStateMap } +// AggregateStateByContainerNameWithoutCheckpoints returns a map from container name to the aggregated state +// of all containers with that name, belonging to pods matched by the VPA, omitting any checkpoints. +func (vpa *Vpa) AggregateStateByContainerNameWithoutCheckpoints() ContainerNameToAggregateStateMap { + containerNameToAggregateStateMap := AggregateStateByContainerName(vpa.aggregateContainerStates) + return containerNameToAggregateStateMap +} + +// AggregateContainerStates returns the underlying internal aggregate state map. +func (vpa *Vpa) AggregateContainerStates() aggregateContainerStatesMap { + return vpa.aggregateContainerStates +} + // HasRecommendation returns if the VPA object contains any recommendation func (vpa *Vpa) HasRecommendation() bool { return (vpa.Recommendation != nil) && len(vpa.Recommendation.ContainerRecommendations) > 0 diff --git a/vertical-pod-autoscaler/pkg/recommender/routines/recommender.go b/vertical-pod-autoscaler/pkg/recommender/routines/recommender.go index 1f456a80c18..3b92bdbcb05 100644 --- a/vertical-pod-autoscaler/pkg/recommender/routines/recommender.go +++ b/vertical-pod-autoscaler/pkg/recommender/routines/recommender.go @@ -91,7 +91,7 @@ func (r *recommender) UpdateVPAs() { if !found { continue } - resources := r.podResourceRecommender.GetRecommendedPodResources(GetContainerNameToAggregateStateMap(vpa)) + resources := r.podResourceRecommender.GetRecommendedPodResources(GetContainerNameToAggregateStateMap(vpa), vpa.ContainersPerPod) had := vpa.HasRecommendation() listOfResourceRecommendation := logic.MapToListOfRecommendedContainerResources(resources) @@ -153,6 +153,9 @@ func (r *recommender) RunOnce() { r.clusterStateFeeder.LoadPods() timer.ObserveStep("LoadPods") + r.clusterStateFeeder.PruneContainers() + timer.ObserveStep("PruneContainers") + r.clusterStateFeeder.LoadRealTimeMetrics() timer.ObserveStep("LoadMetrics") klog.V(3).InfoS("ClusterState is tracking", "pods", len(r.clusterState.Pods), "vpas", len(r.clusterState.Vpas)) diff --git a/vertical-pod-autoscaler/pkg/updater/eviction/pods_eviction_restriction.go b/vertical-pod-autoscaler/pkg/updater/eviction/pods_eviction_restriction.go index e79f79e54c5..e553907b972 100644 --- a/vertical-pod-autoscaler/pkg/updater/eviction/pods_eviction_restriction.go +++ b/vertical-pod-autoscaler/pkg/updater/eviction/pods_eviction_restriction.go @@ -18,6 +18,7 @@ package eviction import ( "context" + "encoding/json" "fmt" "time" @@ -25,6 +26,9 @@ import ( apiv1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8stypes "k8s.io/apimachinery/pkg/types" + resource_updates "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/admission-controller/resource" + "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/admission-controller/resource/pod/patch" vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1" appsinformer "k8s.io/client-go/informers/apps/v1" coreinformer "k8s.io/client-go/informers/core/v1" @@ -47,12 +51,18 @@ type PodsEvictionRestriction interface { Evict(pod *apiv1.Pod, vpa *vpa_types.VerticalPodAutoscaler, eventRecorder record.EventRecorder) error // CanEvict checks if pod can be safely evicted CanEvict(pod *apiv1.Pod) bool + + // InPlaceUpdate updates the pod resources in-place + InPlaceUpdate(pod *apiv1.Pod, vpa *vpa_types.VerticalPodAutoscaler, eventRecorder record.EventRecorder) error + // CanEvict checks if pod can be safely evicted + CanInPlaceUpdate(pod *apiv1.Pod) bool } type podsEvictionRestrictionImpl struct { client kube_client.Interface podToReplicaCreatorMap map[string]podReplicaCreator creatorToSingleGroupStatsMap map[podReplicaCreator]singleGroupStats + patchCalculators []patch.Calculator } type singleGroupStats struct { @@ -61,13 +71,14 @@ type singleGroupStats struct { running int evictionTolerance int evicted int + inPlaceUpdating int } // PodsEvictionRestrictionFactory creates PodsEvictionRestriction type PodsEvictionRestrictionFactory interface { // NewPodsEvictionRestriction creates PodsEvictionRestriction for given set of pods, // controlled by a single VPA object. - NewPodsEvictionRestriction(pods []*apiv1.Pod, vpa *vpa_types.VerticalPodAutoscaler) PodsEvictionRestriction + NewPodsEvictionRestriction(pods []*apiv1.Pod, vpa *vpa_types.VerticalPodAutoscaler, patchCalculators []patch.Calculator) PodsEvictionRestriction } type podsEvictionRestrictionFactoryImpl struct { @@ -98,23 +109,50 @@ type podReplicaCreator struct { // CanEvict checks if pod can be safely evicted func (e *podsEvictionRestrictionImpl) CanEvict(pod *apiv1.Pod) bool { - cr, present := e.podToReplicaCreatorMap[getPodID(pod)] + cr, present := e.podToReplicaCreatorMap[GetPodID(pod)] if present { singleGroupStats, present := e.creatorToSingleGroupStatsMap[cr] if pod.Status.Phase == apiv1.PodPending { return true } if present { + shouldBeAlive := singleGroupStats.configured - singleGroupStats.evictionTolerance - if singleGroupStats.running-singleGroupStats.evicted > shouldBeAlive { + // TODO(jkyros): Come back and think through this better, but for now, take in-place updates into account because + // they might cause disruption. We assume pods will not be both in-place updated and evicted in the same pass, but + // we need eviction to take the numbers into account so we don't violate our disruption dolerances. + // If we're already resizing this pod, don't do anything to it, unless we failed to resize it, then we want to evict it. + if IsInPlaceUpdating(pod) { + klog.V(4).InfoS("Pod disruption tolerance", + "pod", pod.Name, + "running", singleGroupStats.running, + "configured", singleGroupStats.configured, + "tolerance", singleGroupStats.evictionTolerance, + "evicted", singleGroupStats.evicted, + "updating", singleGroupStats.inPlaceUpdating) + + if singleGroupStats.running-(singleGroupStats.evicted+(singleGroupStats.inPlaceUpdating-1)) > shouldBeAlive { + klog.V(4).Infof("Would be able to evict, but already resizing %s", pod.Name) + + if pod.Status.Resize == apiv1.PodResizeStatusInfeasible || pod.Status.Resize == apiv1.PodResizeStatusDeferred { + klog.Warningf("Attempted in-place resize of %s impossible, should now evict", pod.Name) + return true + } + } + return false + } + + if singleGroupStats.running-(singleGroupStats.evicted+singleGroupStats.inPlaceUpdating) > shouldBeAlive { return true } // If all pods are running and eviction tolerance is small evict 1 pod. if singleGroupStats.running == singleGroupStats.configured && singleGroupStats.evictionTolerance == 0 && - singleGroupStats.evicted == 0 { + singleGroupStats.evicted == 0 && + singleGroupStats.inPlaceUpdating == 0 { return true } + } } return false @@ -123,7 +161,7 @@ func (e *podsEvictionRestrictionImpl) CanEvict(pod *apiv1.Pod) bool { // Evict sends eviction instruction to api client. Returns error if pod cannot be evicted or if client returned error // Does not check if pod was actually evicted after eviction grace period. func (e *podsEvictionRestrictionImpl) Evict(podToEvict *apiv1.Pod, vpa *vpa_types.VerticalPodAutoscaler, eventRecorder record.EventRecorder) error { - cr, present := e.podToReplicaCreatorMap[getPodID(podToEvict)] + cr, present := e.podToReplicaCreatorMap[GetPodID(podToEvict)] if !present { return fmt.Errorf("pod not suitable for eviction %s/%s: not in replicated pods map", podToEvict.Namespace, podToEvict.Name) } @@ -192,7 +230,7 @@ func NewPodsEvictionRestrictionFactory(client kube_client.Interface, minReplicas // NewPodsEvictionRestriction creates PodsEvictionRestriction for a given set of pods, // controlled by a single VPA object. -func (f *podsEvictionRestrictionFactoryImpl) NewPodsEvictionRestriction(pods []*apiv1.Pod, vpa *vpa_types.VerticalPodAutoscaler) PodsEvictionRestriction { +func (f *podsEvictionRestrictionFactoryImpl) NewPodsEvictionRestriction(pods []*apiv1.Pod, vpa *vpa_types.VerticalPodAutoscaler, patchCalculators []patch.Calculator) PodsEvictionRestriction { // We can evict pod only if it is a part of replica set // For each replica set we can evict only a fraction of pods. // Evictions may be later limited by pod disruption budget if configured. @@ -224,6 +262,7 @@ func (f *podsEvictionRestrictionFactoryImpl) NewPodsEvictionRestriction(pods []* for creator, replicas := range livePods { actual := len(replicas) + // TODO(maxcao13): Does minReplicas matter if the update is in-place? There should be no downtime if the update is disruptionless if actual < required { klog.V(2).InfoS("Too few replicas", "kind", creator.Kind, "object", klog.KRef(creator.Namespace, creator.Name), "livePods", actual, "requiredPods", required, "globalMinReplicas", f.minReplicas) continue @@ -246,18 +285,24 @@ func (f *podsEvictionRestrictionFactoryImpl) NewPodsEvictionRestriction(pods []* singleGroup.configured = configured singleGroup.evictionTolerance = int(float64(configured) * f.evictionToleranceFraction) for _, pod := range replicas { - podToReplicaCreatorMap[getPodID(pod)] = creator + podToReplicaCreatorMap[GetPodID(pod)] = creator if pod.Status.Phase == apiv1.PodPending { singleGroup.pending = singleGroup.pending + 1 } + if IsInPlaceUpdating(pod) { + singleGroup.inPlaceUpdating = singleGroup.inPlaceUpdating + 1 + } } singleGroup.running = len(replicas) - singleGroup.pending creatorToSingleGroupStatsMap[creator] = singleGroup + } return &podsEvictionRestrictionImpl{ client: f.client, podToReplicaCreatorMap: podToReplicaCreatorMap, - creatorToSingleGroupStatsMap: creatorToSingleGroupStatsMap} + creatorToSingleGroupStatsMap: creatorToSingleGroupStatsMap, + patchCalculators: patchCalculators, + } } func getPodReplicaCreator(pod *apiv1.Pod) (*podReplicaCreator, error) { @@ -273,7 +318,8 @@ func getPodReplicaCreator(pod *apiv1.Pod) (*podReplicaCreator, error) { return podReplicaCreator, nil } -func getPodID(pod *apiv1.Pod) string { +// GetPodID returns a string that uniquely identifies a pod by namespace and name +func GetPodID(pod *apiv1.Pod) string { if pod == nil { return "" } @@ -391,3 +437,198 @@ func setUpInformer(kubeClient kube_client.Interface, kind controllerKind) (cache } return informer, nil } + +// CanInPlaceUpdate performs the same checks +func (e *podsEvictionRestrictionImpl) CanInPlaceUpdate(pod *apiv1.Pod) bool { + + cr, present := e.podToReplicaCreatorMap[GetPodID(pod)] + if present { + + // If our QoS class is guaranteed, we can't change the resources without a restart + // TODO(maxcao13): kubelet already prevents a resize of a guaranteed pod, so should we still check this early? + if pod.Status.QOSClass == apiv1.PodQOSGuaranteed { + klog.Warningf("Can't resize %s in-place, pod QoS is %s", pod.Name, pod.Status.QOSClass) + return false + } + + // If we're already resizing this pod, don't do it again + if IsInPlaceUpdating(pod) { + klog.Warningf("Not resizing %s, already resizing it", pod.Name) + return false + } + + noRestartPoliciesPopulated := true + + for _, container := range pod.Spec.Containers { + // If some of these are populated, we know it at least understands resizing + if len(container.ResizePolicy) > 0 { + noRestartPoliciesPopulated = false + } + + // TODO(maxcao13): Do we have to check the policy resource too? i.e. if only memory is getting scaled, then only check the memory resize policy? + for _, policy := range container.ResizePolicy { + if policy.RestartPolicy != apiv1.NotRequired { + klog.Warningf("in-place resize of %s will cause container disruption, container %s restart policy is %v", pod.Name, container.Name, policy.RestartPolicy) + // TODO(jkyros): is there something that prevents this from happening elsewhere in the API? + if pod.Spec.RestartPolicy == apiv1.RestartPolicyNever { + klog.Warningf("in-place resize of %s not possible, container %s resize policy is %v but pod restartPolicy is %v", pod.Name, container.Name, policy.RestartPolicy, pod.Spec.RestartPolicy) + return false + } + + } + } + } + + // If none of the policies are populated, our feature is probably not enabled, so we can't in-place regardless + if noRestartPoliciesPopulated { + klog.Warningf("impossible to resize %s in-place, container resize policies are not populated", pod.Name) + } + + //TODO(jkyros): Come back and handle sidecar containers at some point since they're weird? + singleGroupStats, present := e.creatorToSingleGroupStatsMap[cr] + // If we're pending, we can't in-place resize + // TODO(jkyros): are we sure we can't? Should I just set this to "if running"? + if pod.Status.Phase == apiv1.PodPending { + klog.V(4).Infof("Can't resize pending pod %s", pod.Name) + return false + } + // This second "present" check is against the creator-to-group-stats map, not the pod-to-replica map + // TODO(maxcao13): Not sure, but do we need disruption tolerance for in-place updates? I guess we do since they are not guaranteed to be disruptionless... + // TODO(maxcao13): If this is okay, I may have to rename evictionTolerance to disruption tolerance + if present { + klog.V(4).InfoS("Checking pod disruption tolerance", + "podName", pod.Name, + "configuredPods", singleGroupStats.configured, + "runningPods", singleGroupStats.running, + "evictedPods", singleGroupStats.evicted, + "inPlaceUpdatingPods", singleGroupStats.inPlaceUpdating, + "evictionTolerance", singleGroupStats.evictionTolerance, + ) + // minimum number of pods that should be running to tolerate disruptions + shouldBeAlive := singleGroupStats.configured - singleGroupStats.evictionTolerance + // number of pods that are actually running + actuallyAlive := singleGroupStats.running - (singleGroupStats.evicted + singleGroupStats.inPlaceUpdating) + if actuallyAlive > shouldBeAlive { + klog.V(4).InfoS("Pod can be resized in-place; more pods are running than required", "podName", pod.Name, "shouldBeAlive", shouldBeAlive, "actuallyAlive", actuallyAlive) + return true + } + + // If all pods are running, no pods are being evicted or updated, and eviction tolerance is small, we can resize in-place + if singleGroupStats.running == singleGroupStats.configured && + singleGroupStats.evictionTolerance == 0 && + singleGroupStats.evicted == 0 && singleGroupStats.inPlaceUpdating == 0 { + klog.V(4).InfoS("Pod can be resized in-place; all pods are running and eviction tolerance is 0", "podName", pod.Name) + return true + } + } + } + return false +} + +// TODO(maxcao13): Maybe we want to move all this updating logic outisde of this method receiver or into a different package/file? +// InPlaceUpdate sends calculates patches and sends resize request to api client. Returns error if pod cannot be in-place updated or if client returned error. +// Does not check if pod was actually in-place updated after grace period. +func (e *podsEvictionRestrictionImpl) InPlaceUpdate(podToUpdate *apiv1.Pod, vpa *vpa_types.VerticalPodAutoscaler, eventRecorder record.EventRecorder) error { + cr, present := e.podToReplicaCreatorMap[GetPodID(podToUpdate)] + if !present { + return fmt.Errorf("pod not suitable for eviction %v : not in replicated pods map", podToUpdate.Name) + } + + // TODO(maxcao13): Not sure if we need to check again here, but commenting it out for now in case we do + // if !e.CanInPlaceUpdate(podToUpdate) { + // return fmt.Errorf("cannot update pod %v in place : number of in-flight updates exceeded", podToUpdate.Name) + // } + + // TODO(maxcao13): There's maybe a more efficient way to do this, but this is what we have for now + + // separate patches since we have to patch resize and spec separately + resourcePatches := []resource_updates.PatchRecord{} + annotationPatches := []resource_updates.PatchRecord{} + if podToUpdate.Annotations == nil { + annotationPatches = append(annotationPatches, patch.GetAddEmptyAnnotationsPatch()) + } + for i, calculator := range e.patchCalculators { + p, err := calculator.CalculatePatches(podToUpdate, vpa) + if err != nil { + return fmt.Errorf("failed to calculate resource patch for pod %s/%s: %v", podToUpdate.Namespace, podToUpdate.Name, err) + } + klog.V(4).InfoS("Calculated patches for pod", "pod", klog.KObj(podToUpdate), "patches", p) + // TODO(maxcao13): change how this works later, this is gross and depends on the resource calculator being first in the slice + // we may not even want the updater to patch pod annotations at all + if i == 0 { + resourcePatches = append(resourcePatches, p...) + } else { + annotationPatches = append(annotationPatches, p...) + } + } + if len(resourcePatches) > 0 { + patch, err := json.Marshal(resourcePatches) + if err != nil { + klog.Errorf("Cannot marshal the patch %v: %v", resourcePatches, err) + return err + } + + res, err := e.client.CoreV1().Pods(podToUpdate.Namespace).Patch(context.TODO(), podToUpdate.Name, k8stypes.JSONPatchType, patch, metav1.PatchOptions{}, "resize") + if err != nil { + klog.ErrorS(err, "Failed to patch pod", "pod", klog.KObj(podToUpdate)) + return err + } + klog.V(4).InfoS("In-place patched pod /resize subresource using patches ", "pod", klog.KObj(res), "patches", string(patch)) + + // TODO(maxcao13): whether or not we apply annotation patches should depend on resource patches? + if len(annotationPatches) > 0 { + patch, err := json.Marshal(annotationPatches) + if err != nil { + klog.Errorf("Cannot marshal the patch %v: %v", annotationPatches, err) + return err + } + res, err = e.client.CoreV1().Pods(podToUpdate.Namespace).Patch(context.TODO(), podToUpdate.Name, k8stypes.JSONPatchType, patch, metav1.PatchOptions{}) + if err != nil { + klog.ErrorS(err, "Failed to patch pod", "pod", klog.KObj(podToUpdate)) + return err + } + klog.V(4).InfoS("Patched pod annotations", "pod", klog.KObj(res), "patches", string(patch)) + } + } else { + err := fmt.Errorf("no patches to apply to %s", podToUpdate.Name) + klog.ErrorS(err, "Failed to patch pod", "pod", klog.KObj(podToUpdate)) + return err + } + + // TODO(maxcao13): If this keeps getting called on the same object with the same reason, it is considered a patch request. + // And we fail to have the corresponding rbac for it. So figure out if we need this later. + // TODO(maxcao13): Do we even need to emit an event? The api server might reject the resize request. Should we rename this to InPlaceResizeAttempted? + // eventRecorder.Event(podToUpdate, apiv1.EventTypeNormal, "InPlaceResizedByVPA", "Pod was resized in place by VPA Updater.") + + // TODO(jkyros): You need to do this regardless once you update the pod, if it changes phases here as a result, you still + // need to catalog what you did + if podToUpdate.Status.Phase == apiv1.PodRunning { + singleGroupStats, present := e.creatorToSingleGroupStatsMap[cr] + if !present { + klog.Errorf("Internal error - cannot find stats for replication group %v", cr) + } else { + singleGroupStats.inPlaceUpdating = singleGroupStats.inPlaceUpdating + 1 + e.creatorToSingleGroupStatsMap[cr] = singleGroupStats + } + } else { + klog.Warningf("I updated, but my pod phase was %s", podToUpdate.Status.Phase) + } + + return nil +} + +// IsInPlaceUpdating checks whether or not the given pod is currently in the middle of an in-place update +func IsInPlaceUpdating(podToCheck *apiv1.Pod) (isUpdating bool) { + // If the pod is currently updating we need to tally that + if podToCheck.Status.Resize != "" { + klog.V(4).InfoS("Pod is currently resizing", "pod", klog.KObj(podToCheck), "status", podToCheck.Status.Resize) + // Proposed -> Deferred -> InProgress, but what about Infeasible? + if podToCheck.Status.Resize == apiv1.PodResizeStatusInfeasible { + klog.V(4).InfoS("Resource proposal for pod is Infeasible, we're probably stuck like this until we evict", "pod", klog.KObj(podToCheck)) + } else if podToCheck.Status.Resize == apiv1.PodResizeStatusDeferred { + klog.V(4).InfoS("Resource proposal for pod is Deferred, we're probably stuck like this until we evict", "pod", klog.KObj(podToCheck)) + } + return true + } + return false +} diff --git a/vertical-pod-autoscaler/pkg/updater/eviction/pods_eviction_restriction_test.go b/vertical-pod-autoscaler/pkg/updater/eviction/pods_eviction_restriction_test.go index 6685c144cc4..6a37ff1c99a 100644 --- a/vertical-pod-autoscaler/pkg/updater/eviction/pods_eviction_restriction_test.go +++ b/vertical-pod-autoscaler/pkg/updater/eviction/pods_eviction_restriction_test.go @@ -28,6 +28,7 @@ import ( batchv1 "k8s.io/api/batch/v1" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/admission-controller/resource/pod/patch" vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1" "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/test" appsinformer "k8s.io/client-go/informers/apps/v1" @@ -46,6 +47,17 @@ func getBasicVpa() *vpa_types.VerticalPodAutoscaler { return test.VerticalPodAutoscaler().WithContainer("any").Get() } +func getNoopPatchCalculators() []patch.Calculator { + return []patch.Calculator{} +} + +// func getPatchCalculators() []patch.Calculator { +// return []patch.Calculator{ +// patch.NewResourceUpdatesCalculator(inPlaceRecommendationProvider), +// eviction.NewLastInPlaceUpdateCalculator(), +// } +// } + func TestEvictReplicatedByController(t *testing.T) { rc := apiv1.ReplicationController{ ObjectMeta: metav1.ObjectMeta{ @@ -72,6 +84,7 @@ func TestEvictReplicatedByController(t *testing.T) { replicas int32 evictionTolerance float64 vpa *vpa_types.VerticalPodAutoscaler + calculators []patch.Calculator pods []podWithExpectations }{ { @@ -79,6 +92,7 @@ func TestEvictReplicatedByController(t *testing.T) { replicas: 3, evictionTolerance: 0.5, vpa: getBasicVpa(), + calculators: getNoopPatchCalculators(), pods: []podWithExpectations{ { pod: generatePod().Get(), @@ -102,6 +116,7 @@ func TestEvictReplicatedByController(t *testing.T) { replicas: 4, evictionTolerance: 0.5, vpa: getBasicVpa(), + calculators: getNoopPatchCalculators(), pods: []podWithExpectations{ { @@ -131,6 +146,7 @@ func TestEvictReplicatedByController(t *testing.T) { replicas: 4, evictionTolerance: 0.5, vpa: getBasicVpa(), + calculators: getNoopPatchCalculators(), pods: []podWithExpectations{ { pod: generatePod().Get(), @@ -154,6 +170,7 @@ func TestEvictReplicatedByController(t *testing.T) { replicas: 3, evictionTolerance: 0.1, vpa: getBasicVpa(), + calculators: getNoopPatchCalculators(), pods: []podWithExpectations{ { pod: generatePod().Get(), @@ -177,6 +194,7 @@ func TestEvictReplicatedByController(t *testing.T) { replicas: 3, evictionTolerance: 0.1, vpa: getBasicVpa(), + calculators: getNoopPatchCalculators(), pods: []podWithExpectations{ { pod: generatePod().Get(), @@ -195,6 +213,7 @@ func TestEvictReplicatedByController(t *testing.T) { replicas: 3, evictionTolerance: 0.5, vpa: getBasicVpa(), + calculators: getNoopPatchCalculators(), pods: []podWithExpectations{ { pod: generatePod().Get(), @@ -218,6 +237,7 @@ func TestEvictReplicatedByController(t *testing.T) { replicas: 4, evictionTolerance: 0.5, vpa: getBasicVpa(), + calculators: getNoopPatchCalculators(), pods: []podWithExpectations{ { pod: generatePod().Get(), @@ -246,6 +266,7 @@ func TestEvictReplicatedByController(t *testing.T) { replicas: 1, evictionTolerance: 0.5, vpa: getBasicVpa(), + calculators: getNoopPatchCalculators(), pods: []podWithExpectations{ { pod: generatePod().Get(), @@ -259,6 +280,7 @@ func TestEvictReplicatedByController(t *testing.T) { replicas: 1, evictionTolerance: 0.5, vpa: vpaSingleReplica, + calculators: getNoopPatchCalculators(), pods: []podWithExpectations{ { pod: generatePod().Get(), @@ -278,7 +300,7 @@ func TestEvictReplicatedByController(t *testing.T) { pods = append(pods, p.pod) } factory, _ := getEvictionRestrictionFactory(&rc, nil, nil, nil, 2, testCase.evictionTolerance) - eviction := factory.NewPodsEvictionRestriction(pods, testCase.vpa) + eviction := factory.NewPodsEvictionRestriction(pods, testCase.vpa, testCase.calculators) for i, p := range testCase.pods { assert.Equalf(t, p.canEvict, eviction.CanEvict(p.pod), "TC %v - unexpected CanEvict result for pod-%v %#v", testCase.name, i, p.pod) } @@ -317,7 +339,7 @@ func TestEvictReplicatedByReplicaSet(t *testing.T) { basicVpa := getBasicVpa() factory, _ := getEvictionRestrictionFactory(nil, &rs, nil, nil, 2, 0.5) - eviction := factory.NewPodsEvictionRestriction(pods, basicVpa) + eviction := factory.NewPodsEvictionRestriction(pods, basicVpa, getNoopPatchCalculators()) for _, pod := range pods { assert.True(t, eviction.CanEvict(pod)) @@ -357,7 +379,7 @@ func TestEvictReplicatedByStatefulSet(t *testing.T) { basicVpa := getBasicVpa() factory, _ := getEvictionRestrictionFactory(nil, nil, &ss, nil, 2, 0.5) - eviction := factory.NewPodsEvictionRestriction(pods, basicVpa) + eviction := factory.NewPodsEvictionRestriction(pods, basicVpa, getNoopPatchCalculators()) for _, pod := range pods { assert.True(t, eviction.CanEvict(pod)) @@ -396,7 +418,7 @@ func TestEvictReplicatedByDaemonSet(t *testing.T) { basicVpa := getBasicVpa() factory, _ := getEvictionRestrictionFactory(nil, nil, nil, &ds, 2, 0.5) - eviction := factory.NewPodsEvictionRestriction(pods, basicVpa) + eviction := factory.NewPodsEvictionRestriction(pods, basicVpa, getNoopPatchCalculators()) for _, pod := range pods { assert.True(t, eviction.CanEvict(pod)) @@ -432,7 +454,7 @@ func TestEvictReplicatedByJob(t *testing.T) { basicVpa := getBasicVpa() factory, _ := getEvictionRestrictionFactory(nil, nil, nil, nil, 2, 0.5) - eviction := factory.NewPodsEvictionRestriction(pods, basicVpa) + eviction := factory.NewPodsEvictionRestriction(pods, basicVpa, getNoopPatchCalculators()) for _, pod := range pods { assert.True(t, eviction.CanEvict(pod)) @@ -472,7 +494,7 @@ func TestEvictTooFewReplicas(t *testing.T) { basicVpa := getBasicVpa() factory, _ := getEvictionRestrictionFactory(&rc, nil, nil, nil, 10, 0.5) - eviction := factory.NewPodsEvictionRestriction(pods, basicVpa) + eviction := factory.NewPodsEvictionRestriction(pods, basicVpa, getNoopPatchCalculators()) for _, pod := range pods { assert.False(t, eviction.CanEvict(pod)) @@ -509,7 +531,7 @@ func TestEvictionTolerance(t *testing.T) { basicVpa := getBasicVpa() factory, _ := getEvictionRestrictionFactory(&rc, nil, nil, nil, 2 /*minReplicas*/, tolerance) - eviction := factory.NewPodsEvictionRestriction(pods, basicVpa) + eviction := factory.NewPodsEvictionRestriction(pods, basicVpa, getNoopPatchCalculators()) for _, pod := range pods { assert.True(t, eviction.CanEvict(pod)) @@ -550,7 +572,7 @@ func TestEvictAtLeastOne(t *testing.T) { basicVpa := getBasicVpa() factory, _ := getEvictionRestrictionFactory(&rc, nil, nil, nil, 2, tolerance) - eviction := factory.NewPodsEvictionRestriction(pods, basicVpa) + eviction := factory.NewPodsEvictionRestriction(pods, basicVpa, getNoopPatchCalculators()) for _, pod := range pods { assert.True(t, eviction.CanEvict(pod)) @@ -590,6 +612,7 @@ func TestEvictEmitEvent(t *testing.T) { replicas int32 evictionTolerance float64 vpa *vpa_types.VerticalPodAutoscaler + calculators []patch.Calculator pods []podWithExpectations errorExpected bool }{ @@ -598,6 +621,7 @@ func TestEvictEmitEvent(t *testing.T) { replicas: 4, evictionTolerance: 0.5, vpa: basicVpa, + calculators: getNoopPatchCalculators(), pods: []podWithExpectations{ { pod: generatePod().WithPhase(apiv1.PodPending).Get(), @@ -617,6 +641,7 @@ func TestEvictEmitEvent(t *testing.T) { replicas: 4, evictionTolerance: 0.5, vpa: basicVpa, + calculators: getNoopPatchCalculators(), pods: []podWithExpectations{ { @@ -638,7 +663,7 @@ func TestEvictEmitEvent(t *testing.T) { pods = append(pods, p.pod) } factory, _ := getEvictionRestrictionFactory(&rc, nil, nil, nil, 2, testCase.evictionTolerance) - eviction := factory.NewPodsEvictionRestriction(pods, testCase.vpa) + eviction := factory.NewPodsEvictionRestriction(pods, testCase.vpa, testCase.calculators) for _, p := range testCase.pods { mockRecorder := test.MockEventRecorder() diff --git a/vertical-pod-autoscaler/pkg/updater/inplace/inplace_recommendation_provider.go b/vertical-pod-autoscaler/pkg/updater/inplace/inplace_recommendation_provider.go new file mode 100644 index 00000000000..f89d2405614 --- /dev/null +++ b/vertical-pod-autoscaler/pkg/updater/inplace/inplace_recommendation_provider.go @@ -0,0 +1,83 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package inplace + +import ( + "fmt" + + core "k8s.io/api/core/v1" + "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/admission-controller/resource/pod/recommendation" + "k8s.io/klog/v2" + + vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1" + "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/limitrange" + vpa_api_util "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/vpa" +) + +type inPlaceRecommendationProvider struct { + limitsRangeCalculator limitrange.LimitRangeCalculator + recommendationProcessor vpa_api_util.RecommendationProcessor +} + +// NewInPlaceRecommendationProvider constructs the recommendation provider that can be used to determine recommendations for pods. +func NewInPlaceRecommendationProvider(calculator limitrange.LimitRangeCalculator, + recommendationProcessor vpa_api_util.RecommendationProcessor) recommendation.Provider { + return &inPlaceRecommendationProvider{ + limitsRangeCalculator: calculator, + recommendationProcessor: recommendationProcessor, + } +} + +// GetContainersResourcesForPod returns recommended request for a given pod. +// The returned slice corresponds 1-1 to containers in the Pod. +func (p *inPlaceRecommendationProvider) GetContainersResourcesForPod(pod *core.Pod, vpa *vpa_types.VerticalPodAutoscaler) ([]vpa_api_util.ContainerResources, vpa_api_util.ContainerToAnnotationsMap, error) { + if vpa == nil || pod == nil { + klog.V(2).InfoS("Can't calculate recommendations, one of VPA or Pod is nil", "vpa", vpa, "pod", pod) + return nil, nil, nil + } + klog.V(2).InfoS("Updating requirements for pod", "pod", pod.Name) + + recommendedPodResources := &vpa_types.RecommendedPodResources{} + + if vpa.Status.Recommendation != nil { + var err error + // ignore annotations as they are cannot be used when patching resize subresource + recommendedPodResources, _, err = p.recommendationProcessor.Apply(vpa, pod) + if err != nil { + klog.V(2).InfoS("Cannot process recommendation for pod", "pod", klog.KObj(pod)) + return nil, nil, err + } + } + containerLimitRange, err := p.limitsRangeCalculator.GetContainerLimitRangeItem(pod.Namespace) + if err != nil { + return nil, nil, fmt.Errorf("error getting containerLimitRange: %s", err) + } + var resourcePolicy *vpa_types.PodResourcePolicy + if vpa.Spec.UpdatePolicy == nil || vpa.Spec.UpdatePolicy.UpdateMode == nil || *vpa.Spec.UpdatePolicy.UpdateMode != vpa_types.UpdateModeOff { + resourcePolicy = vpa.Spec.ResourcePolicy + } + containerResources := recommendation.GetContainersResources(pod, resourcePolicy, *recommendedPodResources, containerLimitRange, false, nil) + + // Ensure that we are not propagating empty resource key if any. + for _, resource := range containerResources { + if resource.RemoveEmptyResourceKeyIfAny() { + klog.InfoS("An empty resource key was found and purged", "pod", klog.KObj(pod), "vpa", klog.KObj(vpa)) + } + } + + return containerResources, nil, nil +} diff --git a/vertical-pod-autoscaler/pkg/updater/inplace/inplace_updated.go b/vertical-pod-autoscaler/pkg/updater/inplace/inplace_updated.go new file mode 100644 index 00000000000..c353325df43 --- /dev/null +++ b/vertical-pod-autoscaler/pkg/updater/inplace/inplace_updated.go @@ -0,0 +1,39 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package inplace + +import ( + core "k8s.io/api/core/v1" + resource_admission "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/admission-controller/resource" + "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/admission-controller/resource/pod/patch" + + vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1" + "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/annotations" +) + +type inPlaceUpdate struct{} + +func (*inPlaceUpdate) CalculatePatches(pod *core.Pod, _ *vpa_types.VerticalPodAutoscaler) ([]resource_admission.PatchRecord, error) { + vpaInPlaceUpdatedValue := annotations.GetVpaInPlaceUpdatedValue() + return []resource_admission.PatchRecord{patch.GetAddAnnotationPatch(annotations.VpaInPlaceUpdatedLabel, vpaInPlaceUpdatedValue)}, nil +} + +// NewInPlaceUpdatedCalculator returns calculator for +// observed containers patches. +func NewInPlaceUpdatedCalculator() patch.Calculator { + return &inPlaceUpdate{} +} diff --git a/vertical-pod-autoscaler/pkg/updater/inplace/resource_updates.go b/vertical-pod-autoscaler/pkg/updater/inplace/resource_updates.go new file mode 100644 index 00000000000..9d68e78f67d --- /dev/null +++ b/vertical-pod-autoscaler/pkg/updater/inplace/resource_updates.go @@ -0,0 +1,83 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package inplace + +import ( + "fmt" + + core "k8s.io/api/core/v1" + resource_admission "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/admission-controller/resource" + "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/admission-controller/resource/pod/patch" + "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/admission-controller/resource/pod/recommendation" + vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1" + vpa_api_util "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/vpa" +) + +type resourcesInplaceUpdatesPatchCalculator struct { + recommendationProvider recommendation.Provider +} + +// NewResourceInPlaceUpdatesCalculator returns a calculator for +// resource update patches. +func NewResourceInPlaceUpdatesCalculator(recommendationProvider recommendation.Provider) patch.Calculator { + return &resourcesInplaceUpdatesPatchCalculator{ + recommendationProvider: recommendationProvider, + } +} + +// TODO(maxcao13): this calculator's patches should only be marshalled as a JSON patch to the pod "resize" subresource. it won't be able to patch pod annotations +// if we DO need to calculate patches to add annotations, we can create a separate calculator to add that +func (c *resourcesInplaceUpdatesPatchCalculator) CalculatePatches(pod *core.Pod, vpa *vpa_types.VerticalPodAutoscaler) ([]resource_admission.PatchRecord, error) { + result := []resource_admission.PatchRecord{} + + containersResources, _, err := c.recommendationProvider.GetContainersResourcesForPod(pod, vpa) + if err != nil { + return []resource_admission.PatchRecord{}, fmt.Errorf("Failed to calculate resource patch for pod %s/%s: %v", pod.Namespace, pod.Name, err) + } + + for i, containerResources := range containersResources { + newPatches := getContainerPatch(pod, i, containerResources) + result = append(result, newPatches...) + } + + return result, nil +} + +func getContainerPatch(pod *core.Pod, i int, containerResources vpa_api_util.ContainerResources) []resource_admission.PatchRecord { + var patches []resource_admission.PatchRecord + // Add empty resources object if missing. + if pod.Spec.Containers[i].Resources.Limits == nil && + pod.Spec.Containers[i].Resources.Requests == nil { + patches = append(patches, patch.GetPatchInitializingEmptyResources(i)) + } + + patches = appendPatches(patches, pod.Spec.Containers[i].Resources.Requests, i, containerResources.Requests, "requests") + patches = appendPatches(patches, pod.Spec.Containers[i].Resources.Limits, i, containerResources.Limits, "limits") + + return patches +} + +func appendPatches(patches []resource_admission.PatchRecord, current core.ResourceList, containerIndex int, resources core.ResourceList, fieldName string) []resource_admission.PatchRecord { + // Add empty object if it's missing and we're about to fill it. + if current == nil && len(resources) > 0 { + patches = append(patches, patch.GetPatchInitializingEmptyResourcesSubfield(containerIndex, fieldName)) + } + for resource, request := range resources { + patches = append(patches, patch.GetAddResourceRequirementValuePatch(containerIndex, fieldName, resource, request)) + } + return patches +} diff --git a/vertical-pod-autoscaler/pkg/updater/logic/updater.go b/vertical-pod-autoscaler/pkg/updater/logic/updater.go index 5d4a300af57..83257c007ac 100644 --- a/vertical-pod-autoscaler/pkg/updater/logic/updater.go +++ b/vertical-pod-autoscaler/pkg/updater/logic/updater.go @@ -37,6 +37,7 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/klog/v2" + "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/admission-controller/resource/pod/patch" vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1" vpa_clientset "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned" "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned/scheme" @@ -50,6 +51,11 @@ import ( vpa_api_util "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/vpa" ) +const ( + DeferredResizeUpdateTimeout = 1 * time.Minute + InProgressResizeUpdateTimeout = 1 * time.Hour +) + // Updater performs updates on pods if recommended by Vertical Pod Autoscaler type Updater interface { // RunOnce represents single iteration in the main-loop of Updater @@ -57,19 +63,21 @@ type Updater interface { } type updater struct { - vpaLister vpa_lister.VerticalPodAutoscalerLister - podLister v1lister.PodLister - eventRecorder record.EventRecorder - evictionFactory eviction.PodsEvictionRestrictionFactory - recommendationProcessor vpa_api_util.RecommendationProcessor - evictionAdmission priority.PodEvictionAdmission - priorityProcessor priority.PriorityProcessor - evictionRateLimiter *rate.Limiter - selectorFetcher target.VpaTargetSelectorFetcher - useAdmissionControllerStatus bool - statusValidator status.Validator - controllerFetcher controllerfetcher.ControllerFetcher - ignoredNamespaces []string + vpaLister vpa_lister.VerticalPodAutoscalerLister + podLister v1lister.PodLister + eventRecorder record.EventRecorder + evictionFactory eviction.PodsEvictionRestrictionFactory + recommendationProcessor vpa_api_util.RecommendationProcessor + evictionAdmission priority.PodEvictionAdmission + priorityProcessor priority.PriorityProcessor + evictionRateLimiter *rate.Limiter + selectorFetcher target.VpaTargetSelectorFetcher + useAdmissionControllerStatus bool + statusValidator status.Validator + controllerFetcher controllerfetcher.ControllerFetcher + ignoredNamespaces []string + patchCalculators []patch.Calculator + lastInPlaceUpdateAttemptTimeMap map[string]time.Time } // NewUpdater creates Updater with given configuration @@ -89,6 +97,7 @@ func NewUpdater( priorityProcessor priority.PriorityProcessor, namespace string, ignoredNamespaces []string, + patchCalculators []patch.Calculator, ) (Updater, error) { evictionRateLimiter := getRateLimiter(evictionRateLimit, evictionRateBurst) factory, err := eviction.NewPodsEvictionRestrictionFactory(kubeClient, minReplicasForEvicition, evictionToleranceFraction) @@ -112,7 +121,9 @@ func NewUpdater( status.AdmissionControllerStatusName, statusNamespace, ), - ignoredNamespaces: ignoredNamespaces, + ignoredNamespaces: ignoredNamespaces, + patchCalculators: patchCalculators, + lastInPlaceUpdateAttemptTimeMap: make(map[string]time.Time), }, nil } @@ -147,8 +158,8 @@ func (u *updater) RunOnce(ctx context.Context) { continue } if vpa_api_util.GetUpdateMode(vpa) != vpa_types.UpdateModeRecreate && - vpa_api_util.GetUpdateMode(vpa) != vpa_types.UpdateModeAuto { - klog.V(3).InfoS("Skipping VPA object because its mode is not \"Recreate\" or \"Auto\"", "vpa", klog.KObj(vpa)) + vpa_api_util.GetUpdateMode(vpa) != vpa_types.UpdateModeAuto && vpa_api_util.GetUpdateMode(vpa) != vpa_types.UpdateModeInPlaceOrRecreate { + klog.V(3).InfoS("Skipping VPA object because its mode is not \"InPlaceOrRecreate\", \"Recreate\" or \"Auto\"", "vpa", klog.KObj(vpa)) continue } selector, err := u.selectorFetcher.Fetch(ctx, vpa) @@ -199,29 +210,63 @@ func (u *updater) RunOnce(ctx context.Context) { vpasWithEvictablePodsCounter := metrics_updater.NewVpasWithEvictablePodsCounter() vpasWithEvictedPodsCounter := metrics_updater.NewVpasWithEvictedPodsCounter() + vpasWithInPlaceUpdatablePodsCounter := metrics_updater.NewVpasWithInPlaceUpdtateablePodsCounter() + vpasWithInPlaceUpdatedPodsCounter := metrics_updater.NewVpasWithInPlaceUpdtatedPodsCounter() + // using defer to protect against 'return' after evictionRateLimiter.Wait defer controlledPodsCounter.Observe() defer evictablePodsCounter.Observe() defer vpasWithEvictablePodsCounter.Observe() defer vpasWithEvictedPodsCounter.Observe() + // separate counters for in-place + defer vpasWithInPlaceUpdatablePodsCounter.Observe() + defer vpasWithInPlaceUpdatedPodsCounter.Observe() // NOTE: this loop assumes that controlledPods are filtered - // to contain only Pods controlled by a VPA in auto or recreate mode + // to contain only Pods controlled by a VPA in auto, recreate, or inPlaceOrRecreate mode + klog.V(2).InfoS("Processing controlled pods", "count", len(allLivePods)) for vpa, livePods := range controlledPods { vpaSize := len(livePods) controlledPodsCounter.Add(vpaSize, vpaSize) - evictionLimiter := u.evictionFactory.NewPodsEvictionRestriction(livePods, vpa) + evictionLimiter := u.evictionFactory.NewPodsEvictionRestriction(livePods, vpa, u.patchCalculators) + // TODO(jkyros): I need to know the priority details here so I can use them to determine what we want to do to the pod + // previously it was just "evict" but now we have to make decisions, so we need to know + // TODO(macao): Does the below filter mean that we may not be able to inplace scale pods that could be inplaced scaled, but non-evictable? + // Seems so, and if so, we should rework this filter to allow inplace scaling for non-evictable pods (is that what John was commenting above me? :p) podsForUpdate := u.getPodsUpdateOrder(filterNonEvictablePods(livePods, evictionLimiter), vpa) evictablePodsCounter.Add(vpaSize, len(podsForUpdate)) withEvictable := false withEvicted := false - for _, pod := range podsForUpdate { + + for _, prioritizedPod := range podsForUpdate { + + pod := prioritizedPod.Pod() + + fallBackToEviction, err := u.AttemptInPlaceScalingIfPossible(ctx, vpaSize, vpa, pod, evictionLimiter, vpasWithInPlaceUpdatablePodsCounter, vpasWithInPlaceUpdatedPodsCounter) + if err != nil { + klog.Warningf("error attemptng to scale pod %v in-place: %v", pod.Name, err) + return + } + // If in-place scaling was possible, and it isn't stuck, then skip eviction + if fallBackToEviction { + // TODO(jkyros): this needs to be cleaner, but we absolutely need to make sure a disruptionless update doesn't "sneak through" + if prioritizedPod.IsDisruptionless() { + klog.Infof("Not falling back to eviction, %v was supposed to be disruptionless", pod.Name) + continue + } else { + klog.V(4).Infof("Falling back to eviction for %s", pod.Name) + } + } else { + klog.Infof("Not falling back to eviction, because we don't have a recommendation yet? %v", pod.Name) + continue + } + withEvictable = true if !evictionLimiter.CanEvict(pod) { continue } - err := u.evictionRateLimiter.Wait(ctx) + err = u.evictionRateLimiter.Wait(ctx) if err != nil { klog.V(0).InfoS("Eviction rate limiter wait failed", "error", err) return @@ -246,6 +291,20 @@ func (u *updater) RunOnce(ctx context.Context) { timer.ObserveStep("EvictPods") } +// VpaRecommendationProvided checks the VPA status to see if it has provided a recommendation yet. Used +// to make sure we don't get bogus values for in-place scaling +// TODO(jkyros): take this out when you find the proper place to gate this +func VpaRecommendationProvided(vpa *vpa_types.VerticalPodAutoscaler) bool { + // for _, condition := range vpa.Status.Conditions { + // if condition.Type == vpa_types.RecommendationProvided && condition.Status == apiv1.ConditionTrue { + // return true + // } + // } + // TODO(maxcao13): The above condition doesn't work in tests because sometimes there is no recommender to set this status + // so we should check the recommendation field directly. Or we can set the above condition manually in tests. + return vpa.Status.Recommendation != nil +} + func getRateLimiter(evictionRateLimit float64, evictionRateLimitBurst int) *rate.Limiter { var evictionRateLimiter *rate.Limiter if evictionRateLimit <= 0 { @@ -260,7 +319,7 @@ func getRateLimiter(evictionRateLimit float64, evictionRateLimitBurst int) *rate } // getPodsUpdateOrder returns list of pods that should be updated ordered by update priority -func (u *updater) getPodsUpdateOrder(pods []*apiv1.Pod, vpa *vpa_types.VerticalPodAutoscaler) []*apiv1.Pod { +func (u *updater) getPodsUpdateOrder(pods []*apiv1.Pod, vpa *vpa_types.VerticalPodAutoscaler) []*priority.PrioritizedPod { priorityCalculator := priority.NewUpdatePriorityCalculator( vpa, nil, @@ -271,7 +330,7 @@ func (u *updater) getPodsUpdateOrder(pods []*apiv1.Pod, vpa *vpa_types.VerticalP priorityCalculator.AddPod(pod, time.Now()) } - return priorityCalculator.GetSortedPods(u.evictionAdmission) + return priorityCalculator.GetSortedPrioritizedPods(u.evictionAdmission) } func filterNonEvictablePods(pods []*apiv1.Pod, evictionRestriction eviction.PodsEvictionRestriction) []*apiv1.Pod { @@ -323,3 +382,108 @@ func newEventRecorder(kubeClient kube_client.Interface) record.EventRecorder { return eventBroadcaster.NewRecorder(vpascheme, apiv1.EventSource{Component: "vpa-updater"}) } + +func (u *updater) AttemptInPlaceScalingIfPossible(ctx context.Context, vpaSize int, vpa *vpa_types.VerticalPodAutoscaler, pod *apiv1.Pod, evictionLimiter eviction.PodsEvictionRestriction, vpasWithInPlaceUpdatablePodsCounter *metrics_updater.SizeBasedGauge, vpasWithInPlaceUpdatedPodsCounter *metrics_updater.SizeBasedGauge) (fallBackToEviction bool, err error) { + // TODO(jkyros): We're somehow jumping the gun here, I'm not sure if it's a race condition or what but evictions + // don't hit it (maybe they take too long?). We end up with 0's for resource recommendations because we + // queue this for in-place before the VPA has made a recommendation. + + if !VpaRecommendationProvided(vpa) { + klog.V(4).Infof("VPA hasn't made a recommendation yet, we're early on %s for some reason", pod.Name) + // TODO(jkyros): so we must have had some erroneous evictions before, but we were passing the test suite? But for now if I want to test + // in-place I need it to not evict immediately if I can't in-place (because then it will never in-place) + fallBackToEviction = false + return + } + + if vpa_api_util.GetUpdateMode(vpa) == vpa_types.UpdateModeInPlaceOrRecreate { + + // separate counters/stats for in-place updates + withInPlaceUpdatable := false + withInPlaceUpdated := false + + klog.V(4).Infof("Looks like we might be able to in-place update %s..", pod.Name) + withInPlaceUpdatable = true + // If I can't update + if !evictionLimiter.CanInPlaceUpdate(pod) { + // If we are already updating, wait for the next loop to progress + if !eviction.IsInPlaceUpdating(pod) { + // But it's not in-place updating, something went wrong (e.g. the operation would change pods QoS) + // fall back to eviction + klog.V(4).Infof("Can't in-place update pod %s, falling back to eviction, it might say no", pod.Name) + fallBackToEviction = true + return + } + + lastUpdateTime, exists := u.lastInPlaceUpdateAttemptTimeMap[eviction.GetPodID(pod)] + if !exists { + klog.V(4).Infof("In-place update in progress for %s/%s, but no lastInPlaceUpdateTime found, setting it to now", pod.Namespace, pod.Name) + lastUpdateTime = time.Now() + u.lastInPlaceUpdateAttemptTimeMap[eviction.GetPodID(pod)] = lastUpdateTime + } + + // if currently inPlaceUpdating, we should only fallback to eviction if the update has failed. i.e: one of the following conditions: + // 1. .status.resize: Infeasible + // 2. .status.resize: Deferred + more than 1 minute has elapsed since the lastInPlaceUpdateTime + // 3. .status.resize: InProgress + more than 1 hour has elapsed since the lastInPlaceUpdateTime + switch pod.Status.Resize { + case apiv1.PodResizeStatusDeferred: + if time.Since(lastUpdateTime) > DeferredResizeUpdateTimeout { + klog.V(4).InfoS(fmt.Sprintf("In-place update deferred for more than %v, falling back to eviction", DeferredResizeUpdateTimeout), "pod", pod.Name) + fallBackToEviction = true + } else { + klog.V(4).Infof("In-place update deferred for %s, NOT falling back to eviction yet", pod.Name) + } + case apiv1.PodResizeStatusInProgress: + if time.Since(lastUpdateTime) > InProgressResizeUpdateTimeout { + klog.V(4).InfoS(fmt.Sprintf("In-place update in progress for more than %v, falling back to eviction", InProgressResizeUpdateTimeout), "pod", pod.Name) + fallBackToEviction = true + } else { + klog.V(4).InfoS("In-place update in progress, NOT falling back to eviction yet", "pod", pod.Name) + } + case apiv1.PodResizeStatusInfeasible: + klog.V(4).InfoS("In-place update infeasible, falling back to eviction", "pod", pod.Name) + fallBackToEviction = true + default: + klog.V(4).InfoS("In-place update status unknown, falling back to eviction", "pod", pod.Name) + fallBackToEviction = true + } + return + } + + // TODO(jkyros): need our own rate limiter or can we freeload off the eviction one? + err = u.evictionRateLimiter.Wait(ctx) + if err != nil { + // TODO(jkyros): whether or not we fall back to eviction here probably depends on *why* we failed + klog.Warningf("updating pod %v failed: %v", pod.Name, err) + return + } + + klog.V(2).Infof("attempting to in-place update pod %v", pod.Name) + u.lastInPlaceUpdateAttemptTimeMap[eviction.GetPodID(pod)] = time.Now() + evictErr := evictionLimiter.InPlaceUpdate(pod, vpa, u.eventRecorder) + if evictErr != nil { + klog.Warningf("updating pod %v failed: %v", pod.Name, evictErr) + } else { + // TODO(jkyros): come back later for stats + withInPlaceUpdated = false + metrics_updater.AddInPlaceUpdatedPod(vpaSize) + } + + if withInPlaceUpdatable { + vpasWithInPlaceUpdatablePodsCounter.Add(vpaSize, 1) + } + if withInPlaceUpdated { + vpasWithInPlaceUpdatedPodsCounter.Add(vpaSize, 1) + } + + } else { + // If our update mode doesn't support in-place, then evict + fallBackToEviction = true + return + } + + // counters for in-place update + + return +} diff --git a/vertical-pod-autoscaler/pkg/updater/logic/updater_test.go b/vertical-pod-autoscaler/pkg/updater/logic/updater_test.go index 64357d4924e..a6299d8f703 100644 --- a/vertical-pod-autoscaler/pkg/updater/logic/updater_test.go +++ b/vertical-pod-autoscaler/pkg/updater/logic/updater_test.go @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/fake" + "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/admission-controller/resource/pod/patch" vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1" controllerfetcher "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/target/controller_fetcher" target_mock "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/target/mock" @@ -159,6 +160,10 @@ func testRunOnceBase( Get() pods[i].Labels = labels + // We will test in-place separately, but we do need to account for these calls + eviction.On("CanInPlaceUpdate", pods[i]).Return(false) + eviction.On("IsInPlaceUpdating", pods[i]).Return(false) + eviction.On("CanEvict", pods[i]).Return(true) eviction.On("Evict", pods[i], nil).Return(nil) } @@ -173,12 +178,16 @@ func testRunOnceBase( Name: rc.Name, APIVersion: rc.APIVersion, } + // TOD0(jkyros): I added the recommendationProvided condition here because in-place needs to wait for a + // recommendation to scale, causing this test to fail (because in-place checks before eviction, and in-place will + // wait to scale -- and not fall back to eviction -- until the VPA has made a recommendation) vpaObj := test.VerticalPodAutoscaler(). WithContainer(containerName). WithTarget("2", "200M"). WithMinAllowed(containerName, "1", "100M"). WithMaxAllowed(containerName, "3", "1G"). - WithTargetRef(targetRef).Get() + WithTargetRef(targetRef). + AppendCondition(vpa_types.RecommendationProvided, apiv1.ConditionTrue, "reason", "msg", time.Unix(0, 0)).Get() vpaObj.Spec.UpdatePolicy = &vpa_types.PodUpdatePolicy{UpdateMode: &updateMode} vpaLister.On("List").Return([]*vpa_types.VerticalPodAutoscaler{vpaObj}, nil).Once() @@ -247,7 +256,7 @@ type fakeEvictFactory struct { evict eviction.PodsEvictionRestriction } -func (f fakeEvictFactory) NewPodsEvictionRestriction(pods []*apiv1.Pod, vpa *vpa_types.VerticalPodAutoscaler) eviction.PodsEvictionRestriction { +func (f fakeEvictFactory) NewPodsEvictionRestriction(pods []*apiv1.Pod, vpa *vpa_types.VerticalPodAutoscaler, patchCalculators []patch.Calculator) eviction.PodsEvictionRestriction { return f.evict } @@ -310,13 +319,20 @@ func TestRunOnceIgnoreNamespaceMatchingPods(t *testing.T) { Name: rc.Name, APIVersion: rc.APIVersion, } + + // TOD0(jkyros): I added the recommendationProvided condition here because in-place needs to wait for a + // recommendation to scale, causing this test to fail (because in-place checks before eviction, and in-place will + // wait to scale -- and not fall back to eviction -- until the VPA has made a recommendation) + // TODO(maxcao13): We can either just add these conditions on every test, or we can change the VpaRecommendationProvided condition check in the code + // which I have already did. Either way should be fine unless vpa.Status.Recommendation != nil does not imply RecommendationProvided condition vpaObj := test.VerticalPodAutoscaler(). WithNamespace("default"). WithContainer(containerName). WithTarget("2", "200M"). WithMinAllowed(containerName, "1", "100M"). WithMaxAllowed(containerName, "3", "1G"). - WithTargetRef(targetRef).Get() + WithTargetRef(targetRef). + AppendCondition(vpa_types.RecommendationProvided, apiv1.ConditionTrue, "reason", "msg", time.Unix(0, 0)).Get() vpaLister.On("List").Return([]*vpa_types.VerticalPodAutoscaler{vpaObj}, nil).Once() diff --git a/vertical-pod-autoscaler/pkg/updater/main.go b/vertical-pod-autoscaler/pkg/updater/main.go index 762a301c1f8..4b7f6a805bc 100644 --- a/vertical-pod-autoscaler/pkg/updater/main.go +++ b/vertical-pod-autoscaler/pkg/updater/main.go @@ -26,19 +26,12 @@ import ( "github.com/spf13/pflag" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/uuid" - "k8s.io/client-go/informers" - kube_client "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/leaderelection" - "k8s.io/client-go/tools/leaderelection/resourcelock" - kube_flag "k8s.io/component-base/cli/flag" - componentbaseconfig "k8s.io/component-base/config" - componentbaseoptions "k8s.io/component-base/config/options" - "k8s.io/klog/v2" - "k8s.io/autoscaler/vertical-pod-autoscaler/common" + "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/admission-controller/resource/pod/patch" vpa_clientset "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned" "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/target" controllerfetcher "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/target/controller_fetcher" + "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/updater/inplace" updater "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/updater/logic" "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/updater/priority" "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/limitrange" @@ -47,6 +40,14 @@ import ( "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/server" "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/status" vpa_api_util "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/vpa" + "k8s.io/client-go/informers" + kube_client "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" + kube_flag "k8s.io/component-base/cli/flag" + componentbaseconfig "k8s.io/component-base/config" + componentbaseoptions "k8s.io/component-base/config/options" + "k8s.io/klog/v2" ) var ( @@ -56,6 +57,7 @@ var ( minReplicas = flag.Int("min-replicas", 2, `Minimum number of replicas to perform update`) + // TODO(maxcao13): Should this be combined into disruption tolerance, or should we have a separate flag for that, or we just don't rename? evictionToleranceFraction = flag.Float64("eviction-tolerance", 0.5, `Fraction of replica count that can be evicted for update, if more than one pod can be evicted.`) @@ -182,6 +184,12 @@ func run(healthCheck *metrics.HealthCheck, commonFlag *common.CommonFlags) { ignoredNamespaces := strings.Split(commonFlag.IgnoredVpaObjectNamespaces, ",") + inPlaceRecommendationProvider := inplace.NewInPlaceRecommendationProvider(limitRangeCalculator, vpa_api_util.NewCappingRecommendationProcessor(limitRangeCalculator)) + + // TODO(maxcao13): figure out if we need to use NewInPlaceUpdatedCalculator; does it help the user to know if their pod was updated in-place as an annotation? + // calculators := []patch.Calculator{patch.NewResourceUpdatesCalculator(inPlaceRecommendationProvider), eviction.NewInPlaceUpdatedCalculator()} + calculators := []patch.Calculator{inplace.NewResourceInPlaceUpdatesCalculator(inPlaceRecommendationProvider), inplace.NewInPlaceUpdatedCalculator()} + // TODO: use SharedInformerFactory in updater updater, err := updater.NewUpdater( kubeClient, @@ -199,6 +207,7 @@ func run(healthCheck *metrics.HealthCheck, commonFlag *common.CommonFlags) { priority.NewProcessor(), commonFlag.VpaObjectNamespace, ignoredNamespaces, + calculators, ) if err != nil { klog.Fatalf("Failed to create updater: %v", err) diff --git a/vertical-pod-autoscaler/pkg/updater/priority/priority_processor.go b/vertical-pod-autoscaler/pkg/updater/priority/priority_processor.go index 53c07fa7163..de9cc267f4f 100644 --- a/vertical-pod-autoscaler/pkg/updater/priority/priority_processor.go +++ b/vertical-pod-autoscaler/pkg/updater/priority/priority_processor.go @@ -51,7 +51,7 @@ func (*defaultPriorityProcessor) GetUpdatePriority(pod *apiv1.Pod, vpa *vpa_type hasObservedContainers, vpaContainerSet := parseVpaObservedContainers(pod) - for _, podContainer := range pod.Spec.Containers { + for num, podContainer := range pod.Spec.Containers { if hasObservedContainers && !vpaContainerSet.Has(podContainer.Name) { klog.V(4).InfoS("Not listed in VPA observed containers label. Skipping container priority calculations", "label", annotations.VpaObservedContainersLabel, "observedContainers", pod.GetAnnotations()[annotations.VpaObservedContainersLabel], "containerName", podContainer.Name, "vpa", klog.KObj(vpa)) continue @@ -73,6 +73,35 @@ func (*defaultPriorityProcessor) GetUpdatePriority(pod *apiv1.Pod, vpa *vpa_type (hasUpperBound && request.Cmp(upperBound) > 0) { outsideRecommendedRange = true } + + // TODO(jkyros): I think we're picking up early zeroes here from the VPA when it has no recommendation, I think that's why I have to wait + // for the recommendation later before I try to scale in-place + // TODO(jkyros): For in place VPA, this might be gross, but we need this pod to be in the eviction list because it doesn't actually have + // the resources it asked for even if the spec is right, and we might need to fall back to evicting it + // TODO(jkyros): Can we have empty container status at this point for real? It's at least failing the tests if we don't check, but + // we could just populate the status in the tests + // Statuses can be missing, or status resources can be nil + if len(pod.Status.ContainerStatuses) > num && pod.Status.ContainerStatuses[num].Resources != nil { + if statusRequest, hasStatusRequest := pod.Status.ContainerStatuses[num].Resources.Requests[resourceName]; hasStatusRequest { + // If we're updating, but we still don't have what we asked for, we may still need to act on this pod + if request.MilliValue() > statusRequest.MilliValue() { + scaleUp = true + // It's okay if we're actually still resizing, but if we can't now or we're stuck, make sure the pod + // is still in the list so we can evict it to go live on a fatter node or something + if pod.Status.Resize == apiv1.PodResizeStatusDeferred || pod.Status.Resize == apiv1.PodResizeStatusInfeasible { + klog.V(4).Infof("Pod %s looks like it's stuck scaling up (%v state), leaving it in for eviction", pod.Name, pod.Status.Resize) + } else { + klog.V(4).Infof("Pod %s is in the process of scaling up (%v state), leaving it in so we can see if it's taking too long", pod.Name, pod.Status.Resize) + } + } + // I guess if it's not outside of compliance, it's probably okay it's stuck here? + if (hasLowerBound && statusRequest.Cmp(lowerBound) < 0) || + (hasUpperBound && statusRequest.Cmp(upperBound) > 0) { + outsideRecommendedRange = true + } + } + } + } else { // Note: if the request is not specified, the container will use the // namespace default request. Currently we ignore it and treat such @@ -83,6 +112,10 @@ func (*defaultPriorityProcessor) GetUpdatePriority(pod *apiv1.Pod, vpa *vpa_type } } } + + // TODO(jkyros): hmm this gets hairy here because if "status" is what let us into the list, + // we probably need to do these calculations vs the status rather than the spec, because the + // spec is a "lie" resourceDiff := 0.0 for resource, totalRecommended := range totalRecommendedPerResource { totalRequest := math.Max(float64(totalRequestPerResource[resource]), 1.0) diff --git a/vertical-pod-autoscaler/pkg/updater/priority/update_priority_calculator.go b/vertical-pod-autoscaler/pkg/updater/priority/update_priority_calculator.go index c6c9127f784..aafd33d98cd 100644 --- a/vertical-pod-autoscaler/pkg/updater/priority/update_priority_calculator.go +++ b/vertical-pod-autoscaler/pkg/updater/priority/update_priority_calculator.go @@ -33,7 +33,7 @@ import ( ) var ( - defaultUpdateThreshold = flag.Float64("pod-update-threshold", 0.1, "Ignore updates that have priority lower than the value of this flag") + defaultUpdateThreshold = flag.Float64("pod-update-threshold", 0.1, "Ignore disruptive updates that have priority lower than the value of this flag") podLifetimeUpdateThreshold = flag.Duration("in-recommendation-bounds-eviction-lifetime-threshold", time.Hour*12, "Pods that live for at least that long can be evicted even if their request is within the [MinRecommended...MaxRecommended] range") @@ -48,7 +48,7 @@ var ( // than pod with 100M current memory and 150M recommendation (100% increase vs 50% increase) type UpdatePriorityCalculator struct { vpa *vpa_types.VerticalPodAutoscaler - pods []prioritizedPod + pods []PrioritizedPod config *UpdateConfig recommendationProcessor vpa_api_util.RecommendationProcessor priorityProcessor PriorityProcessor @@ -116,6 +116,8 @@ func (calc *UpdatePriorityCalculator) AddPod(pod *apiv1.Pod, now time.Time) { } } + disruptionlessRecommendation := calc.CalculateDisruptionFreeActions(pod, processedRecommendation) + // The update is allowed in following cases: // - the request is outside the recommended range for some container. // - the pod lives for at least 24h and the resource diff is >= MinChangePriority. @@ -126,14 +128,29 @@ func (calc *UpdatePriorityCalculator) AddPod(pod *apiv1.Pod, now time.Time) { klog.V(4).InfoS("Not updating pod, missing field pod.Status.StartTime", "pod", klog.KObj(pod)) return } - if now.Before(pod.Status.StartTime.Add(*podLifetimeUpdateThreshold)) { - klog.V(4).InfoS("Not updating a short-lived pod, request within recommended range", "pod", klog.KObj(pod)) - return - } + // TODO(maxcao13): hopefully this doesn't break anything but we switch the order so that significant change is checked first before lifetime + // this way we don't in-place scale it for insignificant change, else we would mark it disruptionless and still have an in-place update if updatePriority.ResourceDiff < calc.config.MinChangePriority { klog.V(4).InfoS("Not updating pod, resource diff too low", "pod", klog.KObj(pod), "updatePriority", updatePriority) return } + if now.Before(pod.Status.StartTime.Add(*podLifetimeUpdateThreshold)) { + // TODO(jkyros): do we need an in-place update threshold arg ? + // If our recommendations are disruptionless, we can bypass the threshold limit + if len(disruptionlessRecommendation.ContainerRecommendations) > 0 { + klog.V(2).Infof("Short-lived, but pod still accepted for DISRUPTIONLESS (%d/%d) in-place update %v/%v with priority %v", len(disruptionlessRecommendation.ContainerRecommendations), len(processedRecommendation.ContainerRecommendations), pod.Namespace, pod.Name, updatePriority.ResourceDiff) + updatePriority.Disruptionless = true + calc.pods = append(calc.pods, PrioritizedPod{ + pod: pod, + priority: updatePriority, + recommendation: disruptionlessRecommendation}) + } else { + // if it's not disruptionless, we fallback to the Recreate conditions which already failed + // (quick oom, outside recommended range, long-lived + significant change), so don't update this pod + klog.V(4).InfoS("Not updating a short-lived pod, request within recommended range", "pod", klog.KObj(pod)) + } + return + } } // If the pod has quick OOMed then evict only if the resources will change @@ -142,12 +159,30 @@ func (calc *UpdatePriorityCalculator) AddPod(pod *apiv1.Pod, now time.Time) { return } klog.V(2).InfoS("Pod accepted for update", "pod", klog.KObj(pod), "updatePriority", updatePriority.ResourceDiff, "processedRecommendations", calc.GetProcessedRecommendationTargets(processedRecommendation)) - calc.pods = append(calc.pods, prioritizedPod{ + calc.pods = append(calc.pods, PrioritizedPod{ pod: pod, priority: updatePriority, recommendation: processedRecommendation}) } +// GetSortedPrioritizedPods returns a list of prioritized pods ordered by update priority (highest update priority first). Used instead +// of GetSortedPods when we need access to the priority information +func (calc *UpdatePriorityCalculator) GetSortedPrioritizedPods(admission PodEvictionAdmission) []*PrioritizedPod { + sort.Sort(byPriorityDesc(calc.pods)) + + //result := []*apiv1.Pod{} + result := []*PrioritizedPod{} + for num, podPrio := range calc.pods { + if admission.Admit(podPrio.pod, podPrio.recommendation) { + result = append(result, &calc.pods[num]) + } else { + klog.V(2).Infof("pod removed from update queue by PodEvictionAdmission: %v", podPrio.pod.Name) + } + } + + return result +} + // GetSortedPods returns a list of pods ordered by update priority (highest update priority first) func (calc *UpdatePriorityCalculator) GetSortedPods(admission PodEvictionAdmission) []*apiv1.Pod { sort.Sort(byPriorityDesc(calc.pods)) @@ -206,12 +241,25 @@ func parseVpaObservedContainers(pod *apiv1.Pod) (bool, sets.Set[string]) { return hasObservedContainers, vpaContainerSet } -type prioritizedPod struct { +// PrioritizedPod contains the priority and recommendation details for a pod. +// TODO(jkyros): I made this public, but there may be a cleaner way +type PrioritizedPod struct { pod *apiv1.Pod priority PodPriority recommendation *vpa_types.RecommendedPodResources } +// IsDisruptionless returns the disruptionless status of the underlying pod priority +// TODO(jkyros): scope issues, maybe not the best place to put Disruptionless +func (p PrioritizedPod) IsDisruptionless() bool { + return p.priority.Disruptionless +} + +// Pod returns the underlying private pod +func (p PrioritizedPod) Pod() *apiv1.Pod { + return p.pod +} + // PodPriority contains data for a pod update that can be used to prioritize between updates. type PodPriority struct { // Is any container outside of the recommended range. @@ -220,9 +268,11 @@ type PodPriority struct { ScaleUp bool // Relative difference between the total requested and total recommended resources. ResourceDiff float64 + // Is this update disruptionless + Disruptionless bool } -type byPriorityDesc []prioritizedPod +type byPriorityDesc []PrioritizedPod func (list byPriorityDesc) Len() int { return len(list) @@ -250,3 +300,67 @@ func (p PodPriority) Less(other PodPriority) bool { // 2. A pod with larger value of resourceDiff takes precedence. return p.ResourceDiff < other.ResourceDiff } + +// CalculateDisruptionFreeActions calculates the set of actions we think we can perform without disruption based on the pod/container resize/restart +// policies and returns that set of actions. +func (calc *UpdatePriorityCalculator) CalculateDisruptionFreeActions(pod *apiv1.Pod, recommendation *vpa_types.RecommendedPodResources) *vpa_types.RecommendedPodResources { + + var disruptionlessRecommendation = &vpa_types.RecommendedPodResources{} + + for _, container := range pod.Spec.Containers { + // If we don't have a resize policy, we can't check it + if len(container.ResizePolicy) == 0 { + continue + } + + // So we get whatever the recommendation was for this container + resourceRec := getRecommendationForContainerName(container.Name, recommendation) + // If we didn't find a recommendation for this container, we don't have anything to do + if resourceRec == nil { + continue + } + // Then we go through all the resource recommendations it has + for resource := range resourceRec.Target { + // And we look up what the restart policy is for those resources + resourceRestartPolicy := getRestartPolicyForResource(resource, container.ResizePolicy) + // If we don't have one, that's probably bad + if resourceRestartPolicy == nil { + continue + } + // If we do have one, and it's disruptive, then we know this won't work + if *resourceRestartPolicy != apiv1.NotRequired { + continue + } + + } + + // And if we made it here, we should theoretically be able to do this without disruption + disruptionlessRecommendation.ContainerRecommendations = append(disruptionlessRecommendation.ContainerRecommendations, *resourceRec) + + } + + return disruptionlessRecommendation +} + +// getRecommendationForContainerName searches through the list of ContainerRecommendations until it finds one matching the named container. Used +// to match up containers with their recommendations (we have container, we want resource recommendation) +func getRecommendationForContainerName(name string, recommendation *vpa_types.RecommendedPodResources) *vpa_types.RecommendedContainerResources { + for _, recommendationContainer := range recommendation.ContainerRecommendations { + if recommendationContainer.ContainerName == name { + return &recommendationContainer + } + } + return nil +} + +// getRestartPolicyForResource searches through the list of resources in the resize policy until it finds the one matching the named resource. Used +// to match up restart policies with our resource recommendations (we have resource, we want policy). +func getRestartPolicyForResource(resourceName apiv1.ResourceName, policy []apiv1.ContainerResizePolicy) *apiv1.ResourceResizeRestartPolicy { + // TODO(jkyros): can there be duplicate policies for resources? we just take the first one now + for _, resizePolicy := range policy { + if resizePolicy.ResourceName == resourceName { + return &resizePolicy.RestartPolicy + } + } + return nil +} diff --git a/vertical-pod-autoscaler/pkg/utils/annotations/vpa_inplace_update.go b/vertical-pod-autoscaler/pkg/utils/annotations/vpa_inplace_update.go new file mode 100644 index 00000000000..2cb64ed1e64 --- /dev/null +++ b/vertical-pod-autoscaler/pkg/utils/annotations/vpa_inplace_update.go @@ -0,0 +1,29 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package annotations + +// TODO(maxcao13): This annotation currently doesn't do anything. Do we want an annotation to show vpa inplace resized only for cosmetic reasons? + +const ( + // VpaInPlaceUpdatedLabel is a label used by the vpa inplace updated annotation. + VpaInPlaceUpdatedLabel = "vpaInPlaceUpdated" +) + +// GetVpaInPlaceUpdatedValue creates an annotation value for a given pod. +func GetVpaInPlaceUpdatedValue() string { + return "vpaInPlaceUpdated" +} diff --git a/vertical-pod-autoscaler/pkg/utils/metrics/updater/updater.go b/vertical-pod-autoscaler/pkg/utils/metrics/updater/updater.go index 9a068b735ef..a3034350da7 100644 --- a/vertical-pod-autoscaler/pkg/utils/metrics/updater/updater.go +++ b/vertical-pod-autoscaler/pkg/utils/metrics/updater/updater.go @@ -75,6 +75,38 @@ var ( }, []string{"vpa_size_log2"}, ) + inPlaceUpdatableCount = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Name: "in_place_Updatable_pods_total", + Help: "Number of Pods matching in place update criteria.", + }, []string{"vpa_size_log2"}, + ) + + inPlaceUpdatedCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: metricsNamespace, + Name: "in_place_updated_pods_total", + Help: "Number of Pods updated in-place by Updater to apply a new recommendation.", + }, []string{"vpa_size_log2"}, + ) + + vpasWithInPlaceUpdatablePodsCount = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Name: "vpas_with_in_place_Updatable_pods_total", + Help: "Number of VPA objects with at least one Pod matching in place update criteria.", + }, []string{"vpa_size_log2"}, + ) + + vpasWithInPlaceUpdatedPodsCount = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Name: "vpas_with_in_place_updated_pods_total", + Help: "Number of VPA objects with at least one in-place updated Pod.", + }, []string{"vpa_size_log2"}, + ) + functionLatency = metrics.CreateExecutionTimeMetric(metricsNamespace, "Time spent in various parts of VPA Updater main loop.") ) @@ -123,6 +155,27 @@ func AddEvictedPod(vpaSize int) { evictedCount.WithLabelValues(strconv.Itoa(log2)).Inc() } +// NewInPlaceUpdtateablePodsCounter returns a wrapper for counting Pods which are matching in-place update criteria +func NewInPlaceUpdtateablePodsCounter() *SizeBasedGauge { + return newSizeBasedGauge(evictableCount) +} + +// NewVpasWithInPlaceUpdtateablePodsCounter returns a wrapper for counting VPA objects with Pods matching in-place update criteria +func NewVpasWithInPlaceUpdtateablePodsCounter() *SizeBasedGauge { + return newSizeBasedGauge(vpasWithEvictablePodsCount) +} + +// NewVpasWithInPlaceUpdtatedPodsCounter returns a wrapper for counting VPA objects with evicted Pods +func NewVpasWithInPlaceUpdtatedPodsCounter() *SizeBasedGauge { + return newSizeBasedGauge(vpasWithEvictedPodsCount) +} + +// AddInPlaceUpdatedPod increases the counter of pods updated in place by Updater, by given VPA size +func AddInPlaceUpdatedPod(vpaSize int) { + log2 := metrics.GetVpaSizeLog2(vpaSize) + inPlaceUpdatedCount.WithLabelValues(strconv.Itoa(log2)).Inc() +} + // Add increases the counter for the given VPA size func (g *SizeBasedGauge) Add(vpaSize int, value int) { log2 := metrics.GetVpaSizeLog2(vpaSize) diff --git a/vertical-pod-autoscaler/pkg/utils/test/test_utils.go b/vertical-pod-autoscaler/pkg/utils/test/test_utils.go index 257f613504f..3d282f5a073 100644 --- a/vertical-pod-autoscaler/pkg/utils/test/test_utils.go +++ b/vertical-pod-autoscaler/pkg/utils/test/test_utils.go @@ -121,6 +121,24 @@ func (m *PodsEvictionRestrictionMock) CanEvict(pod *apiv1.Pod) bool { return args.Bool(0) } +// InPlaceUpdate is a mock implementation of PodsEvictionRestriction.InPlaceUpdate +func (m *PodsEvictionRestrictionMock) InPlaceUpdate(pod *apiv1.Pod, vpa *vpa_types.VerticalPodAutoscaler, eventRecorder record.EventRecorder) error { + args := m.Called(pod, eventRecorder) + return args.Error(0) +} + +// CanInPlaceUpdate is a mock implementation of PodsEvictionRestriction.CanInPlaceUpdate +func (m *PodsEvictionRestrictionMock) CanInPlaceUpdate(pod *apiv1.Pod) bool { + args := m.Called(pod) + return args.Bool(0) +} + +// IsInPlaceUpdating is a mock implementation of PodsEvictionRestriction.IsInPlaceUpdating +func (m *PodsEvictionRestrictionMock) IsInPlaceUpdating(pod *apiv1.Pod) bool { + args := m.Called(pod) + return args.Bool(0) +} + // PodListerMock is a mock of PodLister type PodListerMock struct { mock.Mock