Skip to content
Merged
7 changes: 3 additions & 4 deletions cmd/dlx/app/dlx.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,12 @@ func Run(kubeconfigPath string,
return errors.Wrap(err, "Failed to get client configuration")
}

kubeClientSet, err := kubernetes.NewForConfig(restConfig)
dlxOptions.KubeClientSet, err = kubernetes.NewForConfig(restConfig)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if dlxOptions.KubeClientSet, err = kubernetes.NewForConfig(restConfig); err != nil {}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if err != nil {
return errors.Wrap(err, "Failed to create k8s client set")
}

newDLX, err := createDLX(resourceScaler, dlxOptions, kubeClientSet)
newDLX, err := createDLX(resourceScaler, dlxOptions)
if err != nil {
return errors.Wrap(err, "Failed to create dlx")
}
Expand All @@ -103,7 +103,6 @@ func Run(kubeconfigPath string,
func createDLX(
resourceScaler scalertypes.ResourceScaler,
options scalertypes.DLXOptions,
kubeClientSet kubernetes.Interface,
) (*dlx.DLX, error) {
rootLogger, err := nucliozap.NewNuclioZap("scaler",
"console",
Expand All @@ -115,7 +114,7 @@ func createDLX(
return nil, errors.Wrap(err, "Failed to initialize root logger")
}

newScaler, err := dlx.NewDLX(rootLogger, resourceScaler, options, kubeClientSet)
newScaler, err := dlx.NewDLX(rootLogger, resourceScaler, options)

if err != nil {
return nil, err
Expand Down
7 changes: 2 additions & 5 deletions pkg/dlx/dlx.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (

"github.com/nuclio/errors"
"github.com/nuclio/logger"
"k8s.io/client-go/kubernetes"
)

type DLX struct {
Expand All @@ -43,9 +42,7 @@ type DLX struct {

func NewDLX(parentLogger logger.Logger,
resourceScaler scalertypes.ResourceScaler,
options scalertypes.DLXOptions,
kubeClientSet kubernetes.Interface,
) (*DLX, error) {
options scalertypes.DLXOptions) (*DLX, error) {
childLogger := parentLogger.GetChild("dlx")
childLogger.InfoWith("Creating DLX",
"options", options)
Expand All @@ -72,7 +69,7 @@ func NewDLX(parentLogger logger.Logger,
watcher, err := kube.NewIngressWatcher(
context.Background(),
childLogger,
kubeClientSet,
options.KubeClientSet,
cache,
options.ResolveTargetsFromIngressCallback,
options.ResyncInterval,
Expand Down
8 changes: 2 additions & 6 deletions pkg/kube/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ type ingressValue struct {
name string
host string
path string
version string
targets []string
}

Expand Down Expand Up @@ -68,7 +67,7 @@ func NewIngressWatcher(
resyncInterval = scalertypes.Duration{Duration: scalertypes.DefaultResyncInterval}
}

ctx, cancel := context.WithCancel(dlxCtx)
ctxWithCancel, cancel := context.WithCancel(dlxCtx)

factory := informers.NewSharedInformerFactoryWithOptions(
kubeClient,
Expand All @@ -81,7 +80,7 @@ func NewIngressWatcher(
ingressInformer := factory.Networking().V1().Ingresses().Informer()

ingressWatcher := &IngressWatcher{
ctx: ctx,
ctx: ctxWithCancel,
cancel: cancel,
logger: dlxLogger.GetChild("watcher"),
cache: ingressCache,
Expand Down Expand Up @@ -166,9 +165,6 @@ func (iw *IngressWatcher) UpdateHandler(oldObj, newObj interface{}) {
// ResourceVersion is managed by Kubernetes and indicates whether the resource has changed.
// Comparing resourceVersion helps avoid unnecessary updates triggered by periodic informer resync
if oldIngressResource.ResourceVersion == newIngressResource.ResourceVersion {
iw.logger.DebugWith("No changes in resource, skipping",
"resourceVersion", oldIngressResource.ResourceVersion,
"ingressName", oldIngressResource.Name)
return
}

Expand Down
26 changes: 14 additions & 12 deletions pkg/kube/ingress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (suite *IngressWatcherTestSuite) TestAddHandler() {
testIngressWatcher, err := suite.createTestIngressWatcher()
suite.Require().NoError(err)

testObj = suite.createDummyIngress(testCase.testArgs.host, testCase.testArgs.path, testCase.testArgs.version, testCase.testArgs.targets)
testObj = suite.createDummyIngress(testCase.testArgs.host, testCase.testArgs.path, "1", testCase.testArgs.targets)

if testCase.expectError {
testObj = &networkingv1.IngressSpec{}
Expand Down Expand Up @@ -150,6 +150,8 @@ func (suite *IngressWatcherTestSuite) TestUpdateHandler() {
initialCachedData *ingressValue
testOldObj ingressValue
testNewObj ingressValue
OldObjVersion string
newObjVersion string
}{
{
name: "Update PairTarget - same host and path, different targets",
Expand All @@ -163,15 +165,15 @@ func (suite *IngressWatcherTestSuite) TestUpdateHandler() {
testOldObj: ingressValue{
host: "www.example.com",
path: "/test/path",
version: "1",
targets: []string{"test-targets-name-1", "test-targets-name-2"},
},
OldObjVersion: "1",
testNewObj: ingressValue{
host: "www.example.com",
path: "/test/path",
version: "2",
targets: []string{"test-targets-name-1", "test-targets-name-3"},
},
newObjVersion: "2",
initialCachedData: &ingressValue{
host: "www.example.com",
path: "/test/path",
Expand All @@ -189,15 +191,15 @@ func (suite *IngressWatcherTestSuite) TestUpdateHandler() {
testOldObj: ingressValue{
host: "www.example.com",
path: "/test/path",
version: "1",
targets: []string{"test-targets-name-1", "test-targets-name-2"},
},
testNewObj: ingressValue{
host: "www.example.com",
path: "/test/path",
version: "1",
targets: []string{"test-targets-name-1", "test-targets-name-3"},
},
OldObjVersion: "1",
newObjVersion: "1",
initialCachedData: &ingressValue{
host: "www.example.com",
path: "/test/path",
Expand All @@ -213,15 +215,15 @@ func (suite *IngressWatcherTestSuite) TestUpdateHandler() {
testOldObj: ingressValue{
host: "www.example.com",
path: "/test/path",
version: "1",
targets: []string{"test-targets-name-1", "test-targets-name-2"},
},
OldObjVersion: "1",
testNewObj: ingressValue{
host: "www.example.com",
path: "/another/path",
version: "2",
targets: []string{"test-targets-name-1", "test-targets-name-2"},
},
newObjVersion: "2",
expectedResults: []expectedResult{
{
host: "www.example.com",
Expand All @@ -244,15 +246,15 @@ func (suite *IngressWatcherTestSuite) TestUpdateHandler() {
testOldObj: ingressValue{
host: "www.example.com",
path: "/test/path",
version: "1",
targets: []string{"test-targets-name-1", "test-targets-name-2"},
},
OldObjVersion: "1",
testNewObj: ingressValue{
host: "www.google.com",
path: "/test/path",
version: "2",
targets: []string{"test-targets-name-1", "test-targets-name-2"},
},
newObjVersion: "2",
expectedResults: []expectedResult{
{
host: "www.example.com",
Expand All @@ -271,8 +273,8 @@ func (suite *IngressWatcherTestSuite) TestUpdateHandler() {
testIngressWatcher, err := suite.createTestIngressWatcher()
suite.Require().NoError(err)

testOldObj := suite.createDummyIngress(testCase.testOldObj.host, testCase.testOldObj.path, testCase.testOldObj.version, testCase.testOldObj.targets)
testNewObj := suite.createDummyIngress(testCase.testNewObj.host, testCase.testNewObj.path, testCase.testNewObj.version, testCase.testNewObj.targets)
testOldObj := suite.createDummyIngress(testCase.testOldObj.host, testCase.testOldObj.path, testCase.OldObjVersion, testCase.testOldObj.targets)
testNewObj := suite.createDummyIngress(testCase.testNewObj.host, testCase.testNewObj.path, testCase.newObjVersion, testCase.testNewObj.targets)

if testCase.initialCachedData != nil {
err = testIngressWatcher.cache.Set(testCase.initialCachedData.host, testCase.initialCachedData.path, testCase.initialCachedData.targets)
Expand Down Expand Up @@ -370,7 +372,7 @@ func (suite *IngressWatcherTestSuite) TestDeleteHandler() {
testIngressWatcher, err := suite.createTestIngressWatcher()
suite.Require().NoError(err)

testObj = suite.createDummyIngress(testCase.testArgs.host, testCase.testArgs.path, testCase.testArgs.version, testCase.testArgs.targets)
testObj = suite.createDummyIngress(testCase.testArgs.host, testCase.testArgs.path, "1", testCase.testArgs.targets)

if testCase.expectError {
testObj = &networkingv1.IngressSpec{}
Expand Down
2 changes: 2 additions & 0 deletions pkg/scalertypes/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/nuclio/errors"
networkingv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes"
)

type AutoScalerOptions struct {
Expand Down Expand Up @@ -88,6 +89,7 @@ type DLXOptions struct {
LabelSelector string
ResolveTargetsFromIngressCallback ResolveTargetsFromIngressCallback `json:"-"`
ResyncInterval Duration
KubeClientSet kubernetes.Interface `json:"-"`
}

type ResourceScaler interface {
Expand Down