Skip to content

Commit be763b5

Browse files
committed
Reduce the impact of long GC's and huge spikes in latency by always smoothing updates to the limit. Introduce smoothing to the noload RTT to keep that value more stable.
1 parent 833d048 commit be763b5

File tree

5 files changed

+128
-46
lines changed

5 files changed

+128
-46
lines changed

concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/GradientLimit.java

Lines changed: 48 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,16 @@ public final class GradientLimit implements Limit {
2828
private static final Logger LOG = LoggerFactory.getLogger(GradientLimit.class);
2929

3030
public static class Builder {
31-
private int initialLimit = 100;
31+
private int initialLimit = 50;
32+
private int minLimit = 1;
3233
private int maxConcurrency = 1000;
3334
private long minRttThreshold = TimeUnit.MICROSECONDS.toNanos(1);
3435

35-
private double smoothing = 0.2;
36+
private double smoothing = 0.1;
3637
private Function<Integer, Integer> queueSize = SquareRootFunction.create(4);
3738
private MetricRegistry registry = EmptyMetricRegistry.INSTANCE;
3839
private double rttTolerance = 1.0;
39-
40-
private int probeMultiplier = 30;
40+
private int probeMultiplier = 10;
4141

4242
/**
4343
* Minimum threshold for accepting a new rtt sample. Any RTT lower than this threshold
@@ -62,6 +62,18 @@ public Builder initialLimit(int initialLimit) {
6262
return this;
6363
}
6464

65+
/**
66+
* Minimum concurrency limit allowed. The minimum helps prevent the algorithm from adjust the limit
67+
* too far down. Note that this limit is not desirable when use as backpressure for batch apps.
68+
*
69+
* @param minLimit
70+
* @return Chainable builder
71+
*/
72+
public Builder minLimit(int minLimit) {
73+
this.minLimit = minLimit;
74+
return this;
75+
}
76+
6577
/**
6678
* Tolerance for changes in minimum latency.
6779
* @param rttTolerance Value {@literal >}= 1.0 indicating how much change in minimum latency is acceptable
@@ -132,13 +144,13 @@ public Builder metricRegistry(MetricRegistry registry) {
132144
* The limiter will probe for a new noload RTT every probeMultiplier * current limit
133145
* iterations. Default value is 30. Set to -1 to disable
134146
* @param probeMultiplier
135-
* @return Chinable builder
147+
* @return Chainable builder
136148
*/
137149
public Builder probeMultiplier(int probeMultiplier) {
138150
this.probeMultiplier = probeMultiplier;
139151
return this;
140152
}
141-
153+
142154
public GradientLimit build() {
143155
return new GradientLimit(this);
144156
}
@@ -157,13 +169,15 @@ public static GradientLimit newDefault() {
157169
*/
158170
private volatile double estimatedLimit;
159171

160-
private Measurement rttNoLoad = new MinimumMeasurement();
172+
private final Measurement rttNoLoad;
161173

162174
/**
163175
* Maximum allowed limit providing an upper bound failsafe
164176
*/
165177
private final int maxLimit;
166178

179+
private final int minLimit;
180+
167181
private final Function<Integer, Integer> queueSize;
168182

169183
private final double smoothing;
@@ -185,12 +199,14 @@ public static GradientLimit newDefault() {
185199
private GradientLimit(Builder builder) {
186200
this.estimatedLimit = builder.initialLimit;
187201
this.maxLimit = builder.maxConcurrency;
202+
this.minLimit = builder.minLimit;
188203
this.queueSize = builder.queueSize;
189204
this.smoothing = builder.smoothing;
190205
this.minRttThreshold = builder.minRttThreshold;
191206
this.rttTolerance = builder.rttTolerance;
192207
this.probeMultiplier = builder.probeMultiplier;
193208
this.resetRttCounter = nextProbeCountdown();
209+
this.rttNoLoad = new SmoothingMinimumMeasurement(builder.smoothing);
194210

195211
this.minRttSampleListener = builder.registry.registerDistribution(MetricIds.MIN_RTT_NAME);
196212
this.minWindowRttSampleListener = builder.registry.registerDistribution(MetricIds.WINDOW_MIN_RTT_NAME);
@@ -207,53 +223,63 @@ private int nextProbeCountdown() {
207223

208224
@Override
209225
public synchronized void update(SampleWindow sample) {
210-
final long rtt = sample.getCandidateRttNanos();
211-
minWindowRttSampleListener.addSample(rtt);
212-
213-
Preconditions.checkArgument(rtt > 0, "rtt must be >0 but got " + rtt);
226+
Preconditions.checkArgument(sample.getCandidateRttNanos() > 0, "rtt must be >0 but got " + sample.getCandidateRttNanos());
214227

215-
if (rtt < minRttThreshold) {
228+
if (sample.getCandidateRttNanos() < minRttThreshold) {
216229
return;
217230
}
231+
232+
final long rtt = sample.getCandidateRttNanos(); // rttWindowNoLoad.get().longValue();
233+
minWindowRttSampleListener.addSample(rtt);
218234

219235
final double queueSize = this.queueSize.apply((int)this.estimatedLimit);
220236
queueSizeSampleListener.addSample(queueSize);
221237

222-
// Reset or probe for a new RTT and a new estimatedLimit. It's necessary to cut the limit
238+
// Reset or probe for a new noload RTT and a new estimatedLimit. It's necessary to cut the limit
223239
// in half to avoid having the limit drift upwards when the RTT is probed during heavy load.
224240
// To avoid decreasing the limit too much we don't allow it to go lower than the queueSize.
225241
if (probeMultiplier != DISABLED && resetRttCounter-- <= 0) {
226242
resetRttCounter = nextProbeCountdown();
227243

228-
estimatedLimit = Math.max(estimatedLimit - queueSize, queueSize);
244+
estimatedLimit = Math.max(minLimit, Math.max(estimatedLimit - queueSize, queueSize));
245+
rttNoLoad.update(current -> rtt);
229246
LOG.debug("Probe MinRTT limit={}", getLimit());
230-
rttNoLoad.reset();
231247
return;
232248
} else if (rttNoLoad.add(rtt)) {
233249
LOG.debug("New MinRTT {} limit={}", TimeUnit.NANOSECONDS.toMicros(rtt)/1000.0, getLimit());
234250
}
235251

236252
minRttSampleListener.addSample(rttNoLoad.get());
237253

238-
final double gradient = Math.max(0.5, Math.min(1.0, rttTolerance * rttNoLoad.get() / rtt));
254+
final double gradient;
255+
// rtt is still higher than rtt_noload because of smoothing rtt noload updates
256+
// set to 1.0 to indicate no queueing
257+
if (rttNoLoad.get().doubleValue() > rtt) {
258+
gradient = 1.0;
259+
} else {
260+
gradient = Math.max(0.5, rttTolerance * rttNoLoad.get().doubleValue() / rtt);
261+
}
262+
239263
double newLimit;
264+
// Reduce the limit aggressively if there was a drop
240265
if (sample.didDrop()) {
241266
newLimit = estimatedLimit/2;
267+
// Don't grow the limit because we are app limited
242268
} else if ((estimatedLimit - sample.getMaxInFlight()) > queueSize) {
243269
return;
270+
// Normal update to the limit
244271
} else {
245272
newLimit = estimatedLimit * gradient + queueSize;
246273
}
247274

248275
newLimit = Math.max(queueSize, Math.min(maxLimit, newLimit));
249-
if (newLimit < estimatedLimit) {
250-
newLimit = estimatedLimit * (1-smoothing) + smoothing*(newLimit);
251-
}
276+
newLimit = Math.max(minLimit, estimatedLimit * (1-smoothing) + smoothing*(newLimit));
277+
252278
if ((int)newLimit != (int)estimatedLimit) {
253279
if (LOG.isDebugEnabled()) {
254280
LOG.debug("New limit={} minRtt={} ms winRtt={} ms queueSize={} gradient={} resetCounter={}",
255281
(int)newLimit,
256-
TimeUnit.NANOSECONDS.toMicros(rttNoLoad.get())/1000.0,
282+
TimeUnit.NANOSECONDS.toMicros(rttNoLoad.get().longValue())/1000.0,
257283
TimeUnit.NANOSECONDS.toMicros(rtt)/1000.0,
258284
queueSize,
259285
gradient,
@@ -269,13 +295,13 @@ public int getLimit() {
269295
}
270296

271297
public long getRttNoLoad() {
272-
return rttNoLoad.get();
298+
return rttNoLoad.get().longValue();
273299
}
274300

275301
@Override
276302
public String toString() {
277303
return "GradientLimit [limit=" + (int)estimatedLimit +
278-
", rtt_noload=" + TimeUnit.MICROSECONDS.toMillis(rttNoLoad.get()) / 1000.0+
304+
", rtt_noload=" + TimeUnit.MICROSECONDS.toMillis(rttNoLoad.get().longValue()) / 1000.0+
279305
" ms]";
280306
}
281307
}

concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/Measurement.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,14 @@ public interface Measurement {
1111
* @param sample
1212
* @return True if internal state was updated
1313
*/
14-
boolean add(long sample);
14+
boolean add(Number sample);
1515

16-
long update(Function<Long, Long> func);
16+
Number update(Function<Number, Number> func);
1717

1818
/**
1919
* @return Return the current value
2020
*/
21-
long get();
21+
Number get();
2222

2323
/**
2424
* Reset the internal state as if no samples were ever added

concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/MinimumMeasurement.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,30 +3,30 @@
33
import java.util.function.Function;
44

55
public class MinimumMeasurement implements Measurement {
6-
private long value = 0;
6+
private Double value = 0.0;
77

88
@Override
9-
public boolean add(long sample) {
10-
if (value == 0 || sample < value) {
11-
value = sample;
9+
public boolean add(Number sample) {
10+
if (value == 0.0 || sample.doubleValue() < value) {
11+
value = sample.doubleValue();
1212
return true;
1313
}
1414
return false;
1515
}
1616

1717
@Override
18-
public long get() {
18+
public Number get() {
1919
return value;
2020
}
2121

2222
@Override
2323
public void reset() {
24-
value = 0;
24+
value = 0.0;
2525
}
2626

2727
@Override
28-
public long update(Function<Long, Long> func) {
29-
value = func.apply(value);
28+
public Number update(Function<Number, Number> func) {
29+
value = func.apply(value).doubleValue();
3030
return value;
3131
}
3232
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package com.netflix.concurrency.limits.limit;
2+
3+
import java.util.function.Function;
4+
5+
/**
6+
* Measures the minimum value of a sample set but also adds a smoothing factor
7+
*/
8+
public class SmoothingMinimumMeasurement implements Measurement {
9+
private Double value = 0.0;
10+
private final double ratio;
11+
12+
/**
13+
* @param smoothing Factor applied to the new measurement using the formula
14+
* value = value * (1-smoothing) + newValue * smoothing
15+
*/
16+
public SmoothingMinimumMeasurement(double smoothing) {
17+
this.ratio = smoothing;
18+
}
19+
20+
@Override
21+
public boolean add(Number sample) {
22+
if (value == 0) {
23+
value = sample.doubleValue();
24+
return true;
25+
} else if (sample.doubleValue() < value.doubleValue()) {
26+
value = value * ratio + sample.doubleValue() * (1-ratio);
27+
return true;
28+
}
29+
30+
return false;
31+
}
32+
33+
@Override
34+
public Number get() {
35+
return value;
36+
}
37+
38+
@Override
39+
public void reset() {
40+
value = 0.0;
41+
}
42+
43+
@Override
44+
public Number update(Function<Number, Number> func) {
45+
double newValue = func.apply(value).doubleValue();
46+
value = value * (1-ratio) + newValue * ratio;
47+
return value;
48+
}
49+
}

concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/example/Example.java

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import com.netflix.concurrency.limits.grpc.server.ConcurrencyLimitServerInterceptor;
1919
import com.netflix.concurrency.limits.grpc.server.GrpcServerLimiterBuilder;
2020
import com.netflix.concurrency.limits.limit.GradientLimit;
21-
import com.netflix.concurrency.limits.limit.VegasLimit;
2221

2322
import io.grpc.CallOptions;
2423
import io.grpc.Channel;
@@ -46,20 +45,26 @@ public class Example {
4645

4746

4847
public static ServerCallHandler<String, String> createServerHandler(int concurrency) {
49-
final ExponentialDistribution distribution = new ExponentialDistribution(10.0);
50-
final Supplier<Integer> latency = () -> 100 + (int)distribution.sample();
48+
final ExponentialDistribution distribution = new ExponentialDistribution(20.0);
49+
final Supplier<Integer> latency = () -> 1 + (int)distribution.sample();
5150

5251
List<Semaphore> semaphores = Arrays.asList(
53-
new Semaphore(concurrency, true),
54-
new Semaphore(concurrency*2, true),
55-
new Semaphore(concurrency*4, true),
56-
new Semaphore(concurrency*8, true));
52+
new Semaphore( concurrency, true),
53+
new Semaphore( concurrency, true),
54+
new Semaphore( concurrency, true),
55+
new Semaphore( concurrency, true),
56+
new Semaphore(1, true)
57+
);
58+
59+
AtomicInteger position = new AtomicInteger(0);
60+
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
61+
position.updateAndGet(current -> (current + 1) % semaphores.size());
62+
}, 10, 10, TimeUnit.SECONDS);
5763

5864
return ServerCalls.asyncUnaryCall(new UnaryMethod<String, String>() {
59-
AtomicInteger counter = new AtomicInteger();
6065
@Override
6166
public void invoke(String req, StreamObserver<String> observer) {
62-
Semaphore sem = semaphores.get((counter.incrementAndGet() / 2000) % semaphores.size());
67+
Semaphore sem = semaphores.get(position.get());
6368
try {
6469
sem.acquire();
6570
Uninterruptibles.sleepUninterruptibly(latency.get(), TimeUnit.MILLISECONDS);
@@ -86,7 +91,7 @@ public static void main(String[] args) throws IOException {
8691
.build(), new ConcurrencyLimitServerInterceptor(new GrpcServerLimiterBuilder()
8792
.limiter(builder -> builder
8893
.limit(limit)
89-
.minWindowTime(200, TimeUnit.MILLISECONDS)
94+
.minWindowTime(1, TimeUnit.SECONDS)
9095
)
9196
.build())
9297
))
@@ -96,9 +101,10 @@ public static void main(String[] args) throws IOException {
96101
// Report progress
97102
AtomicInteger dropCount = new AtomicInteger(0);
98103
AtomicInteger successCount = new AtomicInteger(0);
99-
104+
AtomicInteger counter = new AtomicInteger(0);
100105
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
101-
System.out.println(MessageFormat.format("{0}, {1}, {2}", limit.getLimit(), successCount.getAndSet(0), dropCount.getAndSet(0)));
106+
System.out.println(MessageFormat.format("{0,number,#}, {1}, {2}, {3}", counter.incrementAndGet(),
107+
limit.getLimit(), successCount.getAndSet(0), dropCount.getAndSet(0)));
102108
}, 1, 1, TimeUnit.SECONDS);
103109

104110
// Create a client
@@ -107,7 +113,8 @@ public static void main(String[] args) throws IOException {
107113
.build();
108114

109115
DriverBuilder.newBuilder()
110-
.exponential(3, 2, TimeUnit.SECONDS)
116+
.exponential(3, 90, TimeUnit.SECONDS)
117+
.exponential(1, 5, TimeUnit.SECONDS)
111118
.run(1, TimeUnit.HOURS, () -> {
112119
ClientCalls.asyncUnaryCall(channel.newCall(METHOD_DESCRIPTOR, CallOptions.DEFAULT.withWaitForReady()), "request",
113120
new StreamObserver<String>() {

0 commit comments

Comments
 (0)