Skip to content

Commit ba9767c

Browse files
authored
Merge pull request #44 from elandau/feature/vegas_slow_start
Vegas probing + slow start
2 parents 6259e69 + 690025d commit ba9767c

File tree

8 files changed

+167
-67
lines changed

8 files changed

+167
-67
lines changed

concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/GradientLimit.java

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import java.util.concurrent.ThreadLocalRandom;
44
import java.util.concurrent.TimeUnit;
55
import java.util.function.Function;
6-
import java.util.function.Supplier;
76

87
import org.slf4j.Logger;
98
import org.slf4j.LoggerFactory;
@@ -38,12 +37,8 @@ public static class Builder {
3837
private MetricRegistry registry = EmptyMetricRegistry.INSTANCE;
3938
private double rttTolerance = 1.0;
4039

41-
private Supplier<Integer> resetRttCounterSupplier;
40+
private int probeMultiplier = 30;
4241

43-
private Builder() {
44-
probeNoLoadRtt(500, 1000);
45-
}
46-
4742
/**
4843
* Minimum threshold for accepting a new rtt sample. Any RTT lower than this threshold
4944
* will be discarded.
@@ -134,15 +129,13 @@ public Builder metricRegistry(MetricRegistry registry) {
134129
}
135130

136131
/**
137-
* Probe for a new no_load RTT randomly in the range of [minUpdate, maxUpdates]
138-
* update intervals. Default is [1000, 2000].
139-
* @param minUpdates
140-
* @param maxUpdates
132+
* The limiter will probe for a new noload RTT every probeMultiplier * current limit
133+
* iterations. Default value is 30.
134+
* @param probeMultiplier
141135
* @return Chinable builder
142136
*/
143-
public Builder probeNoLoadRtt(int minUpdates, int maxUpdates) {
144-
Preconditions.checkArgument(minUpdates < maxUpdates, "minUpdates must be < maxUpdates");
145-
resetRttCounterSupplier = () -> ThreadLocalRandom.current().nextInt(minUpdates, maxUpdates);
137+
public Builder probeMultiplier(int probeMultiplier) {
138+
this.probeMultiplier = probeMultiplier;
146139
return this;
147140
}
148141

@@ -185,7 +178,7 @@ public static GradientLimit newDefault() {
185178

186179
private final SampleListener queueSizeSampleListener;
187180

188-
private final Supplier<Integer> resetRttCounterSupplier;
181+
private final int probeMultiplier;
189182

190183
private int resetRttCounter;
191184

@@ -196,14 +189,19 @@ private GradientLimit(Builder builder) {
196189
this.smoothing = builder.smoothing;
197190
this.minRttThreshold = builder.minRttThreshold;
198191
this.rttTolerance = builder.rttTolerance;
199-
this.resetRttCounterSupplier = builder.resetRttCounterSupplier;
200-
this.resetRttCounter = resetRttCounterSupplier.get();
192+
this.probeMultiplier = builder.probeMultiplier;
193+
this.resetRttCounter = nextProbeCountdown();
201194

202195
this.minRttSampleListener = builder.registry.registerDistribution(MetricIds.MIN_RTT_NAME);
203196
this.minWindowRttSampleListener = builder.registry.registerDistribution(MetricIds.WINDOW_MIN_RTT_NAME);
204197
this.queueSizeSampleListener = builder.registry.registerDistribution(MetricIds.WINDOW_QUEUE_SIZE_NAME);
205198
}
206199

200+
private int nextProbeCountdown() {
201+
int max = (int) (probeMultiplier * estimatedLimit);
202+
return ThreadLocalRandom.current().nextInt(max / 2, max);
203+
}
204+
207205
@Override
208206
public synchronized void update(SampleWindow sample) {
209207
final long rtt = sample.getCandidateRttNanos();
@@ -222,7 +220,7 @@ public synchronized void update(SampleWindow sample) {
222220
// in half to avoid having the limit drift upwards when the RTT is probed during heavy load.
223221
// To avoid decreasing the limit too much we don't allow it to go lower than the queueSize.
224222
if (resetRttCounter != DISABLED && resetRttCounter-- <= 0) {
225-
resetRttCounter = this.resetRttCounterSupplier.get();
223+
resetRttCounter = nextProbeCountdown();
226224

227225
estimatedLimit = Math.max(estimatedLimit - queueSize, queueSize);
228226

concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/VegasLimit.java

Lines changed: 69 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.netflix.concurrency.limits.limit;
22

3+
import java.util.concurrent.ThreadLocalRandom;
34
import java.util.concurrent.TimeUnit;
45
import java.util.function.Function;
56

@@ -12,6 +13,7 @@
1213
import com.netflix.concurrency.limits.MetricRegistry.SampleListener;
1314
import com.netflix.concurrency.limits.internal.EmptyMetricRegistry;
1415
import com.netflix.concurrency.limits.internal.Preconditions;
16+
import com.netflix.concurrency.limits.limit.functions.Log10RootFunction;
1517

1618
/**
1719
* Limiter based on TCP Vegas where the limit increases by alpha if the queue_use is small ({@literal <} alpha)
@@ -26,34 +28,59 @@
2628
public class VegasLimit implements Limit {
2729
private static final Logger LOG = LoggerFactory.getLogger(VegasLimit.class);
2830

31+
private static final Function<Integer, Integer> LOG10 = Log10RootFunction.create(0);
32+
33+
private static final int DISABLED = -1;
34+
2935
public static class Builder {
3036
private int initialLimit = 20;
3137
private int maxConcurrency = 1000;
3238
private MetricRegistry registry = EmptyMetricRegistry.INSTANCE;
3339
private double smoothing = 1.0;
3440

35-
private Function<Integer, Integer> alpha = (limit) -> 3;
36-
private Function<Integer, Integer> beta = (limit) -> 6;
37-
private Function<Double, Double> increaseFunc = (limit) -> limit + 1;
38-
private Function<Double, Double> decreaseFunc = (limit) -> limit - 1;
41+
private Function<Integer, Integer> alphaFunc = (limit) -> 3 * LOG10.apply(limit.intValue());
42+
private Function<Integer, Integer> betaFunc = (limit) -> 6 * LOG10.apply(limit.intValue());
43+
private Function<Integer, Integer> thresholdFunc = (limit) -> LOG10.apply(limit.intValue());
44+
private Function<Double, Double> increaseFunc = (limit) -> limit + LOG10.apply(limit.intValue());
45+
private Function<Double, Double> decreaseFunc = (limit) -> limit - LOG10.apply(limit.intValue());
46+
private int probeMultiplier = 30;
47+
48+
private Builder() {
49+
}
50+
51+
/**
52+
* The limiter will probe for a new noload RTT every probeMultiplier * current limit
53+
* iterations. Default value is 30.
54+
* @param probeMultiplier
55+
* @return Chinable builder
56+
*/
57+
public Builder probeMultiplier(int probeMultiplier) {
58+
this.probeMultiplier = probeMultiplier;
59+
return this;
60+
}
3961

4062
public Builder alpha(int alpha) {
41-
this.alpha = (ignore) -> alpha;
63+
this.alphaFunc = (ignore) -> alpha;
64+
return this;
65+
}
66+
67+
public Builder threshold(Function<Integer, Integer> threshold) {
68+
this.thresholdFunc = threshold;
4269
return this;
4370
}
4471

4572
public Builder alpha(Function<Integer, Integer> alpha) {
46-
this.alpha = alpha;
73+
this.alphaFunc = alpha;
4774
return this;
4875
}
4976

5077
public Builder beta(int beta) {
51-
this.beta = (ignore) -> beta;
78+
this.betaFunc = (ignore) -> beta;
5279
return this;
5380
}
5481

5582
public Builder beta(Function<Integer, Integer> beta) {
56-
this.beta = beta;
83+
this.betaFunc = beta;
5784
return this;
5885
}
5986

@@ -125,49 +152,77 @@ public static VegasLimit newDefault() {
125152
private final double smoothing;
126153
private final Function<Integer, Integer> alphaFunc;
127154
private final Function<Integer, Integer> betaFunc;
155+
private final Function<Integer, Integer> thresholdFunc;
128156
private final Function<Double, Double> increaseFunc;
129157
private final Function<Double, Double> decreaseFunc;
130158
private final SampleListener rttSampleListener;
159+
private int probeMultiplier;
160+
private int probeCountdown;
131161

132162
private VegasLimit(Builder builder) {
133163
this.estimatedLimit = builder.initialLimit;
134164
this.maxLimit = builder.maxConcurrency;
135-
this.alphaFunc = builder.alpha;
136-
this.betaFunc = builder.beta;
165+
this.alphaFunc = builder.alphaFunc;
166+
this.betaFunc = builder.betaFunc;
137167
this.increaseFunc = builder.increaseFunc;
138168
this.decreaseFunc = builder.decreaseFunc;
169+
this.thresholdFunc = builder.thresholdFunc;
139170
this.smoothing = builder.smoothing;
171+
this.probeMultiplier = builder.probeMultiplier;
172+
this.probeCountdown = nextProbeCountdown();
140173

141174
this.rttSampleListener = builder.registry.registerDistribution(MetricIds.MIN_RTT_NAME);
142-
175+
}
176+
177+
private int nextProbeCountdown() {
178+
int max = (int) (probeMultiplier * estimatedLimit);
179+
return ThreadLocalRandom.current().nextInt(max / 2, max);
143180
}
144181

145182
@Override
146183
public synchronized void update(SampleWindow sample) {
147184
long rtt = sample.getCandidateRttNanos();
148185
Preconditions.checkArgument(rtt > 0, "rtt must be >0 but got " + rtt);
149186

187+
final int queueSize = (int) Math.ceil(estimatedLimit * (1 - (double)rtt_noload / rtt));
188+
189+
if (probeCountdown != DISABLED && probeCountdown-- <= 0) {
190+
LOG.debug("Probe MinRTT {}", TimeUnit.NANOSECONDS.toMicros(rtt) / 1000.0);
191+
probeCountdown = nextProbeCountdown();
192+
rtt_noload = rtt;
193+
return;
194+
}
195+
150196
if (rtt_noload == 0 || rtt < rtt_noload) {
151197
LOG.debug("New MinRTT {}", TimeUnit.NANOSECONDS.toMicros(rtt) / 1000.0);
152198
rtt_noload = rtt;
199+
return;
153200
}
154201

155202
rttSampleListener.addSample(rtt_noload);
156203

157204
double newLimit;
158-
final int queueSize = (int) Math.ceil(estimatedLimit * (1 - (double)rtt_noload / rtt));
205+
// Treat any drop (i.e timeout) as needing to reduce the limit
159206
if (sample.didDrop()) {
160207
newLimit = decreaseFunc.apply(estimatedLimit);
161-
} else if (sample.getMaxInFlight() + queueSize < estimatedLimit) {
208+
// Prevent upward drift if not close to the limit
209+
} else if (sample.getMaxInFlight() * 2 < estimatedLimit) {
162210
return;
163211
} else {
164212
int alpha = alphaFunc.apply((int)estimatedLimit);
165213
int beta = betaFunc.apply((int)estimatedLimit);
214+
int threshold = this.thresholdFunc.apply((int)estimatedLimit);
166215

167-
if (queueSize < alpha) {
216+
// Aggressive increase when no queuing
217+
if (queueSize <= threshold) {
218+
newLimit = estimatedLimit + beta;
219+
// Increase the limit if queue is still manageable
220+
} else if (queueSize < alpha) {
168221
newLimit = increaseFunc.apply(estimatedLimit);
222+
// Detecting latency so decrease
169223
} else if (queueSize > beta) {
170224
newLimit = decreaseFunc.apply(estimatedLimit);
225+
// We're within he sweet spot so nothing to do
171226
} else {
172227
return;
173228
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package com.netflix.concurrency.limits.limit.functions;
2+
3+
import java.util.function.Function;
4+
import java.util.stream.IntStream;
5+
6+
/**
7+
* Function used by limiters to calculate thredsholds using log10 of the current limit.
8+
* Here we pre-compute the log10 of numbers up to 1000 as an optimization.
9+
*/
10+
public final class Log10RootFunction implements Function<Integer, Integer> {
11+
static final int[] lookup = new int[1000];
12+
13+
static {
14+
IntStream.range(0, 1000).forEach(i -> lookup[i] = Math.max(1, (int)Math.log10(i)));
15+
}
16+
17+
private static final Log10RootFunction INSTANCE = new Log10RootFunction();
18+
19+
/**
20+
* Create an instance of a function that returns : baseline + sqrt(limit)
21+
*
22+
* @param baseline
23+
* @return
24+
*/
25+
public static Function<Integer, Integer> create(int baseline) {
26+
return INSTANCE.andThen(t -> t + baseline);
27+
}
28+
29+
@Override
30+
public Integer apply(Integer t) {
31+
return t < 1000 ? lookup[t] : (int)Math.sqrt(t);
32+
}
33+
}

concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/functions/SquareRootFunction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ public final class SquareRootFunction implements Function<Integer, Integer> {
1212
static final int[] lookup = new int[1000];
1313

1414
static {
15-
IntStream.range(0, 1000).forEach(i -> lookup[i] = (int)Math.sqrt(i));
15+
IntStream.range(0, 1000).forEach(i -> lookup[i] = Math.max(1, (int)Math.sqrt(i)));
1616
}
1717

1818
private static final SquareRootFunction INSTANCE = new SquareRootFunction();

concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limit/VegasLimitTest.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,27 +29,27 @@ public void initialLimit() {
2929
public void increaseLimit() {
3030
VegasLimit limit = create();
3131
limit.update(new ImmutableSampleWindow().addSample(TimeUnit.MILLISECONDS.toNanos(10), 10));
32-
Assert.assertEquals(11, limit.getLimit());
32+
Assert.assertEquals(10, limit.getLimit());
3333
limit.update(new ImmutableSampleWindow().addSample(TimeUnit.MILLISECONDS.toNanos(10), 11));
34-
Assert.assertEquals(12, limit.getLimit());
34+
Assert.assertEquals(16, limit.getLimit());
3535
}
3636

3737
@Test
3838
public void decreaseLimit() {
3939
VegasLimit limit = create();
4040
limit.update(new ImmutableSampleWindow().addSample(TimeUnit.MILLISECONDS.toNanos(10), 10));
41-
Assert.assertEquals(11, limit.getLimit());
42-
limit.update(new ImmutableSampleWindow().addSample(TimeUnit.MILLISECONDS.toNanos(50), 11));
4341
Assert.assertEquals(10, limit.getLimit());
42+
limit.update(new ImmutableSampleWindow().addSample(TimeUnit.MILLISECONDS.toNanos(50), 11));
43+
Assert.assertEquals(9, limit.getLimit());
4444
}
4545

4646
@Test
4747
public void noChangeIfWithinThresholds() {
4848
VegasLimit limit = create();
4949
limit.update(new ImmutableSampleWindow().addSample(TimeUnit.MILLISECONDS.toNanos(10), 10));
50-
Assert.assertEquals(11, limit.getLimit());
50+
Assert.assertEquals(10, limit.getLimit());
5151
limit.update(new ImmutableSampleWindow().addSample(TimeUnit.MILLISECONDS.toNanos(14), 14));
52-
Assert.assertEquals(11, limit.getLimit());
52+
Assert.assertEquals(10, limit.getLimit());
5353
}
5454

5555
@Test
@@ -84,7 +84,7 @@ public void decreaseWithoutSmoothing() {
8484

8585
// Pick up first min-rtt
8686
limit.update(new ImmutableSampleWindow().addSample(TimeUnit.MILLISECONDS.toNanos(10), 100));
87-
Assert.assertEquals(101, limit.getLimit());
87+
Assert.assertEquals(100, limit.getLimit());
8888

8989
// First decrease
9090
limit.update(new ImmutableSampleWindow().addSample(TimeUnit.MILLISECONDS.toNanos(20), 100));

concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limit/functions/SquareRootFunctionTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ public class SquareRootFunctionTest {
99
@Test
1010
public void confirm0Index() {
1111
Function<Integer, Integer> func = SquareRootFunction.create(4);
12-
Assert.assertEquals(4, func.apply(0).intValue());
12+
Assert.assertEquals(5, func.apply(0).intValue());
1313
}
1414

1515
@Test

concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/example/DriverBuilder.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ public void run(long runtime, TimeUnit units, Runnable action) {
6767
long endTime = System.nanoTime() + units.toNanos(runtime);
6868
while (true) {
6969
for (DriverBuilder.Segment segment : segments) {
70-
System.out.println(segment.name());
7170
long segmentEndTime = System.nanoTime() + segment.duration();
7271
while (true) {
7372
long currentTime = System.nanoTime();

0 commit comments

Comments
 (0)