Skip to content

Commit c617762

Browse files
committed
feat: add fallbackTargetRef to CRD and required changes
Signed-off-by: yyewolf <[email protected]>
1 parent 30e1694 commit c617762

File tree

9 files changed

+196
-22
lines changed

9 files changed

+196
-22
lines changed

config/crd/bases/http.keda.sh_httpscaledobjects.yaml

+20
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,26 @@ spec:
6060
spec:
6161
description: HTTPScaledObjectSpec defines the desired state of HTTPScaledObject
6262
properties:
63+
coldStartTimeoutFailoverRef:
64+
description: (optional) The name of the failover service to route
65+
HTTP requests to when the target is not available
66+
properties:
67+
port:
68+
description: The port to route to
69+
format: int32
70+
type: integer
71+
portName:
72+
description: The port to route to referenced by name
73+
type: string
74+
service:
75+
description: The name of the service to route to
76+
type: string
77+
required:
78+
- service
79+
type: object
80+
x-kubernetes-validations:
81+
- message: must define either the 'portName' or the 'port'
82+
rule: has(self.portName) != has(self.port)
6383
hosts:
6484
description: |-
6585
The hosts to route. All requests which the "Host" header

interceptor/handler/upstream.go

+9-3
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@ var (
1313
)
1414

1515
type Upstream struct {
16-
roundTripper http.RoundTripper
16+
roundTripper http.RoundTripper
17+
shouldFailover bool
1718
}
1819

19-
func NewUpstream(roundTripper http.RoundTripper) *Upstream {
20+
func NewUpstream(roundTripper http.RoundTripper, shouldFailover bool) *Upstream {
2021
return &Upstream{
21-
roundTripper: roundTripper,
22+
roundTripper: roundTripper,
23+
shouldFailover: shouldFailover,
2224
}
2325
}
2426

@@ -29,6 +31,10 @@ func (uh *Upstream) ServeHTTP(w http.ResponseWriter, r *http.Request) {
2931
ctx := r.Context()
3032

3133
stream := util.StreamFromContext(ctx)
34+
if uh.shouldFailover {
35+
stream = util.FailoverStreamFromContext(ctx)
36+
}
37+
3238
if stream == nil {
3339
sh := NewStatic(http.StatusInternalServerError, errNilStream)
3440
sh.ServeHTTP(w, r)

interceptor/handler/upstream_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func TestForwarderSuccess(t *testing.T) {
4343
timeouts := defaultTimeouts()
4444
dialCtxFunc := retryDialContextFunc(timeouts, timeouts.DefaultBackoff())
4545
rt := newRoundTripper(dialCtxFunc, timeouts.ResponseHeader)
46-
uh := NewUpstream(rt)
46+
uh := NewUpstream(rt, false)
4747
uh.ServeHTTP(res, req)
4848

4949
r.True(
@@ -88,7 +88,7 @@ func TestForwarderHeaderTimeout(t *testing.T) {
8888
r.NoError(err)
8989
req = util.RequestWithStream(req, originURL)
9090
rt := newRoundTripper(dialCtxFunc, timeouts.ResponseHeader)
91-
uh := NewUpstream(rt)
91+
uh := NewUpstream(rt, false)
9292
uh.ServeHTTP(res, req)
9393

9494
forwardedRequests := hdl.IncomingRequests()
@@ -138,7 +138,7 @@ func TestForwarderWaitsForSlowOrigin(t *testing.T) {
138138
r.NoError(err)
139139
req = util.RequestWithStream(req, originURL)
140140
rt := newRoundTripper(dialCtxFunc, timeouts.ResponseHeader)
141-
uh := NewUpstream(rt)
141+
uh := NewUpstream(rt, false)
142142
uh.ServeHTTP(res, req)
143143
// wait for the goroutine above to finish, with a little cusion
144144
ensureSignalBeforeTimeout(originWaitCh, originDelay*2)
@@ -161,7 +161,7 @@ func TestForwarderConnectionRetryAndTimeout(t *testing.T) {
161161
r.NoError(err)
162162
req = util.RequestWithStream(req, noSuchURL)
163163
rt := newRoundTripper(dialCtxFunc, timeouts.ResponseHeader)
164-
uh := NewUpstream(rt)
164+
uh := NewUpstream(rt, false)
165165

166166
start := time.Now()
167167
uh.ServeHTTP(res, req)
@@ -217,7 +217,7 @@ func TestForwardRequestRedirectAndHeaders(t *testing.T) {
217217
r.NoError(err)
218218
req = util.RequestWithStream(req, srvURL)
219219
rt := newRoundTripper(dialCtxFunc, timeouts.ResponseHeader)
220-
uh := NewUpstream(rt)
220+
uh := NewUpstream(rt, false)
221221
uh.ServeHTTP(res, req)
222222
r.Equal(301, res.Code)
223223
r.Equal("abc123.com", res.Header().Get("Location"))

interceptor/middleware/routing.go

+28-12
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func (rm *Routing) ServeHTTP(w http.ResponseWriter, r *http.Request) {
5757
}
5858
r = r.WithContext(util.ContextWithHTTPSO(r.Context(), httpso))
5959

60-
stream, err := rm.streamFromHTTPSO(r.Context(), httpso)
60+
stream, err := rm.streamFromHTTPSO(r.Context(), httpso, httpso.Spec.ScaleTargetRef)
6161
if err != nil {
6262
sh := handler.NewStatic(http.StatusInternalServerError, err)
6363
sh.ServeHTTP(w, r)
@@ -66,45 +66,61 @@ func (rm *Routing) ServeHTTP(w http.ResponseWriter, r *http.Request) {
6666
}
6767
r = r.WithContext(util.ContextWithStream(r.Context(), stream))
6868

69+
if httpso.Spec.ColdStartTimeoutFailoverRef != nil {
70+
failoverStream, err := rm.streamFromHTTPSO(r.Context(), httpso, httpso.Spec.ColdStartTimeoutFailoverRef)
71+
if err != nil {
72+
sh := handler.NewStatic(http.StatusInternalServerError, err)
73+
sh.ServeHTTP(w, r)
74+
return
75+
}
76+
r = r.WithContext(util.ContextWithFailoverStream(r.Context(), failoverStream))
77+
}
78+
6979
rm.upstreamHandler.ServeHTTP(w, r)
7080
}
7181

72-
func (rm *Routing) getPort(ctx context.Context, httpso *httpv1alpha1.HTTPScaledObject) (int32, error) {
73-
if httpso.Spec.ScaleTargetRef.Port != 0 {
74-
return httpso.Spec.ScaleTargetRef.Port, nil
82+
func (rm *Routing) getPort(ctx context.Context, httpso *httpv1alpha1.HTTPScaledObject, reference httpv1alpha1.Ref) (int32, error) {
83+
var (
84+
port int32 = reference.GetPort()
85+
portName string = reference.GetPortName()
86+
serviceName string = reference.GetServiceName()
87+
)
88+
89+
if port != 0 {
90+
return port, nil
7591
}
76-
if httpso.Spec.ScaleTargetRef.PortName == "" {
92+
if portName == "" {
7793
return 0, fmt.Errorf(`must specify either "port" or "portName"`)
7894
}
79-
svc, err := rm.svcCache.Get(ctx, httpso.GetNamespace(), httpso.Spec.ScaleTargetRef.Service)
95+
svc, err := rm.svcCache.Get(ctx, httpso.GetNamespace(), serviceName)
8096
if err != nil {
8197
return 0, fmt.Errorf("failed to get Service: %w", err)
8298
}
8399
for _, port := range svc.Spec.Ports {
84-
if port.Name == httpso.Spec.ScaleTargetRef.PortName {
100+
if port.Name == portName {
85101
return port.Port, nil
86102
}
87103
}
88-
return 0, fmt.Errorf("portName %q not found in Service", httpso.Spec.ScaleTargetRef.PortName)
104+
return 0, fmt.Errorf("portName %q not found in Service", portName)
89105
}
90106

91-
func (rm *Routing) streamFromHTTPSO(ctx context.Context, httpso *httpv1alpha1.HTTPScaledObject) (*url.URL, error) {
92-
port, err := rm.getPort(ctx, httpso)
107+
func (rm *Routing) streamFromHTTPSO(ctx context.Context, httpso *httpv1alpha1.HTTPScaledObject, reference httpv1alpha1.Ref) (*url.URL, error) {
108+
port, err := rm.getPort(ctx, httpso, reference)
93109
if err != nil {
94110
return nil, fmt.Errorf("failed to get port: %w", err)
95111
}
96112
if rm.tlsEnabled {
97113
return url.Parse(fmt.Sprintf(
98114
"https://%s.%s:%d",
99-
httpso.Spec.ScaleTargetRef.Service,
115+
reference.GetServiceName(),
100116
httpso.GetNamespace(),
101117
port,
102118
))
103119
}
104120
//goland:noinspection HttpUrlsUsage
105121
return url.Parse(fmt.Sprintf(
106122
"http://%s.%s:%d",
107-
httpso.Spec.ScaleTargetRef.Service,
123+
reference.GetServiceName(),
108124
httpso.GetNamespace(),
109125
port,
110126
))

interceptor/proxy_handlers.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ func newForwardingHandler(
6565
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
6666
ctx := r.Context()
6767
httpso := util.HTTPSOFromContext(ctx)
68+
hasFailover := httpso.Spec.ColdStartTimeoutFailoverRef != nil
6869

6970
waitFuncCtx, done := context.WithTimeout(r.Context(), fwdCfg.waitTimeout)
7071
defer done()
@@ -73,7 +74,7 @@ func newForwardingHandler(
7374
httpso.GetNamespace(),
7475
httpso.Spec.ScaleTargetRef.Service,
7576
)
76-
if err != nil {
77+
if err != nil && !hasFailover {
7778
lggr.Error(err, "wait function failed, not forwarding request")
7879
w.WriteHeader(http.StatusBadGateway)
7980
if _, err := w.Write([]byte(fmt.Sprintf("error on backend (%s)", err))); err != nil {
@@ -83,7 +84,8 @@ func newForwardingHandler(
8384
}
8485
w.Header().Add("X-KEDA-HTTP-Cold-Start", strconv.FormatBool(isColdStart))
8586

86-
uh := handler.NewUpstream(roundTripper)
87+
shouldFailover := hasFailover && err != nil
88+
uh := handler.NewUpstream(roundTripper, shouldFailover)
8789
uh.ServeHTTP(w, r)
8890
})
8991
}

interceptor/proxy_handlers_test.go

+69
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"crypto/tls"
66
"crypto/x509"
7+
"errors"
78
"fmt"
89
"log"
910
"net/http"
@@ -149,6 +150,74 @@ func TestImmediatelySuccessfulProxyTLS(t *testing.T) {
149150
r.Equal("test response", res.Body.String())
150151
}
151152

153+
// the proxy should successfully forward a request to the failover when the server is not reachable
154+
func TestImmediatelySuccessfulFailoverProxy(t *testing.T) {
155+
host := fmt.Sprintf("%s.testing", t.Name())
156+
r := require.New(t)
157+
158+
initialStream, err := url.Parse("http://0.0.0.0:0")
159+
r.NoError(err)
160+
161+
failoverHdl := kedanet.NewTestHTTPHandlerWrapper(
162+
http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
163+
w.WriteHeader(200)
164+
_, err := w.Write([]byte("test response"))
165+
r.NoError(err)
166+
}),
167+
)
168+
srv, failoverURL, err := kedanet.StartTestServer(failoverHdl)
169+
r.NoError(err)
170+
defer srv.Close()
171+
failoverPort, err := strconv.Atoi(failoverURL.Port())
172+
r.NoError(err)
173+
174+
timeouts := defaultTimeouts()
175+
dialCtxFunc := retryDialContextFunc(timeouts, timeouts.DefaultBackoff())
176+
waitFunc := func(ctx context.Context, _ string, _ string) (bool, error) {
177+
return false, errors.New("nothing")
178+
}
179+
hdl := newForwardingHandler(
180+
logr.Discard(),
181+
dialCtxFunc,
182+
waitFunc,
183+
forwardingConfig{
184+
waitTimeout: 0,
185+
respHeaderTimeout: timeouts.ResponseHeader,
186+
},
187+
&tls.Config{},
188+
)
189+
const path = "/testfwd"
190+
res, req, err := reqAndRes(path)
191+
r.NoError(err)
192+
req = util.RequestWithHTTPSO(req,
193+
&httpv1alpha1.HTTPScaledObject{
194+
ObjectMeta: metav1.ObjectMeta{
195+
Namespace: "@" + host,
196+
},
197+
Spec: httpv1alpha1.HTTPScaledObjectSpec{
198+
ScaleTargetRef: httpv1alpha1.ScaleTargetRef{
199+
Name: "testdepl",
200+
Service: "testsvc",
201+
Port: int32(456),
202+
},
203+
ColdStartTimeoutFailoverRef: &httpv1alpha1.ColdStartTimeoutFailoverRef{
204+
Service: "testsvc",
205+
Port: int32(failoverPort),
206+
},
207+
TargetPendingRequests: ptr.To[int32](123),
208+
},
209+
},
210+
)
211+
req = util.RequestWithStream(req, initialStream)
212+
req = util.RequestWithFailoverStream(req, failoverURL)
213+
req.Host = host
214+
215+
hdl.ServeHTTP(res, req)
216+
217+
r.Equal(200, res.Code, "expected response code 200")
218+
r.Equal("test response", res.Body.String())
219+
}
220+
152221
// the proxy should wait for a timeout and fail if there is no
153222
// origin to which to connect
154223
func TestWaitFailedConnection(t *testing.T) {

operator/apis/http/v1alpha1/httpscaledobject_types.go

+44
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,12 @@ import (
2020
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2121
)
2222

23+
type Ref interface {
24+
GetServiceName() string
25+
GetPort() int32
26+
GetPortName() string
27+
}
28+
2329
// ScaleTargetRef contains all the details about an HTTP application to scale and route to
2430
type ScaleTargetRef struct {
2531
// +optional
@@ -36,6 +42,40 @@ type ScaleTargetRef struct {
3642
PortName string `json:"portName,omitempty"`
3743
}
3844

45+
func (s ScaleTargetRef) GetServiceName() string {
46+
return s.Service
47+
}
48+
49+
func (s ScaleTargetRef) GetPort() int32 {
50+
return s.Port
51+
}
52+
53+
func (s ScaleTargetRef) GetPortName() string {
54+
return s.PortName
55+
}
56+
57+
// ColdStartTimeoutFailoverRef contains all the details about an HTTP application to scale and route to
58+
type ColdStartTimeoutFailoverRef struct {
59+
// The name of the service to route to
60+
Service string `json:"service"`
61+
// The port to route to
62+
Port int32 `json:"port,omitempty"`
63+
// The port to route to referenced by name
64+
PortName string `json:"portName,omitempty"`
65+
}
66+
67+
func (s *ColdStartTimeoutFailoverRef) GetServiceName() string {
68+
return s.Service
69+
}
70+
71+
func (s *ColdStartTimeoutFailoverRef) GetPort() int32 {
72+
return s.Port
73+
}
74+
75+
func (s *ColdStartTimeoutFailoverRef) GetPortName() string {
76+
return s.PortName
77+
}
78+
3979
// ReplicaStruct contains the minimum and maximum amount of replicas to have in the deployment
4080
type ReplicaStruct struct {
4181
// Minimum amount of replicas to have in the deployment (Default 0)
@@ -93,6 +133,10 @@ type HTTPScaledObjectSpec struct {
93133
// Including validation as a requirement to define either the PortName or the Port
94134
// +kubebuilder:validation:XValidation:rule="has(self.portName) != has(self.port)",message="must define either the 'portName' or the 'port'"
95135
ScaleTargetRef ScaleTargetRef `json:"scaleTargetRef"`
136+
// (optional) The name of the failover service to route HTTP requests to when the target is not available
137+
// +optional
138+
// +kubebuilder:validation:XValidation:rule="has(self.portName) != has(self.port)",message="must define either the 'portName' or the 'port'"
139+
ColdStartTimeoutFailoverRef *ColdStartTimeoutFailoverRef `json:"coldStartTimeoutFailoverRef,omitempty"`
96140
// (optional) Replica information
97141
// +optional
98142
Replicas *ReplicaStruct `json:"replicas,omitempty"`

pkg/util/context.go

+10
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ const (
1515
ckLogger contextKey = iota
1616
ckHTTPSO
1717
ckStream
18+
ckFailoverStream
1819
)
1920

2021
func ContextWithLogger(ctx context.Context, logger logr.Logger) context.Context {
@@ -43,3 +44,12 @@ func StreamFromContext(ctx context.Context) *url.URL {
4344
cv, _ := ctx.Value(ckStream).(*url.URL)
4445
return cv
4546
}
47+
48+
func ContextWithFailoverStream(ctx context.Context, url *url.URL) context.Context {
49+
return context.WithValue(ctx, ckFailoverStream, url)
50+
}
51+
52+
func FailoverStreamFromContext(ctx context.Context) *url.URL {
53+
cv, _ := ctx.Value(ckFailoverStream).(*url.URL)
54+
return cv
55+
}

pkg/util/contexthttp.go

+7
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,10 @@ func RequestWithStream(r *http.Request, stream *url.URL) *http.Request {
3636

3737
return r.WithContext(ctx)
3838
}
39+
40+
func RequestWithFailoverStream(r *http.Request, stream *url.URL) *http.Request {
41+
ctx := r.Context()
42+
ctx = ContextWithFailoverStream(ctx, stream)
43+
44+
return r.WithContext(ctx)
45+
}

0 commit comments

Comments
 (0)