Skip to content

Commit 30daf4b

Browse files
committed
VPA: Allow updater to actuate InPlaceOrRecreate updates
Introduces large changes in the updater component to allow InPlaceOrRecreate mode. If the feature gate is enabled and the VPA update mode is InPlaceOrRecreate, the updater will attempt an in place update by first checking a number of preconditions before actuation (e.g., if the pod's qosClass would be changed, whether we are already in-place resizing, whether an in-place update may potentially violate disruption(previously eviction) tolerance, etc.). After the preconditions are validated, we send an update signal to the InPlacePodVerticalScaling API with the recommendation, which may or may not fail. Failures are handled in subsequent updater loops. As for implementation details, patchCalculators have been re-used from the admission-controllers code for the updater in order to calculate recommendations for the updater to actuate. InPlace logic has been mostly stuffed in the eviction package for now because of similarities and ease (user-initated API calls eviction vs. in-place; both cause disruption). It may or may not be useful to refactor this later. Signed-off-by: Max Cao <[email protected]>
1 parent 6efcfdd commit 30daf4b

File tree

8 files changed

+591
-37
lines changed

8 files changed

+591
-37
lines changed

vertical-pod-autoscaler/pkg/admission-controller/resource/pod/patch/resource_updates.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func getContainerPatch(pod *core.Pod, i int, annotationsPerContainer vpa_api_uti
7878
// Add empty resources object if missing.
7979
if pod.Spec.Containers[i].Resources.Limits == nil &&
8080
pod.Spec.Containers[i].Resources.Requests == nil {
81-
patches = append(patches, getPatchInitializingEmptyResources(i))
81+
patches = append(patches, GetPatchInitializingEmptyResources(i))
8282
}
8383

8484
annotations, found := annotationsPerContainer[pod.Spec.Containers[i].Name]
@@ -96,31 +96,35 @@ func getContainerPatch(pod *core.Pod, i int, annotationsPerContainer vpa_api_uti
9696
func appendPatchesAndAnnotations(patches []resource_admission.PatchRecord, annotations []string, current core.ResourceList, containerIndex int, resources core.ResourceList, fieldName, resourceName string) ([]resource_admission.PatchRecord, []string) {
9797
// Add empty object if it's missing and we're about to fill it.
9898
if current == nil && len(resources) > 0 {
99-
patches = append(patches, getPatchInitializingEmptyResourcesSubfield(containerIndex, fieldName))
99+
patches = append(patches, GetPatchInitializingEmptyResourcesSubfield(containerIndex, fieldName))
100100
}
101101
for resource, request := range resources {
102-
patches = append(patches, getAddResourceRequirementValuePatch(containerIndex, fieldName, resource, request))
102+
patches = append(patches, GetAddResourceRequirementValuePatch(containerIndex, fieldName, resource, request))
103103
annotations = append(annotations, fmt.Sprintf("%s %s", resource, resourceName))
104104
}
105105
return patches, annotations
106106
}
107107

108-
func getAddResourceRequirementValuePatch(i int, kind string, resource core.ResourceName, quantity resource.Quantity) resource_admission.PatchRecord {
108+
// GetAddResourceRequirementValuePatch returns a patch record to add resource requirements to a container.
109+
func GetAddResourceRequirementValuePatch(i int, kind string, resource core.ResourceName, quantity resource.Quantity) resource_admission.PatchRecord {
109110
return resource_admission.PatchRecord{
110111
Op: "add",
111112
Path: fmt.Sprintf("/spec/containers/%d/resources/%s/%s", i, kind, resource),
112113
Value: quantity.String()}
113114
}
114115

115-
func getPatchInitializingEmptyResources(i int) resource_admission.PatchRecord {
116+
// GetPatchInitializingEmptyResources returns a patch record to initialize an empty resources object for a container.
117+
func GetPatchInitializingEmptyResources(i int) resource_admission.PatchRecord {
116118
return resource_admission.PatchRecord{
117119
Op: "add",
118120
Path: fmt.Sprintf("/spec/containers/%d/resources", i),
119121
Value: core.ResourceRequirements{},
120122
}
121123
}
122124

123-
func getPatchInitializingEmptyResourcesSubfield(i int, kind string) resource_admission.PatchRecord {
125+
// GetPatchInitializingEmptyResourcesSubfield returns a patch record to initialize an empty subfield
126+
// (e.g., "requests" or "limits") within a container's resources object.
127+
func GetPatchInitializingEmptyResourcesSubfield(i int, kind string) resource_admission.PatchRecord {
124128
return resource_admission.PatchRecord{
125129
Op: "add",
126130
Path: fmt.Sprintf("/spec/containers/%d/resources/%s", i, kind),

vertical-pod-autoscaler/pkg/updater/eviction/pods_eviction_restriction.go

Lines changed: 194 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,26 @@ package eviction
1818

1919
import (
2020
"context"
21+
"encoding/json"
2122
"fmt"
2223
"time"
2324

2425
appsv1 "k8s.io/api/apps/v1"
2526
apiv1 "k8s.io/api/core/v1"
2627
policyv1 "k8s.io/api/policy/v1"
2728
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29+
k8stypes "k8s.io/apimachinery/pkg/types"
2830
appsinformer "k8s.io/client-go/informers/apps/v1"
2931
coreinformer "k8s.io/client-go/informers/core/v1"
3032
kube_client "k8s.io/client-go/kubernetes"
3133
"k8s.io/client-go/tools/cache"
3234
"k8s.io/client-go/tools/record"
3335
"k8s.io/klog/v2"
3436

37+
resource_updates "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/admission-controller/resource"
38+
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/admission-controller/resource/pod/patch"
39+
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/features"
40+
3541
vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1"
3642
)
3743

@@ -48,12 +54,18 @@ type PodsEvictionRestriction interface {
4854
Evict(pod *apiv1.Pod, vpa *vpa_types.VerticalPodAutoscaler, eventRecorder record.EventRecorder) error
4955
// CanEvict checks if pod can be safely evicted
5056
CanEvict(pod *apiv1.Pod) bool
57+
58+
// InPlaceUpdate updates the pod resources in-place
59+
InPlaceUpdate(pod *apiv1.Pod, vpa *vpa_types.VerticalPodAutoscaler, eventRecorder record.EventRecorder) error
60+
// CanEvict checks if pod can be safely evicted
61+
CanInPlaceUpdate(pod *apiv1.Pod) bool
5162
}
5263

5364
type podsEvictionRestrictionImpl struct {
5465
client kube_client.Interface
5566
podToReplicaCreatorMap map[string]podReplicaCreator
5667
creatorToSingleGroupStatsMap map[podReplicaCreator]singleGroupStats
68+
patchCalculators []patch.Calculator
5769
}
5870

5971
type singleGroupStats struct {
@@ -62,13 +74,14 @@ type singleGroupStats struct {
6274
running int
6375
evictionTolerance int
6476
evicted int
77+
inPlaceUpdating int
6578
}
6679

6780
// PodsEvictionRestrictionFactory creates PodsEvictionRestriction
6881
type PodsEvictionRestrictionFactory interface {
6982
// NewPodsEvictionRestriction creates PodsEvictionRestriction for given set of pods,
7083
// controlled by a single VPA object.
71-
NewPodsEvictionRestriction(pods []*apiv1.Pod, vpa *vpa_types.VerticalPodAutoscaler) PodsEvictionRestriction
84+
NewPodsEvictionRestriction(pods []*apiv1.Pod, vpa *vpa_types.VerticalPodAutoscaler, patchCalculators []patch.Calculator) PodsEvictionRestriction
7285
}
7386

7487
type podsEvictionRestrictionFactoryImpl struct {
@@ -99,21 +112,40 @@ type podReplicaCreator struct {
99112

100113
// CanEvict checks if pod can be safely evicted
101114
func (e *podsEvictionRestrictionImpl) CanEvict(pod *apiv1.Pod) bool {
102-
cr, present := e.podToReplicaCreatorMap[getPodID(pod)]
115+
cr, present := e.podToReplicaCreatorMap[GetPodID(pod)]
103116
if present {
104117
singleGroupStats, present := e.creatorToSingleGroupStatsMap[cr]
105118
if pod.Status.Phase == apiv1.PodPending {
106119
return true
107120
}
108121
if present {
109122
shouldBeAlive := singleGroupStats.configured - singleGroupStats.evictionTolerance
110-
if singleGroupStats.running-singleGroupStats.evicted > shouldBeAlive {
123+
klog.V(4).InfoS("Pod disruption tolerance",
124+
"pod", klog.KObj(pod),
125+
"running", singleGroupStats.running,
126+
"configured", singleGroupStats.configured,
127+
"tolerance", singleGroupStats.evictionTolerance,
128+
"evicted", singleGroupStats.evicted,
129+
"updating", singleGroupStats.inPlaceUpdating)
130+
if IsInPlaceUpdating(pod) {
131+
if singleGroupStats.running-(singleGroupStats.evicted+(singleGroupStats.inPlaceUpdating-1)) > shouldBeAlive {
132+
klog.V(4).InfoS("Would be able to evict, but already resizing", "pod", klog.KObj(pod))
133+
if pod.Status.Resize == apiv1.PodResizeStatusInfeasible || pod.Status.Resize == apiv1.PodResizeStatusDeferred {
134+
klog.InfoS("Attempted in-place resize was impossible, should now evict", "pod", klog.KObj(pod), "resizePolicy", pod.Status.Resize)
135+
return true
136+
}
137+
}
138+
return false
139+
}
140+
141+
if singleGroupStats.running-(singleGroupStats.evicted+singleGroupStats.inPlaceUpdating) > shouldBeAlive {
111142
return true
112143
}
113144
// If all pods are running and eviction tolerance is small evict 1 pod.
114145
if singleGroupStats.running == singleGroupStats.configured &&
115146
singleGroupStats.evictionTolerance == 0 &&
116-
singleGroupStats.evicted == 0 {
147+
singleGroupStats.evicted == 0 &&
148+
singleGroupStats.inPlaceUpdating == 0 {
117149
return true
118150
}
119151
}
@@ -124,7 +156,7 @@ func (e *podsEvictionRestrictionImpl) CanEvict(pod *apiv1.Pod) bool {
124156
// Evict sends eviction instruction to api client. Returns error if pod cannot be evicted or if client returned error
125157
// Does not check if pod was actually evicted after eviction grace period.
126158
func (e *podsEvictionRestrictionImpl) Evict(podToEvict *apiv1.Pod, vpa *vpa_types.VerticalPodAutoscaler, eventRecorder record.EventRecorder) error {
127-
cr, present := e.podToReplicaCreatorMap[getPodID(podToEvict)]
159+
cr, present := e.podToReplicaCreatorMap[GetPodID(podToEvict)]
128160
if !present {
129161
return fmt.Errorf("pod not suitable for eviction %s/%s: not in replicated pods map", podToEvict.Namespace, podToEvict.Name)
130162
}
@@ -193,7 +225,7 @@ func NewPodsEvictionRestrictionFactory(client kube_client.Interface, minReplicas
193225

194226
// NewPodsEvictionRestriction creates PodsEvictionRestriction for a given set of pods,
195227
// controlled by a single VPA object.
196-
func (f *podsEvictionRestrictionFactoryImpl) NewPodsEvictionRestriction(pods []*apiv1.Pod, vpa *vpa_types.VerticalPodAutoscaler) PodsEvictionRestriction {
228+
func (f *podsEvictionRestrictionFactoryImpl) NewPodsEvictionRestriction(pods []*apiv1.Pod, vpa *vpa_types.VerticalPodAutoscaler, patchCalculators []patch.Calculator) PodsEvictionRestriction {
197229
// We can evict pod only if it is a part of replica set
198230
// For each replica set we can evict only a fraction of pods.
199231
// Evictions may be later limited by pod disruption budget if configured.
@@ -247,18 +279,24 @@ func (f *podsEvictionRestrictionFactoryImpl) NewPodsEvictionRestriction(pods []*
247279
singleGroup.configured = configured
248280
singleGroup.evictionTolerance = int(float64(configured) * f.evictionToleranceFraction)
249281
for _, pod := range replicas {
250-
podToReplicaCreatorMap[getPodID(pod)] = creator
282+
podToReplicaCreatorMap[GetPodID(pod)] = creator
251283
if pod.Status.Phase == apiv1.PodPending {
252284
singleGroup.pending = singleGroup.pending + 1
253285
}
286+
if IsInPlaceUpdating(pod) {
287+
singleGroup.inPlaceUpdating = singleGroup.inPlaceUpdating + 1
288+
}
254289
}
255290
singleGroup.running = len(replicas) - singleGroup.pending
256291
creatorToSingleGroupStatsMap[creator] = singleGroup
292+
257293
}
258294
return &podsEvictionRestrictionImpl{
259295
client: f.client,
260296
podToReplicaCreatorMap: podToReplicaCreatorMap,
261-
creatorToSingleGroupStatsMap: creatorToSingleGroupStatsMap}
297+
creatorToSingleGroupStatsMap: creatorToSingleGroupStatsMap,
298+
patchCalculators: patchCalculators,
299+
}
262300
}
263301

264302
func getPodReplicaCreator(pod *apiv1.Pod) (*podReplicaCreator, error) {
@@ -274,7 +312,8 @@ func getPodReplicaCreator(pod *apiv1.Pod) (*podReplicaCreator, error) {
274312
return podReplicaCreator, nil
275313
}
276314

277-
func getPodID(pod *apiv1.Pod) string {
315+
// GetPodID returns a string that uniquely identifies a pod by namespace and name
316+
func GetPodID(pod *apiv1.Pod) string {
278317
if pod == nil {
279318
return ""
280319
}
@@ -392,3 +431,149 @@ func setUpInformer(kubeClient kube_client.Interface, kind controllerKind) (cache
392431
}
393432
return informer, nil
394433
}
434+
435+
// CanInPlaceUpdate performs the same checks
436+
func (e *podsEvictionRestrictionImpl) CanInPlaceUpdate(pod *apiv1.Pod) bool {
437+
if !features.Enabled(features.InPlaceOrRecreate) {
438+
return false
439+
}
440+
cr, present := e.podToReplicaCreatorMap[GetPodID(pod)]
441+
if present {
442+
// If our QoS class is guaranteed, we can't change the resources without a restart
443+
if pod.Status.QOSClass == apiv1.PodQOSGuaranteed {
444+
return false
445+
}
446+
447+
if IsInPlaceUpdating(pod) {
448+
return false
449+
}
450+
451+
for _, container := range pod.Spec.Containers {
452+
// If some of these are populated, we know it at least understands resizing
453+
if container.ResizePolicy == nil {
454+
klog.InfoS("Can't resize pod, container resize policy does not exist; is InPlacePodVerticalScaling enabled?", "pod", klog.KObj(pod))
455+
return false
456+
}
457+
}
458+
459+
singleGroupStats, present := e.creatorToSingleGroupStatsMap[cr]
460+
// If we're pending, we can't in-place resize
461+
// TODO(jkyros): are we sure we can't? Should I just set this to "if running"?
462+
if pod.Status.Phase == apiv1.PodPending {
463+
klog.V(4).InfoS("Can't resize pending pod", "pod", klog.KObj(pod))
464+
return false
465+
}
466+
// TODO(maxcao13): May need to rename evictionTolerance to disruptionTolerance
467+
if present {
468+
// minimum number of pods that should be running to tolerate disruptions
469+
shouldBeAlive := singleGroupStats.configured - singleGroupStats.evictionTolerance
470+
// number of pods that are actually running
471+
actuallyAlive := singleGroupStats.running - (singleGroupStats.evicted + singleGroupStats.inPlaceUpdating)
472+
klog.V(4).InfoS("Checking pod disruption tolerance",
473+
"pod", klog.KObj(pod),
474+
"configuredPods", singleGroupStats.configured,
475+
"runningPods", singleGroupStats.running,
476+
"evictedPods", singleGroupStats.evicted,
477+
"inPlaceUpdatingPods", singleGroupStats.inPlaceUpdating,
478+
"evictionTolerance", singleGroupStats.evictionTolerance,
479+
"shouldBeAlive", shouldBeAlive,
480+
"actuallyAlive", actuallyAlive,
481+
)
482+
if actuallyAlive > shouldBeAlive {
483+
klog.V(4).InfoS("Pod can be resized in-place; more pods are running than required", "pod", klog.KObj(pod), "shouldBeAlive", shouldBeAlive, "actuallyAlive", actuallyAlive)
484+
return true
485+
}
486+
487+
// If all pods are running, no pods are being evicted or updated, and eviction tolerance is small, we can resize in-place
488+
if singleGroupStats.running == singleGroupStats.configured &&
489+
singleGroupStats.evictionTolerance == 0 &&
490+
singleGroupStats.evicted == 0 && singleGroupStats.inPlaceUpdating == 0 {
491+
klog.V(4).InfoS("Pod can be resized in-place; all pods are running and eviction tolerance is 0", "pod", klog.KObj(pod))
492+
return true
493+
}
494+
}
495+
}
496+
return false
497+
}
498+
499+
// InPlaceUpdate sends calculates patches and sends resize request to api client. Returns error if pod cannot be in-place updated or if client returned error.
500+
// Does not check if pod was actually in-place updated after grace period.
501+
func (e *podsEvictionRestrictionImpl) InPlaceUpdate(podToUpdate *apiv1.Pod, vpa *vpa_types.VerticalPodAutoscaler, eventRecorder record.EventRecorder) error {
502+
cr, present := e.podToReplicaCreatorMap[GetPodID(podToUpdate)]
503+
if !present {
504+
return fmt.Errorf("pod not suitable for eviction %v: not in replicated pods map", podToUpdate.Name)
505+
}
506+
507+
// separate patches since we have to patch resize and spec separately
508+
resourcePatches := []resource_updates.PatchRecord{}
509+
annotationPatches := []resource_updates.PatchRecord{}
510+
if podToUpdate.Annotations == nil {
511+
annotationPatches = append(annotationPatches, patch.GetAddEmptyAnnotationsPatch())
512+
}
513+
for i, calculator := range e.patchCalculators {
514+
p, err := calculator.CalculatePatches(podToUpdate, vpa)
515+
if err != nil {
516+
return err
517+
}
518+
klog.V(4).InfoS("Calculated patches for pod", "pod", klog.KObj(podToUpdate), "patches", p)
519+
// TODO(maxcao13): change how this works later, this is gross and depends on the resource calculator being first in the slice
520+
// we may not even want the updater to patch pod annotations at all
521+
if i == 0 {
522+
resourcePatches = append(resourcePatches, p...)
523+
} else {
524+
annotationPatches = append(annotationPatches, p...)
525+
}
526+
}
527+
if len(resourcePatches) > 0 {
528+
patch, err := json.Marshal(resourcePatches)
529+
if err != nil {
530+
return err
531+
}
532+
533+
res, err := e.client.CoreV1().Pods(podToUpdate.Namespace).Patch(context.TODO(), podToUpdate.Name, k8stypes.JSONPatchType, patch, metav1.PatchOptions{}, "resize")
534+
if err != nil {
535+
return err
536+
}
537+
klog.V(4).InfoS("In-place patched pod /resize subresource using patches ", "pod", klog.KObj(res), "patches", string(patch))
538+
539+
if len(annotationPatches) > 0 {
540+
patch, err := json.Marshal(annotationPatches)
541+
if err != nil {
542+
return err
543+
}
544+
res, err = e.client.CoreV1().Pods(podToUpdate.Namespace).Patch(context.TODO(), podToUpdate.Name, k8stypes.JSONPatchType, patch, metav1.PatchOptions{})
545+
if err != nil {
546+
return err
547+
}
548+
klog.V(4).InfoS("Patched pod annotations", "pod", klog.KObj(res), "patches", string(patch))
549+
}
550+
} else {
551+
return fmt.Errorf("no resource patches were calculated to apply")
552+
}
553+
554+
// TODO(maxcao13): If this keeps getting called on the same object with the same reason, it is considered a patch request.
555+
// And we fail to have the corresponding rbac for it. So figure out if we need this later.
556+
// Do we even need to emit an event? The node might reject the resize request. If so, should we rename this to InPlaceResizeAttempted?
557+
// eventRecorder.Event(podToUpdate, apiv1.EventTypeNormal, "InPlaceResizedByVPA", "Pod was resized in place by VPA Updater.")
558+
559+
if podToUpdate.Status.Phase == apiv1.PodRunning {
560+
singleGroupStats, present := e.creatorToSingleGroupStatsMap[cr]
561+
if !present {
562+
klog.InfoS("Internal error - cannot find stats for replication group", "pod", klog.KObj(podToUpdate), "podReplicaCreator", cr)
563+
} else {
564+
singleGroupStats.inPlaceUpdating = singleGroupStats.inPlaceUpdating + 1
565+
e.creatorToSingleGroupStatsMap[cr] = singleGroupStats
566+
}
567+
} else {
568+
klog.InfoS("Attempted to in-place update, but pod was not running", "pod", klog.KObj(podToUpdate), "phase", podToUpdate.Status.Phase)
569+
}
570+
571+
return nil
572+
}
573+
574+
// TODO(maxcao13): Switch to conditions after 1.33 is released: https://github.com/kubernetes/enhancements/pull/5089
575+
576+
// IsInPlaceUpdating checks whether or not the given pod is currently in the middle of an in-place update
577+
func IsInPlaceUpdating(podToCheck *apiv1.Pod) (isUpdating bool) {
578+
return podToCheck.Status.Resize != ""
579+
}

0 commit comments

Comments
 (0)