Skip to content

Commit 65b0dd9

Browse files
authored
Merge pull request #46 from elandau/feature/noload_rtt_smoothing
feature: gradient noload rtt smoothing
2 parents 4ca341f + be763b5 commit 65b0dd9

File tree

5 files changed

+128
-46
lines changed

5 files changed

+128
-46
lines changed

concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/GradientLimit.java

Lines changed: 48 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,16 @@ public final class GradientLimit implements Limit {
2828
private static final Logger LOG = LoggerFactory.getLogger(GradientLimit.class);
2929

3030
public static class Builder {
31-
private int initialLimit = 100;
31+
private int initialLimit = 50;
32+
private int minLimit = 1;
3233
private int maxConcurrency = 1000;
3334
private long minRttThreshold = TimeUnit.MICROSECONDS.toNanos(1);
3435

35-
private double smoothing = 0.2;
36+
private double smoothing = 0.1;
3637
private Function<Integer, Integer> queueSize = SquareRootFunction.create(4);
3738
private MetricRegistry registry = EmptyMetricRegistry.INSTANCE;
3839
private double rttTolerance = 1.0;
39-
40-
private int probeMultiplier = 30;
40+
private int probeMultiplier = 10;
4141

4242
/**
4343
* Minimum threshold for accepting a new rtt sample. Any RTT lower than this threshold
@@ -62,6 +62,18 @@ public Builder initialLimit(int initialLimit) {
6262
return this;
6363
}
6464

65+
/**
66+
* Minimum concurrency limit allowed. The minimum helps prevent the algorithm from adjust the limit
67+
* too far down. Note that this limit is not desirable when use as backpressure for batch apps.
68+
*
69+
* @param minLimit
70+
* @return Chainable builder
71+
*/
72+
public Builder minLimit(int minLimit) {
73+
this.minLimit = minLimit;
74+
return this;
75+
}
76+
6577
/**
6678
* Tolerance for changes in minimum latency.
6779
* @param rttTolerance Value {@literal >}= 1.0 indicating how much change in minimum latency is acceptable
@@ -132,13 +144,13 @@ public Builder metricRegistry(MetricRegistry registry) {
132144
* The limiter will probe for a new noload RTT every probeMultiplier * current limit
133145
* iterations. Default value is 30. Set to -1 to disable
134146
* @param probeMultiplier
135-
* @return Chinable builder
147+
* @return Chainable builder
136148
*/
137149
public Builder probeMultiplier(int probeMultiplier) {
138150
this.probeMultiplier = probeMultiplier;
139151
return this;
140152
}
141-
153+
142154
public GradientLimit build() {
143155
return new GradientLimit(this);
144156
}
@@ -157,13 +169,15 @@ public static GradientLimit newDefault() {
157169
*/
158170
private volatile double estimatedLimit;
159171

160-
private Measurement rttNoLoad = new MinimumMeasurement();
172+
private final Measurement rttNoLoad;
161173

162174
/**
163175
* Maximum allowed limit providing an upper bound failsafe
164176
*/
165177
private final int maxLimit;
166178

179+
private final int minLimit;
180+
167181
private final Function<Integer, Integer> queueSize;
168182

169183
private final double smoothing;
@@ -185,12 +199,14 @@ public static GradientLimit newDefault() {
185199
private GradientLimit(Builder builder) {
186200
this.estimatedLimit = builder.initialLimit;
187201
this.maxLimit = builder.maxConcurrency;
202+
this.minLimit = builder.minLimit;
188203
this.queueSize = builder.queueSize;
189204
this.smoothing = builder.smoothing;
190205
this.minRttThreshold = builder.minRttThreshold;
191206
this.rttTolerance = builder.rttTolerance;
192207
this.probeMultiplier = builder.probeMultiplier;
193208
this.resetRttCounter = nextProbeCountdown();
209+
this.rttNoLoad = new SmoothingMinimumMeasurement(builder.smoothing);
194210

195211
this.minRttSampleListener = builder.registry.registerDistribution(MetricIds.MIN_RTT_NAME);
196212
this.minWindowRttSampleListener = builder.registry.registerDistribution(MetricIds.WINDOW_MIN_RTT_NAME);
@@ -207,53 +223,63 @@ private int nextProbeCountdown() {
207223

208224
@Override
209225
public synchronized void update(SampleWindow sample) {
210-
final long rtt = sample.getCandidateRttNanos();
211-
minWindowRttSampleListener.addSample(rtt);
212-
213-
Preconditions.checkArgument(rtt > 0, "rtt must be >0 but got " + rtt);
226+
Preconditions.checkArgument(sample.getCandidateRttNanos() > 0, "rtt must be >0 but got " + sample.getCandidateRttNanos());
214227

215-
if (rtt < minRttThreshold) {
228+
if (sample.getCandidateRttNanos() < minRttThreshold) {
216229
return;
217230
}
231+
232+
final long rtt = sample.getCandidateRttNanos(); // rttWindowNoLoad.get().longValue();
233+
minWindowRttSampleListener.addSample(rtt);
218234

219235
final double queueSize = this.queueSize.apply((int)this.estimatedLimit);
220236
queueSizeSampleListener.addSample(queueSize);
221237

222-
// Reset or probe for a new RTT and a new estimatedLimit. It's necessary to cut the limit
238+
// Reset or probe for a new noload RTT and a new estimatedLimit. It's necessary to cut the limit
223239
// in half to avoid having the limit drift upwards when the RTT is probed during heavy load.
224240
// To avoid decreasing the limit too much we don't allow it to go lower than the queueSize.
225241
if (probeMultiplier != DISABLED && resetRttCounter-- <= 0) {
226242
resetRttCounter = nextProbeCountdown();
227243

228-
estimatedLimit = Math.max(estimatedLimit - queueSize, queueSize);
244+
estimatedLimit = Math.max(minLimit, Math.max(estimatedLimit - queueSize, queueSize));
245+
rttNoLoad.update(current -> rtt);
229246
LOG.debug("Probe MinRTT limit={}", getLimit());
230-
rttNoLoad.reset();
231247
return;
232248
} else if (rttNoLoad.add(rtt)) {
233249
LOG.debug("New MinRTT {} limit={}", TimeUnit.NANOSECONDS.toMicros(rtt)/1000.0, getLimit());
234250
}
235251

236252
minRttSampleListener.addSample(rttNoLoad.get());
237253

238-
final double gradient = Math.max(0.5, Math.min(1.0, rttTolerance * rttNoLoad.get() / rtt));
254+
final double gradient;
255+
// rtt is still higher than rtt_noload because of smoothing rtt noload updates
256+
// set to 1.0 to indicate no queueing
257+
if (rttNoLoad.get().doubleValue() > rtt) {
258+
gradient = 1.0;
259+
} else {
260+
gradient = Math.max(0.5, rttTolerance * rttNoLoad.get().doubleValue() / rtt);
261+
}
262+
239263
double newLimit;
264+
// Reduce the limit aggressively if there was a drop
240265
if (sample.didDrop()) {
241266
newLimit = estimatedLimit/2;
267+
// Don't grow the limit because we are app limited
242268
} else if ((estimatedLimit - sample.getMaxInFlight()) > queueSize) {
243269
return;
270+
// Normal update to the limit
244271
} else {
245272
newLimit = estimatedLimit * gradient + queueSize;
246273
}
247274

248275
newLimit = Math.max(queueSize, Math.min(maxLimit, newLimit));
249-
if (newLimit < estimatedLimit) {
250-
newLimit = estimatedLimit * (1-smoothing) + smoothing*(newLimit);
251-
}
276+
newLimit = Math.max(minLimit, estimatedLimit * (1-smoothing) + smoothing*(newLimit));
277+
252278
if ((int)newLimit != (int)estimatedLimit) {
253279
if (LOG.isDebugEnabled()) {
254280
LOG.debug("New limit={} minRtt={} ms winRtt={} ms queueSize={} gradient={} resetCounter={}",
255281
(int)newLimit,
256-
TimeUnit.NANOSECONDS.toMicros(rttNoLoad.get())/1000.0,
282+
TimeUnit.NANOSECONDS.toMicros(rttNoLoad.get().longValue())/1000.0,
257283
TimeUnit.NANOSECONDS.toMicros(rtt)/1000.0,
258284
queueSize,
259285
gradient,
@@ -269,13 +295,13 @@ public int getLimit() {
269295
}
270296

271297
public long getRttNoLoad() {
272-
return rttNoLoad.get();
298+
return rttNoLoad.get().longValue();
273299
}
274300

275301
@Override
276302
public String toString() {
277303
return "GradientLimit [limit=" + (int)estimatedLimit +
278-
", rtt_noload=" + TimeUnit.MICROSECONDS.toMillis(rttNoLoad.get()) / 1000.0+
304+
", rtt_noload=" + TimeUnit.MICROSECONDS.toMillis(rttNoLoad.get().longValue()) / 1000.0+
279305
" ms]";
280306
}
281307
}

concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/Measurement.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,14 @@ public interface Measurement {
1111
* @param sample
1212
* @return True if internal state was updated
1313
*/
14-
boolean add(long sample);
14+
boolean add(Number sample);
1515

16-
long update(Function<Long, Long> func);
16+
Number update(Function<Number, Number> func);
1717

1818
/**
1919
* @return Return the current value
2020
*/
21-
long get();
21+
Number get();
2222

2323
/**
2424
* Reset the internal state as if no samples were ever added

concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/MinimumMeasurement.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,30 +3,30 @@
33
import java.util.function.Function;
44

55
public class MinimumMeasurement implements Measurement {
6-
private long value = 0;
6+
private Double value = 0.0;
77

88
@Override
9-
public boolean add(long sample) {
10-
if (value == 0 || sample < value) {
11-
value = sample;
9+
public boolean add(Number sample) {
10+
if (value == 0.0 || sample.doubleValue() < value) {
11+
value = sample.doubleValue();
1212
return true;
1313
}
1414
return false;
1515
}
1616

1717
@Override
18-
public long get() {
18+
public Number get() {
1919
return value;
2020
}
2121

2222
@Override
2323
public void reset() {
24-
value = 0;
24+
value = 0.0;
2525
}
2626

2727
@Override
28-
public long update(Function<Long, Long> func) {
29-
value = func.apply(value);
28+
public Number update(Function<Number, Number> func) {
29+
value = func.apply(value).doubleValue();
3030
return value;
3131
}
3232
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package com.netflix.concurrency.limits.limit;
2+
3+
import java.util.function.Function;
4+
5+
/**
6+
* Measures the minimum value of a sample set but also adds a smoothing factor
7+
*/
8+
public class SmoothingMinimumMeasurement implements Measurement {
9+
private Double value = 0.0;
10+
private final double ratio;
11+
12+
/**
13+
* @param smoothing Factor applied to the new measurement using the formula
14+
* value = value * (1-smoothing) + newValue * smoothing
15+
*/
16+
public SmoothingMinimumMeasurement(double smoothing) {
17+
this.ratio = smoothing;
18+
}
19+
20+
@Override
21+
public boolean add(Number sample) {
22+
if (value == 0) {
23+
value = sample.doubleValue();
24+
return true;
25+
} else if (sample.doubleValue() < value.doubleValue()) {
26+
value = value * ratio + sample.doubleValue() * (1-ratio);
27+
return true;
28+
}
29+
30+
return false;
31+
}
32+
33+
@Override
34+
public Number get() {
35+
return value;
36+
}
37+
38+
@Override
39+
public void reset() {
40+
value = 0.0;
41+
}
42+
43+
@Override
44+
public Number update(Function<Number, Number> func) {
45+
double newValue = func.apply(value).doubleValue();
46+
value = value * (1-ratio) + newValue * ratio;
47+
return value;
48+
}
49+
}

concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/example/Example.java

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import com.netflix.concurrency.limits.grpc.server.ConcurrencyLimitServerInterceptor;
1919
import com.netflix.concurrency.limits.grpc.server.GrpcServerLimiterBuilder;
2020
import com.netflix.concurrency.limits.limit.GradientLimit;
21-
import com.netflix.concurrency.limits.limit.VegasLimit;
2221

2322
import io.grpc.CallOptions;
2423
import io.grpc.Channel;
@@ -46,20 +45,26 @@ public class Example {
4645

4746

4847
public static ServerCallHandler<String, String> createServerHandler(int concurrency) {
49-
final ExponentialDistribution distribution = new ExponentialDistribution(10.0);
50-
final Supplier<Integer> latency = () -> 100 + (int)distribution.sample();
48+
final ExponentialDistribution distribution = new ExponentialDistribution(20.0);
49+
final Supplier<Integer> latency = () -> 1 + (int)distribution.sample();
5150

5251
List<Semaphore> semaphores = Arrays.asList(
53-
new Semaphore(concurrency, true),
54-
new Semaphore(concurrency*2, true),
55-
new Semaphore(concurrency*4, true),
56-
new Semaphore(concurrency*8, true));
52+
new Semaphore( concurrency, true),
53+
new Semaphore( concurrency, true),
54+
new Semaphore( concurrency, true),
55+
new Semaphore( concurrency, true),
56+
new Semaphore(1, true)
57+
);
58+
59+
AtomicInteger position = new AtomicInteger(0);
60+
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
61+
position.updateAndGet(current -> (current + 1) % semaphores.size());
62+
}, 10, 10, TimeUnit.SECONDS);
5763

5864
return ServerCalls.asyncUnaryCall(new UnaryMethod<String, String>() {
59-
AtomicInteger counter = new AtomicInteger();
6065
@Override
6166
public void invoke(String req, StreamObserver<String> observer) {
62-
Semaphore sem = semaphores.get((counter.incrementAndGet() / 2000) % semaphores.size());
67+
Semaphore sem = semaphores.get(position.get());
6368
try {
6469
sem.acquire();
6570
Uninterruptibles.sleepUninterruptibly(latency.get(), TimeUnit.MILLISECONDS);
@@ -86,7 +91,7 @@ public static void main(String[] args) throws IOException {
8691
.build(), new ConcurrencyLimitServerInterceptor(new GrpcServerLimiterBuilder()
8792
.limiter(builder -> builder
8893
.limit(limit)
89-
.minWindowTime(200, TimeUnit.MILLISECONDS)
94+
.minWindowTime(1, TimeUnit.SECONDS)
9095
)
9196
.build())
9297
))
@@ -96,9 +101,10 @@ public static void main(String[] args) throws IOException {
96101
// Report progress
97102
AtomicInteger dropCount = new AtomicInteger(0);
98103
AtomicInteger successCount = new AtomicInteger(0);
99-
104+
AtomicInteger counter = new AtomicInteger(0);
100105
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
101-
System.out.println(MessageFormat.format("{0}, {1}, {2}", limit.getLimit(), successCount.getAndSet(0), dropCount.getAndSet(0)));
106+
System.out.println(MessageFormat.format("{0,number,#}, {1}, {2}, {3}", counter.incrementAndGet(),
107+
limit.getLimit(), successCount.getAndSet(0), dropCount.getAndSet(0)));
102108
}, 1, 1, TimeUnit.SECONDS);
103109

104110
// Create a client
@@ -107,7 +113,8 @@ public static void main(String[] args) throws IOException {
107113
.build();
108114

109115
DriverBuilder.newBuilder()
110-
.exponential(3, 2, TimeUnit.SECONDS)
116+
.exponential(3, 90, TimeUnit.SECONDS)
117+
.exponential(1, 5, TimeUnit.SECONDS)
111118
.run(1, TimeUnit.HOURS, () -> {
112119
ClientCalls.asyncUnaryCall(channel.newCall(METHOD_DESCRIPTOR, CallOptions.DEFAULT.withWaitForReady()), "request",
113120
new StreamObserver<String>() {

0 commit comments

Comments
 (0)