diff --git a/pkg/dlx/dlx.go b/pkg/dlx/dlx.go index 885ebbe6..ae9fa876 100644 --- a/pkg/dlx/dlx.go +++ b/pkg/dlx/dlx.go @@ -71,7 +71,8 @@ func NewDLX(parentLogger logger.Logger, options.TargetNameHeader, options.TargetPathHeader, options.TargetPort, - options.MultiTargetStrategy) + options.MultiTargetStrategy, + watcher.GetIngressHostCacheReader()) if err != nil { return nil, errors.Wrap(err, "Failed to create handler") } diff --git a/pkg/dlx/handler.go b/pkg/dlx/handler.go index 9980ecc5..e40c3a25 100644 --- a/pkg/dlx/handler.go +++ b/pkg/dlx/handler.go @@ -30,6 +30,7 @@ import ( "sync" "time" + "github.com/v3io/scaler/pkg/ingresscache" "github.com/v3io/scaler/pkg/scalertypes" "github.com/nuclio/errors" @@ -49,6 +50,7 @@ type Handler struct { targetURLCache *cache.LRUExpireCache proxyLock sync.Locker lastProxyErrorTime time.Time + ingressCache ingresscache.IngressHostCacheReader } func NewHandler(parentLogger logger.Logger, @@ -57,7 +59,8 @@ func NewHandler(parentLogger logger.Logger, targetNameHeader string, targetPathHeader string, targetPort int, - multiTargetStrategy scalertypes.MultiTargetStrategy) (Handler, error) { + multiTargetStrategy scalertypes.MultiTargetStrategy, + ingressCache ingresscache.IngressHostCacheReader) (Handler, error) { h := Handler{ logger: parentLogger.GetChild("handler"), resourceStarter: resourceStarter, @@ -69,17 +72,17 @@ func NewHandler(parentLogger logger.Logger, targetURLCache: cache.NewLRUExpireCache(100), proxyLock: &sync.Mutex{}, lastProxyErrorTime: time.Now(), + ingressCache: ingressCache, } h.HandleFunc = h.handleRequest return h, nil } func (h *Handler) handleRequest(res http.ResponseWriter, req *http.Request) { + var path string + var err error var resourceNames []string - responseChannel := make(chan ResourceStatusResult, 1) - defer close(responseChannel) - // first try to see if our request came from ingress controller forwardedHost := req.Header.Get("X-Forwarded-Host") forwardedPort := req.Header.Get("X-Forwarded-Port") @@ -97,15 +100,15 @@ func (h *Handler) handleRequest(res http.ResponseWriter, req *http.Request) { resourceNames = append(resourceNames, resourceName) resourceTargetURLMap[resourceName] = targetURL } else { - targetNameHeaderValue := req.Header.Get(h.targetNameHeader) - path := req.Header.Get(h.targetPathHeader) - if targetNameHeaderValue == "" { - h.logger.WarnWith("When ingress not set, must pass header value", - "missingHeader", h.targetNameHeader) + path, resourceNames, err = h.getResourceNameAndPath(req) + if err != nil { + h.logger.WarnWith("Failed to get resource names and path from request", + "error", err.Error(), + "host", req.Host, + "path", req.URL.Path) res.WriteHeader(http.StatusBadRequest) return } - resourceNames = strings.Split(targetNameHeaderValue, ",") for _, resourceName := range resourceNames { targetURL, status := h.parseTargetURL(resourceName, path) if targetURL == nil { @@ -163,6 +166,43 @@ func (h *Handler) handleRequest(res http.ResponseWriter, req *http.Request) { proxy.ServeHTTP(res, req) } +func (h *Handler) getResourceNameAndPath(req *http.Request) (string, []string, error) { + // first try to get the target name and path from the ingress cache + path, resourceNames, err := h.extractValuesFromIngress(req) + if err == nil { + return path, resourceNames, nil + } + + h.logger.DebugWith("Failed to get target name from ingress cache, try to extract from the request headers", + "host", req.Host, + "path", req.URL.Path, + "error", err.Error()) + + // old implementation for backward compatibility + targetNameHeaderValue := req.Header.Get(h.targetNameHeader) + path = req.Header.Get(h.targetPathHeader) + if targetNameHeaderValue == "" { + return "", nil, errors.New("No target name header found") + } + resourceNames = strings.Split(targetNameHeaderValue, ",") + return path, resourceNames, nil +} + +func (h *Handler) extractValuesFromIngress(req *http.Request) (string, []string, error) { + host := req.Host + path := req.URL.Path + resourceNames, err := h.ingressCache.Get(host, path) + if err != nil { + return "", nil, errors.New("Failed to get target name from ingress cache") + } + + if len(resourceNames) == 0 { + return "", nil, errors.New("No resourceNames found in ingress cache") + } + + return path, resourceNames, nil +} + func (h *Handler) parseTargetURL(resourceName, path string) (*url.URL, int) { serviceName, err := h.resourceScaler.ResolveServiceName(scalertypes.Resource{Name: resourceName}) if err != nil { @@ -178,17 +218,17 @@ func (h *Handler) parseTargetURL(resourceName, path string) (*url.URL, int) { } func (h *Handler) startResources(resourceNames []string) *ResourceStatusResult { - responseChannel := make(chan ResourceStatusResult, len(resourceNames)) - defer close(responseChannel) + responseChan := make(chan ResourceStatusResult, len(resourceNames)) + defer close(responseChan) // Start all resources in separate go routines for _, resourceName := range resourceNames { - go h.resourceStarter.handleResourceStart(resourceName, responseChannel) + go h.resourceStarter.handleResourceStart(resourceName, responseChan) } // Wait for all resources to finish starting for range resourceNames { - statusResult := <-responseChannel + statusResult := <-responseChan if statusResult.Error != nil { h.logger.WarnWith("Failed to start resource", diff --git a/pkg/dlx/handler_test.go b/pkg/dlx/handler_test.go new file mode 100644 index 00000000..83a2826f --- /dev/null +++ b/pkg/dlx/handler_test.go @@ -0,0 +1,264 @@ +package dlx + +import ( + "errors" + "net/http" + "net/http/httptest" + "net/url" + "strconv" + "testing" + "time" + + "github.com/v3io/scaler/pkg/ingresscache" + resourcescalerMock "github.com/v3io/scaler/pkg/resourcescaler/mock" + "github.com/v3io/scaler/pkg/scalertypes" + + "github.com/nuclio/logger" + nucliozap "github.com/nuclio/zap" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" +) + +type HandlerTestSuite struct { + suite.Suite + logger logger.Logger + starter *ResourceStarter + scaler *resourcescalerMock.ResourceScaler + httpServer *httptest.Server + backendHost string + backendPort int +} + +type ingressValue struct { + host string + path string + targets []string +} + +func (suite *HandlerTestSuite) SetupSuite() { + var err error + suite.logger, err = nucliozap.NewNuclioZapTest("test") + suite.Require().NoError(err) +} + +func (suite *HandlerTestSuite) SetupTest() { + suite.scaler = &resourcescalerMock.ResourceScaler{} + suite.starter = &ResourceStarter{ + logger: suite.logger, + scaler: suite.scaler, + resourceReadinessTimeout: 3 * time.Second, + } + allowedPaths := map[string]struct{}{ + // TODO - To fix this test for a valid path (i.e.- '/test/path'), the path suffix needs to be removed from h.parseTargetURL + "//test/path/test/path": {}, + "//test/path/to/multiple/test/path/to/multiple": {}, + } + // Start a test server that always returns 200 + suite.httpServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if _, exists := allowedPaths[r.URL.Path]; exists { + w.WriteHeader(http.StatusOK) + } else { + w.WriteHeader(http.StatusBadRequest) + } + })) + + backendURL, _ := url.Parse(suite.httpServer.URL) + suite.backendHost = backendURL.Hostname() + backendPort := backendURL.Port() + if backendPort == "" { + backendPort = "8080" // Default HTTP port + } + backendPortInt, err := strconv.Atoi(backendPort) + suite.Require().NoError(err) + suite.backendPort = backendPortInt +} + +func (suite *HandlerTestSuite) TearDownTest() { + if suite.httpServer != nil { + suite.httpServer.Close() + } +} + +func (suite *HandlerTestSuite) TestHandleRequest() { + for _, tc := range []struct { + name string + resolveServiceNameErr error + initialCachedData *ingressValue + reqHeaders map[string]string + reqHost string + reqPath string + expectedStatus int + }{ + { + name: "No ingress headers, host and path found in ingress cache", + resolveServiceNameErr: nil, + initialCachedData: &ingressValue{ + host: "www.example.com", + path: "/test/path", + targets: []string{"test-targets-name-1"}, + }, + reqHost: "www.example.com", + reqPath: "/test/path", + expectedStatus: http.StatusOK, + }, { + name: "No ingress headers,multiple targets found in ingress cache", + resolveServiceNameErr: nil, + initialCachedData: &ingressValue{ + host: "www.example.com", + path: "/test/path/to/multiple", + targets: []string{"test-targets-name-1", "test-targets-name-2"}, + }, + reqHost: "www.example.com", + reqPath: "/test/path/to/multiple", + expectedStatus: http.StatusOK, + }, + { + name: "No ingress headers, not found in ingress cache", + resolveServiceNameErr: nil, + initialCachedData: nil, + reqHost: "unknown", + reqPath: "/notfound", + expectedStatus: http.StatusBadRequest, + }, + { + name: "No ingress headers, scaler fails", + resolveServiceNameErr: errors.New("fail"), + initialCachedData: &ingressValue{ + host: "www.example.com", + path: "/test/path", + targets: []string{"test-targets-name-1"}, + }, + reqHost: "www.example.com", + reqPath: "/test/path", + expectedStatus: http.StatusInternalServerError, + }, + } { + suite.Run(tc.name, func() { + // test case setup + suite.scaler.ExpectedCalls = nil + suite.scaler.On("ResolveServiceName", mock.Anything).Return(suite.backendHost, tc.resolveServiceNameErr) + suite.scaler.On("SetScaleCtx", mock.Anything, mock.Anything, mock.Anything).Return(nil) + testIngressCache := ingresscache.NewIngressCache(suite.logger) + if tc.initialCachedData != nil { + err := testIngressCache.Set(tc.initialCachedData.host, tc.initialCachedData.path, tc.initialCachedData.targets) + suite.Require().NoError(err) + } + + testHandler := suite.createTestHandler(suite.backendPort, testIngressCache) + testRequest := suite.createTestHTTPRequest(tc.reqHeaders, tc.reqHost, tc.reqPath) + testResponse := httptest.NewRecorder() + + // call the testHandler + testHandler.handleRequest(testResponse, testRequest) + + // validate the response + suite.Require().Equal(tc.expectedStatus, testResponse.Code) + suite.scaler.AssertExpectations(suite.T()) + }) + } +} + +func (suite *HandlerTestSuite) TestGetResourceNameAndPath() { + for _, tc := range []struct { + name string + errMsg string + initialCachedData *ingressValue + reqHeaders map[string]string + reqHost string + reqPath string + expectErr bool + expectedPath string + expectedResourceNames []string + }{ + { + name: "No ingress headers, host and path found in ingress cache", + initialCachedData: &ingressValue{ + host: "www.example.com", + path: "/test/path", + targets: []string{"test-targets-name-1"}, + }, + reqHost: "www.example.com", + reqPath: "/test/path", + expectedPath: "/test/path", + expectedResourceNames: []string{"test-targets-name-1"}, + }, { + name: "Ingress headers, host and path did not found in ingress cache", + reqHost: "www.example.com", + reqPath: "/test/path", + expectedPath: "/test/path", + expectedResourceNames: []string{"test-targets-name-1"}, + reqHeaders: map[string]string{ + "X-Resource-Name": "test-targets-name-1", + "X-Resource-Path": "/test/path", + }, + }, { + name: "Missing both ingress headers and host and path did not found in ingress cache", + reqHost: "www.example.com", + reqPath: "/test/path", + expectErr: true, + errMsg: "No target name header found", + }, + } { + suite.Run(tc.name, func() { + // test case setup + testIngressCache := ingresscache.NewIngressCache(suite.logger) + if tc.initialCachedData != nil { + err := testIngressCache.Set(tc.initialCachedData.host, tc.initialCachedData.path, tc.initialCachedData.targets) + suite.Require().NoError(err) + } + + testHandler := suite.createTestHandler(suite.backendPort, testIngressCache) + testRequest := suite.createTestHTTPRequest(tc.reqHeaders, tc.reqHost, tc.reqPath) + resultPath, resultResourceNames, err := testHandler.getResourceNameAndPath(testRequest) + + // validate the result + if tc.expectErr { + suite.Require().Error(err) + suite.Require().ErrorContains(err, tc.errMsg) + } else { + suite.Require().NoError(err) + suite.Require().Equal(tc.expectedPath, resultPath) + suite.Require().Equal(tc.expectedResourceNames, resultResourceNames) + } + }) + } +} + +// --- HandlerTestSuite suite methods --- + +func (suite *HandlerTestSuite) createTestHandler(targetPort int, cache ingresscache.IngressHostCacheReader) Handler { + handler, err := NewHandler( + suite.logger, + suite.starter, + suite.scaler, + "X-Resource-Name", + "X-Resource-Path", + targetPort, + scalertypes.MultiTargetStrategyPrimary, + cache, + ) + suite.Require().NoError(err) + return handler +} + +func (suite *HandlerTestSuite) createTestHTTPRequest( + reqHeaders map[string]string, + reqHost string, + reqPath string, +) *http.Request { + req := httptest.NewRequest("GET", "/", nil) + if reqHost != "" { + req.Host = reqHost + } + if reqPath != "" { + req.URL.Path = reqPath + } + for k, v := range reqHeaders { + req.Header.Set(k, v) + } + return req +} + +func TestHandlerTestSuite(t *testing.T) { + suite.Run(t, new(HandlerTestSuite)) +}