Skip to content

Commit 3f7591f

Browse files
authored
avereha/prioritylimiter (#328)
* Add priority limiter From time to time, simple requests are slow. Extra logging helped us understand that the slowdown would happen when we would get a complex request to fill up all limiter slots in carbon API. How does this change solve the problem? Why is this the best approach? To prevent this issue, I'm using a priority queue for the limiter, with the priority set to the number of subrequests - fewer subrequests -> more priority. That way, all the simple requests have the highest priority(0) in the carbon API. Simple requests will be faster, complex requests will be slower. For requests with equal priority, we are sorting them by uuid. That way, we can lower the number of active requests. Possible future improvements: now that we know how many requests are waiting for the limiter and request size, we can start rejecting complex requests when we have more than X pending requests. we can propagate the priority in a header to the zipper * carbonapi: add metrics for number of active/waiting requests * carbonapi: register prometheus metrics
1 parent 868da3e commit 3f7591f

File tree

8 files changed

+486
-31
lines changed

8 files changed

+486
-31
lines changed

app/carbonapi/app.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,9 @@ func New(config cfg.API, logger *zap.Logger, buildVersion string) (*App, error)
8585
app.requestBlocker.ReloadRules()
8686

8787
// TODO(gmagnusson): Setup backends
88-
backend, err := initBackend(app.config, logger)
88+
backend, err := initBackend(app.config, logger,
89+
app.prometheusMetrics.ActiveUpstreamRequests,
90+
app.prometheusMetrics.WaitingUpstreamRequests)
8991
if err != nil {
9092
logger.Fatal("couldn't initialize backends", zap.Error(err))
9193
}
@@ -155,6 +157,8 @@ func (app *App) registerPrometheusMetrics(logger *zap.Logger) *http.Server {
155157
prometheus.MustRegister(app.prometheusMetrics.FindDurationLinComplex)
156158
prometheus.MustRegister(app.prometheusMetrics.TimeInQueueExp)
157159
prometheus.MustRegister(app.prometheusMetrics.TimeInQueueLin)
160+
prometheus.MustRegister(app.prometheusMetrics.ActiveUpstreamRequests)
161+
prometheus.MustRegister(app.prometheusMetrics.WaitingUpstreamRequests)
158162

159163
writeTimeout := app.config.Timeouts.Global
160164
if writeTimeout < 30*time.Second {
@@ -476,7 +480,7 @@ func (app *App) bucketRequestTimes(req *http.Request, t time.Duration) {
476480
}
477481
}
478482

479-
func initBackend(config cfg.API, logger *zap.Logger) (backend.Backend, error) {
483+
func initBackend(config cfg.API, logger *zap.Logger, activeUpstreamRequests, waitingUpstreamRequests prometheus.Gauge) (backend.Backend, error) {
480484
client := &http.Client{}
481485
client.Transport = &http.Transport{
482486
MaxIdleConnsPerHost: config.MaxIdleConnsPerHost,
@@ -500,6 +504,8 @@ func initBackend(config cfg.API, logger *zap.Logger) (backend.Backend, error) {
500504
Limit: config.ConcurrencyLimitPerServer,
501505
PathCacheExpirySec: uint32(config.ExpireDelaySec),
502506
Logger: logger,
507+
ActiveRequests: activeUpstreamRequests,
508+
WaitingRequests: waitingUpstreamRequests,
503509
})
504510

505511
if err != nil {

app/carbonapi/http_handlers.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -358,12 +358,16 @@ func (app *App) getTargetData(ctx context.Context, target string, exp parser.Exp
358358
metricErrs = append(metricErrs, dataTypes.ErrMetricsNotFound)
359359
continue
360360
}
361-
361+
renderRequestContext := ctx
362+
subrequestCount := len(renderRequests)
363+
if subrequestCount > 1 {
364+
renderRequestContext = util.WithPriority(ctx, subrequestCount)
365+
}
362366
// TODO(dgryski): group the render requests into batches
363367
rch := make(chan renderResponse, len(renderRequests))
364368
for _, m := range renderRequests {
365369
// TODO (grzkv) Refactor to enable premature cancel
366-
go app.sendRenderRequest(ctx, rch, m, mfetch.From, mfetch.Until, toLog)
370+
go app.sendRenderRequest(renderRequestContext, rch, m, mfetch.From, mfetch.Until, toLog)
367371
}
368372

369373
errs := make([]error, 0)

app/carbonapi/metrics.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ type PrometheusMetrics struct {
2727
FindDurationLinComplex prometheus.Histogram
2828
TimeInQueueExp prometheus.Histogram
2929
TimeInQueueLin prometheus.Histogram
30+
ActiveUpstreamRequests prometheus.Gauge
31+
WaitingUpstreamRequests prometheus.Gauge
3032
}
3133

3234
func newPrometheusMetrics(config cfg.API) PrometheusMetrics {
@@ -194,6 +196,18 @@ func newPrometheusMetrics(config cfg.API) PrometheusMetrics {
194196
config.Zipper.Common.Monitoring.TimeInQueueLinHistogram.BucketsNum),
195197
},
196198
),
199+
ActiveUpstreamRequests: prometheus.NewGauge(
200+
prometheus.GaugeOpts{
201+
Name: "active_upstream_requests",
202+
Help: "Number of in-flight upstream requests",
203+
},
204+
),
205+
WaitingUpstreamRequests: prometheus.NewGauge(
206+
prometheus.GaugeOpts{
207+
Name: "waiting_upstream_requests",
208+
Help: "Number of upstream requests waiting on the limiter",
209+
},
210+
),
197211
}
198212
}
199213

@@ -242,6 +256,7 @@ var apiMetrics = struct {
242256
FindCacheHits: expvar.NewInt("find_cache_hits"),
243257
FindCacheMisses: expvar.NewInt("find_cache_misses"),
244258
FindCacheOverheadNS: expvar.NewInt("find_cache_overhead_ns"),
259+
245260
}
246261

247262
// TODO (grzkv): Move to Prometheus, as these are not runtime metrics.

pkg/backend/net/net.go

Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@ import (
1313
"strings"
1414
"time"
1515

16+
"github.com/bookingcom/carbonapi/pkg/prioritylimiter"
1617
"github.com/bookingcom/carbonapi/pkg/types"
18+
"github.com/prometheus/client_golang/prometheus"
19+
1720
"github.com/bookingcom/carbonapi/pkg/types/encoding/carbonapi_v2"
1821
"github.com/bookingcom/carbonapi/util"
1922

@@ -47,7 +50,7 @@ type Backend struct {
4750
cluster string
4851
client *http.Client
4952
timeout time.Duration
50-
limiter chan struct{}
53+
limiter *prioritylimiter.Limiter
5154
logger *zap.Logger
5255
cache *expirecache.Cache
5356
cacheExpirySec int32
@@ -69,6 +72,8 @@ type Config struct {
6972
Limit int // Set limit of concurrent requests to backend. Defaults to no limit.
7073
PathCacheExpirySec uint32 // Set time in seconds before items in path cache expire. Defaults to 10 minutes.
7174
Logger *zap.Logger // Logger to use. Defaults to a no-op logger.
75+
ActiveRequests prometheus.Gauge
76+
WaitingRequests prometheus.Gauge
7277
}
7378

7479
var fmtProto = []string{"protobuf"}
@@ -108,7 +113,11 @@ func New(cfg Config) (*Backend, error) {
108113
}
109114

110115
if cfg.Limit > 0 {
111-
b.limiter = make(chan struct{}, cfg.Limit)
116+
if cfg.ActiveRequests != nil && cfg.WaitingRequests != nil {
117+
b.limiter = prioritylimiter.New(cfg.Limit, prioritylimiter.WithMetrics(cfg.ActiveRequests, cfg.WaitingRequests))
118+
} else {
119+
b.limiter = prioritylimiter.New(cfg.Limit)
120+
}
112121
}
113122

114123
if cfg.Logger != nil {
@@ -155,32 +164,16 @@ func (b Backend) enter(ctx context.Context) error {
155164
if b.limiter == nil {
156165
return nil
157166
}
158-
159-
select {
160-
case <-ctx.Done():
161-
return ctx.Err()
162-
163-
case b.limiter <- struct{}{}:
164-
// fallthrough
165-
}
166-
167-
return nil
167+
priority := util.GetPriority(ctx)
168+
uuid := util.GetUUID(ctx)
169+
return b.limiter.Enter(ctx, priority, uuid)
168170
}
169171

170172
func (b Backend) leave() error {
171173
if b.limiter == nil {
172174
return nil
173175
}
174-
175-
select {
176-
case <-b.limiter:
177-
// fallthrough
178-
default:
179-
// this should never happen, but let's not block forever if it does
180-
return errors.New("Unable to return value to limiter")
181-
}
182-
183-
return nil
176+
return b.limiter.Leave()
184177
}
185178

186179
func (b Backend) setTimeout(ctx context.Context) (context.Context, context.CancelFunc) {

pkg/backend/net/net_test.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -198,9 +198,6 @@ func TestCallTimeoutLeavesLimiter(t *testing.T) {
198198
t.Error("Expected to time out")
199199
}
200200

201-
if len(b.limiter) != 0 {
202-
t.Error("Expected limiter to be empty")
203-
}
204201
}
205202

206203
func TestDo(t *testing.T) {
Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
package prioritylimiter
2+
3+
import (
4+
"container/heap"
5+
"context"
6+
"errors"
7+
"sync/atomic"
8+
"time"
9+
10+
"github.com/prometheus/client_golang/prometheus"
11+
)
12+
13+
const (
14+
indexStateActive = -1
15+
indexStateNew = -2
16+
indexStateCancelled = -3
17+
)
18+
19+
type request struct {
20+
priority int // less is more
21+
canEnter chan struct{}
22+
index int
23+
uuid string
24+
}
25+
26+
type requests []*request
27+
28+
// Limiter does two things
29+
// a) limits the number of concurrent requests going upstream
30+
// b) prioritize the "waiting" requests
31+
// For prioritization we are using two variables:
32+
// "priority": that is request complexity, less complexity == more priority
33+
// "uuid": for requests of equal comlexity, process them ordered by uuid in order do minimize the number of "active" requests
34+
type Limiter struct {
35+
requests requests
36+
limiter chan struct{}
37+
wantToEnter chan *request
38+
cancelRequest chan *request
39+
loopCount uint32
40+
activeGauge prometheus.Gauge
41+
waitingGauge prometheus.Gauge
42+
}
43+
44+
type LimiterOption func(*Limiter)
45+
46+
// New creates a new limiter that allows maximum "limit" requests to "Enter"
47+
func New(limit int, options ...LimiterOption) *Limiter {
48+
ret := &Limiter{
49+
limiter: make(chan struct{}, limit),
50+
wantToEnter: make(chan *request),
51+
cancelRequest: make(chan *request),
52+
loopCount: 0,
53+
}
54+
for _, option := range options {
55+
option(ret)
56+
}
57+
58+
go ret.loop()
59+
60+
return ret
61+
}
62+
63+
// WithMetrics adds prometheus metrics to the Limiter instanace
64+
func WithMetrics(activeGauge, waitingGauge prometheus.Gauge) LimiterOption {
65+
return func(l *Limiter) {
66+
l.activeGauge = activeGauge
67+
l.waitingGauge = waitingGauge
68+
}
69+
}
70+
71+
// Enter blocks this request until it's turn comes
72+
func (l *Limiter) Enter(ctx context.Context, priority int, uuid string) error {
73+
canEnter := make(chan struct{})
74+
75+
req := &request{
76+
priority: priority,
77+
canEnter: canEnter,
78+
uuid: uuid,
79+
index: indexStateNew,
80+
}
81+
82+
l.wantToEnter <- req
83+
84+
select {
85+
// Check first if the ctx is not closed
86+
case <-ctx.Done():
87+
l.cancelRequest <- req
88+
return ctx.Err()
89+
default:
90+
select {
91+
case <-ctx.Done():
92+
l.cancelRequest <- req
93+
return ctx.Err()
94+
case <-canEnter:
95+
return nil
96+
}
97+
}
98+
}
99+
100+
// Leave marks a request as complete
101+
func (l *Limiter) Leave() error {
102+
select {
103+
case <-l.limiter:
104+
// fallthrough
105+
default:
106+
// this should never happen, but let's not block forever if it does
107+
return errors.New("Unable to return value to limiter")
108+
}
109+
return nil
110+
}
111+
112+
// Active returns the number of in progress requests
113+
func (l *Limiter) Active() int {
114+
return len(l.limiter)
115+
}
116+
117+
func (l *Limiter) loop() {
118+
for {
119+
if len(l.requests) == 0 {
120+
select {
121+
case req := <-l.wantToEnter:
122+
if req.index != indexStateCancelled {
123+
heap.Push(&l.requests, req)
124+
}
125+
case req := <-l.cancelRequest:
126+
index := req.index
127+
if index >= 0 {
128+
heap.Remove(&l.requests, index)
129+
}
130+
if index == indexStateActive {
131+
// If we are receiving a cancel request at this point,
132+
// it means Enter() returned with error, and the caller will not Leave()
133+
l.Leave()
134+
}
135+
req.index = indexStateCancelled
136+
}
137+
} else {
138+
select {
139+
case req := <-l.wantToEnter:
140+
if req.index != indexStateCancelled {
141+
heap.Push(&l.requests, req)
142+
}
143+
case req := <-l.cancelRequest:
144+
index := req.index
145+
if index >= 0 {
146+
heap.Remove(&l.requests, index)
147+
}
148+
if index == indexStateActive {
149+
// If we are receiving a cancel request at this point,
150+
// it means Enter() returned with error, and the caller will not Leave()
151+
l.Leave()
152+
}
153+
req.index = indexStateCancelled
154+
case l.limiter <- struct{}{}:
155+
req := heap.Pop(&l.requests).(*request)
156+
close(req.canEnter)
157+
}
158+
}
159+
atomic.AddUint32(&l.loopCount, 1)
160+
if l.activeGauge != nil {
161+
l.activeGauge.Set(float64(len(l.limiter)))
162+
}
163+
if l.waitingGauge != nil {
164+
l.waitingGauge.Set(float64(len(l.requests)))
165+
}
166+
}
167+
}
168+
169+
// used in tests to ensure that loop() processed all the pending messages
170+
func (l *Limiter) waitLoopCount(i int) {
171+
for {
172+
count := int(atomic.LoadUint32(&l.loopCount))
173+
if count >= i {
174+
return
175+
}
176+
time.Sleep(time.Millisecond * 50)
177+
}
178+
}
179+
180+
func (r requests) Len() int {
181+
return len(r)
182+
}
183+
184+
func (r requests) Less(i, j int) bool {
185+
if r[i].priority == r[j].priority {
186+
return r[i].uuid < r[j].uuid
187+
}
188+
return r[i].priority < r[j].priority
189+
}
190+
191+
func (r requests) Swap(i, j int) {
192+
r[i], r[j] = r[j], r[i]
193+
r[i].index = i
194+
r[j].index = j
195+
}
196+
197+
func (r *requests) Push(x interface{}) {
198+
req := x.(*request)
199+
idx := len(*r)
200+
req.index = idx
201+
*r = append(*r, req)
202+
}
203+
204+
func (r *requests) Pop() interface{} {
205+
old := *r
206+
n := len(old)
207+
req := old[n-1]
208+
req.index = indexStateActive
209+
old[n-1] = nil // avoid memory leak
210+
*r = old[0 : n-1]
211+
return req
212+
}

0 commit comments

Comments
 (0)