@@ -8,16 +8,23 @@ import (
88 "io"
99 "net/http"
1010 neturl "net/url"
11+ "slices"
12+ "strconv"
1113 "strings"
14+ "sync"
1215
1316 "github.com/go-logr/logr"
1417 "github.com/prometheus/common/expfmt"
1518 "github.com/prometheus/prometheus/promql/parser"
1619 "github.com/tidwall/gjson"
20+ "golang.org/x/sync/semaphore"
1721 "gopkg.in/yaml.v3"
1822 v2 "k8s.io/api/autoscaling/v2"
23+ discoveryV1 "k8s.io/api/discovery/v1"
1924 "k8s.io/apimachinery/pkg/api/resource"
25+ "k8s.io/apimachinery/pkg/labels"
2026 "k8s.io/metrics/pkg/apis/external_metrics"
27+ "sigs.k8s.io/controller-runtime/pkg/client"
2128
2229 "github.com/kedacore/keda/v2/pkg/scalers/authentication"
2330 "github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
@@ -29,16 +36,18 @@ type metricsAPIScaler struct {
2936 metadata * metricsAPIScalerMetadata
3037 httpClient * http.Client
3138 logger logr.Logger
39+ kubeClient client.Client
3240}
3341
3442type metricsAPIScalerMetadata struct {
35- TargetValue float64 `keda:"name=targetValue,order=triggerMetadata,optional"`
36- ActivationTargetValue float64 `keda:"name=activationTargetValue,order=triggerMetadata,default=0"`
37- URL string `keda:"name=url,order=triggerMetadata"`
38- Format APIFormat `keda:"name=format,order=triggerMetadata,default=json,enum=prometheus;json;xml;yaml"`
39- ValueLocation string `keda:"name=valueLocation,order=triggerMetadata"`
40- UnsafeSsl bool `keda:"name=unsafeSsl,order=triggerMetadata,default=false"`
41-
43+ TargetValue float64 `keda:"name=targetValue,order=triggerMetadata,optional"`
44+ ActivationTargetValue float64 `keda:"name=activationTargetValue,order=triggerMetadata,default=0"`
45+ URL string `keda:"name=url,order=triggerMetadata"`
46+ Format APIFormat `keda:"name=format,order=triggerMetadata,default=json,enum=prometheus;json;xml;yaml"`
47+ ValueLocation string `keda:"name=valueLocation,order=triggerMetadata"`
48+ UnsafeSsl bool `keda:"name=unsafeSsl,order=triggerMetadata,default=false"`
49+ AggregateFromKubeServiceEndpoints bool `keda:"name=aggregateFromKubeServiceEndpoints,order=triggerMetadata,default=false"`
50+ AggregationType AggregationType `keda:"name=aggregationType,order=triggerMetadata,default=average,enum=average;sum;max;min"`
4251 // Authentication parameters for connecting to the metrics API
4352 MetricsAPIAuth * authentication.Config `keda:"optional"`
4453
@@ -50,6 +59,8 @@ const (
5059 valueLocationWrongErrorMsg = "valueLocation must point to value of type number or a string representing a Quantity got: '%s'"
5160)
5261
62+ const secureHTTPScheme = "https"
63+
5364type APIFormat string
5465
5566// Options for APIFormat:
@@ -60,8 +71,18 @@ const (
6071 YAMLFormat APIFormat = "yaml"
6172)
6273
74+ type AggregationType string
75+
76+ // Options for APIFormat:
77+ const (
78+ AverageAggregationType AggregationType = "average"
79+ SumAggregationType AggregationType = "sum"
80+ MaxAggregationType AggregationType = "max"
81+ MinAggregationType AggregationType = "min"
82+ )
83+
6384// NewMetricsAPIScaler creates a new HTTP scaler
64- func NewMetricsAPIScaler (config * scalersconfig.ScalerConfig ) (Scaler , error ) {
85+ func NewMetricsAPIScaler (config * scalersconfig.ScalerConfig , kubeClient client. Client ) (Scaler , error ) {
6586 metricType , err := GetMetricTargetType (config )
6687 if err != nil {
6788 return nil , fmt .Errorf ("error getting scaler metric type: %w" , err )
@@ -87,6 +108,7 @@ func NewMetricsAPIScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
87108 metricType : metricType ,
88109 metadata : meta ,
89110 httpClient : httpClient ,
111+ kubeClient : kubeClient ,
90112 logger : InitializeLogger (config , "metrics_api_scaler" ),
91113 }, nil
92114}
@@ -278,8 +300,156 @@ func getValueFromYAMLResponse(body []byte, valueLocation string) (float64, error
278300 }
279301}
280302
303+ func (s * metricsAPIScaler ) getEndpointsUrlsFromServiceURL (ctx context.Context , serviceURL string ) (endpointUrls []string , err error ) {
304+ // parse service name from s.meta.url
305+ url , err := neturl .Parse (serviceURL )
306+ if err != nil {
307+ s .logger .Error (err , "Failed parsing url for metrics API" )
308+ return nil , err
309+ }
310+
311+ splittedHost := strings .Split (url .Host , "." )
312+ if len (splittedHost ) < 2 {
313+ return nil , fmt .Errorf ("invalid hostname %s : expected at least 2 elements, first being service name and second being the namespace" , url .Host )
314+ }
315+ serviceName := splittedHost [0 ]
316+ namespace := splittedHost [1 ]
317+ podPort := url .Port ()
318+ // infer port from service scheme when not set explicitly
319+ if podPort == "" {
320+ if url .Scheme == secureHTTPScheme {
321+ podPort = "443"
322+ } else {
323+ podPort = "80"
324+ }
325+ }
326+ // get service serviceEndpointsSlices
327+ serviceEndpointsSlices := & discoveryV1.EndpointSliceList {}
328+ serviceNameSelector := labels .NewSelector ()
329+ serviceNameSelector .Matches (labels .Set (map [string ]string {
330+ discoveryV1 .LabelServiceName : serviceName ,
331+ }))
332+ err = s .kubeClient .List (ctx , serviceEndpointsSlices , & client.ListOptions {
333+ LabelSelector : serviceNameSelector ,
334+ Namespace : namespace ,
335+ })
336+ if err != nil {
337+ return nil , err
338+ }
339+ var uniqueAddresses []string
340+ for _ , endpointSlice := range serviceEndpointsSlices .Items {
341+ for _ , eps := range endpointSlice .Endpoints {
342+ // as suggested in https://github.com/kedacore/keda/pull/6565#discussion_r2395073047, make sure we take endpoint into account
343+ // only when it's ready
344+ if eps .Conditions .Ready != nil && ! * eps .Conditions .Ready {
345+ continue
346+ }
347+ for _ , address := range eps .Addresses {
348+ // deduplicate addresses as suggested in https://github.com/kedacore/keda/pull/6565#discussion_r2395073047
349+ // because it's not guaranteed by Kubernetes that an endpoint IP address will have a unique representation
350+ // see https://kubernetes.io/docs/concepts/services-networking/endpoint-slices/#duplicate-endpoints
351+ if slices .Contains (uniqueAddresses , address ) {
352+ continue
353+ }
354+ uniqueAddresses = append (uniqueAddresses , address )
355+
356+ foundPort := ""
357+ for _ , port := range endpointSlice .Ports {
358+ if port .Port != nil && strconv .Itoa (int (* port .Port )) == podPort {
359+ foundPort = fmt .Sprintf (":%d" , * port .Port )
360+ break
361+ }
362+ }
363+ if foundPort == "" {
364+ s .logger .V (1 ).Info (fmt .Sprintf ("Warning : could not find port %s in endpoint slice for service %s.%s definition. Will infer port from %s scheme" , podPort , serviceName , namespace , url .Scheme ))
365+ }
366+ endpointUrls = append (endpointUrls , fmt .Sprintf ("%s://%s%s%s" , url .Scheme , address , foundPort , url .Path ))
367+ }
368+ }
369+ }
370+ return endpointUrls , err
371+ }
372+
281373func (s * metricsAPIScaler ) getMetricValue (ctx context.Context ) (float64 , error ) {
282- request , err := getMetricAPIServerRequest (ctx , s .metadata )
374+ // if we wish to aggregate metric from a kubernetes service then we need to query each endpoint behind the service
375+ if s .metadata .AggregateFromKubeServiceEndpoints {
376+ endpointsUrls , err := s .getEndpointsUrlsFromServiceURL (ctx , s .metadata .URL )
377+ if err != nil {
378+ return 0 , fmt .Errorf ("failed to get kubernetes endpoints urls from configured service URL" )
379+ }
380+ if len (endpointsUrls ) == 0 {
381+ return 0 , fmt .Errorf ("no endpoints URLs were given for the service name" )
382+ }
383+ return s .aggregateMetricsFromMultipleEndpoints (ctx , endpointsUrls )
384+ }
385+ // get single/unaggregated metric
386+ metric , err := s .getMetricValueFromURL (ctx , nil )
387+ if err == nil {
388+ s .logger .V (1 ).Info (fmt .Sprintf ("fetched single metric from metrics API url : %s. Value is %v\n " , s .metadata .URL , metric ))
389+ }
390+ return metric , err
391+ }
392+
393+ func (s * metricsAPIScaler ) aggregateMetricsFromMultipleEndpoints (ctx context.Context , endpointsUrls []string ) (float64 , error ) {
394+ // call s.getMetricValueFromURL() for each endpointsUrls in parallel goroutines (maximum 5 at a time) and sum them up
395+ const maxGoroutines = 5
396+ var mu sync.Mutex
397+ sem := semaphore .NewWeighted (maxGoroutines )
398+ expectedNbMetrics := len (endpointsUrls )
399+ nbErrors := 0
400+ var err error
401+ var firstMetricEncountered bool
402+ var aggregation float64
403+ for _ , endpointURL := range endpointsUrls {
404+ if err := sem .Acquire (ctx , 1 ); err != nil {
405+ s .logger .Error (err , "Failed to acquire semaphore" )
406+ continue
407+ }
408+ go func (url string ) {
409+ defer sem .Release (1 )
410+ metric , err := s .getMetricValueFromURL (ctx , & endpointURL )
411+
412+ if err != nil {
413+ s .logger .V (1 ).Info (fmt .Sprintf ("Error fetching metric for %s: %v\n " , url , err ))
414+ // we will ignore metric for computing aggregation when encountering error : decrease expectedNbMetrics
415+ mu .Lock ()
416+ expectedNbMetrics --
417+ nbErrors ++
418+ mu .Unlock ()
419+ } else {
420+ mu .Lock ()
421+ switch s .metadata .AggregationType {
422+ case MinAggregationType :
423+ if ! firstMetricEncountered || metric < aggregation {
424+ firstMetricEncountered = true
425+ aggregation = metric
426+ }
427+ case MaxAggregationType :
428+ if ! firstMetricEncountered || metric > aggregation {
429+ firstMetricEncountered = true
430+ aggregation = metric
431+ }
432+ default :
433+ // sum metrics if we are not looking for min or max value
434+ aggregation += metric
435+ }
436+ mu .Unlock ()
437+ }
438+ }(endpointURL )
439+ }
440+
441+ if nbErrors > 0 && nbErrors == len (endpointsUrls ) {
442+ err = fmt .Errorf ("could not get any metric successfully from the %d provided endpoints" , len (endpointsUrls ))
443+ }
444+ if s .metadata .AggregationType == AverageAggregationType {
445+ aggregation /= float64 (expectedNbMetrics )
446+ }
447+ s .logger .V (1 ).Info (fmt .Sprintf ("fetched %d metrics out of %d endpoints from kubernetes service : %s is %v\n " , expectedNbMetrics , len (endpointsUrls ), s .metadata .AggregationType , aggregation ))
448+ return aggregation , err
449+ }
450+
451+ func (s * metricsAPIScaler ) getMetricValueFromURL (ctx context.Context , url * string ) (float64 , error ) {
452+ request , err := getMetricAPIServerRequest (ctx , s .metadata , url )
283453 if err != nil {
284454 return 0 , err
285455 }
@@ -340,12 +510,14 @@ func (s *metricsAPIScaler) GetMetricsAndActivity(ctx context.Context, metricName
340510 return []external_metrics.ExternalMetricValue {metric }, val > s .metadata .ActivationTargetValue , nil
341511}
342512
343- func getMetricAPIServerRequest (ctx context.Context , meta * metricsAPIScalerMetadata ) (* http.Request , error ) {
513+ func getMetricAPIServerRequest (ctx context.Context , meta * metricsAPIScalerMetadata , url * string ) (* http.Request , error ) {
344514 var requestURL string
345-
515+ if url == nil {
516+ url = & meta .URL
517+ }
346518 // Handle API Key as query parameter if needed
347519 if meta .MetricsAPIAuth != nil && meta .MetricsAPIAuth .EnabledAPIKeyAuth () && meta .MetricsAPIAuth .Method == methodValueQuery {
348- url , _ := neturl .Parse (meta . URL )
520+ url , _ := neturl .Parse (* url )
349521 queryString := url .Query ()
350522 if meta .MetricsAPIAuth .KeyParamName == "" {
351523 queryString .Set ("api_key" , meta .MetricsAPIAuth .APIKey )
@@ -355,7 +527,7 @@ func getMetricAPIServerRequest(ctx context.Context, meta *metricsAPIScalerMetada
355527 url .RawQuery = queryString .Encode ()
356528 requestURL = url .String ()
357529 } else {
358- requestURL = meta . URL
530+ requestURL = * url
359531 }
360532
361533 // Create the request
0 commit comments