Skip to content

Commit ce3641b

Browse files
Feat : Add aggregation from kube service endpoints feature in metrics API scaler (kedacore#6565)
* - Add aggregation from kube service endpoints feature in metrics API scaler - make generate-scalers-schema Signed-off-by: julian GUINARD <[email protected]> * apply suggestions : - kedacore#6565 (comment) - kedacore#6565 (comment) Signed-off-by: julian GUINARD <[email protected]> * Update pkg/scalers/metrics_api_scaler.go Co-authored-by: Jan Wozniak <[email protected]> Signed-off-by: julianguinard <[email protected]> --------- Signed-off-by: julian GUINARD <[email protected]> Signed-off-by: julianguinard <[email protected]> Co-authored-by: Jan Wozniak <[email protected]>
1 parent 57ca8ca commit ce3641b

File tree

9 files changed

+399
-74
lines changed

9 files changed

+399
-74
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,13 +128,15 @@ New deprecation(s):
128128
- **Github Scaler**: Add support to control unlabeled job/runner matching ([#6900](https://github.com/kedacore/keda/issues/6900))
129129
- **InfluxDB Scaler**: Add support for InfluxDB v3 ([#6981](https://github.com/kedacore/keda/issues/6981))
130130
- **Kafka Scaler**: Add support for even distribution of partitions to consumers ([#2581](https://github.com/kedacore/keda/issues/2581))
131+
- **Metrics API scaler**: Introduce new `aggregateFromKubeServiceEndpoints` and `aggregationType` metadata fields to `metrics-api` so it is able to fetch metrics from all endpoints behind a kubernetes service and aggregate them ([#6565](https://github.com/kedacore/keda/pull/6565))
131132
- **Metrics API Scaler**: Support AuthParams for authMode ([#6939](https://github.com/kedacore/keda/issues/6939))
132133
- **Metrics API Scaler**: Support multiple auth methods simultaneously ([#6642](https://github.com/kedacore/keda/issues/6642))
133134
- **RabbitMQ Scaler**: add `DeliverGetRate`, `PublishedToDeliveredRatio` and `ExpectedQueueConsumptionTime` trigger modes to RabbitMQ scaler ([#7071](https://github.com/kedacore/keda/issues/7071))
134135
- **Solace Scaler**: Add hostlist support for Solace brokers ([#7090](https://github.com/kedacore/keda/issues/7090))
135136
- **Temporal Scaler**: Always set `temporal-namespace` header on requests([#7079](https://github.com/kedacore/keda/issues/7079))
136137
- **Temporal Scaler**: Support custom `tlsServerName` ([#6820](https://github.com/kedacore/keda/pull/6820))
137138

139+
138140
### Fixes
139141

140142
- **General**: Add missing `omitempty` json tags in the AuthPodIdentity struct ([#6779](https://github.com/kedacore/keda/issues/6779))

config/rbac/role.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,14 @@ rules:
100100
- patch
101101
- update
102102
- watch
103+
- apiGroups:
104+
- discovery.k8s.io
105+
resources:
106+
- endpointslices
107+
verbs:
108+
- get
109+
- list
110+
- watch
103111
- apiGroups:
104112
- eventing.keda.sh
105113
resources:

controllers/keda/scaledobject_controller.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ import (
5757
// +kubebuilder:rbac:groups=keda.sh,resources=scaledobjects;scaledobjects/finalizers;scaledobjects/status,verbs=get;list;watch;update;patch
5858
// +kubebuilder:rbac:groups=autoscaling,resources=horizontalpodautoscalers,verbs=get;list;watch;update;patch;create;delete
5959
// +kubebuilder:rbac:groups="",resources=configmaps;configmaps/status,verbs=get;list;watch
60+
// +kubebuilder:rbac:groups="discovery.k8s.io",resources=endpointslices,verbs=get;list;watch
6061
// +kubebuilder:rbac:groups="",resources=events,verbs=create;patch
6162
// +kubebuilder:rbac:groups="",resources=pods;services;services;secrets;external,verbs=get;list;watch
6263
// +kubebuilder:rbac:groups="*",resources="*/scale",verbs=get;list;watch;update;patch

pkg/scalers/metrics_api_scaler.go

Lines changed: 185 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,23 @@ import (
88
"io"
99
"net/http"
1010
neturl "net/url"
11+
"slices"
12+
"strconv"
1113
"strings"
14+
"sync"
1215

1316
"github.com/go-logr/logr"
1417
"github.com/prometheus/common/expfmt"
1518
"github.com/prometheus/prometheus/promql/parser"
1619
"github.com/tidwall/gjson"
20+
"golang.org/x/sync/semaphore"
1721
"gopkg.in/yaml.v3"
1822
v2 "k8s.io/api/autoscaling/v2"
23+
discoveryV1 "k8s.io/api/discovery/v1"
1924
"k8s.io/apimachinery/pkg/api/resource"
25+
"k8s.io/apimachinery/pkg/labels"
2026
"k8s.io/metrics/pkg/apis/external_metrics"
27+
"sigs.k8s.io/controller-runtime/pkg/client"
2128

2229
"github.com/kedacore/keda/v2/pkg/scalers/authentication"
2330
"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
@@ -29,16 +36,18 @@ type metricsAPIScaler struct {
2936
metadata *metricsAPIScalerMetadata
3037
httpClient *http.Client
3138
logger logr.Logger
39+
kubeClient client.Client
3240
}
3341

3442
type metricsAPIScalerMetadata struct {
35-
TargetValue float64 `keda:"name=targetValue,order=triggerMetadata,optional"`
36-
ActivationTargetValue float64 `keda:"name=activationTargetValue,order=triggerMetadata,default=0"`
37-
URL string `keda:"name=url,order=triggerMetadata"`
38-
Format APIFormat `keda:"name=format,order=triggerMetadata,default=json,enum=prometheus;json;xml;yaml"`
39-
ValueLocation string `keda:"name=valueLocation,order=triggerMetadata"`
40-
UnsafeSsl bool `keda:"name=unsafeSsl,order=triggerMetadata,default=false"`
41-
43+
TargetValue float64 `keda:"name=targetValue,order=triggerMetadata,optional"`
44+
ActivationTargetValue float64 `keda:"name=activationTargetValue,order=triggerMetadata,default=0"`
45+
URL string `keda:"name=url,order=triggerMetadata"`
46+
Format APIFormat `keda:"name=format,order=triggerMetadata,default=json,enum=prometheus;json;xml;yaml"`
47+
ValueLocation string `keda:"name=valueLocation,order=triggerMetadata"`
48+
UnsafeSsl bool `keda:"name=unsafeSsl,order=triggerMetadata,default=false"`
49+
AggregateFromKubeServiceEndpoints bool `keda:"name=aggregateFromKubeServiceEndpoints,order=triggerMetadata,default=false"`
50+
AggregationType AggregationType `keda:"name=aggregationType,order=triggerMetadata,default=average,enum=average;sum;max;min"`
4251
// Authentication parameters for connecting to the metrics API
4352
MetricsAPIAuth *authentication.Config `keda:"optional"`
4453

@@ -50,6 +59,8 @@ const (
5059
valueLocationWrongErrorMsg = "valueLocation must point to value of type number or a string representing a Quantity got: '%s'"
5160
)
5261

62+
const secureHTTPScheme = "https"
63+
5364
type APIFormat string
5465

5566
// Options for APIFormat:
@@ -60,8 +71,18 @@ const (
6071
YAMLFormat APIFormat = "yaml"
6172
)
6273

74+
type AggregationType string
75+
76+
// Options for APIFormat:
77+
const (
78+
AverageAggregationType AggregationType = "average"
79+
SumAggregationType AggregationType = "sum"
80+
MaxAggregationType AggregationType = "max"
81+
MinAggregationType AggregationType = "min"
82+
)
83+
6384
// NewMetricsAPIScaler creates a new HTTP scaler
64-
func NewMetricsAPIScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
85+
func NewMetricsAPIScaler(config *scalersconfig.ScalerConfig, kubeClient client.Client) (Scaler, error) {
6586
metricType, err := GetMetricTargetType(config)
6687
if err != nil {
6788
return nil, fmt.Errorf("error getting scaler metric type: %w", err)
@@ -87,6 +108,7 @@ func NewMetricsAPIScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
87108
metricType: metricType,
88109
metadata: meta,
89110
httpClient: httpClient,
111+
kubeClient: kubeClient,
90112
logger: InitializeLogger(config, "metrics_api_scaler"),
91113
}, nil
92114
}
@@ -278,8 +300,156 @@ func getValueFromYAMLResponse(body []byte, valueLocation string) (float64, error
278300
}
279301
}
280302

303+
func (s *metricsAPIScaler) getEndpointsUrlsFromServiceURL(ctx context.Context, serviceURL string) (endpointUrls []string, err error) {
304+
// parse service name from s.meta.url
305+
url, err := neturl.Parse(serviceURL)
306+
if err != nil {
307+
s.logger.Error(err, "Failed parsing url for metrics API")
308+
return nil, err
309+
}
310+
311+
splittedHost := strings.Split(url.Host, ".")
312+
if len(splittedHost) < 2 {
313+
return nil, fmt.Errorf("invalid hostname %s : expected at least 2 elements, first being service name and second being the namespace", url.Host)
314+
}
315+
serviceName := splittedHost[0]
316+
namespace := splittedHost[1]
317+
podPort := url.Port()
318+
// infer port from service scheme when not set explicitly
319+
if podPort == "" {
320+
if url.Scheme == secureHTTPScheme {
321+
podPort = "443"
322+
} else {
323+
podPort = "80"
324+
}
325+
}
326+
// get service serviceEndpointsSlices
327+
serviceEndpointsSlices := &discoveryV1.EndpointSliceList{}
328+
serviceNameSelector := labels.NewSelector()
329+
serviceNameSelector.Matches(labels.Set(map[string]string{
330+
discoveryV1.LabelServiceName: serviceName,
331+
}))
332+
err = s.kubeClient.List(ctx, serviceEndpointsSlices, &client.ListOptions{
333+
LabelSelector: serviceNameSelector,
334+
Namespace: namespace,
335+
})
336+
if err != nil {
337+
return nil, err
338+
}
339+
var uniqueAddresses []string
340+
for _, endpointSlice := range serviceEndpointsSlices.Items {
341+
for _, eps := range endpointSlice.Endpoints {
342+
// as suggested in https://github.com/kedacore/keda/pull/6565#discussion_r2395073047, make sure we take endpoint into account
343+
// only when it's ready
344+
if eps.Conditions.Ready != nil && !*eps.Conditions.Ready {
345+
continue
346+
}
347+
for _, address := range eps.Addresses {
348+
// deduplicate addresses as suggested in https://github.com/kedacore/keda/pull/6565#discussion_r2395073047
349+
// because it's not guaranteed by Kubernetes that an endpoint IP address will have a unique representation
350+
// see https://kubernetes.io/docs/concepts/services-networking/endpoint-slices/#duplicate-endpoints
351+
if slices.Contains(uniqueAddresses, address) {
352+
continue
353+
}
354+
uniqueAddresses = append(uniqueAddresses, address)
355+
356+
foundPort := ""
357+
for _, port := range endpointSlice.Ports {
358+
if port.Port != nil && strconv.Itoa(int(*port.Port)) == podPort {
359+
foundPort = fmt.Sprintf(":%d", *port.Port)
360+
break
361+
}
362+
}
363+
if foundPort == "" {
364+
s.logger.V(1).Info(fmt.Sprintf("Warning : could not find port %s in endpoint slice for service %s.%s definition. Will infer port from %s scheme", podPort, serviceName, namespace, url.Scheme))
365+
}
366+
endpointUrls = append(endpointUrls, fmt.Sprintf("%s://%s%s%s", url.Scheme, address, foundPort, url.Path))
367+
}
368+
}
369+
}
370+
return endpointUrls, err
371+
}
372+
281373
func (s *metricsAPIScaler) getMetricValue(ctx context.Context) (float64, error) {
282-
request, err := getMetricAPIServerRequest(ctx, s.metadata)
374+
// if we wish to aggregate metric from a kubernetes service then we need to query each endpoint behind the service
375+
if s.metadata.AggregateFromKubeServiceEndpoints {
376+
endpointsUrls, err := s.getEndpointsUrlsFromServiceURL(ctx, s.metadata.URL)
377+
if err != nil {
378+
return 0, fmt.Errorf("failed to get kubernetes endpoints urls from configured service URL")
379+
}
380+
if len(endpointsUrls) == 0 {
381+
return 0, fmt.Errorf("no endpoints URLs were given for the service name")
382+
}
383+
return s.aggregateMetricsFromMultipleEndpoints(ctx, endpointsUrls)
384+
}
385+
// get single/unaggregated metric
386+
metric, err := s.getMetricValueFromURL(ctx, nil)
387+
if err == nil {
388+
s.logger.V(1).Info(fmt.Sprintf("fetched single metric from metrics API url : %s. Value is %v\n", s.metadata.URL, metric))
389+
}
390+
return metric, err
391+
}
392+
393+
func (s *metricsAPIScaler) aggregateMetricsFromMultipleEndpoints(ctx context.Context, endpointsUrls []string) (float64, error) {
394+
// call s.getMetricValueFromURL() for each endpointsUrls in parallel goroutines (maximum 5 at a time) and sum them up
395+
const maxGoroutines = 5
396+
var mu sync.Mutex
397+
sem := semaphore.NewWeighted(maxGoroutines)
398+
expectedNbMetrics := len(endpointsUrls)
399+
nbErrors := 0
400+
var err error
401+
var firstMetricEncountered bool
402+
var aggregation float64
403+
for _, endpointURL := range endpointsUrls {
404+
if err := sem.Acquire(ctx, 1); err != nil {
405+
s.logger.Error(err, "Failed to acquire semaphore")
406+
continue
407+
}
408+
go func(url string) {
409+
defer sem.Release(1)
410+
metric, err := s.getMetricValueFromURL(ctx, &endpointURL)
411+
412+
if err != nil {
413+
s.logger.V(1).Info(fmt.Sprintf("Error fetching metric for %s: %v\n", url, err))
414+
// we will ignore metric for computing aggregation when encountering error : decrease expectedNbMetrics
415+
mu.Lock()
416+
expectedNbMetrics--
417+
nbErrors++
418+
mu.Unlock()
419+
} else {
420+
mu.Lock()
421+
switch s.metadata.AggregationType {
422+
case MinAggregationType:
423+
if !firstMetricEncountered || metric < aggregation {
424+
firstMetricEncountered = true
425+
aggregation = metric
426+
}
427+
case MaxAggregationType:
428+
if !firstMetricEncountered || metric > aggregation {
429+
firstMetricEncountered = true
430+
aggregation = metric
431+
}
432+
default:
433+
// sum metrics if we are not looking for min or max value
434+
aggregation += metric
435+
}
436+
mu.Unlock()
437+
}
438+
}(endpointURL)
439+
}
440+
441+
if nbErrors > 0 && nbErrors == len(endpointsUrls) {
442+
err = fmt.Errorf("could not get any metric successfully from the %d provided endpoints", len(endpointsUrls))
443+
}
444+
if s.metadata.AggregationType == AverageAggregationType {
445+
aggregation /= float64(expectedNbMetrics)
446+
}
447+
s.logger.V(1).Info(fmt.Sprintf("fetched %d metrics out of %d endpoints from kubernetes service : %s is %v\n", expectedNbMetrics, len(endpointsUrls), s.metadata.AggregationType, aggregation))
448+
return aggregation, err
449+
}
450+
451+
func (s *metricsAPIScaler) getMetricValueFromURL(ctx context.Context, url *string) (float64, error) {
452+
request, err := getMetricAPIServerRequest(ctx, s.metadata, url)
283453
if err != nil {
284454
return 0, err
285455
}
@@ -340,12 +510,14 @@ func (s *metricsAPIScaler) GetMetricsAndActivity(ctx context.Context, metricName
340510
return []external_metrics.ExternalMetricValue{metric}, val > s.metadata.ActivationTargetValue, nil
341511
}
342512

343-
func getMetricAPIServerRequest(ctx context.Context, meta *metricsAPIScalerMetadata) (*http.Request, error) {
513+
func getMetricAPIServerRequest(ctx context.Context, meta *metricsAPIScalerMetadata, url *string) (*http.Request, error) {
344514
var requestURL string
345-
515+
if url == nil {
516+
url = &meta.URL
517+
}
346518
// Handle API Key as query parameter if needed
347519
if meta.MetricsAPIAuth != nil && meta.MetricsAPIAuth.EnabledAPIKeyAuth() && meta.MetricsAPIAuth.Method == methodValueQuery {
348-
url, _ := neturl.Parse(meta.URL)
520+
url, _ := neturl.Parse(*url)
349521
queryString := url.Query()
350522
if meta.MetricsAPIAuth.KeyParamName == "" {
351523
queryString.Set("api_key", meta.MetricsAPIAuth.APIKey)
@@ -355,7 +527,7 @@ func getMetricAPIServerRequest(ctx context.Context, meta *metricsAPIScalerMetada
355527
url.RawQuery = queryString.Encode()
356528
requestURL = url.String()
357529
} else {
358-
requestURL = meta.URL
530+
requestURL = *url
359531
}
360532

361533
// Create the request

pkg/scalers/metrics_api_scaler_test.go

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -103,15 +103,13 @@ var metricsAPIMetricIdentifiers = []metricsAPIMetricIdentifier{
103103

104104
func TestMetricsAPIGetMetricSpecForScaling(t *testing.T) {
105105
for _, testData := range metricsAPIMetricIdentifiers {
106-
s, err := NewMetricsAPIScaler(
107-
&scalersconfig.ScalerConfig{
108-
ResolvedEnv: map[string]string{},
109-
TriggerMetadata: testData.metadataTestData.metadata,
110-
AuthParams: map[string]string{},
111-
GlobalHTTPTimeout: 3000 * time.Millisecond,
112-
TriggerIndex: testData.triggerIndex,
113-
},
114-
)
106+
s, err := NewMetricsAPIScaler(&scalersconfig.ScalerConfig{
107+
ResolvedEnv: map[string]string{},
108+
TriggerMetadata: testData.metadataTestData.metadata,
109+
AuthParams: map[string]string{},
110+
GlobalHTTPTimeout: 3000 * time.Millisecond,
111+
TriggerIndex: testData.triggerIndex,
112+
}, nil)
115113
if err != nil {
116114
t.Errorf("Error creating the Scaler")
117115
}
@@ -215,14 +213,12 @@ func TestBearerAuth(t *testing.T) {
215213
"authMode": "bearer",
216214
}
217215

218-
s, err := NewMetricsAPIScaler(
219-
&scalersconfig.ScalerConfig{
220-
ResolvedEnv: map[string]string{},
221-
TriggerMetadata: metadata,
222-
AuthParams: authentication,
223-
GlobalHTTPTimeout: 3000 * time.Millisecond,
224-
},
225-
)
216+
s, err := NewMetricsAPIScaler(&scalersconfig.ScalerConfig{
217+
ResolvedEnv: map[string]string{},
218+
TriggerMetadata: metadata,
219+
AuthParams: authentication,
220+
GlobalHTTPTimeout: 3000 * time.Millisecond,
221+
}, nil)
226222
if err != nil {
227223
t.Errorf("Error creating the Scaler")
228224
}

pkg/scaling/scalers_builder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string,
215215
case "memory":
216216
return scalers.NewCPUMemoryScaler(corev1.ResourceMemory, config)
217217
case "metrics-api":
218-
return scalers.NewMetricsAPIScaler(config)
218+
return scalers.NewMetricsAPIScaler(config, client)
219219
case "mongodb":
220220
return scalers.NewMongoDBScaler(ctx, config)
221221
case "mssql":

schema/generated/scalers-metadata-schema.json

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2413,6 +2413,24 @@
24132413
"type": "string",
24142414
"default": "false",
24152415
"metadataVariableReadable": true
2416+
},
2417+
{
2418+
"name": "aggregateFromKubeServiceEndpoints",
2419+
"type": "string",
2420+
"default": "false",
2421+
"metadataVariableReadable": true
2422+
},
2423+
{
2424+
"name": "aggregationType",
2425+
"type": "string",
2426+
"default": "average",
2427+
"allowedValue": [
2428+
"average",
2429+
"sum",
2430+
"max",
2431+
"min"
2432+
],
2433+
"metadataVariableReadable": true
24162434
}
24172435
]
24182436
},

0 commit comments

Comments
 (0)