From 334992497ecf4fe8b20b62c110c78ed1bead76ea Mon Sep 17 00:00:00 2001 From: y-rabie Date: Thu, 27 Mar 2025 05:08:41 +0200 Subject: [PATCH] feat: add fallback support for value metric type Signed-off-by: y-rabie --- CHANGELOG.md | 1 + apis/keda/v1alpha1/scaledobject_types.go | 13 +- .../v1alpha1/scaledobject_webhook_test.go | 93 +------------ pkg/fallback/fallback.go | 128 +++++++++++++----- pkg/fallback/fallback_test.go | 22 --- tests/internals/fallback/fallback_test.go | 60 ++++++++ 6 files changed, 161 insertions(+), 156 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bbe430cbfcf..1095124fe2c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -64,6 +64,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio ### New - **General**: Add Fallback option `behavior` for dynamic fallback calculation ([#6450](https://github.com/kedacore/keda/issues/6450)) +- **General**: Add Fallback support for triggers of `Value` metric type ([#6655](https://github.com/kedacore/keda/pull/6655)) - **General**: Add support for time-bound Kubernetes ServiceAccount tokens as a source for TriggerAuthentication ([#6136](https://github.com/kedacore/keda/issues/6136)) - **General**: Enable OpenSSF Scorecard to enhance security practices across the project ([#5913](https://github.com/kedacore/keda/issues/5913)) - **General**: Introduce new NSQ scaler ([#3281](https://github.com/kedacore/keda/issues/3281)) diff --git a/apis/keda/v1alpha1/scaledobject_types.go b/apis/keda/v1alpha1/scaledobject_types.go index 9374a9f1c25..817b8d48d0e 100644 --- a/apis/keda/v1alpha1/scaledobject_types.go +++ b/apis/keda/v1alpha1/scaledobject_types.go @@ -294,21 +294,14 @@ func CheckFallbackValid(scaledObject *ScaledObject) error { scaledObject.Spec.Fallback.FailureThreshold, scaledObject.Spec.Fallback.Replicas) } - if scaledObject.IsUsingModifiers() { - if scaledObject.Spec.Advanced.ScalingModifiers.MetricType != autoscalingv2.AverageValueMetricType { - return fmt.Errorf("when using ScalingModifiers, ScaledObject.Spec.Advanced.ScalingModifiers.MetricType must be AverageValue to have fallback enabled") - } - } else { + if !scaledObject.IsUsingModifiers() { fallbackValid := false for _, trigger := range scaledObject.Spec.Triggers { if trigger.Type == cpuString || trigger.Type == memoryString { continue } - // If at least one trigger is of the type `AverageValue`, then having fallback is valid. - if trigger.MetricType == autoscalingv2.AverageValueMetricType { - fallbackValid = true - break - } + fallbackValid = true + break } if !fallbackValid { diff --git a/apis/keda/v1alpha1/scaledobject_webhook_test.go b/apis/keda/v1alpha1/scaledobject_webhook_test.go index 22550d6ae90..eac10231fc5 100644 --- a/apis/keda/v1alpha1/scaledobject_webhook_test.go +++ b/apis/keda/v1alpha1/scaledobject_webhook_test.go @@ -199,7 +199,7 @@ var _ = It("shouldn't validate the so creation when the fallback is configured a }).Should(HaveOccurred()) }) -var _ = It("should validate the so creation when the fallback is configured, and at least one trigger (besides cpu/memory) has metricType == AverageValue.", func() { +var _ = It("should validate the so creation when the fallback is configured, and at least one trigger (besides cpu/memory) is configured.", func() { namespaceName := "right-fallback-at-least-one-averagevalue" namespace := createNamespace(namespaceName) workload := createDeployment(namespaceName, true, true) @@ -244,8 +244,8 @@ var _ = It("should validate the so creation when the fallback is configured, and }).ShouldNot(HaveOccurred()) }) -var _ = It("shouldn't validate the so creation when the fallback is configured, and NO trigger (besides cpu/memory) has metricType == AverageValue.", func() { - namespaceName := "wrong-fallback-none-averagevalue" +var _ = It("should validate the so creation when the fallback is configured, and so uses ScalingModifiers.", func() { + namespaceName := "right-fallback-scalingmodifier" namespace := createNamespace(namespaceName) workload := createDeployment(namespaceName, true, true) // Create ScaledObject with cpu and memory triggers. @@ -277,6 +277,7 @@ var _ = It("shouldn't validate the so creation when the fallback is configured, FailureThreshold: 3, Replicas: 6, } + so.Spec.Advanced.ScalingModifiers = ScalingModifiers{Target: "2", Formula: "workload_trig_1 + workload_trig_2", MetricType: v2.ValueMetricType} err := k8sClient.Create(context.Background(), namespace) Expect(err).ToNot(HaveOccurred()) @@ -289,92 +290,6 @@ var _ = It("shouldn't validate the so creation when the fallback is configured, }).Should(HaveOccurred()) }) -var _ = It("shouldn't validate the so creation when the fallback is configured, and the so uses ScalingModifiers with its metricType != AverageValue.", func() { - namespaceName := "wrong-fallback-scalingmodifier" - namespace := createNamespace(namespaceName) - workload := createDeployment(namespaceName, true, true) - - triggers := []ScaleTriggers{ - { - Type: "cron", - Name: "cron_trig", - Metadata: map[string]string{ - "timezone": "UTC", - "start": "0 * * * *", - "end": "1 * * * *", - "desiredReplicas": "1", - }, - }, - { - Type: "kubernetes-workload", - Name: "workload_trig", - Metadata: map[string]string{ - "podSelector": "pod=workload-test", - "value": "1", - }, - }, - } - sm := ScalingModifiers{Target: "2", Formula: "workload_trig + cron_trig", MetricType: v2.ValueMetricType} - so := createScaledObjectScalingModifiers(namespaceName, sm, triggers) - so.Spec.Fallback = &Fallback{ - FailureThreshold: 3, - Replicas: 6, - } - - err := k8sClient.Create(context.Background(), namespace) - Expect(err).ToNot(HaveOccurred()) - - err = k8sClient.Create(context.Background(), workload) - Expect(err).ToNot(HaveOccurred()) - - Eventually(func() error { - return k8sClient.Create(context.Background(), so) - }).Should(HaveOccurred()) -}) - -var _ = It("should validate the so creation when the fallback is configured, and the so uses ScalingModifiers with its metricType == AverageValue.", func() { - namespaceName := "right-fallback-scalingmodifier" - namespace := createNamespace(namespaceName) - workload := createDeployment(namespaceName, true, true) - - triggers := []ScaleTriggers{ - { - Type: "cron", - Name: "cron_trig", - Metadata: map[string]string{ - "timezone": "UTC", - "start": "0 * * * *", - "end": "1 * * * *", - "desiredReplicas": "1", - }, - }, - { - Type: "kubernetes-workload", - Name: "workload_trig", - Metadata: map[string]string{ - "podSelector": "pod=workload-test", - "value": "1", - }, - }, - } - sm := ScalingModifiers{Target: "2", Formula: "workload_trig + cron_trig", MetricType: v2.AverageValueMetricType} - so := createScaledObjectScalingModifiers(namespaceName, sm, triggers) - so.Spec.Fallback = &Fallback{ - FailureThreshold: 3, - Replicas: 6, - } - - err := k8sClient.Create(context.Background(), namespace) - Expect(err).ToNot(HaveOccurred()) - - err = k8sClient.Create(context.Background(), workload) - Expect(err).ToNot(HaveOccurred()) - - Eventually(func() error { - return k8sClient.Create(context.Background(), so) - }).ShouldNot(HaveOccurred()) -}) - var _ = It("shouldn't validate the so creation when there is another unmanaged hpa and so has transfer-hpa-ownership activated", func() { hpaName := "test-unmanaged-hpa-ownership" diff --git a/pkg/fallback/fallback.go b/pkg/fallback/fallback.go index ab2791e21f3..64915a069ad 100644 --- a/pkg/fallback/fallback.go +++ b/pkg/fallback/fallback.go @@ -18,12 +18,14 @@ package fallback import ( "context" + "fmt" "reflect" "strconv" v2 "k8s.io/api/autoscaling/v2" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/client-go/scale" "k8s.io/metrics/pkg/apis/external_metrics" runtimeclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -35,22 +37,8 @@ import ( var log = logf.Log.WithName("fallback") -func isFallbackEnabled(scaledObject *kedav1alpha1.ScaledObject, metricSpec v2.MetricSpec) bool { - if scaledObject.Spec.Fallback == nil { - return false - } - - // If we are using ScalingModifiers, we only care whether its metric type is AverageValue (or not not set -> default, which is AverageValue). - // If not, test the type of metricSpec passed. - if scaledObject.IsUsingModifiers() && (scaledObject.Spec.Advanced.ScalingModifiers.MetricType != v2.AverageValueMetricType && scaledObject.Spec.Advanced.ScalingModifiers.MetricType != "") { - log.V(0).Info("Fallback can only be enabled for scalingModifiers with metric of type AverageValue", "scaledObject.Namespace", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name, "scalingModifiers.MetricType", scaledObject.Spec.Advanced.ScalingModifiers.MetricType) - return false - } else if !scaledObject.IsUsingModifiers() && metricSpec.External.Target.Type != v2.AverageValueMetricType { - log.V(0).Info("Fallback can only be enabled for triggers with metric of type AverageValue", "scaledObject.Namespace", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name, "metricSpec.External.Target.Type", metricSpec.External.Target.Type) - return false - } - - return true +func isFallbackEnabled(scaledObject *kedav1alpha1.ScaledObject) bool { + return scaledObject.Spec.Fallback != nil } func GetMetricsWithFallback(ctx context.Context, client runtimeclient.Client, scaleClient scale.ScalesGetter, metrics []external_metrics.ExternalMetricValue, suppressedError error, metricName string, scaledObject *kedav1alpha1.ScaledObject, metricSpec v2.MetricSpec) ([]external_metrics.ExternalMetricValue, bool, error) { @@ -65,7 +53,7 @@ func GetMetricsWithFallback(ctx context.Context, client runtimeclient.Client, sc healthStatus.Status = kedav1alpha1.HealthStatusHappy status.Health[metricName] = *healthStatus - updateStatus(ctx, client, scaledObject, status, metricSpec) + updateStatus(ctx, client, scaledObject, status) return metrics, false, nil } @@ -74,10 +62,10 @@ func GetMetricsWithFallback(ctx context.Context, client runtimeclient.Client, sc *healthStatus.NumberOfFailures++ status.Health[metricName] = *healthStatus - updateStatus(ctx, client, scaledObject, status, metricSpec) + updateStatus(ctx, client, scaledObject, status) switch { - case !isFallbackEnabled(scaledObject, metricSpec): + case !isFallbackEnabled(scaledObject): return nil, false, suppressedError case !HasValidFallback(scaledObject): log.Info("Failed to validate ScaledObject Spec. Please check that parameters are positive integers", "scaledObject.Namespace", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name) @@ -92,7 +80,13 @@ func GetMetricsWithFallback(ctx context.Context, client runtimeclient.Client, sc return nil, false, suppressedError } } - return doFallback(scaledObject, metricSpec, metricName, currentReplicas, suppressedError), true, nil + + l := doFallback(ctx, client, scaledObject, metricSpec, metricName, currentReplicas, suppressedError) + if l == nil { + return l, false, fmt.Errorf("error performing fallback") + } + return l, true, nil + default: return nil, false, suppressedError } @@ -119,46 +113,110 @@ func HasValidFallback(scaledObject *kedav1alpha1.ScaledObject) bool { modifierChecking } -func doFallback(scaledObject *kedav1alpha1.ScaledObject, metricSpec v2.MetricSpec, metricName string, currentReplicas int32, suppressedError error) []external_metrics.ExternalMetricValue { +// This depends on the fact that Deployment, StatefulSet, ReplicaSet (and Argo Rollouts CRD) all have the status.readyReplicas +// field that we can use directly instead of getting their selector and listing all pods with that selector, the way HPA actually +// does it. Thus, we offset that overhead to the respective controller of the CRD. +// Any other CRD that doesn't have `.status.readyReplicas` won't support scaling on Value metrics. +func getReadyReplicasCount(ctx context.Context, client runtimeclient.Client, scaledObject *kedav1alpha1.ScaledObject) (int32, error) { + // Fetching the scaleTargetRef as an unstructured. + u := &unstructured.Unstructured{} + if scaledObject.Status.ScaleTargetGVKR == nil { + return 0, fmt.Errorf("scaledObject.Status.ScaleTargetGVKR is empty") + } + u.SetGroupVersionKind(scaledObject.Status.ScaleTargetGVKR.GroupVersionKind()) + + if err := client.Get(ctx, runtimeclient.ObjectKey{Namespace: scaledObject.Namespace, Name: scaledObject.Spec.ScaleTargetRef.Name}, u); err != nil { + return 0, fmt.Errorf("error getting scaleTargetRef: %w", err) + } + + readyReplicasField, found, err := unstructured.NestedFieldCopy(u.Object, "status", "readyReplicas") + if !found { + return 0, fmt.Errorf("error accessing status.readyReplicas in scaleTarget object: no such field exists") + } + if err != nil { + return 0, fmt.Errorf("error accessing status.readyReplicas in scaleTarget object: %w", err) + } + + v := reflect.ValueOf(readyReplicasField) + // This is probably impossible if the field is found, but just for extra guard. + if v.IsZero() { + return 0, fmt.Errorf("error accessing status.readyReplicas in scaleTarget object: field is nil") + } + + var readyReplicas int32 + // readyReplicas can be a signed or unsigned integer, otherwise return an error. + switch { + case v.CanInt(): + readyReplicas = int32(v.Int()) + case v.CanUint(): + readyReplicas = int32(v.Uint()) + default: + return 0, fmt.Errorf("unexpected type of status.readyReplicas in scaleTarget object, expected integer or unsigned integer, got: %v", reflect.TypeOf(readyReplicas)) + } + + // Guard against the case where readyReplicas=0, because we'll be dividing by it later. + if readyReplicas == 0 { + return 0, fmt.Errorf("status.readyReplicas is 0 in scaleTargetRef") + } + + return readyReplicas, nil +} + +func doFallback(ctx context.Context, client runtimeclient.Client, scaledObject *kedav1alpha1.ScaledObject, metricSpec v2.MetricSpec, metricName string, currentReplicas int32, suppressedError error) []external_metrics.ExternalMetricValue { fallbackBehavior := scaledObject.Spec.Fallback.Behavior fallbackReplicas := int64(scaledObject.Spec.Fallback.Replicas) - var replicas int64 + var replicas float64 switch fallbackBehavior { case kedav1alpha1.FallbackBehaviorStatic: - replicas = fallbackReplicas + replicas = float64(fallbackReplicas) case kedav1alpha1.FallbackBehaviorCurrentReplicas: - replicas = int64(currentReplicas) + replicas = float64(currentReplicas) case kedav1alpha1.FallbackBehaviorCurrentReplicasIfHigher: currentReplicasCount := int64(currentReplicas) if currentReplicasCount > fallbackReplicas { - replicas = currentReplicasCount + replicas = float64(currentReplicasCount) } else { - replicas = fallbackReplicas + replicas = float64(fallbackReplicas) } case kedav1alpha1.FallbackBehaviorCurrentReplicasIfLower: currentReplicasCount := int64(currentReplicas) if currentReplicasCount < fallbackReplicas { - replicas = currentReplicasCount + replicas = float64(currentReplicasCount) } else { - replicas = fallbackReplicas + replicas = float64(fallbackReplicas) } default: - replicas = fallbackReplicas + replicas = float64(fallbackReplicas) } - var normalisationValue int64 + // If the metricType is Value, we get the number of readyReplicas, and divide replicas by it. + if (!scaledObject.IsUsingModifiers() && metricSpec.External.Target.Type == v2.ValueMetricType) || + (scaledObject.IsUsingModifiers() && scaledObject.Spec.Advanced.ScalingModifiers.MetricType == v2.ValueMetricType) { + readyReplicas, err := getReadyReplicasCount(ctx, client, scaledObject) + if err != nil { + log.Error(err, "failed to do fallback for metric of type Value", "scaledObject.Namespace", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name, "metricName", metricName) + return nil + } + replicas /= float64(readyReplicas) + } + + var normalisationValue float64 if !scaledObject.IsUsingModifiers() { - normalisationValue = int64(metricSpec.External.Target.AverageValue.AsApproximateFloat64()) + if metricSpec.External.Target.Type == v2.AverageValueMetricType { + normalisationValue = metricSpec.External.Target.AverageValue.AsApproximateFloat64() + } else if metricSpec.External.Target.Type == v2.ValueMetricType { + normalisationValue = metricSpec.External.Target.Value.AsApproximateFloat64() + } } else { - value, _ := strconv.ParseInt(scaledObject.Spec.Advanced.ScalingModifiers.Target, 10, 64) + value, _ := strconv.ParseFloat(scaledObject.Spec.Advanced.ScalingModifiers.Target, 64) normalisationValue = value metricName = kedav1alpha1.CompositeMetricName } metric := external_metrics.ExternalMetricValue{ MetricName: metricName, - Value: *resource.NewMilliQuantity(normalisationValue*1000*replicas, resource.DecimalSI), + Value: *resource.NewMilliQuantity(int64(normalisationValue*1000*replicas), resource.DecimalSI), Timestamp: metav1.Now(), } fallbackMetrics := []external_metrics.ExternalMetricValue{metric} @@ -173,10 +231,10 @@ func doFallback(scaledObject *kedav1alpha1.ScaledObject, metricSpec v2.MetricSpe return fallbackMetrics } -func updateStatus(ctx context.Context, client runtimeclient.Client, scaledObject *kedav1alpha1.ScaledObject, status *kedav1alpha1.ScaledObjectStatus, metricSpec v2.MetricSpec) { +func updateStatus(ctx context.Context, client runtimeclient.Client, scaledObject *kedav1alpha1.ScaledObject, status *kedav1alpha1.ScaledObjectStatus) { patch := runtimeclient.MergeFrom(scaledObject.DeepCopy()) - if !isFallbackEnabled(scaledObject, metricSpec) || !HasValidFallback(scaledObject) { + if !isFallbackEnabled(scaledObject) || !HasValidFallback(scaledObject) { log.V(1).Info("Fallback is not enabled, hence skipping the health update to the scaledobject", "scaledObject.Namespace", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name) return } diff --git a/pkg/fallback/fallback_test.go b/pkg/fallback/fallback_test.go index a8558d74e28..2485fa56dc7 100644 --- a/pkg/fallback/fallback_test.go +++ b/pkg/fallback/fallback_test.go @@ -218,28 +218,6 @@ var _ = Describe("fallback", func() { Expect(so.Status.Health[metricName]).To(haveFailureAndStatus(4, kedav1alpha1.HealthStatusFailing)) }) - It("should behave as if fallback is disabled when the metrics spec target type is not average value metric", func() { - so := buildScaledObject( - &kedav1alpha1.Fallback{ - FailureThreshold: int32(3), - Replicas: int32(10), - }, nil, - ) - - qty := resource.NewQuantity(int64(3), resource.DecimalSI) - metricsSpec := v2.MetricSpec{ - External: &v2.ExternalMetricSource{ - Target: v2.MetricTarget{ - Type: v2.UtilizationMetricType, - Value: qty, - }, - }, - } - - isEnabled := isFallbackEnabled(so, metricsSpec) - Expect(isEnabled).Should(BeFalse()) - }) - It("should ignore error if we fail to update kubernetes status", func() { scaler.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Eq(metricName)).Return(nil, false, errors.New("some error")) startingNumberOfFailures := int32(3) diff --git a/tests/internals/fallback/fallback_test.go b/tests/internals/fallback/fallback_test.go index 7c78e2f11a5..0424d50dbb6 100644 --- a/tests/internals/fallback/fallback_test.go +++ b/tests/internals/fallback/fallback_test.go @@ -196,6 +196,41 @@ spec: name: {{.TriggerAuthName}} ` + scaledObjectTemplateWithTriggersOfValueType = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObject}} + namespace: {{.Namespace}} + labels: + app: {{.DeploymentName}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + minReplicaCount: {{.MinReplicas}} + maxReplicaCount: {{.MaxReplicas}} + fallback: + failureThreshold: 3 + replicas: {{.DefaultFallback}} + advanced: + horizontalPodAutoscalerConfig: + behavior: + scaleDown: + stabilizationWindowSeconds: 1 + cooldownPeriod: 1 + pollingInterval: 5 + triggers: + - type: metrics-api + metricType: Value + metadata: + targetValue: "5" + url: "{{.MetricsServerEndpoint}}" + valueLocation: 'value' + method: "query" + authenticationRef: + name: {{.TriggerAuthName}} +` + scaledObjectTemplateWithCurrentReplicasIfHigher = ` apiVersion: keda.sh/v1alpha1 kind: ScaledObject @@ -394,6 +429,31 @@ func TestFallback(t *testing.T) { DeleteKubernetesResources(t, namespace, data, templates) } +func TestFallbackForValueMetrics(t *testing.T) { + // setup + t.Log("--- setting up ---") + kc := GetKubernetesClient(t) + data, templates := getTemplateData() + + // Replace the default scaledObject template + for i, tmpl := range templates { + if tmpl.Name == "scaledObjectTemplate" { + templates[i].Config = scaledObjectTemplateWithTriggersOfValueType + break + } + } + + CreateKubernetesResources(t, kc, namespace, data, templates) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, namespace, minReplicas, 180, 3), + "replica count should be %d after 3 minutes", minReplicas) + testScaleOut(t, kc, data) + testFallback(t, kc, data) + testRestoreAfterFallback(t, kc, data) + + DeleteKubernetesResources(t, namespace, data, templates) +} + func TestFallbackWithCurrentReplicasIfHigher(t *testing.T) { // setup t.Log("--- setting up CurrentReplicasIfHigher test ---")