Skip to content

Commit 3cd7611

Browse files
authored
Merge pull request #50 from elandau/feature/gradient_tweak
RTT noLoad low pass filter
2 parents 40226aa + 5442c29 commit 3cd7611

File tree

7 files changed

+127
-137
lines changed

7 files changed

+127
-137
lines changed
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package com.netflix.concurrency.limits.limit;
2+
3+
public class ExpAvgMeasurement implements Measurement {
4+
private final int window;
5+
private final double ratio;
6+
private final double filter;
7+
8+
private double value;
9+
private int count;
10+
11+
public ExpAvgMeasurement(int window, double filter) {
12+
this.window = window;
13+
this.ratio = 1.0 / window;
14+
this.filter = filter;
15+
this.value = 0.0;
16+
this.count = 0;
17+
}
18+
19+
@Override
20+
public Number add(Number sample) {
21+
// First sample seen
22+
if (count == 0) {
23+
value = sample.doubleValue();
24+
count = 1;
25+
// Adaptive average for the first <window> samples
26+
} else if (count < window) {
27+
count++;
28+
double tempRatio = 1.0 / count;
29+
value = (1-tempRatio) * value + tempRatio * Math.min(value*filter, sample.doubleValue());
30+
// Steady state
31+
} else {
32+
value = (1-ratio) * value + ratio * Math.min(value*filter, sample.doubleValue());
33+
}
34+
return value;
35+
}
36+
37+
@Override
38+
public Number get() {
39+
return value;
40+
}
41+
42+
@Override
43+
public void reset() {
44+
value = 0.0;
45+
count = 0;
46+
}
47+
48+
}

concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/GradientLimit.java

Lines changed: 71 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.netflix.concurrency.limits.limit;
22

3-
import java.util.concurrent.ThreadLocalRandom;
43
import java.util.concurrent.TimeUnit;
54
import java.util.function.Function;
65

@@ -23,21 +22,19 @@
2322
* growth as the limit grows.
2423
*/
2524
public final class GradientLimit implements Limit {
26-
private static final int DISABLED = -1;
27-
2825
private static final Logger LOG = LoggerFactory.getLogger(GradientLimit.class);
2926

3027
public static class Builder {
3128
private int initialLimit = 50;
3229
private int minLimit = 1;
33-
private int maxConcurrency = 1000;
30+
private int maxLimit = 1000;
3431
private long minRttThreshold = TimeUnit.MICROSECONDS.toNanos(1);
3532

3633
private double smoothing = 0.1;
3734
private Function<Integer, Integer> queueSize = SquareRootFunction.create(4);
3835
private MetricRegistry registry = EmptyMetricRegistry.INSTANCE;
39-
private double rttTolerance = 1.0;
40-
private int probeMultiplier = 10;
36+
private int noLoadRttWindow = 1000;
37+
private double noLoadRttFilter = 1.1;
4138

4239
/**
4340
* Minimum threshold for accepting a new rtt sample. Any RTT lower than this threshold
@@ -80,9 +77,9 @@ public Builder minLimit(int minLimit) {
8077
* before reducing the limit. For example, a value of 2.0 means that a 2x increase in latency is acceptable.
8178
* @return Chainable builder
8279
*/
80+
@Deprecated
8381
public Builder rttTolerance(double rttTolerance) {
8482
Preconditions.checkArgument(rttTolerance >= 1.0, "Tolerance must be >= 1.0");
85-
this.rttTolerance = rttTolerance;
8683
return this;
8784
}
8885

@@ -92,11 +89,22 @@ public Builder rttTolerance(double rttTolerance) {
9289
* @param maxConcurrency
9390
* @return Chainable builder
9491
*/
92+
@Deprecated
9593
public Builder maxConcurrency(int maxConcurrency) {
96-
this.maxConcurrency = maxConcurrency;
94+
return maxLimit(maxConcurrency);
95+
}
96+
97+
/**
98+
* Maximum allowable concurrency. Any estimated concurrency will be capped
99+
* at this value
100+
* @param maxConcurrency
101+
* @return Chainable builder
102+
*/
103+
public Builder maxLimit(int maxLimit) {
104+
this.maxLimit = maxLimit;
97105
return this;
98106
}
99-
107+
100108
/**
101109
* Fixed amount the estimated limit can grow while latencies remain low
102110
* @param queueSize
@@ -146,11 +154,33 @@ public Builder metricRegistry(MetricRegistry registry) {
146154
* @param probeMultiplier
147155
* @return Chainable builder
148156
*/
157+
@Deprecated
149158
public Builder probeMultiplier(int probeMultiplier) {
150-
this.probeMultiplier = probeMultiplier;
151159
return this;
152160
}
153161

162+
/**
163+
* Exponential moving average window size of sample updates for tracking noLoad RTT
164+
* Having sample window lets the system adapt to changes in latency characteristics.
165+
* @param window
166+
* @return Chainable builder
167+
*/
168+
public Builder noLoadRttWindow(int window) {
169+
this.noLoadRttWindow = window;
170+
return this;
171+
}
172+
173+
/**
174+
* Low pass filter applied to noLoad RTT measurements ensuring that outlier latency
175+
* measurements don't have an adverse impact on the noLoad rtt.
176+
* @param filter
177+
* @return Chainable builder
178+
*/
179+
public Builder noLoadRttFilter(double filter) {
180+
this.noLoadRttFilter = filter;
181+
return this;
182+
}
183+
154184
public GradientLimit build() {
155185
return new GradientLimit(this);
156186
}
@@ -169,7 +199,9 @@ public static GradientLimit newDefault() {
169199
*/
170200
private volatile double estimatedLimit;
171201

172-
private final Measurement rttNoLoad;
202+
private final Measurement rttNoLoadAccumulator;
203+
204+
private final double smoothing;
173205

174206
/**
175207
* Maximum allowed limit providing an upper bound failsafe
@@ -180,112 +212,84 @@ public static GradientLimit newDefault() {
180212

181213
private final Function<Integer, Integer> queueSize;
182214

183-
private final double smoothing;
184-
185215
private final long minRttThreshold;
186-
187-
private final double rttTolerance;
188-
216+
189217
private final SampleListener minRttSampleListener;
190218

191219
private final SampleListener minWindowRttSampleListener;
192220

193221
private final SampleListener queueSizeSampleListener;
194222

195-
private final int probeMultiplier;
196-
197-
private int resetRttCounter;
198-
199223
private GradientLimit(Builder builder) {
200224
this.estimatedLimit = builder.initialLimit;
201-
this.maxLimit = builder.maxConcurrency;
225+
this.maxLimit = builder.maxLimit;
202226
this.minLimit = builder.minLimit;
203227
this.queueSize = builder.queueSize;
204228
this.smoothing = builder.smoothing;
205229
this.minRttThreshold = builder.minRttThreshold;
206-
this.rttTolerance = builder.rttTolerance;
207-
this.probeMultiplier = builder.probeMultiplier;
208-
this.resetRttCounter = nextProbeCountdown();
209-
this.rttNoLoad = new SmoothingMinimumMeasurement(builder.smoothing);
230+
this.rttNoLoadAccumulator = new ExpAvgMeasurement(builder.noLoadRttWindow, builder.noLoadRttFilter);
210231

211232
this.minRttSampleListener = builder.registry.registerDistribution(MetricIds.MIN_RTT_NAME);
212233
this.minWindowRttSampleListener = builder.registry.registerDistribution(MetricIds.WINDOW_MIN_RTT_NAME);
213234
this.queueSizeSampleListener = builder.registry.registerDistribution(MetricIds.WINDOW_QUEUE_SIZE_NAME);
214235
}
215236

216-
private int nextProbeCountdown() {
217-
if (probeMultiplier == DISABLED) {
218-
return DISABLED;
219-
}
220-
int max = (int) (probeMultiplier * estimatedLimit);
221-
return ThreadLocalRandom.current().nextInt(max / 2, max);
222-
}
223-
224237
@Override
225238
public synchronized void update(SampleWindow sample) {
226239
Preconditions.checkArgument(sample.getCandidateRttNanos() > 0, "rtt must be >0 but got " + sample.getCandidateRttNanos());
227240

228-
if (sample.getCandidateRttNanos() < minRttThreshold) {
241+
final long rttSample = sample.getCandidateRttNanos();
242+
minWindowRttSampleListener.addSample(rttSample);
243+
if (rttSample < minRttThreshold) {
229244
return;
230245
}
231-
232-
final long rtt = sample.getCandidateRttNanos(); // rttWindowNoLoad.get().longValue();
233-
minWindowRttSampleListener.addSample(rtt);
234246

235247
final double queueSize = this.queueSize.apply((int)this.estimatedLimit);
236248
queueSizeSampleListener.addSample(queueSize);
237249

238-
// Reset or probe for a new noload RTT and a new estimatedLimit. It's necessary to cut the limit
239-
// in half to avoid having the limit drift upwards when the RTT is probed during heavy load.
240-
// To avoid decreasing the limit too much we don't allow it to go lower than the queueSize.
241-
if (probeMultiplier != DISABLED && resetRttCounter-- <= 0) {
242-
resetRttCounter = nextProbeCountdown();
243-
244-
estimatedLimit = Math.max(minLimit, Math.max(estimatedLimit - queueSize, queueSize));
245-
rttNoLoad.update(current -> rtt);
246-
LOG.debug("Probe MinRTT limit={}", getLimit());
247-
return;
248-
} else if (rttNoLoad.add(rtt)) {
249-
LOG.debug("New MinRTT {} limit={}", TimeUnit.NANOSECONDS.toMicros(rtt)/1000.0, getLimit());
250-
}
250+
final double rttNoLoad = rttNoLoadAccumulator.add(rttSample).doubleValue();
251+
final double rtt = (double)rttSample;
251252

252-
minRttSampleListener.addSample(rttNoLoad.get());
253+
minRttSampleListener.addSample(rttNoLoad);
253254

254255
final double gradient;
255-
// rtt is still higher than rtt_noload because of smoothing rtt noload updates
256+
// rtt is lower than rtt_noload because of smoothing rtt noload updates
256257
// set to 1.0 to indicate no queueing
257-
if (rttNoLoad.get().doubleValue() > rtt) {
258+
if (rtt < rttNoLoad) {
258259
gradient = 1.0;
259260
} else {
260-
gradient = Math.max(0.5, rttTolerance * rttNoLoad.get().doubleValue() / rtt);
261+
gradient = Math.max(0.5, rttNoLoad / rtt);
261262
}
262263

263264
double newLimit;
264265
// Reduce the limit aggressively if there was a drop
265266
if (sample.didDrop()) {
266267
newLimit = estimatedLimit/2;
267-
// Don't grow the limit because we are app limited
268-
} else if ((estimatedLimit - sample.getMaxInFlight()) > queueSize) {
269-
return;
270268
// Normal update to the limit
271269
} else {
272270
newLimit = estimatedLimit * gradient + queueSize;
273271
}
274272

275-
newLimit = Math.max(queueSize, Math.min(maxLimit, newLimit));
276-
newLimit = Math.max(minLimit, estimatedLimit * (1-smoothing) + smoothing*(newLimit));
277-
278273
if ((int)newLimit != (int)estimatedLimit) {
274+
// Don't grow the limit because we are app limited
275+
if (sample.getMaxInFlight() < estimatedLimit / 2) {
276+
return;
277+
// Apply a smoothing factor when reducing the limit only
278+
} else if (newLimit < estimatedLimit) {
279+
newLimit = (1-smoothing) * estimatedLimit + smoothing*(newLimit);
280+
}
281+
279282
if (LOG.isDebugEnabled()) {
280-
LOG.debug("New limit={} minRtt={} ms winRtt={} ms queueSize={} gradient={} resetCounter={}",
283+
LOG.debug("New limit={} minRtt={} ms winRtt={} ms queueSize={} gradient={}",
281284
(int)newLimit,
282-
TimeUnit.NANOSECONDS.toMicros(rttNoLoad.get().longValue())/1000.0,
283-
TimeUnit.NANOSECONDS.toMicros(rtt)/1000.0,
285+
TimeUnit.NANOSECONDS.toMicros((int)rttNoLoad)/1000.0,
286+
TimeUnit.NANOSECONDS.toMicros((int)rtt)/1000.0,
284287
queueSize,
285-
gradient,
286-
resetRttCounter);
288+
gradient);
287289
}
288290
}
291+
292+
newLimit = Math.max(Math.max(minLimit, queueSize), Math.min(maxLimit, newLimit));
289293
estimatedLimit = newLimit;
290294
}
291295

@@ -295,13 +299,13 @@ public int getLimit() {
295299
}
296300

297301
public long getRttNoLoad() {
298-
return rttNoLoad.get().longValue();
302+
return rttNoLoadAccumulator.get().longValue();
299303
}
300304

301305
@Override
302306
public String toString() {
303307
return "GradientLimit [limit=" + (int)estimatedLimit +
304-
", rtt_noload=" + TimeUnit.MICROSECONDS.toMillis(rttNoLoad.get().longValue()) / 1000.0+
308+
", rtt_noload=" + TimeUnit.MICROSECONDS.toMillis(getRttNoLoad()) / 1000.0+
305309
" ms]";
306310
}
307311
}

concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/Measurement.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,15 @@
11
package com.netflix.concurrency.limits.limit;
22

3-
import java.util.function.Function;
4-
53
/**
64
* Contract for tracking a measurement such as a minimum or average of a sample set
75
*/
86
public interface Measurement {
97
/**
108
* Add a single sample and update the internal state.
119
* @param sample
12-
* @return True if internal state was updated
10+
* @return Updated value
1311
*/
14-
boolean add(Number sample);
15-
16-
Number update(Function<Number, Number> func);
12+
Number add(Number sample);
1713

1814
/**
1915
* @return Return the current value
Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,14 @@
11
package com.netflix.concurrency.limits.limit;
22

3-
import java.util.function.Function;
4-
53
public class MinimumMeasurement implements Measurement {
64
private Double value = 0.0;
75

86
@Override
9-
public boolean add(Number sample) {
7+
public Number add(Number sample) {
108
if (value == 0.0 || sample.doubleValue() < value) {
119
value = sample.doubleValue();
12-
return true;
1310
}
14-
return false;
11+
return value;
1512
}
1613

1714
@Override
@@ -23,10 +20,4 @@ public Number get() {
2320
public void reset() {
2421
value = 0.0;
2522
}
26-
27-
@Override
28-
public Number update(Function<Number, Number> func) {
29-
value = func.apply(value).doubleValue();
30-
return value;
31-
}
3223
}

0 commit comments

Comments
 (0)