Skip to content

Commit 3a44ace

Browse files
sargunalexellis
authored andcommitted
Add Concurrency Limiter
This enables limiting concurrency. It is a naive approach which will reject requests as soon as they exceed the maximum number of in-flight requests. Signed-off-by: Sargun Dhillon <[email protected]> Signed-off-by: Lucas Roesler <[email protected]> Reviewed-by: Lucas Roesler <[email protected]>
1 parent b5a3eb4 commit 3a44ace

File tree

7 files changed

+294
-11
lines changed

7 files changed

+294
-11
lines changed

Dockerfile

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@ FROM golang:1.10
33
RUN mkdir -p /go/src/github.com/openfaas-incubator/of-watchdog
44
WORKDIR /go/src/github.com/openfaas-incubator/of-watchdog
55

6-
COPY vendor vendor
7-
COPY config config
8-
COPY executor executor
9-
COPY metrics metrics
10-
COPY main.go .
6+
COPY vendor vendor
7+
COPY config config
8+
COPY executor executor
9+
COPY metrics metrics
10+
COPY concurrency-limiter concurrency-limiter
11+
COPY metrics metrics
12+
COPY main.go .
1113

1214
# Run a gofmt and exclude all vendored code.
1315
RUN test -z "$(gofmt -l $(find . -type f -name '*.go' -not -path "./vendor/*"))"

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,6 @@ Environmental variables:
152152
| `suppress_lock` | Yes | When set to `false` the watchdog will attempt to write a lockfile to /tmp/ for healthchecks. Default `false` |
153153
| `upstream_url` | Yes | `http` mode only - where to forward requests i.e. `127.0.0.1:5000` |
154154
| `buffer_http` | Yes | `http` mode only - buffers request body to memory before fowarding. Use if your upstream HTTP server does not accept `Transfer-Encoding: chunked` Default: `false` |
155-
155+
| `max_inflight` | Yes | Limit the maximum number of requests in flight |
156156

157157
> Note: the .lock file is implemented for health-checking, but cannot be disabled yet. You must create this file in /tmp/.
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package limiter
2+
3+
import (
4+
"fmt"
5+
"net/http"
6+
"sync/atomic"
7+
)
8+
9+
type ConcurrencyLimiter struct {
10+
backendHTTPHandler http.Handler
11+
/*
12+
We keep two counters here in order to make it so that we can know when a request has gone to completed
13+
in the tests. We could wrap these up in a condvar, so there's no need to spinlock, but that seems overkill
14+
for testing.
15+
16+
This is effectively a very fancy semaphore built for optimistic concurrency only, and with spinlocks. If
17+
you want to add timeouts here / pessimistic concurrency, signaling needs to be added and/or a condvar esque
18+
sorta thing needs to be done to wake up waiters who are waiting post-spin.
19+
20+
Otherwise, there's all sorts of futzing in order to make sure that the concurrency limiter handler
21+
has completed
22+
The math works on overflow:
23+
var x, y uint64
24+
x = (1 << 64 - 1)
25+
y = (1 << 64 - 1)
26+
x++
27+
fmt.Println(x)
28+
fmt.Println(y)
29+
fmt.Println(x - y)
30+
Prints:
31+
0
32+
18446744073709551615
33+
1
34+
*/
35+
requestsStarted uint64
36+
requestsCompleted uint64
37+
38+
maxInflightRequests uint64
39+
}
40+
41+
func (cl *ConcurrencyLimiter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
42+
requestsStarted := atomic.AddUint64(&cl.requestsStarted, 1)
43+
completedRequested := atomic.LoadUint64(&cl.requestsCompleted)
44+
if requestsStarted-completedRequested > cl.maxInflightRequests {
45+
// This is a failure pathway, and we do not want to block on the write to finish
46+
atomic.AddUint64(&cl.requestsCompleted, 1)
47+
w.WriteHeader(http.StatusTooManyRequests)
48+
fmt.Fprintf(w, "Concurrent request limit exceeded. Max concurrent requests: %d\n", cl.maxInflightRequests)
49+
return
50+
}
51+
cl.backendHTTPHandler.ServeHTTP(w, r)
52+
atomic.AddUint64(&cl.requestsCompleted, 1)
53+
}
54+
55+
// NewConcurrencyLimiter creates a handler which limits the active number of active, concurrent
56+
// requests. If the concurrency limit is less than, or equal to 0, then it will just return the handler
57+
// passed to it.
58+
func NewConcurrencyLimiter(handler http.Handler, concurrencyLimit int) http.Handler {
59+
if concurrencyLimit <= 0 {
60+
return handler
61+
}
62+
63+
return &ConcurrencyLimiter{
64+
backendHTTPHandler: handler,
65+
maxInflightRequests: uint64(concurrencyLimit),
66+
}
67+
}
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
package limiter
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net/http"
7+
"net/http/httptest"
8+
"sync"
9+
"sync/atomic"
10+
"testing"
11+
"time"
12+
)
13+
14+
func makeFakeHandler(ctx context.Context, completeInFlightRequestChan chan struct{}) http.HandlerFunc {
15+
return func(w http.ResponseWriter, r *http.Request) {
16+
select {
17+
case <-ctx.Done():
18+
w.WriteHeader(http.StatusServiceUnavailable)
19+
case <-completeInFlightRequestChan:
20+
w.WriteHeader(http.StatusOK)
21+
}
22+
}
23+
}
24+
25+
func doRRandRequest(ctx context.Context, wg *sync.WaitGroup, cl http.Handler) *httptest.ResponseRecorder {
26+
// If wait for handler is true, it waits until the code is in the handler function
27+
rr := httptest.NewRecorder()
28+
// This should never fail unless we're out of memory or something
29+
req, err := http.NewRequest("GET", "/", nil)
30+
if err != nil {
31+
panic(err)
32+
}
33+
req = req.WithContext(ctx)
34+
35+
wg.Add(1)
36+
go func() {
37+
// If this code path is meant to make it into the handler, we need a way to figure out if it's there or not
38+
cl.ServeHTTP(rr, req)
39+
// If the request was aborted, unblock any waiting goroutines
40+
wg.Done()
41+
}()
42+
43+
return rr
44+
}
45+
46+
func TestConcurrencyLimitUnderLimit(t *testing.T) {
47+
t.Parallel()
48+
49+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
50+
defer cancel()
51+
52+
completeInFlightRequestChan := make(chan struct{})
53+
handler := makeFakeHandler(ctx, completeInFlightRequestChan)
54+
cl := NewConcurrencyLimiter(http.Handler(handler), 2)
55+
56+
wg := &sync.WaitGroup{}
57+
rr1 := doRRandRequest(ctx, wg, cl)
58+
// This will "release" the request rr1
59+
completeInFlightRequestChan <- struct{}{}
60+
61+
// This should never take more than the timeout
62+
wg.Wait()
63+
64+
// We want to access the response recorder directly, so we don't accidentally get an implicitly correct answer
65+
if rr1.Code != http.StatusOK {
66+
t.Fatalf("Want response code %d, got: %d", http.StatusOK, rr1.Code)
67+
}
68+
}
69+
70+
func TestConcurrencyLimitAtLimit(t *testing.T) {
71+
t.Parallel()
72+
73+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
74+
defer cancel()
75+
76+
completeInFlightRequestChan := make(chan struct{})
77+
handler := makeFakeHandler(ctx, completeInFlightRequestChan)
78+
79+
cl := NewConcurrencyLimiter(http.Handler(handler), 2)
80+
81+
wg := &sync.WaitGroup{}
82+
rr1 := doRRandRequest(ctx, wg, cl)
83+
rr2 := doRRandRequest(ctx, wg, cl)
84+
85+
completeInFlightRequestChan <- struct{}{}
86+
completeInFlightRequestChan <- struct{}{}
87+
88+
wg.Wait()
89+
90+
if rr1.Code != http.StatusOK {
91+
t.Fatalf("Want response code %d, got: %d", http.StatusOK, rr1.Code)
92+
}
93+
if rr2.Code != http.StatusOK {
94+
t.Fatalf("Want response code %d, got: %d", http.StatusOK, rr1.Code)
95+
}
96+
}
97+
98+
func count(r *httptest.ResponseRecorder, code200s, code429s *int) {
99+
switch r.Code {
100+
case http.StatusTooManyRequests:
101+
*code429s = *code429s + 1
102+
case http.StatusOK:
103+
*code200s = *code200s + 1
104+
default:
105+
panic(fmt.Sprintf("Unknown code: %d", r.Code))
106+
}
107+
}
108+
109+
func TestConcurrencyLimitOverLimit(t *testing.T) {
110+
t.Parallel()
111+
112+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
113+
defer cancel()
114+
completeInFlightRequestChan := make(chan struct{}, 3)
115+
handler := makeFakeHandler(ctx, completeInFlightRequestChan)
116+
117+
cl := NewConcurrencyLimiter(http.Handler(handler), 2).(*ConcurrencyLimiter)
118+
119+
wg := &sync.WaitGroup{}
120+
121+
rr1 := doRRandRequest(ctx, wg, cl)
122+
rr2 := doRRandRequest(ctx, wg, cl)
123+
for ctx.Err() == nil {
124+
if requestsStarted := atomic.LoadUint64(&cl.requestsStarted); requestsStarted == 2 {
125+
break
126+
}
127+
time.Sleep(time.Millisecond)
128+
}
129+
rr3 := doRRandRequest(ctx, wg, cl)
130+
for ctx.Err() == nil {
131+
if requestsStarted := atomic.LoadUint64(&cl.requestsStarted); requestsStarted == 3 {
132+
break
133+
}
134+
time.Sleep(time.Millisecond)
135+
}
136+
completeInFlightRequestChan <- struct{}{}
137+
completeInFlightRequestChan <- struct{}{}
138+
completeInFlightRequestChan <- struct{}{}
139+
140+
wg.Wait()
141+
142+
code200s := 0
143+
code429s := 0
144+
count(rr1, &code200s, &code429s)
145+
count(rr2, &code200s, &code429s)
146+
count(rr3, &code200s, &code429s)
147+
if code200s != 2 || code429s != 1 {
148+
t.Fatalf("code 200s: %d, and code429s: %d", code200s, code429s)
149+
}
150+
}
151+
152+
func TestConcurrencyLimitOverLimitAndRecover(t *testing.T) {
153+
t.Parallel()
154+
155+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
156+
defer cancel()
157+
completeInFlightRequestChan := make(chan struct{}, 4)
158+
handler := makeFakeHandler(ctx, completeInFlightRequestChan)
159+
cl := NewConcurrencyLimiter(http.Handler(handler), 2).(*ConcurrencyLimiter)
160+
161+
wg := &sync.WaitGroup{}
162+
163+
rr1 := doRRandRequest(ctx, wg, cl)
164+
rr2 := doRRandRequest(ctx, wg, cl)
165+
for ctx.Err() == nil {
166+
if requestsStarted := atomic.LoadUint64(&cl.requestsStarted); requestsStarted == 2 {
167+
break
168+
}
169+
time.Sleep(time.Millisecond)
170+
}
171+
// This will 429
172+
rr3 := doRRandRequest(ctx, wg, cl)
173+
for ctx.Err() == nil {
174+
if requestsStarted := atomic.LoadUint64(&cl.requestsStarted); requestsStarted == 3 {
175+
break
176+
}
177+
time.Sleep(time.Millisecond)
178+
}
179+
completeInFlightRequestChan <- struct{}{}
180+
completeInFlightRequestChan <- struct{}{}
181+
completeInFlightRequestChan <- struct{}{}
182+
// Although we could do another wg.Wait here, I don't think we should because
183+
// it might provide a false sense of confidence
184+
for ctx.Err() == nil {
185+
if requestsCompleted := atomic.LoadUint64(&cl.requestsCompleted); requestsCompleted == 3 {
186+
break
187+
}
188+
time.Sleep(time.Millisecond)
189+
}
190+
rr4 := doRRandRequest(ctx, wg, cl)
191+
completeInFlightRequestChan <- struct{}{}
192+
wg.Wait()
193+
194+
code200s := 0
195+
code429s := 0
196+
count(rr1, &code200s, &code429s)
197+
count(rr2, &code200s, &code429s)
198+
count(rr3, &code200s, &code429s)
199+
count(rr4, &code200s, &code429s)
200+
201+
if code200s != 3 || code429s != 1 {
202+
t.Fatalf("code 200s: %d, and code429s: %d", code200s, code429s)
203+
}
204+
}

config/config.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,12 @@ type WatchdogConfig struct {
2828

2929
// MetricsPort TCP port on which to serve HTTP Prometheus metrics
3030
MetricsPort int
31+
32+
// MaxInflight limits the number of simultaneous
33+
// requests that the watchdog allows concurrently.
34+
// Any request which exceeds this limit will
35+
// have an immediate response of 429.
36+
MaxInflight int
3137
}
3238

3339
// Process returns a string for the process and a slice for the arguments from the FunctionProcess.
@@ -81,6 +87,7 @@ func New(env []string) (WatchdogConfig, error) {
8187
UpstreamURL: upstreamURL,
8288
BufferHTTPBody: getBool(envMap, "buffer_http"),
8389
MetricsPort: 8081,
90+
MaxInflight: getInt(envMap, "max_inflight", 0),
8491
}
8592

8693
if val := envMap["mode"]; len(val) > 0 {

main.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ import (
1515
"syscall"
1616
"time"
1717

18-
"github.com/openfaas-incubator/of-watchdog/metrics"
19-
18+
limiter "github.com/openfaas-incubator/of-watchdog/concurrency-limiter"
2019
"github.com/openfaas-incubator/of-watchdog/config"
2120
"github.com/openfaas-incubator/of-watchdog/executor"
21+
"github.com/openfaas-incubator/of-watchdog/metrics"
2222
)
2323

2424
var (
@@ -45,7 +45,6 @@ func main() {
4545

4646
httpMetrics := metrics.NewHttp()
4747
http.HandleFunc("/", metrics.InstrumentHandler(requestHandler, httpMetrics))
48-
4948
http.HandleFunc("/_/health", makeHealthHandler())
5049

5150
metricsServer := metrics.MetricsServer{}
@@ -129,7 +128,7 @@ func listenUntilShutdown(shutdownTimeout time.Duration, s *http.Server, suppress
129128
<-idleConnsClosed
130129
}
131130

132-
func buildRequestHandler(watchdogConfig config.WatchdogConfig) http.HandlerFunc {
131+
func buildRequestHandler(watchdogConfig config.WatchdogConfig) http.Handler {
133132
var requestHandler http.HandlerFunc
134133

135134
switch watchdogConfig.OperationalMode {
@@ -150,6 +149,10 @@ func buildRequestHandler(watchdogConfig config.WatchdogConfig) http.HandlerFunc
150149
break
151150
}
152151

152+
if watchdogConfig.MaxInflight > 0 {
153+
return limiter.NewConcurrencyLimiter(requestHandler, watchdogConfig.MaxInflight)
154+
}
155+
153156
return requestHandler
154157
}
155158

metrics/metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func (m *MetricsServer) Serve(cancel chan bool) {
5959

6060
// InstrumentHandler returns a handler which records HTTP requests
6161
// as they are made
62-
func InstrumentHandler(next http.HandlerFunc, _http Http) http.HandlerFunc {
62+
func InstrumentHandler(next http.Handler, _http Http) http.HandlerFunc {
6363
return promhttp.InstrumentHandlerCounter(_http.RequestsTotal,
6464
promhttp.InstrumentHandlerDuration(_http.RequestDurationHistogram, next))
6565
}

0 commit comments

Comments
 (0)