diff --git a/cmd/autoscaler/app/autoscaler.go b/cmd/autoscaler/app/autoscaler.go index 3da1990b..5c34c29e 100644 --- a/cmd/autoscaler/app/autoscaler.go +++ b/cmd/autoscaler/app/autoscaler.go @@ -25,6 +25,7 @@ import ( "time" "github.com/v3io/scaler/pkg/autoscaler" + "github.com/v3io/scaler/pkg/autoscaler/metricsclient" "github.com/v3io/scaler/pkg/common" "github.com/v3io/scaler/pkg/pluginloader" "github.com/v3io/scaler/pkg/scalertypes" @@ -32,11 +33,7 @@ import ( "github.com/nuclio/errors" "github.com/nuclio/zap" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/client-go/discovery" - "k8s.io/client-go/discovery/cached/memory" "k8s.io/client-go/rest" - "k8s.io/client-go/restmapper" - "k8s.io/metrics/pkg/client/custom_metrics" ) func Run(kubeconfigPath string, @@ -44,6 +41,7 @@ func Run(kubeconfigPath string, scaleInterval time.Duration, metricsResourceKind string, metricsResourceGroup string) error { + // define default auto scaler options autoScalerOptions := scalertypes.AutoScalerOptions{ Namespace: namespace, ScaleInterval: scalertypes.Duration{Duration: scaleInterval}, @@ -51,6 +49,10 @@ func Run(kubeconfigPath string, Kind: metricsResourceKind, Group: metricsResourceGroup, }, + MetricsClientOptions: scalertypes.MetricsClientOptions{ + // default to k8s metrics client for the sake of backwards compatibility + MetricsClientKind: scalertypes.KindK8sMetricsClient, + }, } pluginLoader, err := pluginloader.New() @@ -103,16 +105,14 @@ func createAutoScaler(restConfig *rest.Config, return nil, errors.Wrap(err, "Failed to initialize root logger") } - discoveryClient, err := discovery.NewDiscoveryClientForConfig(restConfig) + // create metrics client using factory + metricsClient, err := metricsclient.NewMetricsClient(rootLogger, restConfig, options) if err != nil { - return nil, errors.Wrap(err, "Failed to create discovery client") + return nil, errors.Wrap(err, "Failed to create metrics client") } - availableAPIsGetter := custom_metrics.NewAvailableAPIsGetter(discoveryClient) - restMapper := restmapper.NewDeferredDiscoveryRESTMapper(memory.NewMemCacheClient(discoveryClient)) - customMetricsClient := custom_metrics.NewForConfig(restConfig, restMapper, availableAPIsGetter) // create auto scaler - newScaler, err := autoscaler.NewAutoScaler(rootLogger, resourceScaler, customMetricsClient, options) + newScaler, err := autoscaler.NewAutoScaler(rootLogger, resourceScaler, metricsClient, options) if err != nil { return nil, errors.Wrap(err, "Failed to create auto scaler") } diff --git a/pkg/autoscaler/autoscaler.go b/pkg/autoscaler/autoscaler.go index 77288563..30a30ddb 100644 --- a/pkg/autoscaler/autoscaler.go +++ b/pkg/autoscaler/autoscaler.go @@ -28,10 +28,7 @@ import ( "github.com/nuclio/errors" "github.com/nuclio/logger" - k8serrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/metrics/pkg/client/custom_metrics" ) type Autoscaler struct { @@ -41,13 +38,13 @@ type Autoscaler struct { scaleInterval scalertypes.Duration inScaleToZeroProcessMap map[string]bool groupKind schema.GroupKind - customMetricsClientSet custom_metrics.CustomMetricsClient + metricsClient scalertypes.MetricsClient ticker *time.Ticker } func NewAutoScaler(parentLogger logger.Logger, resourceScaler scalertypes.ResourceScaler, - customMetricsClientSet custom_metrics.CustomMetricsClient, + metricsClient scalertypes.MetricsClient, options scalertypes.AutoScalerOptions) (*Autoscaler, error) { childLogger := parentLogger.GetChild("autoscaler") childLogger.InfoWith("Creating Autoscaler", @@ -59,7 +56,7 @@ func NewAutoScaler(parentLogger logger.Logger, resourceScaler: resourceScaler, scaleInterval: options.ScaleInterval, groupKind: options.GroupKind, - customMetricsClientSet: customMetricsClientSet, + metricsClient: metricsClient, inScaleToZeroProcessMap: make(map[string]bool), }, nil } @@ -99,52 +96,6 @@ func (as *Autoscaler) getMetricNames(resources []scalertypes.Resource) []string return metricNames } -func (as *Autoscaler) getResourceMetrics(metricNames []string) (map[string]map[string]int, error) { - resourcesMetricsMap := make(map[string]map[string]int) - resourceLabels := labels.Everything() - metricSelectorLabels := labels.Everything() - metricsClient := as.customMetricsClientSet.NamespacedMetrics(as.namespace) - - for _, metricName := range metricNames { - - // getting the metric values for all object of schema group kind (e.g. deployment) - metrics, err := metricsClient.GetForObjects(as.groupKind, resourceLabels, metricName, metricSelectorLabels) - if err != nil { - - // if no data points submitted yet it's ok, continue to the next metric - if k8serrors.IsNotFound(err) { - continue - } - return nil, errors.Wrap(err, "Failed to get custom metrics") - } - - // fill the resourcesMetricsMap with the metrics data we got - for _, item := range metrics.Items { - - resourceName := item.DescribedObject.Name - value := int(item.Value.MilliValue()) - - as.logger.DebugWith("Got metric entry", - "resourceName", resourceName, - "metricName", metricName, - "value", value) - - if _, found := resourcesMetricsMap[resourceName]; !found { - resourcesMetricsMap[resourceName] = make(map[string]int) - } - - // sanity - if _, found := resourcesMetricsMap[resourceName][metricName]; found { - return nil, errors.New("Can not have more than one metric value per resource") - } - - resourcesMetricsMap[resourceName][metricName] = value - } - } - - return resourcesMetricsMap, nil -} - func (as *Autoscaler) checkResourceToScale(resource scalertypes.Resource, resourcesMetricsMap map[string]map[string]int) bool { if _, found := resourcesMetricsMap[resource.Name]; !found { as.logger.DebugWith("Resource does not have metrics data yet, keeping up", "resourceName", resource.Name) @@ -198,7 +149,7 @@ func (as *Autoscaler) checkResourcesToScale() error { } metricNames := as.getMetricNames(activeResources) as.logger.DebugWith("Got metric names", "metricNames", metricNames) - resourceMetricsMap, err := as.getResourceMetrics(metricNames) + resourceMetricsMap, err := as.metricsClient.GetResourceMetrics(metricNames) if err != nil { return errors.Wrap(err, "Failed to get resources metrics") } diff --git a/pkg/autoscaler/metricsclient/factory.go b/pkg/autoscaler/metricsclient/factory.go new file mode 100644 index 00000000..b2f610a4 --- /dev/null +++ b/pkg/autoscaler/metricsclient/factory.go @@ -0,0 +1,44 @@ +/* +Copyright 2026 Iguazio Systems Ltd. + +Licensed under the Apache License, Version 2.0 (the "License") with +an addition restriction as set forth herein. You may not use this +file except in compliance with the License. You may obtain a copy of +the License at http://www.apache.org/licenses/LICENSE-2.0. + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +implied. See the License for the specific language governing +permissions and limitations under the License. + +In addition, you may not use the software for any purposes that are +illegal under applicable law, and the grant of the foregoing license +under the Apache 2.0 license is conditioned upon your compliance with +such restriction. +*/ + +package metricsclient + +import ( + "github.com/v3io/scaler/pkg/scalertypes" + + "github.com/nuclio/errors" + "github.com/nuclio/logger" + "k8s.io/client-go/rest" +) + +func NewMetricsClient(logger logger.Logger, + restConfig *rest.Config, + autoScalerConf scalertypes.AutoScalerOptions) (scalertypes.MetricsClient, error) { + switch autoScalerConf.MetricsClientOptions.MetricsClientKind { + case scalertypes.KindK8sMetricsClient: + return NewCustomMetricsClient( + logger, + restConfig, + autoScalerConf.Namespace, + autoScalerConf.GroupKind) + default: + return nil, errors.Errorf("unsupported metrics client kind: %s", autoScalerConf.MetricsClientOptions.MetricsClientKind) + } +} diff --git a/pkg/autoscaler/metricsclient/k8s.go b/pkg/autoscaler/metricsclient/k8s.go new file mode 100644 index 00000000..7aa68396 --- /dev/null +++ b/pkg/autoscaler/metricsclient/k8s.go @@ -0,0 +1,104 @@ +/* +Copyright 2026 Iguazio Systems Ltd. + +Licensed under the Apache License, Version 2.0 (the "License") with +an addition restriction as set forth herein. You may not use this +file except in compliance with the License. You may obtain a copy of +the License at http://www.apache.org/licenses/LICENSE-2.0. + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +implied. See the License for the specific language governing +permissions and limitations under the License. + +In addition, you may not use the software for any purposes that are +illegal under applicable law, and the grant of the foregoing license +under the Apache 2.0 license is conditioned upon your compliance with +such restriction. +*/ + +package metricsclient + +import ( + "github.com/nuclio/errors" + "github.com/nuclio/logger" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/discovery" + "k8s.io/client-go/discovery/cached/memory" + "k8s.io/client-go/rest" + "k8s.io/client-go/restmapper" + k8scustommetrics "k8s.io/metrics/pkg/client/custom_metrics" +) + +type K8sCustomMetricsClient struct { + k8scustommetrics.CustomMetricsClient + namespace string + groupKind schema.GroupKind + logger logger.Logger +} + +func NewCustomMetricsClient( + parentLogger logger.Logger, + restConfig *rest.Config, + namespace string, + groupKind schema.GroupKind) (*K8sCustomMetricsClient, error) { + discoveryClient, err := discovery.NewDiscoveryClientForConfig(restConfig) + if err != nil { + return nil, errors.Wrap(err, "Failed to create k8s metrics client") + } + availableAPIsGetter := k8scustommetrics.NewAvailableAPIsGetter(discoveryClient) + restMapper := restmapper.NewDeferredDiscoveryRESTMapper(memory.NewMemCacheClient(discoveryClient)) + customMetricsClient := k8scustommetrics.NewForConfig(restConfig, restMapper, availableAPIsGetter) + return &K8sCustomMetricsClient{ + logger: parentLogger.GetChild("custom-metrics"), + CustomMetricsClient: customMetricsClient, + namespace: namespace, + groupKind: groupKind, + }, nil +} + +func (cmw *K8sCustomMetricsClient) GetResourceMetrics(metricNames []string) (map[string]map[string]int, error) { + resourcesMetricsMap := make(map[string]map[string]int) + resourceLabels := labels.Everything() + metricSelectorLabels := labels.Everything() + metricsClient := cmw.NamespacedMetrics(cmw.namespace) + + for _, metricName := range metricNames { + // getting the metric values for all object of schema group kind (e.g. deployment) + metrics, err := metricsClient.GetForObjects(cmw.groupKind, resourceLabels, metricName, metricSelectorLabels) + if err != nil { + // No data points have been submitted yet; this is expected—proceed to the next metric. + if k8serrors.IsNotFound(err) { + continue + } + return nil, errors.Wrap(err, "Failed to get custom metrics") + } + + // fill the resourcesMetricsMap with the metrics data we got + for _, item := range metrics.Items { + resourceName := item.DescribedObject.Name + value := int(item.Value.MilliValue()) + + cmw.logger.DebugWith("Got metric entry", + "resourceName", resourceName, + "metricName", metricName, + "value", value) + + if _, found := resourcesMetricsMap[resourceName]; !found { + resourcesMetricsMap[resourceName] = make(map[string]int) + } + + // sanity + if _, found := resourcesMetricsMap[resourceName][metricName]; found { + return nil, errors.New("Can not have more than one metric value per resource") + } + + resourcesMetricsMap[resourceName][metricName] = value + } + } + + return resourcesMetricsMap, nil +} diff --git a/pkg/scalertypes/types.go b/pkg/scalertypes/types.go index e92a5e19..7103a073 100644 --- a/pkg/scalertypes/types.go +++ b/pkg/scalertypes/types.go @@ -33,10 +33,23 @@ import ( "k8s.io/client-go/kubernetes" ) +type MetricsClientKind string + +const ( + KindK8sMetricsClient = "k8sMetricsClient" +) + +type MetricsClientOptions struct { + MetricsClientKind MetricsClientKind + URL string + Template string +} + type AutoScalerOptions struct { - Namespace string - ScaleInterval Duration - GroupKind schema.GroupKind + Namespace string + ScaleInterval Duration + GroupKind schema.GroupKind + MetricsClientOptions MetricsClientOptions } type ResourceScalerConfig struct { @@ -199,3 +212,24 @@ func shortDurationString(d Duration) string { } return s } + +// MetricsClient defines an interface for retrieving resource metrics used by the autoscaler. +type MetricsClient interface { + // GetResourceMetrics retrieves metrics for multiple resources and metric names. + // + // Parameters: + // - metricNames: A slice of metric names to retrieve (e.g., "requests_per_minute", "cpu_usage_per_hour") + // + // Returns: + // - map[string]map[string]int: A nested map structure where: + // * The outer map key is the resource name (e.g., deployment name) + // * The inner map key is the metric name + // * The inner map value is the metric value as an integer + // Example: map["my-deployment"]["requests_per_minute"] = 42 + // - error: An error if metric retrieval fails + // + // The dual map structure allows efficient lookup of metric values by resource name + // and then by metric name, enabling the autoscaler to check multiple metrics + // per resource when making scaling decisions. + GetResourceMetrics(metricNames []string) (map[string]map[string]int, error) +}