Skip to content

Commit 2afe5a7

Browse files
authored
Merge pull request #56 from elandau/feature/gradient_tweak
Improve limiter stability by using average window RTT.
2 parents fc08c3b + 22fc4cd commit 2afe5a7

File tree

5 files changed

+51
-44
lines changed

5 files changed

+51
-44
lines changed

concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/Limit.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@ interface SampleWindow {
1414
*/
1515
long getCandidateRttNanos();
1616

17+
/**
18+
* @return Sum of all RTT samples in this window.
19+
*/
20+
long getRttSumNanos();
21+
1722
/**
1823
* @return Maximum number of inflight observed during the sample window
1924
*/

concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/ExpAvgMeasurement.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,26 +14,13 @@ public ExpAvgMeasurement(int window, double filter) {
1414

1515
@Override
1616
public Number add(Number sample) {
17-
if (value == 0) {
17+
if (value == 0.0) {
1818
value = sample.doubleValue();
1919
} else if (sample.doubleValue() < value) {
2020
value = sample.doubleValue();
2121
} else {
2222
value = (1-ratio) * value + ratio * Math.min(value*filter, sample.doubleValue());
2323
}
24-
// // First sample seen
25-
// if (count == 0) {
26-
// value = sample.doubleValue();
27-
// count = 1;
28-
// // Adaptive average for the first <window> samples
29-
// } else if (count < window) {
30-
// count++;
31-
// double tempRatio = 1.0 / count;
32-
// value = (1-tempRatio) * value + tempRatio * Math.min(value*filter, sample.doubleValue());
33-
// // Steady state
34-
// } else {
35-
// value = (1-ratio) * value + ratio * Math.min(value*filter, sample.doubleValue());
36-
// }
3724
return value;
3825
}
3926

concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/GradientLimit.java

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,14 @@ public final class GradientLimit implements Limit {
2727
public static class Builder {
2828
private int initialLimit = 50;
2929
private int minLimit = 1;
30-
private int maxLimit = 1000;
30+
private int maxLimit = 200;
3131

3232
private double smoothing = 0.2;
3333
private Function<Integer, Integer> queueSize = SquareRootFunction.create(4);
3434
private MetricRegistry registry = EmptyMetricRegistry.INSTANCE;
3535
private int noLoadRttWindow = 1000;
3636
private double noLoadRttFilter = 1.1;
37+
private double rttTolerance = 2.0;
3738

3839
/**
3940
* Minimum threshold for accepting a new rtt sample. Any RTT lower than this threshold
@@ -76,9 +77,9 @@ public Builder minLimit(int minLimit) {
7677
* before reducing the limit. For example, a value of 2.0 means that a 2x increase in latency is acceptable.
7778
* @return Chainable builder
7879
*/
79-
@Deprecated
8080
public Builder rttTolerance(double rttTolerance) {
8181
Preconditions.checkArgument(rttTolerance >= 1.0, "Tolerance must be >= 1.0");
82+
this.rttTolerance = rttTolerance;
8283
return this;
8384
}
8485

@@ -209,6 +210,8 @@ public static GradientLimit newDefault() {
209210

210211
private final int minLimit;
211212

213+
private final double rttTolerance;
214+
212215
private final Function<Integer, Integer> queueSize;
213216

214217
private final SampleListener minRttSampleListener;
@@ -223,6 +226,8 @@ private GradientLimit(Builder builder) {
223226
this.minLimit = builder.minLimit;
224227
this.queueSize = builder.queueSize;
225228
this.smoothing = builder.smoothing;
229+
this.rttTolerance = builder.rttTolerance;
230+
226231
this.rttNoLoadAccumulator = new ExpAvgMeasurement(builder.noLoadRttWindow, builder.noLoadRttFilter);
227232

228233
this.minRttSampleListener = builder.registry.registerDistribution(MetricIds.MIN_RTT_NAME);
@@ -234,18 +239,18 @@ private GradientLimit(Builder builder) {
234239
public synchronized void update(SampleWindow sample) {
235240
Preconditions.checkArgument(sample.getCandidateRttNanos() > 0, "rtt must be >0 but got " + sample.getCandidateRttNanos());
236241

237-
final long rttSample = sample.getCandidateRttNanos();
242+
final long rttSample = sample.getRttSumNanos() / sample.getSampleCount();
238243
minWindowRttSampleListener.addSample(rttSample);
239244

240245
final double queueSize = this.queueSize.apply((int)this.estimatedLimit);
241246
queueSizeSampleListener.addSample(queueSize);
242247

243248
final double rttNoLoad = rttNoLoadAccumulator.add(rttSample).doubleValue();
244-
final double rtt = (double)rttSample;
245249

246-
minRttSampleListener.addSample(rttNoLoad);
250+
minRttSampleListener.addSample(rttSample);
247251

248252
final double gradient;
253+
final double rtt = (double)rttSample / rttTolerance;
249254
// rtt is lower than rtt_noload because of smoothing rtt noload updates
250255
// set to 1.0 to indicate no queueing
251256
if (rtt < rttNoLoad) {
@@ -264,23 +269,24 @@ public synchronized void update(SampleWindow sample) {
264269
}
265270

266271
// Apply a smoothing factor when reducing the limit only
267-
newLimit = (1-smoothing) * estimatedLimit + smoothing*(newLimit);
272+
if (newLimit < estimatedLimit) {
273+
newLimit = (1-smoothing) * estimatedLimit + smoothing*(newLimit);
274+
}
275+
268276
newLimit = Math.max(Math.max(minLimit, queueSize), Math.min(maxLimit, newLimit));
277+
278+
if (LOG.isDebugEnabled()) {
279+
LOG.debug("New limit={} minRtt={} ms winRtt={} ms queueSize={} gradient={}",
280+
(int)newLimit,
281+
TimeUnit.NANOSECONDS.toMicros((int)rttNoLoad)/1000.0,
282+
TimeUnit.NANOSECONDS.toMicros((int)rttSample)/1000.0,
283+
queueSize,
284+
gradient);
285+
}
269286

270-
if ((int)newLimit != (int)estimatedLimit) {
271-
// Don't grow the limit if we are app limited
272-
if (sample.getMaxInFlight() + queueSize < estimatedLimit) {
273-
return;
274-
}
275-
276-
if (LOG.isDebugEnabled()) {
277-
LOG.debug("New limit={} minRtt={} ms winRtt={} ms queueSize={} gradient={}",
278-
(int)newLimit,
279-
TimeUnit.NANOSECONDS.toMicros((int)rttNoLoad)/1000.0,
280-
TimeUnit.NANOSECONDS.toMicros((int)rtt)/1000.0,
281-
queueSize,
282-
gradient);
283-
}
287+
// We are app limited, don't increase the limit to prevent upward drift
288+
if (sample.getMaxInFlight() * 2 < estimatedLimit) {
289+
return;
284290
}
285291

286292
estimatedLimit = newLimit;

concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/DefaultLimiter.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@
2121
public final class DefaultLimiter<ContextT> implements Limiter<ContextT> {
2222
private final Supplier<Long> nanoClock = System::nanoTime;
2323

24-
private static final long DEFAULT_MIN_WINDOW_TIME = TimeUnit.SECONDS.toNanos(1);
25-
private static final long DEFAULT_MAX_WINDOW_TIME = TimeUnit.SECONDS.toNanos(1);
26-
private static final long DEFAULT_MIN_RTT_THRESHOLD = TimeUnit.MICROSECONDS.toNanos(500);
24+
private static final long DEFAULT_MIN_WINDOW_TIME = TimeUnit.SECONDS.toNanos(3);
25+
private static final long DEFAULT_MAX_WINDOW_TIME = TimeUnit.SECONDS.toNanos(3);
26+
private static final long DEFAULT_MIN_RTT_THRESHOLD = TimeUnit.MICROSECONDS.toNanos(100);
2727

2828
/**
2929
* Minimum observed samples to filter out sample windows with not enough significant samples
3030
*/
31-
private static final int DEFAULT_WINDOW_SIZE = 10;
31+
private static final int DEFAULT_WINDOW_SIZE = 100;
3232

3333
/**
3434
* End time for the sampling window at which point the limit should be updated
@@ -181,7 +181,8 @@ public void onSuccess() {
181181
if (isWindowReady(current)) {
182182
sample.set(new ImmutableSampleWindow());
183183

184-
nextUpdateTime = endTime + Math.min(Math.max(current.getCandidateRttNanos() * 2, minWindowTime), maxWindowTime);
184+
long delta = Math.min(Math.max(current.getCandidateRttNanos() * 2, minWindowTime), maxWindowTime);
185+
nextUpdateTime = endTime + delta;
185186
limit.update(current);
186187
strategy.setLimit(limit.getLimit());
187188
}

concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/ImmutableSampleWindow.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,53 @@
11
package com.netflix.concurrency.limits.limiter;
22

3-
import com.netflix.concurrency.limits.Limit;
4-
53
import java.util.concurrent.TimeUnit;
64

5+
import com.netflix.concurrency.limits.Limit;
6+
77
/**
88
* Class used to track immutable samples in an AtomicReference
99
*/
1010
public class ImmutableSampleWindow implements Limit.SampleWindow {
1111
final long minRtt;
12+
final long sum;
1213
final int maxInFlight;
1314
final int sampleCount;
1415
final boolean didDrop;
1516

1617
public ImmutableSampleWindow() {
1718
this.minRtt = Long.MAX_VALUE;
19+
this.sum = 0;
1820
this.maxInFlight = 0;
1921
this.sampleCount = 0;
2022
this.didDrop = false;
2123
}
2224

23-
public ImmutableSampleWindow(long minRtt, int maxInFlight, int sampleCount, boolean didDrop) {
25+
public ImmutableSampleWindow(long minRtt, long sum, int maxInFlight, int sampleCount, boolean didDrop) {
2426
this.minRtt = minRtt;
27+
this.sum = sum;
2528
this.maxInFlight = maxInFlight;
2629
this.sampleCount = sampleCount;
2730
this.didDrop = didDrop;
2831
}
2932

3033
public ImmutableSampleWindow addSample(long rtt, int maxInFlight) {
31-
return new ImmutableSampleWindow(Math.min(rtt, minRtt), Math.max(maxInFlight, this.maxInFlight), sampleCount+1, didDrop);
34+
return new ImmutableSampleWindow(Math.min(rtt, minRtt), sum + rtt, Math.max(maxInFlight, this.maxInFlight), sampleCount+1, didDrop);
3235
}
3336

3437
public ImmutableSampleWindow addDroppedSample(int maxInFlight) {
35-
return new ImmutableSampleWindow(minRtt, Math.max(maxInFlight, this.maxInFlight), sampleCount, true);
38+
return new ImmutableSampleWindow(minRtt, sum, Math.max(maxInFlight, this.maxInFlight), sampleCount, true);
3639
}
3740

3841
@Override
3942
public long getCandidateRttNanos() {
4043
return minRtt;
4144
}
4245

46+
@Override
47+
public long getRttSumNanos() {
48+
return sum;
49+
}
50+
4351
@Override
4452
public int getMaxInFlight() {
4553
return maxInFlight;

0 commit comments

Comments
 (0)