Skip to content

feat: add fallback support for value metric type #6655

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
### New

- **General**: Add Fallback option `behavior` for dynamic fallback calculation ([#6450](https://github.com/kedacore/keda/issues/6450))
- **General**: Add Fallback support for triggers of `Value` metric type ([#6655](https://github.com/kedacore/keda/pull/6655))
- **General**: Add support for time-bound Kubernetes ServiceAccount tokens as a source for TriggerAuthentication ([#6136](https://github.com/kedacore/keda/issues/6136))
- **General**: Enable OpenSSF Scorecard to enhance security practices across the project ([#5913](https://github.com/kedacore/keda/issues/5913))
- **General**: Introduce new NSQ scaler ([#3281](https://github.com/kedacore/keda/issues/3281))
Expand Down
13 changes: 3 additions & 10 deletions apis/keda/v1alpha1/scaledobject_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,21 +294,14 @@ func CheckFallbackValid(scaledObject *ScaledObject) error {
scaledObject.Spec.Fallback.FailureThreshold, scaledObject.Spec.Fallback.Replicas)
}

if scaledObject.IsUsingModifiers() {
if scaledObject.Spec.Advanced.ScalingModifiers.MetricType != autoscalingv2.AverageValueMetricType {
return fmt.Errorf("when using ScalingModifiers, ScaledObject.Spec.Advanced.ScalingModifiers.MetricType must be AverageValue to have fallback enabled")
}
} else {
if !scaledObject.IsUsingModifiers() {
fallbackValid := false
for _, trigger := range scaledObject.Spec.Triggers {
if trigger.Type == cpuString || trigger.Type == memoryString {
continue
}
// If at least one trigger is of the type `AverageValue`, then having fallback is valid.
if trigger.MetricType == autoscalingv2.AverageValueMetricType {
fallbackValid = true
break
}
fallbackValid = true
break
}

if !fallbackValid {
Expand Down
93 changes: 4 additions & 89 deletions apis/keda/v1alpha1/scaledobject_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ var _ = It("shouldn't validate the so creation when the fallback is configured a
}).Should(HaveOccurred())
})

var _ = It("should validate the so creation when the fallback is configured, and at least one trigger (besides cpu/memory) has metricType == AverageValue.", func() {
var _ = It("should validate the so creation when the fallback is configured, and at least one trigger (besides cpu/memory) is configured.", func() {
namespaceName := "right-fallback-at-least-one-averagevalue"
namespace := createNamespace(namespaceName)
workload := createDeployment(namespaceName, true, true)
Expand Down Expand Up @@ -244,8 +244,8 @@ var _ = It("should validate the so creation when the fallback is configured, and
}).ShouldNot(HaveOccurred())
})

var _ = It("shouldn't validate the so creation when the fallback is configured, and NO trigger (besides cpu/memory) has metricType == AverageValue.", func() {
namespaceName := "wrong-fallback-none-averagevalue"
var _ = It("should validate the so creation when the fallback is configured, and so uses ScalingModifiers.", func() {
namespaceName := "right-fallback-scalingmodifier"
namespace := createNamespace(namespaceName)
workload := createDeployment(namespaceName, true, true)
// Create ScaledObject with cpu and memory triggers.
Expand Down Expand Up @@ -277,6 +277,7 @@ var _ = It("shouldn't validate the so creation when the fallback is configured,
FailureThreshold: 3,
Replicas: 6,
}
so.Spec.Advanced.ScalingModifiers = ScalingModifiers{Target: "2", Formula: "workload_trig_1 + workload_trig_2", MetricType: v2.ValueMetricType}

err := k8sClient.Create(context.Background(), namespace)
Expect(err).ToNot(HaveOccurred())
Expand All @@ -289,92 +290,6 @@ var _ = It("shouldn't validate the so creation when the fallback is configured,
}).Should(HaveOccurred())
})

var _ = It("shouldn't validate the so creation when the fallback is configured, and the so uses ScalingModifiers with its metricType != AverageValue.", func() {
namespaceName := "wrong-fallback-scalingmodifier"
namespace := createNamespace(namespaceName)
workload := createDeployment(namespaceName, true, true)

triggers := []ScaleTriggers{
{
Type: "cron",
Name: "cron_trig",
Metadata: map[string]string{
"timezone": "UTC",
"start": "0 * * * *",
"end": "1 * * * *",
"desiredReplicas": "1",
},
},
{
Type: "kubernetes-workload",
Name: "workload_trig",
Metadata: map[string]string{
"podSelector": "pod=workload-test",
"value": "1",
},
},
}
sm := ScalingModifiers{Target: "2", Formula: "workload_trig + cron_trig", MetricType: v2.ValueMetricType}
so := createScaledObjectScalingModifiers(namespaceName, sm, triggers)
so.Spec.Fallback = &Fallback{
FailureThreshold: 3,
Replicas: 6,
}

err := k8sClient.Create(context.Background(), namespace)
Expect(err).ToNot(HaveOccurred())

err = k8sClient.Create(context.Background(), workload)
Expect(err).ToNot(HaveOccurred())

Eventually(func() error {
return k8sClient.Create(context.Background(), so)
}).Should(HaveOccurred())
})

var _ = It("should validate the so creation when the fallback is configured, and the so uses ScalingModifiers with its metricType == AverageValue.", func() {
namespaceName := "right-fallback-scalingmodifier"
namespace := createNamespace(namespaceName)
workload := createDeployment(namespaceName, true, true)

triggers := []ScaleTriggers{
{
Type: "cron",
Name: "cron_trig",
Metadata: map[string]string{
"timezone": "UTC",
"start": "0 * * * *",
"end": "1 * * * *",
"desiredReplicas": "1",
},
},
{
Type: "kubernetes-workload",
Name: "workload_trig",
Metadata: map[string]string{
"podSelector": "pod=workload-test",
"value": "1",
},
},
}
sm := ScalingModifiers{Target: "2", Formula: "workload_trig + cron_trig", MetricType: v2.AverageValueMetricType}
so := createScaledObjectScalingModifiers(namespaceName, sm, triggers)
so.Spec.Fallback = &Fallback{
FailureThreshold: 3,
Replicas: 6,
}

err := k8sClient.Create(context.Background(), namespace)
Expect(err).ToNot(HaveOccurred())

err = k8sClient.Create(context.Background(), workload)
Expect(err).ToNot(HaveOccurred())

Eventually(func() error {
return k8sClient.Create(context.Background(), so)
}).ShouldNot(HaveOccurred())
})

var _ = It("shouldn't validate the so creation when there is another unmanaged hpa and so has transfer-hpa-ownership activated", func() {

hpaName := "test-unmanaged-hpa-ownership"
Expand Down
128 changes: 93 additions & 35 deletions pkg/fallback/fallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ package fallback

import (
"context"
"fmt"
"reflect"
"strconv"

v2 "k8s.io/api/autoscaling/v2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/scale"
"k8s.io/metrics/pkg/apis/external_metrics"
runtimeclient "sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -35,22 +37,8 @@ import (

var log = logf.Log.WithName("fallback")

func isFallbackEnabled(scaledObject *kedav1alpha1.ScaledObject, metricSpec v2.MetricSpec) bool {
if scaledObject.Spec.Fallback == nil {
return false
}

// If we are using ScalingModifiers, we only care whether its metric type is AverageValue (or not not set -> default, which is AverageValue).
// If not, test the type of metricSpec passed.
if scaledObject.IsUsingModifiers() && (scaledObject.Spec.Advanced.ScalingModifiers.MetricType != v2.AverageValueMetricType && scaledObject.Spec.Advanced.ScalingModifiers.MetricType != "") {
log.V(0).Info("Fallback can only be enabled for scalingModifiers with metric of type AverageValue", "scaledObject.Namespace", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name, "scalingModifiers.MetricType", scaledObject.Spec.Advanced.ScalingModifiers.MetricType)
return false
} else if !scaledObject.IsUsingModifiers() && metricSpec.External.Target.Type != v2.AverageValueMetricType {
log.V(0).Info("Fallback can only be enabled for triggers with metric of type AverageValue", "scaledObject.Namespace", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name, "metricSpec.External.Target.Type", metricSpec.External.Target.Type)
return false
}

return true
func isFallbackEnabled(scaledObject *kedav1alpha1.ScaledObject) bool {
return scaledObject.Spec.Fallback != nil
}

func GetMetricsWithFallback(ctx context.Context, client runtimeclient.Client, scaleClient scale.ScalesGetter, metrics []external_metrics.ExternalMetricValue, suppressedError error, metricName string, scaledObject *kedav1alpha1.ScaledObject, metricSpec v2.MetricSpec) ([]external_metrics.ExternalMetricValue, bool, error) {
Expand All @@ -65,7 +53,7 @@ func GetMetricsWithFallback(ctx context.Context, client runtimeclient.Client, sc
healthStatus.Status = kedav1alpha1.HealthStatusHappy
status.Health[metricName] = *healthStatus

updateStatus(ctx, client, scaledObject, status, metricSpec)
updateStatus(ctx, client, scaledObject, status)

return metrics, false, nil
}
Expand All @@ -74,10 +62,10 @@ func GetMetricsWithFallback(ctx context.Context, client runtimeclient.Client, sc
*healthStatus.NumberOfFailures++
status.Health[metricName] = *healthStatus

updateStatus(ctx, client, scaledObject, status, metricSpec)
updateStatus(ctx, client, scaledObject, status)

switch {
case !isFallbackEnabled(scaledObject, metricSpec):
case !isFallbackEnabled(scaledObject):
return nil, false, suppressedError
case !HasValidFallback(scaledObject):
log.Info("Failed to validate ScaledObject Spec. Please check that parameters are positive integers", "scaledObject.Namespace", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name)
Expand All @@ -92,7 +80,13 @@ func GetMetricsWithFallback(ctx context.Context, client runtimeclient.Client, sc
return nil, false, suppressedError
}
}
return doFallback(scaledObject, metricSpec, metricName, currentReplicas, suppressedError), true, nil

l := doFallback(ctx, client, scaledObject, metricSpec, metricName, currentReplicas, suppressedError)
if l == nil {
return l, false, fmt.Errorf("error performing fallback")
}
return l, true, nil

default:
return nil, false, suppressedError
}
Expand All @@ -119,46 +113,110 @@ func HasValidFallback(scaledObject *kedav1alpha1.ScaledObject) bool {
modifierChecking
}

func doFallback(scaledObject *kedav1alpha1.ScaledObject, metricSpec v2.MetricSpec, metricName string, currentReplicas int32, suppressedError error) []external_metrics.ExternalMetricValue {
// This depends on the fact that Deployment, StatefulSet, ReplicaSet (and Argo Rollouts CRD) all have the status.readyReplicas
// field that we can use directly instead of getting their selector and listing all pods with that selector, the way HPA actually
// does it. Thus, we offset that overhead to the respective controller of the CRD.
// Any other CRD that doesn't have `.status.readyReplicas` won't support scaling on Value metrics.
func getReadyReplicasCount(ctx context.Context, client runtimeclient.Client, scaledObject *kedav1alpha1.ScaledObject) (int32, error) {
// Fetching the scaleTargetRef as an unstructured.
u := &unstructured.Unstructured{}
if scaledObject.Status.ScaleTargetGVKR == nil {
return 0, fmt.Errorf("scaledObject.Status.ScaleTargetGVKR is empty")
}
u.SetGroupVersionKind(scaledObject.Status.ScaleTargetGVKR.GroupVersionKind())

if err := client.Get(ctx, runtimeclient.ObjectKey{Namespace: scaledObject.Namespace, Name: scaledObject.Spec.ScaleTargetRef.Name}, u); err != nil {
return 0, fmt.Errorf("error getting scaleTargetRef: %w", err)
}

readyReplicasField, found, err := unstructured.NestedFieldCopy(u.Object, "status", "readyReplicas")
if !found {
return 0, fmt.Errorf("error accessing status.readyReplicas in scaleTarget object: no such field exists")
}
if err != nil {
return 0, fmt.Errorf("error accessing status.readyReplicas in scaleTarget object: %w", err)
}

v := reflect.ValueOf(readyReplicasField)
// This is probably impossible if the field is found, but just for extra guard.
if v.IsZero() {
return 0, fmt.Errorf("error accessing status.readyReplicas in scaleTarget object: field is nil")
}

var readyReplicas int32
// readyReplicas can be a signed or unsigned integer, otherwise return an error.
switch {
case v.CanInt():
readyReplicas = int32(v.Int())
case v.CanUint():
readyReplicas = int32(v.Uint())
default:
return 0, fmt.Errorf("unexpected type of status.readyReplicas in scaleTarget object, expected integer or unsigned integer, got: %v", reflect.TypeOf(readyReplicas))
}

// Guard against the case where readyReplicas=0, because we'll be dividing by it later.
if readyReplicas == 0 {
return 0, fmt.Errorf("status.readyReplicas is 0 in scaleTargetRef")
}

return readyReplicas, nil
}

func doFallback(ctx context.Context, client runtimeclient.Client, scaledObject *kedav1alpha1.ScaledObject, metricSpec v2.MetricSpec, metricName string, currentReplicas int32, suppressedError error) []external_metrics.ExternalMetricValue {
fallbackBehavior := scaledObject.Spec.Fallback.Behavior
fallbackReplicas := int64(scaledObject.Spec.Fallback.Replicas)
var replicas int64
var replicas float64

switch fallbackBehavior {
case kedav1alpha1.FallbackBehaviorStatic:
replicas = fallbackReplicas
replicas = float64(fallbackReplicas)
case kedav1alpha1.FallbackBehaviorCurrentReplicas:
replicas = int64(currentReplicas)
replicas = float64(currentReplicas)
case kedav1alpha1.FallbackBehaviorCurrentReplicasIfHigher:
currentReplicasCount := int64(currentReplicas)
if currentReplicasCount > fallbackReplicas {
replicas = currentReplicasCount
replicas = float64(currentReplicasCount)
} else {
replicas = fallbackReplicas
replicas = float64(fallbackReplicas)
}
case kedav1alpha1.FallbackBehaviorCurrentReplicasIfLower:
currentReplicasCount := int64(currentReplicas)
if currentReplicasCount < fallbackReplicas {
replicas = currentReplicasCount
replicas = float64(currentReplicasCount)
} else {
replicas = fallbackReplicas
replicas = float64(fallbackReplicas)
}
default:
replicas = fallbackReplicas
replicas = float64(fallbackReplicas)
}

var normalisationValue int64
// If the metricType is Value, we get the number of readyReplicas, and divide replicas by it.
if (!scaledObject.IsUsingModifiers() && metricSpec.External.Target.Type == v2.ValueMetricType) ||
(scaledObject.IsUsingModifiers() && scaledObject.Spec.Advanced.ScalingModifiers.MetricType == v2.ValueMetricType) {
readyReplicas, err := getReadyReplicasCount(ctx, client, scaledObject)
if err != nil {
log.Error(err, "failed to do fallback for metric of type Value", "scaledObject.Namespace", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name, "metricName", metricName)
return nil
}
replicas /= float64(readyReplicas)
}

var normalisationValue float64
if !scaledObject.IsUsingModifiers() {
normalisationValue = int64(metricSpec.External.Target.AverageValue.AsApproximateFloat64())
if metricSpec.External.Target.Type == v2.AverageValueMetricType {
normalisationValue = metricSpec.External.Target.AverageValue.AsApproximateFloat64()
} else if metricSpec.External.Target.Type == v2.ValueMetricType {
normalisationValue = metricSpec.External.Target.Value.AsApproximateFloat64()
}
} else {
value, _ := strconv.ParseInt(scaledObject.Spec.Advanced.ScalingModifiers.Target, 10, 64)
value, _ := strconv.ParseFloat(scaledObject.Spec.Advanced.ScalingModifiers.Target, 64)
normalisationValue = value
metricName = kedav1alpha1.CompositeMetricName
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewMilliQuantity(normalisationValue*1000*replicas, resource.DecimalSI),
Value: *resource.NewMilliQuantity(int64(normalisationValue*1000*replicas), resource.DecimalSI),
Timestamp: metav1.Now(),
}
fallbackMetrics := []external_metrics.ExternalMetricValue{metric}
Expand All @@ -173,10 +231,10 @@ func doFallback(scaledObject *kedav1alpha1.ScaledObject, metricSpec v2.MetricSpe
return fallbackMetrics
}

func updateStatus(ctx context.Context, client runtimeclient.Client, scaledObject *kedav1alpha1.ScaledObject, status *kedav1alpha1.ScaledObjectStatus, metricSpec v2.MetricSpec) {
func updateStatus(ctx context.Context, client runtimeclient.Client, scaledObject *kedav1alpha1.ScaledObject, status *kedav1alpha1.ScaledObjectStatus) {
patch := runtimeclient.MergeFrom(scaledObject.DeepCopy())

if !isFallbackEnabled(scaledObject, metricSpec) || !HasValidFallback(scaledObject) {
if !isFallbackEnabled(scaledObject) || !HasValidFallback(scaledObject) {
log.V(1).Info("Fallback is not enabled, hence skipping the health update to the scaledobject", "scaledObject.Namespace", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name)
return
}
Expand Down
22 changes: 0 additions & 22 deletions pkg/fallback/fallback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,28 +218,6 @@ var _ = Describe("fallback", func() {
Expect(so.Status.Health[metricName]).To(haveFailureAndStatus(4, kedav1alpha1.HealthStatusFailing))
})

It("should behave as if fallback is disabled when the metrics spec target type is not average value metric", func() {
so := buildScaledObject(
&kedav1alpha1.Fallback{
FailureThreshold: int32(3),
Replicas: int32(10),
}, nil,
)

qty := resource.NewQuantity(int64(3), resource.DecimalSI)
metricsSpec := v2.MetricSpec{
External: &v2.ExternalMetricSource{
Target: v2.MetricTarget{
Type: v2.UtilizationMetricType,
Value: qty,
},
},
}

isEnabled := isFallbackEnabled(so, metricsSpec)
Expect(isEnabled).Should(BeFalse())
})

It("should ignore error if we fail to update kubernetes status", func() {
scaler.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Eq(metricName)).Return(nil, false, errors.New("some error"))
startingNumberOfFailures := int32(3)
Expand Down
Loading
Loading