Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ New deprecation(s):

- **General**: Add error and event for mismatching input property ([#6721](https://github.com/kedacore/keda/issues/6721))
- **General**: Allow excluding labels from being propagated from ScaledObject and ScaledJob to generated HPA and Job objects ([#6849](https://github.com/kedacore/keda/issues/6849))
- **General**: Only add webhook DNS names when webhook patching is enabled ([#7002](https://github.com/kedacore/keda/issues/7002))
- **General**: Improve Events emitted from ScaledObject controller ([#6802](https://github.com/kedacore/keda/issues/6802))
- **General**: Only add webhook DNS names when webhook patching is enabled ([#7002](https://github.com/kedacore/keda/issues/7002))
- **Apache Kafka Scaler**: Add support for even distribution of partitions to consumers ([#2581](https://github.com/kedacore/keda/issues/2581))
- **Artemis Scaler**: Add TLS support with client certificates for secure HTTPS connections ([#6448](https://github.com/kedacore/keda/issues/6448))
- **AWS CloudWatch Scaler**: Add support for CloudWatch extended statistics (e.g P99 / TM90 and etc) ([#7109](https://github.com/kedacore/keda/issues/7109))
Expand All @@ -128,13 +128,15 @@ New deprecation(s):
- **Github Scaler**: Add support to control unlabeled job/runner matching ([#6900](https://github.com/kedacore/keda/issues/6900))
- **InfluxDB Scaler**: Add support for InfluxDB v3 ([#6981](https://github.com/kedacore/keda/issues/6981))
- **Kafka Scaler**: Add support for even distribution of partitions to consumers ([#2581](https://github.com/kedacore/keda/issues/2581))
- **Metrics API scaler**: Introduce new `aggregateFromKubeServiceEndpoints` and `aggregationType` metadata fields to `metrics-api` so it is able to fetch metrics from all endpoints behind a kubernetes service and aggregate them ([#6565](https://github.com/kedacore/keda/pull/6565))
- **Metrics API Scaler**: Support AuthParams for authMode ([#6939](https://github.com/kedacore/keda/issues/6939))
- **Metrics API Scaler**: Support multiple auth methods simultaneously ([#6642](https://github.com/kedacore/keda/issues/6642))
- **RabbitMQ Scaler**: add `DeliverGetRate`, `PublishedToDeliveredRatio` and `ExpectedQueueConsumptionTime` trigger modes to RabbitMQ scaler ([#7071](https://github.com/kedacore/keda/issues/7071))
- **Solace Scaler**: Add hostlist support for Solace brokers ([#7090](https://github.com/kedacore/keda/issues/7090))
- **Temporal Scaler**: Always set `temporal-namespace` header on requests([#7079](https://github.com/kedacore/keda/issues/7079))
- **Temporal Scaler**: Support custom `tlsServerName` ([#6820](https://github.com/kedacore/keda/pull/6820))


### Fixes

- **General**: Add missing `omitempty` json tags in the AuthPodIdentity struct ([#6779](https://github.com/kedacore/keda/issues/6779))
Expand Down
8 changes: 8 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,14 @@ rules:
- patch
- update
- watch
- apiGroups:
- discovery.k8s.io
resources:
- endpointslices
verbs:
- get
- list
- watch
- apiGroups:
- eventing.keda.sh
resources:
Expand Down
1 change: 1 addition & 0 deletions controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
// +kubebuilder:rbac:groups=keda.sh,resources=scaledobjects;scaledobjects/finalizers;scaledobjects/status,verbs=get;list;watch;update;patch
// +kubebuilder:rbac:groups=autoscaling,resources=horizontalpodautoscalers,verbs=get;list;watch;update;patch;create;delete
// +kubebuilder:rbac:groups="",resources=configmaps;configmaps/status,verbs=get;list;watch
// +kubebuilder:rbac:groups="discovery.k8s.io",resources=endpointslices,verbs=get;list;watch
// +kubebuilder:rbac:groups="",resources=events,verbs=create;patch
// +kubebuilder:rbac:groups="",resources=pods;services;services;secrets;external,verbs=get;list;watch
// +kubebuilder:rbac:groups="*",resources="*/scale",verbs=get;list;watch;update;patch
Expand Down
198 changes: 185 additions & 13 deletions pkg/scalers/metrics_api_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,23 @@ import (
"io"
"net/http"
neturl "net/url"
"slices"
"strconv"
"strings"
"sync"

"github.com/go-logr/logr"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/prometheus/promql/parser"
"github.com/tidwall/gjson"
"golang.org/x/sync/semaphore"
"gopkg.in/yaml.v3"
v2 "k8s.io/api/autoscaling/v2"
discoveryV1 "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/kedacore/keda/v2/pkg/scalers/authentication"
"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
Expand All @@ -29,16 +36,18 @@ type metricsAPIScaler struct {
metadata *metricsAPIScalerMetadata
httpClient *http.Client
logger logr.Logger
kubeClient client.Client
}

type metricsAPIScalerMetadata struct {
TargetValue float64 `keda:"name=targetValue,order=triggerMetadata,optional"`
ActivationTargetValue float64 `keda:"name=activationTargetValue,order=triggerMetadata,default=0"`
URL string `keda:"name=url,order=triggerMetadata"`
Format APIFormat `keda:"name=format,order=triggerMetadata,default=json,enum=prometheus;json;xml;yaml"`
ValueLocation string `keda:"name=valueLocation,order=triggerMetadata"`
UnsafeSsl bool `keda:"name=unsafeSsl,order=triggerMetadata,default=false"`

TargetValue float64 `keda:"name=targetValue,order=triggerMetadata,optional"`
ActivationTargetValue float64 `keda:"name=activationTargetValue,order=triggerMetadata,default=0"`
URL string `keda:"name=url,order=triggerMetadata"`
Format APIFormat `keda:"name=format,order=triggerMetadata,default=json,enum=prometheus;json;xml;yaml"`
ValueLocation string `keda:"name=valueLocation,order=triggerMetadata"`
UnsafeSsl bool `keda:"name=unsafeSsl,order=triggerMetadata,default=false"`
AggregateFromKubeServiceEndpoints bool `keda:"name=aggregateFromKubeServiceEndpoints,order=triggerMetadata,default=false"`
AggregationType AggregationType `keda:"name=aggregationType,order=triggerMetadata,default=average,enum=average;sum;max;min"`
// Authentication parameters for connecting to the metrics API
MetricsAPIAuth *authentication.Config `keda:"optional"`

Expand All @@ -50,6 +59,8 @@ const (
valueLocationWrongErrorMsg = "valueLocation must point to value of type number or a string representing a Quantity got: '%s'"
)

const secureHTTPScheme = "https"

type APIFormat string

// Options for APIFormat:
Expand All @@ -60,8 +71,18 @@ const (
YAMLFormat APIFormat = "yaml"
)

type AggregationType string

// Options for APIFormat:
const (
AverageAggregationType AggregationType = "average"
SumAggregationType AggregationType = "sum"
MaxAggregationType AggregationType = "max"
MinAggregationType AggregationType = "min"
)

// NewMetricsAPIScaler creates a new HTTP scaler
func NewMetricsAPIScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
func NewMetricsAPIScaler(config *scalersconfig.ScalerConfig, kubeClient client.Client) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
if err != nil {
return nil, fmt.Errorf("error getting scaler metric type: %w", err)
Expand All @@ -87,6 +108,7 @@ func NewMetricsAPIScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
metricType: metricType,
metadata: meta,
httpClient: httpClient,
kubeClient: kubeClient,
logger: InitializeLogger(config, "metrics_api_scaler"),
}, nil
}
Expand Down Expand Up @@ -278,8 +300,156 @@ func getValueFromYAMLResponse(body []byte, valueLocation string) (float64, error
}
}

func (s *metricsAPIScaler) getEndpointsUrlsFromServiceURL(ctx context.Context, serviceURL string) (endpointUrls []string, err error) {
// parse service name from s.meta.url
url, err := neturl.Parse(serviceURL)
if err != nil {
s.logger.Error(err, "Failed parsing url for metrics API")
return nil, err
}

splittedHost := strings.Split(url.Host, ".")
if len(splittedHost) < 2 {
return nil, fmt.Errorf("invalid hostname %s : expected at least 2 elements, first being service name and second being the namespace", url.Host)
}
serviceName := splittedHost[0]
namespace := splittedHost[1]
podPort := url.Port()
// infer port from service scheme when not set explicitly
if podPort == "" {
if url.Scheme == secureHTTPScheme {
podPort = "443"
} else {
podPort = "80"
}
}
// get service serviceEndpointsSlices
serviceEndpointsSlices := &discoveryV1.EndpointSliceList{}
serviceNameSelector := labels.NewSelector()
serviceNameSelector.Matches(labels.Set(map[string]string{
discoveryV1.LabelServiceName: serviceName,
}))
err = s.kubeClient.List(ctx, serviceEndpointsSlices, &client.ListOptions{
LabelSelector: serviceNameSelector,
Namespace: namespace,
})
if err != nil {
return nil, err
}
var uniqueAddresses []string
for _, endpointSlice := range serviceEndpointsSlices.Items {
for _, eps := range endpointSlice.Endpoints {
// as suggested in https://github.com/kedacore/keda/pull/6565#discussion_r2395073047, make sure we take endpoint into account
// only when it's ready
if eps.Conditions.Ready != nil && !*eps.Conditions.Ready {
continue
}
for _, address := range eps.Addresses {
// deduplicate addresses as suggested in https://github.com/kedacore/keda/pull/6565#discussion_r2395073047
// because it's not guaranteed by Kubernetes that an endpoint IP address will have a unique representation
// see https://kubernetes.io/docs/concepts/services-networking/endpoint-slices/#duplicate-endpoints
if slices.Contains(uniqueAddresses, address) {
continue
}
uniqueAddresses = append(uniqueAddresses, address)

foundPort := ""
for _, port := range endpointSlice.Ports {
if port.Port != nil && strconv.Itoa(int(*port.Port)) == podPort {
foundPort = fmt.Sprintf(":%d", *port.Port)
break
}
}
if foundPort == "" {
s.logger.V(1).Info(fmt.Sprintf("Warning : could not find port %s in endpoint slice for service %s.%s definition. Will infer port from %s scheme", podPort, serviceName, namespace, url.Scheme))
}
endpointUrls = append(endpointUrls, fmt.Sprintf("%s://%s%s%s", url.Scheme, address, foundPort, url.Path))
}
}
}
return endpointUrls, err
}

func (s *metricsAPIScaler) getMetricValue(ctx context.Context) (float64, error) {
request, err := getMetricAPIServerRequest(ctx, s.metadata)
// if we wish to aggregate metric from a kubernetes service then we need to query each endpoint behind the service
if s.metadata.AggregateFromKubeServiceEndpoints {
endpointsUrls, err := s.getEndpointsUrlsFromServiceURL(ctx, s.metadata.URL)
if err != nil {
return 0, fmt.Errorf("failed to get kubernetes endpoints urls from configured service URL")
}
if len(endpointsUrls) == 0 {
return 0, fmt.Errorf("no endpoints URLs were given for the service name")
}
return s.aggregateMetricsFromMultipleEndpoints(ctx, endpointsUrls)
}
// get single/unaggregated metric
metric, err := s.getMetricValueFromURL(ctx, nil)
if err == nil {
s.logger.V(1).Info(fmt.Sprintf("fetched single metric from metrics API url : %s. Value is %v\n", s.metadata.URL, metric))
}
return metric, err
}

func (s *metricsAPIScaler) aggregateMetricsFromMultipleEndpoints(ctx context.Context, endpointsUrls []string) (float64, error) {
// call s.getMetricValueFromURL() for each endpointsUrls in parallel goroutines (maximum 5 at a time) and sum them up
const maxGoroutines = 5
var mu sync.Mutex
sem := semaphore.NewWeighted(maxGoroutines)
expectedNbMetrics := len(endpointsUrls)
nbErrors := 0
var err error
var firstMetricEncountered bool
var aggregation float64
for _, endpointURL := range endpointsUrls {
if err := sem.Acquire(ctx, 1); err != nil {
s.logger.Error(err, "Failed to acquire semaphore")
continue
}
go func(url string) {
defer sem.Release(1)
metric, err := s.getMetricValueFromURL(ctx, &endpointURL)

if err != nil {
s.logger.V(1).Info(fmt.Sprintf("Error fetching metric for %s: %v\n", url, err))
// we will ignore metric for computing aggregation when encountering error : decrease expectedNbMetrics
mu.Lock()
expectedNbMetrics--
nbErrors++
mu.Unlock()
} else {
mu.Lock()
switch s.metadata.AggregationType {
case MinAggregationType:
if !firstMetricEncountered || metric < aggregation {
firstMetricEncountered = true
aggregation = metric
}
case MaxAggregationType:
if !firstMetricEncountered || metric > aggregation {
firstMetricEncountered = true
aggregation = metric
}
default:
// sum metrics if we are not looking for min or max value
aggregation += metric
}
mu.Unlock()
}
}(endpointURL)
}

if nbErrors > 0 && nbErrors == len(endpointsUrls) {
err = fmt.Errorf("could not get any metric successfully from the %d provided endpoints", len(endpointsUrls))
}
if s.metadata.AggregationType == AverageAggregationType {
aggregation /= float64(expectedNbMetrics)
}
s.logger.V(1).Info(fmt.Sprintf("fetched %d metrics out of %d endpoints from kubernetes service : %s is %v\n", expectedNbMetrics, len(endpointsUrls), s.metadata.AggregationType, aggregation))
return aggregation, err
}

func (s *metricsAPIScaler) getMetricValueFromURL(ctx context.Context, url *string) (float64, error) {
request, err := getMetricAPIServerRequest(ctx, s.metadata, url)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -340,12 +510,14 @@ func (s *metricsAPIScaler) GetMetricsAndActivity(ctx context.Context, metricName
return []external_metrics.ExternalMetricValue{metric}, val > s.metadata.ActivationTargetValue, nil
}

func getMetricAPIServerRequest(ctx context.Context, meta *metricsAPIScalerMetadata) (*http.Request, error) {
func getMetricAPIServerRequest(ctx context.Context, meta *metricsAPIScalerMetadata, url *string) (*http.Request, error) {
var requestURL string

if url == nil {
url = &meta.URL
}
// Handle API Key as query parameter if needed
if meta.MetricsAPIAuth != nil && meta.MetricsAPIAuth.EnabledAPIKeyAuth() && meta.MetricsAPIAuth.Method == methodValueQuery {
url, _ := neturl.Parse(meta.URL)
url, _ := neturl.Parse(*url)
queryString := url.Query()
if meta.MetricsAPIAuth.KeyParamName == "" {
queryString.Set("api_key", meta.MetricsAPIAuth.APIKey)
Expand All @@ -355,7 +527,7 @@ func getMetricAPIServerRequest(ctx context.Context, meta *metricsAPIScalerMetada
url.RawQuery = queryString.Encode()
requestURL = url.String()
} else {
requestURL = meta.URL
requestURL = *url
}

// Create the request
Expand Down
30 changes: 13 additions & 17 deletions pkg/scalers/metrics_api_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,13 @@ var metricsAPIMetricIdentifiers = []metricsAPIMetricIdentifier{

func TestMetricsAPIGetMetricSpecForScaling(t *testing.T) {
for _, testData := range metricsAPIMetricIdentifiers {
s, err := NewMetricsAPIScaler(
&scalersconfig.ScalerConfig{
ResolvedEnv: map[string]string{},
TriggerMetadata: testData.metadataTestData.metadata,
AuthParams: map[string]string{},
GlobalHTTPTimeout: 3000 * time.Millisecond,
TriggerIndex: testData.triggerIndex,
},
)
s, err := NewMetricsAPIScaler(&scalersconfig.ScalerConfig{
ResolvedEnv: map[string]string{},
TriggerMetadata: testData.metadataTestData.metadata,
AuthParams: map[string]string{},
GlobalHTTPTimeout: 3000 * time.Millisecond,
TriggerIndex: testData.triggerIndex,
}, nil)
if err != nil {
t.Errorf("Error creating the Scaler")
}
Expand Down Expand Up @@ -215,14 +213,12 @@ func TestBearerAuth(t *testing.T) {
"authMode": "bearer",
}

s, err := NewMetricsAPIScaler(
&scalersconfig.ScalerConfig{
ResolvedEnv: map[string]string{},
TriggerMetadata: metadata,
AuthParams: authentication,
GlobalHTTPTimeout: 3000 * time.Millisecond,
},
)
s, err := NewMetricsAPIScaler(&scalersconfig.ScalerConfig{
ResolvedEnv: map[string]string{},
TriggerMetadata: metadata,
AuthParams: authentication,
GlobalHTTPTimeout: 3000 * time.Millisecond,
}, nil)
if err != nil {
t.Errorf("Error creating the Scaler")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scaling/scalers_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string,
case "memory":
return scalers.NewCPUMemoryScaler(corev1.ResourceMemory, config)
case "metrics-api":
return scalers.NewMetricsAPIScaler(config)
return scalers.NewMetricsAPIScaler(config, client)
case "mongodb":
return scalers.NewMongoDBScaler(ctx, config)
case "mssql":
Expand Down
18 changes: 18 additions & 0 deletions schema/generated/scalers-metadata-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -2401,6 +2401,24 @@
"type": "string",
"default": "false",
"metadataVariableReadable": true
},
{
"name": "aggregateFromKubeServiceEndpoints",
"type": "string",
"default": "false",
"metadataVariableReadable": true
},
{
"name": "aggregationType",
"type": "string",
"default": "average",
"allowedValue": [
"average",
"sum",
"max",
"min"
],
"metadataVariableReadable": true
}
]
},
Expand Down
Loading
Loading