Skip to content

Commit 59f2bc3

Browse files
authored
[DLX] Integrate cache and watcher into dlx struct (#76)
### Description This PR integrates the cache and watcher components into the DLX to enable scale-from-zero functionality based on ingress resources rather than relying on request headers. ### Changes Made - Refactored initialization logic in both the ingress cache and ingress watcher operator to support external usage from the DLX. - Integrated `ingresscache.IngressHostCache` and `kube.IngressWatcher` into the DLX. - Optimize the watcher's `UpdateHandler` by adding a fast exit when the old and new objects share the same `ResourceVersion` ### References - Jira ticket link - https://iguazio.atlassian.net/browse/NUC-510 ### Additional Notes - This task will be ready for review once manual testing is complete, including the Nuclio side. - A separate ticket will handle integrating the cache into the `requestHandler` as part of completing the full solution
1 parent 23d5465 commit 59f2bc3

File tree

9 files changed

+182
-70
lines changed

9 files changed

+182
-70
lines changed

cmd/autoscaler/app/autoscaler.go

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"time"
2626

2727
"github.com/v3io/scaler/pkg/autoscaler"
28+
"github.com/v3io/scaler/pkg/common"
2829
"github.com/v3io/scaler/pkg/pluginloader"
2930
"github.com/v3io/scaler/pkg/scalertypes"
3031

@@ -35,7 +36,6 @@ import (
3536
"k8s.io/client-go/discovery/cached/memory"
3637
"k8s.io/client-go/rest"
3738
"k8s.io/client-go/restmapper"
38-
"k8s.io/client-go/tools/clientcmd"
3939
"k8s.io/metrics/pkg/client/custom_metrics"
4040
)
4141

@@ -73,7 +73,7 @@ func Run(kubeconfigPath string,
7373
autoScalerOptions = resourceScalerConfig.AutoScalerOptions
7474
}
7575

76-
restConfig, err := getClientConfig(kubeconfigPath)
76+
restConfig, err := common.GetClientConfig(kubeconfigPath)
7777
if err != nil {
7878
return errors.Wrap(err, "Failed to get client configuration")
7979
}
@@ -119,11 +119,3 @@ func createAutoScaler(restConfig *rest.Config,
119119

120120
return newScaler, nil
121121
}
122-
123-
func getClientConfig(kubeconfigPath string) (*rest.Config, error) {
124-
if kubeconfigPath != "" {
125-
return clientcmd.BuildConfigFromFlags("", kubeconfigPath)
126-
}
127-
128-
return rest.InClusterConfig()
129-
}

cmd/dlx/app/dlx.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,14 @@ import (
2424
"os"
2525
"time"
2626

27+
"github.com/v3io/scaler/pkg/common"
2728
"github.com/v3io/scaler/pkg/dlx"
2829
"github.com/v3io/scaler/pkg/pluginloader"
2930
"github.com/v3io/scaler/pkg/scalertypes"
3031

3132
"github.com/nuclio/errors"
3233
"github.com/nuclio/zap"
34+
"k8s.io/client-go/kubernetes"
3335
)
3436

3537
func Run(kubeconfigPath string,
@@ -75,6 +77,15 @@ func Run(kubeconfigPath string,
7577
dlxOptions = resourceScalerConfig.DLXOptions
7678
}
7779

80+
restConfig, err := common.GetClientConfig(kubeconfigPath)
81+
if err != nil {
82+
return errors.Wrap(err, "Failed to get client configuration")
83+
}
84+
85+
if dlxOptions.KubeClientSet, err = kubernetes.NewForConfig(restConfig); err != nil {
86+
return errors.Wrap(err, "Failed to create k8s client set")
87+
}
88+
7889
newDLX, err := createDLX(resourceScaler, dlxOptions)
7990
if err != nil {
8091
return errors.Wrap(err, "Failed to create dlx")
@@ -88,7 +99,10 @@ func Run(kubeconfigPath string,
8899
select {}
89100
}
90101

91-
func createDLX(resourceScaler scalertypes.ResourceScaler, options scalertypes.DLXOptions) (*dlx.DLX, error) {
102+
func createDLX(
103+
resourceScaler scalertypes.ResourceScaler,
104+
options scalertypes.DLXOptions,
105+
) (*dlx.DLX, error) {
92106
rootLogger, err := nucliozap.NewNuclioZap("scaler",
93107
"console",
94108
nil,

pkg/common/helpers.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ package common
2323
import (
2424
"math/rand"
2525
"time"
26+
27+
"k8s.io/client-go/rest"
28+
"k8s.io/client-go/tools/clientcmd"
2629
)
2730

2831
var SeededRand = rand.New(rand.NewSource(time.Now().UnixNano()))
@@ -38,3 +41,11 @@ func UniquifyStringSlice(stringList []string) []string {
3841
}
3942
return list
4043
}
44+
45+
func GetClientConfig(kubeconfigPath string) (*rest.Config, error) {
46+
if kubeconfigPath != "" {
47+
return clientcmd.BuildConfigFromFlags("", kubeconfigPath)
48+
}
49+
50+
return rest.InClusterConfig()
51+
}

pkg/dlx/dlx.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"context"
2525
"net/http"
2626

27+
"github.com/v3io/scaler/pkg/kube"
2728
"github.com/v3io/scaler/pkg/scalertypes"
2829

2930
"github.com/nuclio/errors"
@@ -34,6 +35,7 @@ type DLX struct {
3435
logger logger.Logger
3536
handler Handler
3637
server *http.Server
38+
watcher *kube.IngressWatcher
3739
}
3840

3941
func NewDLX(parentLogger logger.Logger,
@@ -50,6 +52,19 @@ func NewDLX(parentLogger logger.Logger,
5052
return nil, errors.Wrap(err, "Failed to create function starter")
5153
}
5254

55+
watcher, err := kube.NewIngressWatcher(
56+
context.Background(),
57+
childLogger,
58+
options.KubeClientSet,
59+
options.ResolveTargetsFromIngressCallback,
60+
options.ResyncInterval,
61+
options.Namespace,
62+
options.LabelSelector,
63+
)
64+
if err != nil {
65+
return nil, errors.Wrap(err, "Failed to create ingress watcher")
66+
}
67+
5368
handler, err := NewHandler(childLogger,
5469
resourceStarter,
5570
resourceScaler,
@@ -67,17 +82,25 @@ func NewDLX(parentLogger logger.Logger,
6782
server: &http.Server{
6883
Addr: options.ListenAddress,
6984
},
85+
watcher: watcher,
7086
}, nil
7187
}
7288

7389
func (d *DLX) Start() error {
7490
d.logger.DebugWith("Starting", "server", d.server.Addr)
7591
http.HandleFunc("/", d.handler.HandleFunc)
92+
93+
// Start the ingress watcher synchronously to ensure cache is fully synced before DLX begins handling traffic
94+
if err := d.watcher.Start(); err != nil {
95+
return errors.Wrap(err, "Failed to start ingress watcher")
96+
}
97+
7698
go d.server.ListenAndServe() // nolint: errcheck
7799
return nil
78100
}
79101

80102
func (d *DLX) Stop(context context.Context) error {
81103
d.logger.DebugWith("Stopping", "server", d.server.Addr)
104+
d.watcher.Stop()
82105
return d.server.Shutdown(context)
83106
}

pkg/ingresscache/ingresscache.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ type IngressCache struct {
3535
func NewIngressCache(logger logger.Logger) *IngressCache {
3636
return &IngressCache{
3737
syncMap: &sync.Map{},
38-
logger: logger,
38+
logger: logger.GetChild("cache"),
3939
}
4040
}
4141

pkg/ingresscache/types.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,19 @@ such restriction.
2020

2121
package ingresscache
2222

23+
type IngressHostCacheReader interface {
24+
// Get retrieves all target names for the given host and path
25+
Get(host string, path string) ([]string, error)
26+
}
27+
2328
type IngressHostCache interface {
29+
IngressHostCacheReader
30+
2431
// Set adds a new item to the cache for the given host, path and targets. Will overwrite existing values if any
2532
Set(host string, path string, targets []string) error
2633

2734
// Delete removes the specified targets from the cache for the given host and path. Will do nothing if host, path or targets do not exist
2835
Delete(host string, path string, targets []string) error
29-
30-
// Get retrieves all target names for the given host and path
31-
Get(host string, path string) ([]string, error)
3236
}
3337

3438
type IngressHostsTree interface {

pkg/kube/ingress.go

Lines changed: 41 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ package kube
2222

2323
import (
2424
"context"
25-
"time"
2625

2726
"github.com/v3io/scaler/pkg/ingresscache"
27+
"github.com/v3io/scaler/pkg/scalertypes"
2828

2929
"github.com/nuclio/errors"
3030
"github.com/nuclio/logger"
@@ -35,29 +35,6 @@ import (
3535
"k8s.io/client-go/tools/cache"
3636
)
3737

38-
const (
39-
defaultResyncInterval = 30 * time.Second
40-
)
41-
42-
// ResolveTargetsFromIngressCallback defines a function that extracts a list of target identifiers
43-
// (e.g., names of services the Ingress routes traffic to) from a Kubernetes Ingress resource.
44-
//
45-
// This function is expected to be implemented externally and passed into the IngressWatcher,
46-
// allowing for custom logic such as parsing annotations, labels, or other ingress metadata.
47-
//
48-
// Parameters:
49-
// - ingress: The Kubernetes Ingress resource to extract targets from
50-
//
51-
// Returns:
52-
// - []string: A slice of target identifiers (e.g., service names, endpoint addresses)
53-
// - error: An error if target resolution fails
54-
//
55-
// Implementation guidelines:
56-
// - Return a non-nil slice when targets are successfully resolved
57-
// - Return a non-nil error if resolution fails
58-
// - Should handle nil or malformed Ingress objects gracefully and return an error in such cases
59-
type ResolveTargetsFromIngressCallback func(ingress *networkingv1.Ingress) ([]string, error)
60-
6138
type ingressValue struct {
6239
name string
6340
host string
@@ -68,31 +45,32 @@ type ingressValue struct {
6845
// IngressWatcher watches for changes in Kubernetes Ingress resources and updates the ingress cache accordingly
6946
type IngressWatcher struct {
7047
ctx context.Context
48+
cancel context.CancelFunc
7149
logger logger.Logger
7250
cache ingresscache.IngressHostCache
7351
factory informers.SharedInformerFactory
7452
informer cache.SharedIndexInformer
75-
resolveTargetsCallback ResolveTargetsFromIngressCallback
53+
resolveTargetsCallback scalertypes.ResolveTargetsFromIngressCallback
7654
}
7755

7856
func NewIngressWatcher(
79-
ctx context.Context,
57+
dlxCtx context.Context,
8058
dlxLogger logger.Logger,
8159
kubeClient kubernetes.Interface,
82-
ingressCache ingresscache.IngressCache,
83-
resolveTargetsCallback ResolveTargetsFromIngressCallback,
84-
resyncTimeout *time.Duration,
60+
resolveTargetsCallback scalertypes.ResolveTargetsFromIngressCallback,
61+
resyncInterval scalertypes.Duration,
8562
namespace string,
8663
labelSelector string,
8764
) (*IngressWatcher, error) {
88-
if resyncTimeout == nil {
89-
defaultTimeout := defaultResyncInterval
90-
resyncTimeout = &defaultTimeout
65+
if resyncInterval.Duration == 0 {
66+
resyncInterval = scalertypes.Duration{Duration: scalertypes.DefaultResyncInterval}
9167
}
9268

69+
ctxWithCancel, cancel := context.WithCancel(dlxCtx)
70+
9371
factory := informers.NewSharedInformerFactoryWithOptions(
9472
kubeClient,
95-
*resyncTimeout,
73+
resyncInterval.Duration,
9674
informers.WithNamespace(namespace),
9775
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
9876
options.LabelSelector = labelSelector
@@ -101,9 +79,10 @@ func NewIngressWatcher(
10179
ingressInformer := factory.Networking().V1().Ingresses().Informer()
10280

10381
ingressWatcher := &IngressWatcher{
104-
ctx: ctx,
82+
ctx: ctxWithCancel,
83+
cancel: cancel,
10584
logger: dlxLogger.GetChild("watcher"),
106-
cache: &ingressCache,
85+
cache: ingresscache.NewIngressCache(dlxLogger),
10786
factory: factory,
10887
informer: ingressInformer,
10988
resolveTargetsCallback: resolveTargetsCallback,
@@ -135,9 +114,15 @@ func (iw *IngressWatcher) Start() error {
135114

136115
func (iw *IngressWatcher) Stop() {
137116
iw.logger.Info("Stopping ingress watcher")
117+
iw.cancel()
138118
iw.factory.Shutdown()
139119
}
140120

121+
// GetIngressHostCacheReader expose read-only access to the ingress cache
122+
func (iw *IngressWatcher) GetIngressHostCacheReader() ingresscache.IngressHostCacheReader {
123+
return iw.cache
124+
}
125+
141126
// --- ResourceEventHandler methods ---
142127

143128
func (iw *IngressWatcher) AddHandler(obj interface{}) {
@@ -167,6 +152,26 @@ func (iw *IngressWatcher) AddHandler(obj interface{}) {
167152
}
168153

169154
func (iw *IngressWatcher) UpdateHandler(oldObj, newObj interface{}) {
155+
oldIngressResource, ok := oldObj.(*networkingv1.Ingress)
156+
if !ok {
157+
iw.logger.DebugWith("Failed to cast old object to Ingress",
158+
"object", oldObj)
159+
return
160+
}
161+
162+
newIngressResource, ok := newObj.(*networkingv1.Ingress)
163+
if !ok {
164+
iw.logger.DebugWith("Failed to cast new object to Ingress",
165+
"object", newObj)
166+
return
167+
}
168+
169+
// ResourceVersion is managed by Kubernetes and indicates whether the resource has changed.
170+
// Comparing resourceVersion helps avoid unnecessary updates triggered by periodic informer resync
171+
if oldIngressResource.ResourceVersion == newIngressResource.ResourceVersion {
172+
return
173+
}
174+
170175
oldIngress, err := iw.extractValuesFromIngressResource(oldObj)
171176
if err != nil {
172177
iw.logger.DebugWith("Update ingress handler - failed to extract values from old object",
@@ -249,7 +254,7 @@ func (iw *IngressWatcher) extractValuesFromIngressResource(obj interface{}) (*in
249254

250255
targets, err := iw.resolveTargetsCallback(ingress)
251256
if err != nil {
252-
return nil, errors.Wrap(err, "Failed to extract targets from ingress labels")
257+
return nil, errors.Wrapf(err, "Failed to extract targets from ingress labels: %s", err.Error())
253258
}
254259

255260
if len(targets) == 0 {

0 commit comments

Comments
 (0)