Skip to content

Commit 0c9773f

Browse files
authored
Merge pull request #38 from elandau/feature/api
Capture data from sample window in a single immutable class
2 parents 39486b6 + 3b64cd1 commit 0c9773f

File tree

12 files changed

+218
-119
lines changed

12 files changed

+218
-119
lines changed

concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/Limit.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,31 @@
55
* rtt measurements
66
*/
77
public interface Limit {
8+
/**
9+
* Details of the current sample window
10+
*/
11+
interface SampleWindow {
12+
/**
13+
* @return Candidate RTT in the sample window. This is traditionally the minimum rtt.
14+
*/
15+
long getCandidateRttNanos();
16+
17+
/**
18+
* @return Maximum number of inflight observed during the sample window
19+
*/
20+
long getMaxInFlight();
21+
22+
/**
23+
* @return Number of observed RTTs in the sample window
24+
*/
25+
long getSampleCount();
26+
27+
/**
28+
* @return True if there was a timeout
29+
*/
30+
boolean didDrop();
31+
}
32+
833
/**
934
* @return Current estimated limit
1035
*/
@@ -16,12 +41,5 @@ public interface Limit {
1641
* @param rtt Minimum RTT sample for the last window
1742
* @param maxInFlight Maximum number of inflight requests observed in the sampling window
1843
*/
19-
void update(long rtt, int maxInFlight);
20-
21-
/**
22-
* The request failed and was dropped due to being rejected by an external limit
23-
* or hitting a timeout. Loss based implementations will likely reduce the limit
24-
* aggressively when this happens.
25-
*/
26-
void drop();
44+
void update(SampleWindow sample);
2745
}

concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/AIMDLimit.java

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ public static Builder newBuilder() {
3232
}
3333

3434
private volatile int limit;
35-
private boolean didDrop = false;
3635
private final double backoffRatio;
3736

3837
private AIMDLimit(Builder builder) {
@@ -46,24 +45,16 @@ public int getLimit() {
4645
}
4746

4847
@Override
49-
public synchronized void update(long rtt, int maxInFlight) {
50-
if (didDrop) {
51-
didDrop = false;
52-
} else if (maxInFlight >= limit){
53-
limit = limit + 1;
54-
}
55-
}
56-
57-
@Override
58-
public synchronized void drop() {
59-
if (!didDrop) {
60-
didDrop = true;
48+
public void update(SampleWindow sample) {
49+
if (sample.didDrop()) {
6150
limit = Math.max(1, Math.min(limit - 1, (int) (limit * backoffRatio)));
51+
} else if (sample.getMaxInFlight() >= limit) {
52+
limit = limit + 1;
6253
}
6354
}
6455

6556
@Override
6657
public String toString() {
67-
return "AIMDLimit [limit=" + limit + ", didDrop=" + didDrop + "]";
58+
return "AIMDLimit [limit=" + limit + "]";
6859
}
6960
}

concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/FixedLimit.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,9 @@ public int getLimit() {
2323
}
2424

2525
@Override
26-
public void update(long rtt, int maxInFlight) {
26+
public void update(SampleWindow sample) {
2727
}
28-
29-
@Override
30-
public void drop() {
31-
}
32-
28+
3329
@Override
3430
public String toString() {
3531
return "FixedLimit [limit=" + limit + "]";

concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/GradientLimit.java

Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
package com.netflix.concurrency.limits.limit;
22

3+
import java.util.concurrent.TimeUnit;
4+
import java.util.function.Function;
5+
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
39
import com.netflix.concurrency.limits.Limit;
410
import com.netflix.concurrency.limits.MetricIds;
511
import com.netflix.concurrency.limits.MetricRegistry;
612
import com.netflix.concurrency.limits.internal.EmptyMetricRegistry;
713
import com.netflix.concurrency.limits.internal.Preconditions;
814
import com.netflix.concurrency.limits.limit.functions.SquareRootFunction;
915

10-
import java.util.concurrent.TimeUnit;
11-
import java.util.function.Function;
12-
13-
import org.slf4j.Logger;
14-
import org.slf4j.LoggerFactory;
15-
1616
/**
1717
* Concurrency limit algorithm that adjust the limits based on the gradient of change in the
1818
* samples minimum RTT and absolute minimum RTT allowing for a queue of square root of the
@@ -148,8 +148,6 @@ public static GradientLimit newDefault() {
148148

149149
private volatile long windowMinRtt = 0;
150150

151-
private boolean didDrop = false;
152-
153151
/**
154152
* Maximum allowed limit providing an upper bound failsafe
155153
*/
@@ -173,7 +171,8 @@ private GradientLimit(Builder builder) {
173171
}
174172

175173
@Override
176-
public synchronized void update(long rtt, int maxInFlight) {
174+
public synchronized void update(SampleWindow sample) {
175+
final long rtt = sample.getCandidateRttNanos();
177176
Preconditions.checkArgument(rtt > 0, "rtt must be >0 but got " + rtt);
178177

179178
if (rtt < minRttThreshold) {
@@ -193,10 +192,9 @@ public synchronized void update(long rtt, int maxInFlight) {
193192
final double queueSize = this.queueSize.apply((int)this.estimatedLimit);
194193
final double gradient = Math.max(0.5, Math.min(1.0, rttTolerance * rtt_noload / rtt));
195194
double newLimit;
196-
if (didDrop) {
195+
if (sample.didDrop()) {
197196
newLimit = estimatedLimit/2;
198-
didDrop = false;
199-
} else if ((estimatedLimit - maxInFlight) > queueSize) {
197+
} else if ((estimatedLimit - sample.getMaxInFlight()) > queueSize) {
200198
return;
201199
} else {
202200
newLimit = estimatedLimit * gradient + queueSize;
@@ -208,22 +206,17 @@ public synchronized void update(long rtt, int maxInFlight) {
208206
}
209207
if ((int)newLimit != (int)estimatedLimit) {
210208
if (LOG.isDebugEnabled()) {
211-
LOG.debug("New limit={} minRtt={} μs winRtt={} μs queueSize={} gradient={}",
209+
LOG.debug("New limit={} minRtt={} ms winRtt={} ms queueSize={} gradient={}",
212210
(int)estimatedLimit,
213-
TimeUnit.NANOSECONDS.toMicros(rtt_noload),
214-
TimeUnit.NANOSECONDS.toMicros(rtt),
211+
TimeUnit.NANOSECONDS.toMicros(rtt_noload)/1000.0,
212+
TimeUnit.NANOSECONDS.toMicros(rtt)/1000.0,
215213
queueSize,
216214
gradient);
217215
}
218216
}
219217
estimatedLimit = newLimit;
220218
}
221219

222-
@Override
223-
public synchronized void drop() {
224-
didDrop = true;
225-
}
226-
227220
@Override
228221
public int getLimit() {
229222
return (int)estimatedLimit;
@@ -252,7 +245,7 @@ private synchronized long getQueueSize() {
252245
@Override
253246
public String toString() {
254247
return "GradientLimit [limit=" + (int)estimatedLimit +
255-
", rtt_noload=" + TimeUnit.NANOSECONDS.toMillis(rtt_noload) +
256-
"]";
248+
", rtt_noload=" + TimeUnit.MICROSECONDS.toMillis(rtt_noload) / 1000.0+
249+
" ms]";
257250
}
258251
}

concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/SettableLimit.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,7 @@ public synchronized int getLimit() {
2424
}
2525

2626
@Override
27-
public void update(long rtt, int maxInFlight) {
28-
}
29-
30-
@Override
31-
public void drop() {
27+
public void update(SampleWindow sample) {
3228
}
3329

3430
public synchronized void setLimit(int limit) {
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package com.netflix.concurrency.limits.limit;
2+
3+
import java.util.concurrent.TimeUnit;
4+
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
7+
8+
import com.netflix.concurrency.limits.Limit;
9+
10+
public class TracingLimitDecorator implements Limit {
11+
private static final Logger LOG = LoggerFactory.getLogger(TracingLimitDecorator.class);
12+
13+
private final Limit delegate;
14+
15+
public static TracingLimitDecorator wrap(Limit delegate) {
16+
return new TracingLimitDecorator(delegate);
17+
}
18+
19+
public TracingLimitDecorator(Limit delegate) {
20+
this.delegate = delegate;
21+
}
22+
23+
@Override
24+
public int getLimit() {
25+
return delegate.getLimit();
26+
}
27+
28+
@Override
29+
public void update(SampleWindow sample) {
30+
LOG.debug("sampleCount={} maxInFlight={} minRtt={} ms",
31+
sample.getSampleCount(),
32+
sample.getMaxInFlight(),
33+
TimeUnit.NANOSECONDS.toMicros(sample.getCandidateRttNanos()) / 1000.0);
34+
delegate.update(sample);
35+
}
36+
37+
}

concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/VegasLimit.java

Lines changed: 12 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,6 @@ public static VegasLimit newDefault() {
118118

119119
private volatile long rtt_noload = 0;
120120

121-
private boolean didDrop = false;
122-
123121
/**
124122
* Maximum allowed limit providing an upper bound failsafe
125123
*/
@@ -142,20 +140,20 @@ private VegasLimit(Builder builder) {
142140
}
143141

144142
@Override
145-
public synchronized void update(long rtt, int maxInFlight) {
143+
public synchronized void update(SampleWindow sample) {
144+
long rtt = sample.getCandidateRttNanos();
146145
Preconditions.checkArgument(rtt > 0, "rtt must be >0 but got " + rtt);
147146

148147
if (rtt_noload == 0 || rtt < rtt_noload) {
149-
LOG.debug("New MinRTT {}", rtt);
148+
LOG.debug("New MinRTT {}", TimeUnit.NANOSECONDS.toMicros(rtt) / 1000.0);
150149
rtt_noload = rtt;
151150
}
152151

153152
double newLimit;
154153
final int queueSize = (int) Math.ceil(estimatedLimit * (1 - (double)rtt_noload / rtt));
155-
if (didDrop) {
154+
if (sample.didDrop()) {
156155
newLimit = decreaseFunc.apply(estimatedLimit);
157-
didDrop = false;
158-
} else if (maxInFlight + queueSize < estimatedLimit) {
156+
} else if (sample.getMaxInFlight() + queueSize < estimatedLimit) {
159157
return;
160158
} else {
161159
int alpha = alphaFunc.apply((int)estimatedLimit);
@@ -173,22 +171,15 @@ public synchronized void update(long rtt, int maxInFlight) {
173171
newLimit = Math.max(1, Math.min(maxLimit, newLimit));
174172
newLimit = (1 - smoothing) * estimatedLimit + smoothing * newLimit;
175173
if ((int)newLimit != (int)estimatedLimit && LOG.isDebugEnabled()) {
176-
LOG.debug("New limit={} minRtt={} μs winRtt={} μs queueSize={}",
177-
estimatedLimit,
178-
TimeUnit.NANOSECONDS.toMicros(rtt_noload),
179-
TimeUnit.NANOSECONDS.toMicros(rtt),
174+
LOG.debug("New limit={} minRtt={} ms winRtt={} ms queueSize={}",
175+
(int)newLimit,
176+
TimeUnit.NANOSECONDS.toMicros(rtt_noload) / 1000.0,
177+
TimeUnit.NANOSECONDS.toMicros(rtt) / 1000.0,
180178
queueSize);
181179
}
182180
estimatedLimit = newLimit;
183181
}
184182

185-
@Override
186-
public synchronized void drop() {
187-
if (!didDrop) {
188-
didDrop = true;
189-
}
190-
}
191-
192183
@Override
193184
public int getLimit() {
194185
return (int)estimatedLimit;
@@ -200,8 +191,8 @@ long getRttNoLoad() {
200191

201192
@Override
202193
public String toString() {
203-
return "VegasLimit [limit=" + estimatedLimit +
204-
", rtt_noload=" + TimeUnit.NANOSECONDS.toMillis(rtt_noload) +
205-
"]";
194+
return "VegasLimit [limit=" + getLimit() +
195+
", rtt_noload=" + TimeUnit.NANOSECONDS.toMicros(rtt_noload) / 1000.0 +
196+
" ms]";
206197
}
207198
}

0 commit comments

Comments
 (0)