Skip to content
3 changes: 2 additions & 1 deletion pkg/dlx/dlx.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ func NewDLX(parentLogger logger.Logger,
options.TargetNameHeader,
options.TargetPathHeader,
options.TargetPort,
options.MultiTargetStrategy)
options.MultiTargetStrategy,
watcher.GetIngressHostCacheReader())
if err != nil {
return nil, errors.Wrap(err, "Failed to create handler")
}
Expand Down
68 changes: 54 additions & 14 deletions pkg/dlx/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"sync"
"time"

"github.com/v3io/scaler/pkg/ingresscache"
"github.com/v3io/scaler/pkg/scalertypes"

"github.com/nuclio/errors"
Expand All @@ -49,6 +50,7 @@ type Handler struct {
targetURLCache *cache.LRUExpireCache
proxyLock sync.Locker
lastProxyErrorTime time.Time
ingressCache ingresscache.IngressHostCacheReader
}

func NewHandler(parentLogger logger.Logger,
Expand All @@ -57,7 +59,8 @@ func NewHandler(parentLogger logger.Logger,
targetNameHeader string,
targetPathHeader string,
targetPort int,
multiTargetStrategy scalertypes.MultiTargetStrategy) (Handler, error) {
multiTargetStrategy scalertypes.MultiTargetStrategy,
ingressCache ingresscache.IngressHostCacheReader) (Handler, error) {
h := Handler{
logger: parentLogger.GetChild("handler"),
resourceStarter: resourceStarter,
Expand All @@ -69,17 +72,17 @@ func NewHandler(parentLogger logger.Logger,
targetURLCache: cache.NewLRUExpireCache(100),
proxyLock: &sync.Mutex{},
lastProxyErrorTime: time.Now(),
ingressCache: ingressCache,
}
h.HandleFunc = h.handleRequest
return h, nil
}

func (h *Handler) handleRequest(res http.ResponseWriter, req *http.Request) {
var path string
var err error
var resourceNames []string

responseChannel := make(chan ResourceStatusResult, 1)
defer close(responseChannel)

// first try to see if our request came from ingress controller
forwardedHost := req.Header.Get("X-Forwarded-Host")
forwardedPort := req.Header.Get("X-Forwarded-Port")
Expand All @@ -97,15 +100,15 @@ func (h *Handler) handleRequest(res http.ResponseWriter, req *http.Request) {
resourceNames = append(resourceNames, resourceName)
resourceTargetURLMap[resourceName] = targetURL
} else {
targetNameHeaderValue := req.Header.Get(h.targetNameHeader)
path := req.Header.Get(h.targetPathHeader)
if targetNameHeaderValue == "" {
h.logger.WarnWith("When ingress not set, must pass header value",
"missingHeader", h.targetNameHeader)
path, resourceNames, err = h.getResourceNameAndPath(req)
if err != nil {
h.logger.WarnWith("Failed to get resource names and path from request",
"error", err.Error(),
"host", req.Host,
"path", req.URL.Path)
res.WriteHeader(http.StatusBadRequest)
return
}
resourceNames = strings.Split(targetNameHeaderValue, ",")
for _, resourceName := range resourceNames {
targetURL, status := h.parseTargetURL(resourceName, path)
if targetURL == nil {
Expand Down Expand Up @@ -163,6 +166,43 @@ func (h *Handler) handleRequest(res http.ResponseWriter, req *http.Request) {
proxy.ServeHTTP(res, req)
}

func (h *Handler) getResourceNameAndPath(req *http.Request) (string, []string, error) {
// first try to get the target name and path from the ingress cache
path, resourceNames, err := h.extractValuesFromIngress(req)
if err == nil {
return path, resourceNames, nil
}

h.logger.DebugWith("Failed to get target name from ingress cache, try to extract from the request headers",
"host", req.Host,
"path", req.URL.Path,
"error", err.Error())

// old implementation for backward compatibility
targetNameHeaderValue := req.Header.Get(h.targetNameHeader)
path = req.Header.Get(h.targetPathHeader)
if targetNameHeaderValue == "" {
return "", nil, errors.New("No target name header found")
}
resourceNames = strings.Split(targetNameHeaderValue, ",")
return path, resourceNames, nil
}

func (h *Handler) extractValuesFromIngress(req *http.Request) (string, []string, error) {
host := req.Host
path := req.URL.Path
resourceNames, err := h.ingressCache.Get(host, path)
if err != nil {
return "", nil, errors.New("Failed to get target name from ingress cache")
}

if len(resourceNames) == 0 {
return "", nil, errors.New("No resourceNames found in ingress cache")
}

return path, resourceNames, nil
}

func (h *Handler) parseTargetURL(resourceName, path string) (*url.URL, int) {
serviceName, err := h.resourceScaler.ResolveServiceName(scalertypes.Resource{Name: resourceName})
if err != nil {
Expand All @@ -178,17 +218,17 @@ func (h *Handler) parseTargetURL(resourceName, path string) (*url.URL, int) {
}

func (h *Handler) startResources(resourceNames []string) *ResourceStatusResult {
responseChannel := make(chan ResourceStatusResult, len(resourceNames))
defer close(responseChannel)
responseChan := make(chan ResourceStatusResult, len(resourceNames))
defer close(responseChan)

// Start all resources in separate go routines
for _, resourceName := range resourceNames {
go h.resourceStarter.handleResourceStart(resourceName, responseChannel)
go h.resourceStarter.handleResourceStart(resourceName, responseChan)
}

// Wait for all resources to finish starting
for range resourceNames {
statusResult := <-responseChannel
statusResult := <-responseChan

if statusResult.Error != nil {
h.logger.WarnWith("Failed to start resource",
Expand Down
Loading