diff --git a/cmd/autoscaler/app/autoscaler.go b/cmd/autoscaler/app/autoscaler.go index 1abda7da..3da1990b 100644 --- a/cmd/autoscaler/app/autoscaler.go +++ b/cmd/autoscaler/app/autoscaler.go @@ -25,6 +25,7 @@ import ( "time" "github.com/v3io/scaler/pkg/autoscaler" + "github.com/v3io/scaler/pkg/common" "github.com/v3io/scaler/pkg/pluginloader" "github.com/v3io/scaler/pkg/scalertypes" @@ -35,7 +36,6 @@ import ( "k8s.io/client-go/discovery/cached/memory" "k8s.io/client-go/rest" "k8s.io/client-go/restmapper" - "k8s.io/client-go/tools/clientcmd" "k8s.io/metrics/pkg/client/custom_metrics" ) @@ -73,7 +73,7 @@ func Run(kubeconfigPath string, autoScalerOptions = resourceScalerConfig.AutoScalerOptions } - restConfig, err := getClientConfig(kubeconfigPath) + restConfig, err := common.GetClientConfig(kubeconfigPath) if err != nil { return errors.Wrap(err, "Failed to get client configuration") } @@ -119,11 +119,3 @@ func createAutoScaler(restConfig *rest.Config, return newScaler, nil } - -func getClientConfig(kubeconfigPath string) (*rest.Config, error) { - if kubeconfigPath != "" { - return clientcmd.BuildConfigFromFlags("", kubeconfigPath) - } - - return rest.InClusterConfig() -} diff --git a/cmd/dlx/app/dlx.go b/cmd/dlx/app/dlx.go index a13fa8a3..61fab96b 100644 --- a/cmd/dlx/app/dlx.go +++ b/cmd/dlx/app/dlx.go @@ -24,12 +24,14 @@ import ( "os" "time" + "github.com/v3io/scaler/pkg/common" "github.com/v3io/scaler/pkg/dlx" "github.com/v3io/scaler/pkg/pluginloader" "github.com/v3io/scaler/pkg/scalertypes" "github.com/nuclio/errors" "github.com/nuclio/zap" + "k8s.io/client-go/kubernetes" ) func Run(kubeconfigPath string, @@ -75,6 +77,15 @@ func Run(kubeconfigPath string, dlxOptions = resourceScalerConfig.DLXOptions } + restConfig, err := common.GetClientConfig(kubeconfigPath) + if err != nil { + return errors.Wrap(err, "Failed to get client configuration") + } + + if dlxOptions.KubeClientSet, err = kubernetes.NewForConfig(restConfig); err != nil { + return errors.Wrap(err, "Failed to create k8s client set") + } + newDLX, err := createDLX(resourceScaler, dlxOptions) if err != nil { return errors.Wrap(err, "Failed to create dlx") @@ -88,7 +99,10 @@ func Run(kubeconfigPath string, select {} } -func createDLX(resourceScaler scalertypes.ResourceScaler, options scalertypes.DLXOptions) (*dlx.DLX, error) { +func createDLX( + resourceScaler scalertypes.ResourceScaler, + options scalertypes.DLXOptions, +) (*dlx.DLX, error) { rootLogger, err := nucliozap.NewNuclioZap("scaler", "console", nil, diff --git a/pkg/common/helpers.go b/pkg/common/helpers.go index adbb8f19..ae0be086 100644 --- a/pkg/common/helpers.go +++ b/pkg/common/helpers.go @@ -23,6 +23,9 @@ package common import ( "math/rand" "time" + + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" ) var SeededRand = rand.New(rand.NewSource(time.Now().UnixNano())) @@ -38,3 +41,11 @@ func UniquifyStringSlice(stringList []string) []string { } return list } + +func GetClientConfig(kubeconfigPath string) (*rest.Config, error) { + if kubeconfigPath != "" { + return clientcmd.BuildConfigFromFlags("", kubeconfigPath) + } + + return rest.InClusterConfig() +} diff --git a/pkg/dlx/dlx.go b/pkg/dlx/dlx.go index 3a6e462f..885ebbe6 100644 --- a/pkg/dlx/dlx.go +++ b/pkg/dlx/dlx.go @@ -24,6 +24,7 @@ import ( "context" "net/http" + "github.com/v3io/scaler/pkg/kube" "github.com/v3io/scaler/pkg/scalertypes" "github.com/nuclio/errors" @@ -34,6 +35,7 @@ type DLX struct { logger logger.Logger handler Handler server *http.Server + watcher *kube.IngressWatcher } func NewDLX(parentLogger logger.Logger, @@ -50,6 +52,19 @@ func NewDLX(parentLogger logger.Logger, return nil, errors.Wrap(err, "Failed to create function starter") } + watcher, err := kube.NewIngressWatcher( + context.Background(), + childLogger, + options.KubeClientSet, + options.ResolveTargetsFromIngressCallback, + options.ResyncInterval, + options.Namespace, + options.LabelSelector, + ) + if err != nil { + return nil, errors.Wrap(err, "Failed to create ingress watcher") + } + handler, err := NewHandler(childLogger, resourceStarter, resourceScaler, @@ -67,17 +82,25 @@ func NewDLX(parentLogger logger.Logger, server: &http.Server{ Addr: options.ListenAddress, }, + watcher: watcher, }, nil } func (d *DLX) Start() error { d.logger.DebugWith("Starting", "server", d.server.Addr) http.HandleFunc("/", d.handler.HandleFunc) + + // Start the ingress watcher synchronously to ensure cache is fully synced before DLX begins handling traffic + if err := d.watcher.Start(); err != nil { + return errors.Wrap(err, "Failed to start ingress watcher") + } + go d.server.ListenAndServe() // nolint: errcheck return nil } func (d *DLX) Stop(context context.Context) error { d.logger.DebugWith("Stopping", "server", d.server.Addr) + d.watcher.Stop() return d.server.Shutdown(context) } diff --git a/pkg/ingresscache/ingresscache.go b/pkg/ingresscache/ingresscache.go index 6bfa051b..39fa3c8e 100644 --- a/pkg/ingresscache/ingresscache.go +++ b/pkg/ingresscache/ingresscache.go @@ -35,7 +35,7 @@ type IngressCache struct { func NewIngressCache(logger logger.Logger) *IngressCache { return &IngressCache{ syncMap: &sync.Map{}, - logger: logger, + logger: logger.GetChild("cache"), } } diff --git a/pkg/ingresscache/types.go b/pkg/ingresscache/types.go index 06a1e458..a47734c1 100644 --- a/pkg/ingresscache/types.go +++ b/pkg/ingresscache/types.go @@ -20,15 +20,19 @@ such restriction. package ingresscache +type IngressHostCacheReader interface { + // Get retrieves all target names for the given host and path + Get(host string, path string) ([]string, error) +} + type IngressHostCache interface { + IngressHostCacheReader + // Set adds a new item to the cache for the given host, path and targets. Will overwrite existing values if any Set(host string, path string, targets []string) error // Delete removes the specified targets from the cache for the given host and path. Will do nothing if host, path or targets do not exist Delete(host string, path string, targets []string) error - - // Get retrieves all target names for the given host and path - Get(host string, path string) ([]string, error) } type IngressHostsTree interface { diff --git a/pkg/kube/ingress.go b/pkg/kube/ingress.go index d9ac5438..a9090a26 100644 --- a/pkg/kube/ingress.go +++ b/pkg/kube/ingress.go @@ -22,9 +22,9 @@ package kube import ( "context" - "time" "github.com/v3io/scaler/pkg/ingresscache" + "github.com/v3io/scaler/pkg/scalertypes" "github.com/nuclio/errors" "github.com/nuclio/logger" @@ -35,29 +35,6 @@ import ( "k8s.io/client-go/tools/cache" ) -const ( - defaultResyncInterval = 30 * time.Second -) - -// ResolveTargetsFromIngressCallback defines a function that extracts a list of target identifiers -// (e.g., names of services the Ingress routes traffic to) from a Kubernetes Ingress resource. -// -// This function is expected to be implemented externally and passed into the IngressWatcher, -// allowing for custom logic such as parsing annotations, labels, or other ingress metadata. -// -// Parameters: -// - ingress: The Kubernetes Ingress resource to extract targets from -// -// Returns: -// - []string: A slice of target identifiers (e.g., service names, endpoint addresses) -// - error: An error if target resolution fails -// -// Implementation guidelines: -// - Return a non-nil slice when targets are successfully resolved -// - Return a non-nil error if resolution fails -// - Should handle nil or malformed Ingress objects gracefully and return an error in such cases -type ResolveTargetsFromIngressCallback func(ingress *networkingv1.Ingress) ([]string, error) - type ingressValue struct { name string host string @@ -68,31 +45,32 @@ type ingressValue struct { // IngressWatcher watches for changes in Kubernetes Ingress resources and updates the ingress cache accordingly type IngressWatcher struct { ctx context.Context + cancel context.CancelFunc logger logger.Logger cache ingresscache.IngressHostCache factory informers.SharedInformerFactory informer cache.SharedIndexInformer - resolveTargetsCallback ResolveTargetsFromIngressCallback + resolveTargetsCallback scalertypes.ResolveTargetsFromIngressCallback } func NewIngressWatcher( - ctx context.Context, + dlxCtx context.Context, dlxLogger logger.Logger, kubeClient kubernetes.Interface, - ingressCache ingresscache.IngressCache, - resolveTargetsCallback ResolveTargetsFromIngressCallback, - resyncTimeout *time.Duration, + resolveTargetsCallback scalertypes.ResolveTargetsFromIngressCallback, + resyncInterval scalertypes.Duration, namespace string, labelSelector string, ) (*IngressWatcher, error) { - if resyncTimeout == nil { - defaultTimeout := defaultResyncInterval - resyncTimeout = &defaultTimeout + if resyncInterval.Duration == 0 { + resyncInterval = scalertypes.Duration{Duration: scalertypes.DefaultResyncInterval} } + ctxWithCancel, cancel := context.WithCancel(dlxCtx) + factory := informers.NewSharedInformerFactoryWithOptions( kubeClient, - *resyncTimeout, + resyncInterval.Duration, informers.WithNamespace(namespace), informers.WithTweakListOptions(func(options *metav1.ListOptions) { options.LabelSelector = labelSelector @@ -101,9 +79,10 @@ func NewIngressWatcher( ingressInformer := factory.Networking().V1().Ingresses().Informer() ingressWatcher := &IngressWatcher{ - ctx: ctx, + ctx: ctxWithCancel, + cancel: cancel, logger: dlxLogger.GetChild("watcher"), - cache: &ingressCache, + cache: ingresscache.NewIngressCache(dlxLogger), factory: factory, informer: ingressInformer, resolveTargetsCallback: resolveTargetsCallback, @@ -135,9 +114,15 @@ func (iw *IngressWatcher) Start() error { func (iw *IngressWatcher) Stop() { iw.logger.Info("Stopping ingress watcher") + iw.cancel() iw.factory.Shutdown() } +// GetIngressHostCacheReader expose read-only access to the ingress cache +func (iw *IngressWatcher) GetIngressHostCacheReader() ingresscache.IngressHostCacheReader { + return iw.cache +} + // --- ResourceEventHandler methods --- func (iw *IngressWatcher) AddHandler(obj interface{}) { @@ -167,6 +152,26 @@ func (iw *IngressWatcher) AddHandler(obj interface{}) { } func (iw *IngressWatcher) UpdateHandler(oldObj, newObj interface{}) { + oldIngressResource, ok := oldObj.(*networkingv1.Ingress) + if !ok { + iw.logger.DebugWith("Failed to cast old object to Ingress", + "object", oldObj) + return + } + + newIngressResource, ok := newObj.(*networkingv1.Ingress) + if !ok { + iw.logger.DebugWith("Failed to cast new object to Ingress", + "object", newObj) + return + } + + // ResourceVersion is managed by Kubernetes and indicates whether the resource has changed. + // Comparing resourceVersion helps avoid unnecessary updates triggered by periodic informer resync + if oldIngressResource.ResourceVersion == newIngressResource.ResourceVersion { + return + } + oldIngress, err := iw.extractValuesFromIngressResource(oldObj) if err != nil { iw.logger.DebugWith("Update ingress handler - failed to extract values from old object", @@ -249,7 +254,7 @@ func (iw *IngressWatcher) extractValuesFromIngressResource(obj interface{}) (*in targets, err := iw.resolveTargetsCallback(ingress) if err != nil { - return nil, errors.Wrap(err, "Failed to extract targets from ingress labels") + return nil, errors.Wrapf(err, "Failed to extract targets from ingress labels: %s", err.Error()) } if len(targets) == 0 { diff --git a/pkg/kube/ingress_test.go b/pkg/kube/ingress_test.go index dc2ee482..93676fed 100644 --- a/pkg/kube/ingress_test.go +++ b/pkg/kube/ingress_test.go @@ -25,7 +25,7 @@ import ( "strings" "testing" - "github.com/v3io/scaler/pkg/ingresscache" + "github.com/v3io/scaler/pkg/scalertypes" "github.com/nuclio/logger" nucliozap "github.com/nuclio/zap" @@ -116,7 +116,7 @@ func (suite *IngressWatcherTestSuite) TestAddHandler() { testIngressWatcher, err := suite.createTestIngressWatcher() suite.Require().NoError(err) - testObj = suite.createDummyIngress(testCase.testArgs.host, testCase.testArgs.path, testCase.testArgs.targets) + testObj = suite.createDummyIngress(testCase.testArgs.host, testCase.testArgs.path, "1", testCase.testArgs.targets) if testCase.expectError { testObj = &networkingv1.IngressSpec{} @@ -149,6 +149,8 @@ func (suite *IngressWatcherTestSuite) TestUpdateHandler() { initialCachedData *ingressValue testOldObj ingressValue testNewObj ingressValue + OldObjVersion string + newObjVersion string }{ { name: "Update PairTarget - same host and path, different targets", @@ -164,11 +166,39 @@ func (suite *IngressWatcherTestSuite) TestUpdateHandler() { path: "/test/path", targets: []string{"test-targets-name-1", "test-targets-name-2"}, }, + OldObjVersion: "1", testNewObj: ingressValue{ host: "www.example.com", path: "/test/path", targets: []string{"test-targets-name-1", "test-targets-name-3"}, }, + newObjVersion: "2", + initialCachedData: &ingressValue{ + host: "www.example.com", + path: "/test/path", + targets: []string{"test-targets-name-1", "test-targets-name-2"}, + }, + }, { + name: "Update PairTarget - same ResourceVersion, no change in targets", + expectedResults: []expectedResult{ + { + host: "www.example.com", + path: "/test/path", + targets: []string{"test-targets-name-1", "test-targets-name-2"}, + }, + }, + testOldObj: ingressValue{ + host: "www.example.com", + path: "/test/path", + targets: []string{"test-targets-name-1", "test-targets-name-2"}, + }, + testNewObj: ingressValue{ + host: "www.example.com", + path: "/test/path", + targets: []string{"test-targets-name-1", "test-targets-name-3"}, + }, + OldObjVersion: "1", + newObjVersion: "1", initialCachedData: &ingressValue{ host: "www.example.com", path: "/test/path", @@ -186,11 +216,13 @@ func (suite *IngressWatcherTestSuite) TestUpdateHandler() { path: "/test/path", targets: []string{"test-targets-name-1", "test-targets-name-2"}, }, + OldObjVersion: "1", testNewObj: ingressValue{ host: "www.example.com", path: "/another/path", targets: []string{"test-targets-name-1", "test-targets-name-2"}, }, + newObjVersion: "2", expectedResults: []expectedResult{ { host: "www.example.com", @@ -215,11 +247,13 @@ func (suite *IngressWatcherTestSuite) TestUpdateHandler() { path: "/test/path", targets: []string{"test-targets-name-1", "test-targets-name-2"}, }, + OldObjVersion: "1", testNewObj: ingressValue{ host: "www.google.com", path: "/test/path", targets: []string{"test-targets-name-1", "test-targets-name-2"}, }, + newObjVersion: "2", expectedResults: []expectedResult{ { host: "www.example.com", @@ -238,8 +272,8 @@ func (suite *IngressWatcherTestSuite) TestUpdateHandler() { testIngressWatcher, err := suite.createTestIngressWatcher() suite.Require().NoError(err) - testOldObj := suite.createDummyIngress(testCase.testOldObj.host, testCase.testOldObj.path, testCase.testOldObj.targets) - testNewObj := suite.createDummyIngress(testCase.testNewObj.host, testCase.testNewObj.path, testCase.testNewObj.targets) + testOldObj := suite.createDummyIngress(testCase.testOldObj.host, testCase.testOldObj.path, testCase.OldObjVersion, testCase.testOldObj.targets) + testNewObj := suite.createDummyIngress(testCase.testNewObj.host, testCase.testNewObj.path, testCase.newObjVersion, testCase.testNewObj.targets) if testCase.initialCachedData != nil { err = testIngressWatcher.cache.Set(testCase.initialCachedData.host, testCase.initialCachedData.path, testCase.initialCachedData.targets) @@ -337,7 +371,7 @@ func (suite *IngressWatcherTestSuite) TestDeleteHandler() { testIngressWatcher, err := suite.createTestIngressWatcher() suite.Require().NoError(err) - testObj = suite.createDummyIngress(testCase.testArgs.host, testCase.testArgs.path, testCase.testArgs.targets) + testObj = suite.createDummyIngress(testCase.testArgs.host, testCase.testArgs.path, "1", testCase.testArgs.targets) if testCase.expectError { testObj = &networkingv1.IngressSpec{} @@ -374,7 +408,7 @@ func (suite *IngressWatcherTestSuite) TestGetPathFromIngress() { tests := []testCase{ { name: "Valid ingress with path", - ingress: suite.createDummyIngress("host", "/test", []string{"target"}), + ingress: suite.createDummyIngress("host", "/test", "1", []string{"target"}), expected: "/test", }, { @@ -463,7 +497,7 @@ func (suite *IngressWatcherTestSuite) TestGetHostFromIngress() { }{ { name: "Valid ingress with host", - ingress: suite.createDummyIngress("test-host", "/test", []string{"target"}), + ingress: suite.createDummyIngress("test-host", "/test", "1", []string{"target"}), expected: "test-host", }, { @@ -527,15 +561,14 @@ func (suite *IngressWatcherTestSuite) createTestIngressWatcher() (*IngressWatche return NewIngressWatcher(ctx, suite.logger, suite.kubeClientSet, - *ingresscache.NewIngressCache(suite.logger), suite.createMockResolveFunc(), - nil, + scalertypes.Duration{}, "test-namespace", "test-labels-filter", ) } -func (suite *IngressWatcherTestSuite) createMockResolveFunc() ResolveTargetsFromIngressCallback { +func (suite *IngressWatcherTestSuite) createMockResolveFunc() scalertypes.ResolveTargetsFromIngressCallback { return func(ingress *networkingv1.Ingress) ([]string, error) { // Extract targets from ingress - matches createDummyIngress structure (1 rule, 1 path) if len(ingress.Spec.Rules) != 1 { @@ -563,12 +596,13 @@ func (suite *IngressWatcherTestSuite) createMockResolveFunc() ResolveTargetsFrom } // createDummyIngress Creates a dummy Ingress object for testing -func (suite *IngressWatcherTestSuite) createDummyIngress(host, path string, targets []string) *networkingv1.Ingress { +func (suite *IngressWatcherTestSuite) createDummyIngress(host, path, version string, targets []string) *networkingv1.Ingress { target := strings.Join(targets, ",") return &networkingv1.Ingress{ ObjectMeta: metav1.ObjectMeta{ - Name: "test-ingress", - Namespace: "test-namespace", + Name: "test-ingress", + Namespace: "test-namespace", + ResourceVersion: version, }, Spec: networkingv1.IngressSpec{ Rules: []networkingv1.IngressRule{ diff --git a/pkg/scalertypes/types.go b/pkg/scalertypes/types.go index 1fcecfe1..e92a5e19 100644 --- a/pkg/scalertypes/types.go +++ b/pkg/scalertypes/types.go @@ -28,7 +28,9 @@ import ( "time" "github.com/nuclio/errors" + networkingv1 "k8s.io/api/networking/v1" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/kubernetes" ) type AutoScalerOptions struct { @@ -51,16 +53,43 @@ const ( MultiTargetStrategyCanary MultiTargetStrategy = "canary" ) +const ( + DefaultResyncInterval = 30 * time.Second +) + +// ResolveTargetsFromIngressCallback defines a function that extracts a list of target identifiers +// (e.g., names of services the Ingress routes traffic to) from a Kubernetes Ingress resource. +// +// This function is expected to be implemented externally and passed into the IngressWatcher, +// allowing for custom logic such as parsing annotations, labels, or other ingress metadata. +// +// Parameters: +// - ingress: The Kubernetes Ingress resource to extract targets from +// +// Returns: +// - []string: A slice of target identifiers (e.g., service names, endpoint addresses) +// - error: An error if target resolution fails +// +// Implementation guidelines: +// - Return a non-nil slice when targets are successfully resolved +// - Return a non-nil error if resolution fails +// - Should handle nil or malformed Ingress objects gracefully and return an error in such cases +type ResolveTargetsFromIngressCallback func(ingress *networkingv1.Ingress) ([]string, error) + type DLXOptions struct { Namespace string // comma delimited - TargetNameHeader string - TargetPathHeader string - TargetPort int - ListenAddress string - ResourceReadinessTimeout Duration - MultiTargetStrategy MultiTargetStrategy + TargetNameHeader string + TargetPathHeader string + TargetPort int + ListenAddress string + ResourceReadinessTimeout Duration + MultiTargetStrategy MultiTargetStrategy + LabelSelector string + ResolveTargetsFromIngressCallback ResolveTargetsFromIngressCallback `json:"-"` + ResyncInterval Duration + KubeClientSet kubernetes.Interface `json:"-"` } type ResourceScaler interface {