@@ -24,21 +24,28 @@ import (
2424 "context"
2525 "net/http"
2626
27+ "github.com/v3io/scaler/pkg/ingresscache"
28+ "github.com/v3io/scaler/pkg/kube"
2729 "github.com/v3io/scaler/pkg/scalertypes"
2830
2931 "github.com/nuclio/errors"
3032 "github.com/nuclio/logger"
33+ "k8s.io/client-go/kubernetes"
3134)
3235
3336type DLX struct {
34- logger logger.Logger
35- handler Handler
36- server * http.Server
37+ logger logger.Logger
38+ handler Handler
39+ server * http.Server
40+ cache ingresscache.IngressHostCache
41+ watchOperator * kube.IngressWatcher
3742}
3843
3944func NewDLX (parentLogger logger.Logger ,
4045 resourceScaler scalertypes.ResourceScaler ,
41- options scalertypes.DLXOptions ) (* DLX , error ) {
46+ options scalertypes.DLXOptions ,
47+ kubeClientSet kubernetes.Interface ,
48+ ) (* DLX , error ) {
4249 childLogger := parentLogger .GetChild ("dlx" )
4350 childLogger .InfoWith ("Creating DLX" ,
4451 "options" , options )
@@ -50,34 +57,57 @@ func NewDLX(parentLogger logger.Logger,
5057 return nil , errors .Wrap (err , "Failed to create function starter" )
5158 }
5259
60+ cache := ingresscache .NewIngressCache (childLogger )
5361 handler , err := NewHandler (childLogger ,
5462 resourceStarter ,
5563 resourceScaler ,
5664 options .TargetNameHeader ,
5765 options .TargetPathHeader ,
5866 options .TargetPort ,
59- options .MultiTargetStrategy )
67+ options .MultiTargetStrategy ,
68+ cache )
6069 if err != nil {
6170 return nil , errors .Wrap (err , "Failed to create handler" )
6271 }
6372
73+ watcherOperator , err := kube .NewIngressWatcher (
74+ context .Background (),
75+ childLogger ,
76+ kubeClientSet ,
77+ cache ,
78+ options .ResolveTargetsFromIngressCallback ,
79+ options .ResyncInterval ,
80+ options .Namespace ,
81+ options .LabelSelector ,
82+ )
83+ if err != nil {
84+ return nil , errors .Wrap (err , "Failed to create ingress watcher" )
85+ }
86+
6487 return & DLX {
6588 logger : childLogger ,
6689 handler : handler ,
6790 server : & http.Server {
6891 Addr : options .ListenAddress ,
6992 },
93+ cache : cache ,
94+ watchOperator : watcherOperator ,
7095 }, nil
7196}
7297
7398func (d * DLX ) Start () error {
7499 d .logger .DebugWith ("Starting" , "server" , d .server .Addr )
100+ // watchOperator.Start() is non-blocking and runs informers in background goroutines
101+ if err := d .watchOperator .Start (); err != nil {
102+ return errors .Wrap (err , "Failed to start ingress watcher" )
103+ }
75104 http .HandleFunc ("/" , d .handler .HandleFunc )
76105 go d .server .ListenAndServe () // nolint: errcheck
77106 return nil
78107}
79108
80109func (d * DLX ) Stop (context context.Context ) error {
81110 d .logger .DebugWith ("Stopping" , "server" , d .server .Addr )
111+ d .watchOperator .Stop ()
82112 return d .server .Shutdown (context )
83113}
0 commit comments