Skip to content
Merged
12 changes: 2 additions & 10 deletions cmd/autoscaler/app/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/v3io/scaler/pkg/autoscaler"
"github.com/v3io/scaler/pkg/common"
"github.com/v3io/scaler/pkg/pluginloader"
"github.com/v3io/scaler/pkg/scalertypes"

Expand All @@ -35,7 +36,6 @@ import (
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/metrics/pkg/client/custom_metrics"
)

Expand Down Expand Up @@ -73,7 +73,7 @@ func Run(kubeconfigPath string,
autoScalerOptions = resourceScalerConfig.AutoScalerOptions
}

restConfig, err := getClientConfig(kubeconfigPath)
restConfig, err := common.GetClientConfig(kubeconfigPath)
if err != nil {
return errors.Wrap(err, "Failed to get client configuration")
}
Expand Down Expand Up @@ -119,11 +119,3 @@ func createAutoScaler(restConfig *rest.Config,

return newScaler, nil
}

func getClientConfig(kubeconfigPath string) (*rest.Config, error) {
if kubeconfigPath != "" {
return clientcmd.BuildConfigFromFlags("", kubeconfigPath)
}

return rest.InClusterConfig()
}
22 changes: 19 additions & 3 deletions cmd/dlx/app/dlx.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ import (
"os"
"time"

"github.com/v3io/scaler/pkg/common"
"github.com/v3io/scaler/pkg/dlx"
"github.com/v3io/scaler/pkg/pluginloader"
"github.com/v3io/scaler/pkg/scalertypes"

"github.com/nuclio/errors"
"github.com/nuclio/zap"
"k8s.io/client-go/kubernetes"
)

func Run(kubeconfigPath string,
Expand Down Expand Up @@ -75,7 +77,17 @@ func Run(kubeconfigPath string,
dlxOptions = resourceScalerConfig.DLXOptions
}

newDLX, err := createDLX(resourceScaler, dlxOptions)
restConfig, err := common.GetClientConfig(kubeconfigPath)
if err != nil {
return errors.Wrap(err, "Failed to get client configuration")
}

kubeClientSet, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return errors.Wrap(err, "Failed to create k8s client set")
}

newDLX, err := createDLX(resourceScaler, dlxOptions, kubeClientSet)
if err != nil {
return errors.Wrap(err, "Failed to create dlx")
}
Expand All @@ -88,7 +100,11 @@ func Run(kubeconfigPath string,
select {}
}

func createDLX(resourceScaler scalertypes.ResourceScaler, options scalertypes.DLXOptions) (*dlx.DLX, error) {
func createDLX(
resourceScaler scalertypes.ResourceScaler,
options scalertypes.DLXOptions,
kubeClientSet kubernetes.Interface,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not adding kubeClientSet into options?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially assumed that options was meant only for static variables or configuration values.
From your comment, I understand that including runtime dependencies like the Kubernetes client in options is acceptable here—and likely the better practice in this case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

) (*dlx.DLX, error) {
rootLogger, err := nucliozap.NewNuclioZap("scaler",
"console",
nil,
Expand All @@ -99,7 +115,7 @@ func createDLX(resourceScaler scalertypes.ResourceScaler, options scalertypes.DL
return nil, errors.Wrap(err, "Failed to initialize root logger")
}

newScaler, err := dlx.NewDLX(rootLogger, resourceScaler, options)
newScaler, err := dlx.NewDLX(rootLogger, resourceScaler, options, kubeClientSet)

if err != nil {
return nil, err
Expand Down
11 changes: 11 additions & 0 deletions pkg/common/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ package common
import (
"math/rand"
"time"

"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)

var SeededRand = rand.New(rand.NewSource(time.Now().UnixNano()))
Expand All @@ -38,3 +41,11 @@ func UniquifyStringSlice(stringList []string) []string {
}
return list
}

func GetClientConfig(kubeconfigPath string) (*rest.Config, error) {
if kubeconfigPath != "" {
return clientcmd.BuildConfigFromFlags("", kubeconfigPath)
}

return rest.InClusterConfig()
}
33 changes: 32 additions & 1 deletion pkg/dlx/dlx.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,28 @@ import (
"context"
"net/http"

"github.com/v3io/scaler/pkg/ingresscache"
"github.com/v3io/scaler/pkg/kube"
"github.com/v3io/scaler/pkg/scalertypes"

"github.com/nuclio/errors"
"github.com/nuclio/logger"
"k8s.io/client-go/kubernetes"
)

type DLX struct {
logger logger.Logger
handler Handler
server *http.Server
cache ingresscache.IngressHostCache
watcher *kube.IngressWatcher
}

func NewDLX(parentLogger logger.Logger,
resourceScaler scalertypes.ResourceScaler,
options scalertypes.DLXOptions) (*DLX, error) {
options scalertypes.DLXOptions,
kubeClientSet kubernetes.Interface,
) (*DLX, error) {
childLogger := parentLogger.GetChild("dlx")
childLogger.InfoWith("Creating DLX",
"options", options)
Expand All @@ -50,6 +57,7 @@ func NewDLX(parentLogger logger.Logger,
return nil, errors.Wrap(err, "Failed to create function starter")
}

cache := ingresscache.NewIngressCache(childLogger)
handler, err := NewHandler(childLogger,
resourceStarter,
resourceScaler,
Expand All @@ -61,23 +69,46 @@ func NewDLX(parentLogger logger.Logger,
return nil, errors.Wrap(err, "Failed to create handler")
}

watcher, err := kube.NewIngressWatcher(
context.Background(),
childLogger,
kubeClientSet,
cache,
options.ResolveTargetsFromIngressCallback,
options.ResyncInterval,
options.Namespace,
options.LabelSelector,
)
if err != nil {
return nil, errors.Wrap(err, "Failed to create ingress watcher")
}

return &DLX{
logger: childLogger,
handler: handler,
server: &http.Server{
Addr: options.ListenAddress,
},
cache: cache,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why passing cache here if we already have it in watcher?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The watcher updates the cache, but the DLX itself will need to read from it when receiving a request.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TomerShor but we can get it by dlx.watcher.cache, I think here it's better to stick with a single ownership and single source of truth (can also be done via encapsulation like watcher.Cache() or something if we need to only read it)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, we could access the cache via watcher.cache, but I think it's cleaner to initialize the cache in the main function and pass it explicitly to both the watcher and (in the upcoming PR) to the request handler.

This approach promotes loose coupling — the request handler and the watcher both interact with the cache, but neither should own it. More importantly, it’s preferable that the request handler remains unaware of the watcher operator to maintain a clear separation of concerns. This approach helps minimize unnecessary dependencies between components.

Since the cache is fully wired during initialization, having it stored on the DLX struct may be redundant — I’ll consider removing it if it’s not directly needed there.

Let me know what you think =]

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the cache from the DLX struct - CR comments - remove the cache from the DLX struct

Copy link
Collaborator

@rokatyy rokatyy Jul 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@weilerN Watcher writes to the cache, and DLX reads it. Since watcher is a part of DLX, the idea of initializing and passing the cache separately to both contradicts the Single Responsibility Principle. Only the watcher should have the ability to modify the cache; DLX should not — it merely reads from it. Allowing both components to be independently wired with the same cache introduces unnecessary risk.

This approach helps minimize unnecessary dependencies between components.

The question here is: why do we want to minimize dependencies in this specific case? In fact, the dependency between DLX and watcher isn't unnecessary — it's inherent. DLX already owns the watcher; reading data from a cache managed by its child is a natural and clear dependency.

Trying to artificially decouple them reduce "visible" coupling, it weakens architectural clarity. It also opens the door to subtle bugs — for example, if someone mistakenly passes a copy or separate instance of the cache, we’ll end up with divergence that’s hard to track. This kind of split ownership violates the principle of having a single source of truth.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rokatyy Just to clarify — it’s the request handler, not the DLX itself, that reads from the cache. So either we pass the cache directly to the handler, or we pass the watcher and have the handler access the cache through the watcher.

If the concern is about restricting write access, exposing the full cache still leaves that possibility open. We’d need to either wrap it in the watcher (i.e. - watcher.ResolveFromCache() or something like that).

Since your main concern is ownership and making that responsibility explicit within the handler’s boundaries, I’ll follow your direction.
Let me know which approach you prefer ( watcher.cache.Get() or watcher.ResolveFromCache()), and I’ll update the code accordingly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rokatyy @TomerShor I did some changes here (CR comment - move the cache into the watcher and expose only the cach…) to address all the points discussed in this thread:

  • Exposing only the cache.Get to the handler
  • Single Responsibility Principle- the watcher is the responsible for the cache
  • Single source of truth- the cache inside the watcher

watcher: watcher,
}, nil
}

func (d *DLX) Start() error {
d.logger.DebugWith("Starting", "server", d.server.Addr)
http.HandleFunc("/", d.handler.HandleFunc)

// Start the ingress watcher synchronously to ensure cache is fully synced before DLX begins handling traffic
if err := d.watcher.Start(); err != nil {
return errors.Wrap(err, "Failed to start ingress watcher")
}

go d.server.ListenAndServe() // nolint: errcheck
return nil
}

func (d *DLX) Stop(context context.Context) error {
d.logger.DebugWith("Stopping", "server", d.server.Addr)
d.watcher.Stop()
return d.server.Shutdown(context)
}
2 changes: 1 addition & 1 deletion pkg/ingresscache/ingresscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type IngressCache struct {
func NewIngressCache(logger logger.Logger) *IngressCache {
return &IngressCache{
syncMap: &sync.Map{},
logger: logger,
logger: logger.GetChild("cache"),
}
}

Expand Down
75 changes: 40 additions & 35 deletions pkg/kube/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ package kube

import (
"context"
"time"

"github.com/v3io/scaler/pkg/ingresscache"
"github.com/v3io/scaler/pkg/scalertypes"

"github.com/nuclio/errors"
"github.com/nuclio/logger"
Expand All @@ -35,64 +35,44 @@ import (
"k8s.io/client-go/tools/cache"
)

const (
defaultResyncInterval = 30 * time.Second
)

// ResolveTargetsFromIngressCallback defines a function that extracts a list of target identifiers
// (e.g., names of services the Ingress routes traffic to) from a Kubernetes Ingress resource.
//
// This function is expected to be implemented externally and passed into the IngressWatcher,
// allowing for custom logic such as parsing annotations, labels, or other ingress metadata.
//
// Parameters:
// - ingress: The Kubernetes Ingress resource to extract targets from
//
// Returns:
// - []string: A slice of target identifiers (e.g., service names, endpoint addresses)
// - error: An error if target resolution fails
//
// Implementation guidelines:
// - Return a non-nil slice when targets are successfully resolved
// - Return a non-nil error if resolution fails
// - Should handle nil or malformed Ingress objects gracefully and return an error in such cases
type ResolveTargetsFromIngressCallback func(ingress *networkingv1.Ingress) ([]string, error)

type ingressValue struct {
name string
host string
path string
version string
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to avoid changing the actual code for testing purposes, please remove this

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During development, I initially planned to use this in the actual code (e.g., for logging), but it turned out to be redundant—removing it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

targets []string
}

// IngressWatcher watches for changes in Kubernetes Ingress resources and updates the ingress cache accordingly
type IngressWatcher struct {
ctx context.Context
cancel context.CancelFunc
logger logger.Logger
cache ingresscache.IngressHostCache
factory informers.SharedInformerFactory
informer cache.SharedIndexInformer
resolveTargetsCallback ResolveTargetsFromIngressCallback
resolveTargetsCallback scalertypes.ResolveTargetsFromIngressCallback
}

func NewIngressWatcher(
ctx context.Context,
dlxCtx context.Context,
dlxLogger logger.Logger,
kubeClient kubernetes.Interface,
ingressCache ingresscache.IngressCache,
resolveTargetsCallback ResolveTargetsFromIngressCallback,
resyncTimeout *time.Duration,
ingressCache ingresscache.IngressHostCache,
resolveTargetsCallback scalertypes.ResolveTargetsFromIngressCallback,
resyncInterval scalertypes.Duration,
namespace string,
labelSelector string,
) (*IngressWatcher, error) {
if resyncTimeout == nil {
defaultTimeout := defaultResyncInterval
resyncTimeout = &defaultTimeout
if resyncInterval.Duration == 0 {
resyncInterval = scalertypes.Duration{Duration: scalertypes.DefaultResyncInterval}
}

ctx, cancel := context.WithCancel(dlxCtx)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ctx, cancel := context.WithCancel(dlxCtx)
contextWithCancel, cancel := context.WithCancel(dlxCtx)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


factory := informers.NewSharedInformerFactoryWithOptions(
kubeClient,
*resyncTimeout,
resyncInterval.Duration,
informers.WithNamespace(namespace),
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = labelSelector
Expand All @@ -102,8 +82,9 @@ func NewIngressWatcher(

ingressWatcher := &IngressWatcher{
ctx: ctx,
cancel: cancel,
logger: dlxLogger.GetChild("watcher"),
cache: &ingressCache,
cache: ingressCache,
factory: factory,
informer: ingressInformer,
resolveTargetsCallback: resolveTargetsCallback,
Expand Down Expand Up @@ -135,6 +116,7 @@ func (iw *IngressWatcher) Start() error {

func (iw *IngressWatcher) Stop() {
iw.logger.Info("Stopping ingress watcher")
iw.cancel()
iw.factory.Shutdown()
}

Expand Down Expand Up @@ -167,6 +149,29 @@ func (iw *IngressWatcher) AddHandler(obj interface{}) {
}

func (iw *IngressWatcher) UpdateHandler(oldObj, newObj interface{}) {
oldIngressResource, ok := oldObj.(*networkingv1.Ingress)
if !ok {
iw.logger.DebugWith("Failed to cast old object to Ingress",
"object", oldObj)
return
}

newIngressResource, ok := newObj.(*networkingv1.Ingress)
if !ok {
iw.logger.DebugWith("Failed to cast new object to Ingress",
"object", newObj)
return
}

// ResourceVersion is managed by Kubernetes and indicates whether the resource has changed.
// Comparing resourceVersion helps avoid unnecessary updates triggered by periodic informer resync
if oldIngressResource.ResourceVersion == newIngressResource.ResourceVersion {
iw.logger.DebugWith("No changes in resource, skipping",
"resourceVersion", oldIngressResource.ResourceVersion,
"ingressName", oldIngressResource.Name)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be super spammy - most of the time we won't have changes in the ingress, and with a small resync interval will get 1 log line for every nuclio ingress - could be a very large number.
No need for log here at all.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's why this log is DEBUG =]
But ok I will remove

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return
}

oldIngress, err := iw.extractValuesFromIngressResource(oldObj)
if err != nil {
iw.logger.DebugWith("Update ingress handler - failed to extract values from old object",
Expand Down Expand Up @@ -249,7 +254,7 @@ func (iw *IngressWatcher) extractValuesFromIngressResource(obj interface{}) (*in

targets, err := iw.resolveTargetsCallback(ingress)
if err != nil {
return nil, errors.Wrap(err, "Failed to extract targets from ingress labels")
return nil, errors.Wrapf(err, "Failed to extract targets from ingress labels: %s", err.Error())
}

if len(targets) == 0 {
Expand Down
Loading