Skip to content
3 changes: 2 additions & 1 deletion pkg/dlx/dlx.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ func NewDLX(parentLogger logger.Logger,
options.TargetNameHeader,
options.TargetPathHeader,
options.TargetPort,
options.MultiTargetStrategy)
options.MultiTargetStrategy,
watcher.GetIngressHostCacheReader())
if err != nil {
return nil, errors.Wrap(err, "Failed to create handler")
}
Expand Down
68 changes: 54 additions & 14 deletions pkg/dlx/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"sync"
"time"

"github.com/v3io/scaler/pkg/ingresscache"
"github.com/v3io/scaler/pkg/scalertypes"

"github.com/nuclio/errors"
Expand All @@ -49,6 +50,7 @@ type Handler struct {
targetURLCache *cache.LRUExpireCache
proxyLock sync.Locker
lastProxyErrorTime time.Time
ingressCache ingresscache.IngressHostCacheReader
}

func NewHandler(parentLogger logger.Logger,
Expand All @@ -57,7 +59,8 @@ func NewHandler(parentLogger logger.Logger,
targetNameHeader string,
targetPathHeader string,
targetPort int,
multiTargetStrategy scalertypes.MultiTargetStrategy) (Handler, error) {
multiTargetStrategy scalertypes.MultiTargetStrategy,
ingressCache ingresscache.IngressHostCacheReader) (Handler, error) {
h := Handler{
logger: parentLogger.GetChild("handler"),
resourceStarter: resourceStarter,
Expand All @@ -69,17 +72,17 @@ func NewHandler(parentLogger logger.Logger,
targetURLCache: cache.NewLRUExpireCache(100),
proxyLock: &sync.Mutex{},
lastProxyErrorTime: time.Now(),
ingressCache: ingressCache,
}
h.HandleFunc = h.handleRequest
return h, nil
}

func (h *Handler) handleRequest(res http.ResponseWriter, req *http.Request) {
var path string
var err error
var resourceNames []string

responseChannel := make(chan ResourceStatusResult, 1)
defer close(responseChannel)

// first try to see if our request came from ingress controller
forwardedHost := req.Header.Get("X-Forwarded-Host")
forwardedPort := req.Header.Get("X-Forwarded-Port")
Expand All @@ -97,15 +100,15 @@ func (h *Handler) handleRequest(res http.ResponseWriter, req *http.Request) {
resourceNames = append(resourceNames, resourceName)
resourceTargetURLMap[resourceName] = targetURL
} else {
targetNameHeaderValue := req.Header.Get(h.targetNameHeader)
path := req.Header.Get(h.targetPathHeader)
if targetNameHeaderValue == "" {
h.logger.WarnWith("When ingress not set, must pass header value",
"missingHeader", h.targetNameHeader)
path, resourceNames, err = h.getPathAndResourceNames(req)
if err != nil {
h.logger.WarnWith("Failed to get resource names and path from request",
"error", err.Error(),
"host", req.Host,
"path", req.URL.Path)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that req.URL is pointer, so might technically be nil, do we check it anywhere?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I will cover it

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

res.WriteHeader(http.StatusBadRequest)
return
}
resourceNames = strings.Split(targetNameHeaderValue, ",")
for _, resourceName := range resourceNames {
targetURL, status := h.parseTargetURL(resourceName, path)
if targetURL == nil {
Expand Down Expand Up @@ -163,6 +166,43 @@ func (h *Handler) handleRequest(res http.ResponseWriter, req *http.Request) {
proxy.ServeHTTP(res, req)
}

func (h *Handler) getPathAndResourceNames(req *http.Request) (string, []string, error) {
// first try to get the resource names and path from the ingress cache
path, resourceNames, err := h.getValuesFromCache(req)
if err == nil {
return path, resourceNames, nil
}

h.logger.DebugWith("Failed to get resource names from ingress cache, trying to extract from the request headers",
"host", req.Host,
"path", req.URL.Path,
"error", err.Error())

// old implementation for backward compatibility
targetNameHeaderValue := req.Header.Get(h.targetNameHeader)
path = req.Header.Get(h.targetPathHeader)
if targetNameHeaderValue == "" {
return "", nil, errors.New("No target name header found")
}
resourceNames = strings.Split(targetNameHeaderValue, ",")
return path, resourceNames, nil
}

func (h *Handler) getValuesFromCache(req *http.Request) (string, []string, error) {
host := req.Host
path := req.URL.Path
resourceNames, err := h.ingressCache.Get(host, path)
if err != nil {
return "", nil, errors.New("Failed to get resourceNames from ingress cache")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return "", nil, errors.New("Failed to get resourceNames from ingress cache")
return "", nil, errors.New("Failed to get resource names from ingress cache")

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

if len(resourceNames) == 0 {
return "", nil, errors.New("No resources found in ingress cache")
}

return path, resourceNames, nil
}

func (h *Handler) parseTargetURL(resourceName, path string) (*url.URL, int) {
serviceName, err := h.resourceScaler.ResolveServiceName(scalertypes.Resource{Name: resourceName})
if err != nil {
Expand All @@ -178,17 +218,17 @@ func (h *Handler) parseTargetURL(resourceName, path string) (*url.URL, int) {
}

func (h *Handler) startResources(resourceNames []string) *ResourceStatusResult {
responseChannel := make(chan ResourceStatusResult, len(resourceNames))
defer close(responseChannel)
responseChan := make(chan ResourceStatusResult, len(resourceNames))
defer close(responseChan)

// Start all resources in separate go routines
for _, resourceName := range resourceNames {
go h.resourceStarter.handleResourceStart(resourceName, responseChannel)
go h.resourceStarter.handleResourceStart(resourceName, responseChan)
}

// Wait for all resources to finish starting
for range resourceNames {
statusResult := <-responseChannel
statusResult := <-responseChan

if statusResult.Error != nil {
h.logger.WarnWith("Failed to start resource",
Expand Down
Loading