Skip to content

Commit 7831418

Browse files
authored
Merge pull request #42 from elandau/bugfix/noload_rtt_probing
Improve probing for new noload RTT in gradient
2 parents f3e9427 + 10605d1 commit 7831418

File tree

7 files changed

+206
-63
lines changed

7 files changed

+206
-63
lines changed

concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/GradientLimit.java

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,5 @@
11
package com.netflix.concurrency.limits.limit;
22

3-
import java.util.concurrent.ThreadLocalRandom;
4-
import java.util.concurrent.TimeUnit;
5-
import java.util.function.Function;
6-
import java.util.function.Supplier;
7-
8-
import org.slf4j.Logger;
9-
import org.slf4j.LoggerFactory;
10-
113
import com.netflix.concurrency.limits.Limit;
124
import com.netflix.concurrency.limits.MetricIds;
135
import com.netflix.concurrency.limits.MetricRegistry;
@@ -16,6 +8,14 @@
168
import com.netflix.concurrency.limits.internal.Preconditions;
179
import com.netflix.concurrency.limits.limit.functions.SquareRootFunction;
1810

11+
import java.util.concurrent.ThreadLocalRandom;
12+
import java.util.concurrent.TimeUnit;
13+
import java.util.function.Function;
14+
import java.util.function.Supplier;
15+
16+
import org.slf4j.Logger;
17+
import org.slf4j.LoggerFactory;
18+
1919
/**
2020
* Concurrency limit algorithm that adjust the limits based on the gradient of change in the
2121
* samples minimum RTT and absolute minimum RTT allowing for a queue of square root of the
@@ -41,7 +41,7 @@ public static class Builder {
4141
private Supplier<Integer> resetRttCounterSupplier;
4242

4343
private Builder() {
44-
probeNoLoadRtt(1000, 2000);
44+
probeNoLoadRtt(500, 1000);
4545
}
4646

4747
/**
@@ -215,21 +215,24 @@ public synchronized void update(SampleWindow sample) {
215215
return;
216216
}
217217

218-
final double queueSize = this.queueSize.apply((int)this.estimatedLimit);
219-
queueSizeSampleListener.addSample(queueSize);
220-
221218
// Reset or probe for a new RTT and a new estimatedLimit. It's necessary to cut the limit
222219
// in half to avoid having the limit drift upwards when the RTT is probed during heavy load.
223220
// To avoid decreasing the limit too much we don't allow it to go lower than the queueSize.
224221
if (resetRttCounter != DISABLED && resetRttCounter-- <= 0) {
225-
LOG.debug("Probe for a new noload RTT");
226222
resetRttCounter = this.resetRttCounterSupplier.get();
227-
estimatedLimit = Math.max(queueSize, estimatedLimit/2);
228-
rttNoLoad.reset();
223+
long currrentQueueSize = this.queueSize.apply((int)this.estimatedLimit);
224+
estimatedLimit = currrentQueueSize;
225+
long nextRttNoLoad = rttNoLoad.update(value -> value * 2);
226+
LOG.debug("Probe MinRTT {}", TimeUnit.NANOSECONDS.toMicros(nextRttNoLoad)/1000.0);
227+
return;
229228
}
230229

230+
final double queueSize = this.queueSize.apply((int)this.estimatedLimit);
231+
queueSizeSampleListener.addSample(queueSize);
232+
231233
if (rttNoLoad.add(rtt)) {
232-
LOG.debug("New MinRTT {}", rtt);
234+
LOG.debug("New MinRTT {}", TimeUnit.NANOSECONDS.toMicros(rtt)/1000.0);
235+
return;
233236
}
234237
minRttSampleListener.addSample(rttNoLoad.get());
235238

@@ -250,7 +253,7 @@ public synchronized void update(SampleWindow sample) {
250253
if ((int)newLimit != (int)estimatedLimit) {
251254
if (LOG.isDebugEnabled()) {
252255
LOG.debug("New limit={} minRtt={} ms winRtt={} ms queueSize={} gradient={} resetCounter={}",
253-
(int)estimatedLimit,
256+
(int)newLimit,
254257
TimeUnit.NANOSECONDS.toMicros(rttNoLoad.get())/1000.0,
255258
TimeUnit.NANOSECONDS.toMicros(rtt)/1000.0,
256259
queueSize,

concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/Measurement.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.netflix.concurrency.limits.limit;
22

3+
import java.util.function.Function;
4+
35
/**
46
* Contract for tracking a measurement such as a minimum or average of a sample set
57
*/
@@ -11,6 +13,8 @@ public interface Measurement {
1113
*/
1214
boolean add(long sample);
1315

16+
long update(Function<Long, Long> func);
17+
1418
/**
1519
* @return Return the current value
1620
*/

concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/MinimumMeasurement.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.netflix.concurrency.limits.limit;
22

3+
import java.util.function.Function;
4+
35
public class MinimumMeasurement implements Measurement {
46
private long value = 0;
57

@@ -22,4 +24,9 @@ public void reset() {
2224
value = 0;
2325
}
2426

27+
@Override
28+
public long update(Function<Long, Long> func) {
29+
value = func.apply(value);
30+
return value;
31+
}
2532
}

concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/DefaultLimiter.java

Lines changed: 55 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,18 @@
11
package com.netflix.concurrency.limits.limiter;
22

3-
import java.util.Optional;
4-
import java.util.concurrent.TimeUnit;
5-
import java.util.concurrent.atomic.AtomicInteger;
6-
import java.util.concurrent.atomic.AtomicLong;
7-
import java.util.concurrent.atomic.AtomicReference;
8-
import java.util.function.Supplier;
9-
103
import com.netflix.concurrency.limits.Limit;
114
import com.netflix.concurrency.limits.Limiter;
125
import com.netflix.concurrency.limits.Strategy;
136
import com.netflix.concurrency.limits.Strategy.Token;
147
import com.netflix.concurrency.limits.internal.Preconditions;
158
import com.netflix.concurrency.limits.limit.VegasLimit;
169

10+
import java.util.Optional;
11+
import java.util.concurrent.TimeUnit;
12+
import java.util.concurrent.atomic.AtomicInteger;
13+
import java.util.concurrent.atomic.AtomicReference;
14+
import java.util.function.Supplier;
15+
1716
/**
1817
* {@link Limiter} that combines a plugable limit algorithm and enforcement strategy to
1918
* enforce concurrency limits to a fixed resource.
@@ -23,42 +22,31 @@ public final class DefaultLimiter<ContextT> implements Limiter<ContextT> {
2322
private final Supplier<Long> nanoClock = System::nanoTime;
2423

2524
private static final long DEFAULT_MIN_WINDOW_TIME = TimeUnit.SECONDS.toNanos(1);
26-
private static final int DEFAULT_WINDOW_SIZE = 10;
27-
25+
private static final long DEFAULT_MAX_WINDOW_TIME = TimeUnit.SECONDS.toNanos(1);
26+
2827
/**
2928
* Minimum observed samples to filter out sample windows with not enough significant samples
3029
*/
31-
private static final int MIN_WINDOW_SAMPLE_COUNT = 10;
30+
private static final int DEFAULT_WINDOW_SIZE = 10;
3231

3332
/**
3433
* Minimum observed max inflight to filter out sample windows with not enough significant data
3534
*/
36-
private static final int MIN_WINDOW_MAX_INFLIGHT = 1;
35+
private static final int MIN_WINDOW_MAX_INFLIGHT = 2;
3736

3837
/**
3938
* End time for the sampling window at which point the limit should be updated
4039
*/
41-
private final AtomicLong nextUpdateTime = new AtomicLong();
40+
private volatile long nextUpdateTime = 0;
4241

43-
/**
44-
* Algorithm used to determine the new limit based on the current limit and minimum
45-
* measured RTT in the sample window
46-
*/
4742
private final Limit limit;
4843

49-
/**
50-
* Strategy for enforcing the limit
51-
*/
5244
private final Strategy<ContextT> strategy;
5345

54-
/**
55-
* Minimum window size in nanonseconds for sampling a new minRtt
56-
*/
5746
private final long minWindowTime;
5847

59-
/**
60-
* Sampling window size in multiple of the measured minRtt
61-
*/
48+
private final long maxWindowTime;
49+
6250
private final int windowSize;
6351

6452
/**
@@ -73,27 +61,50 @@ public final class DefaultLimiter<ContextT> implements Limiter<ContextT> {
7361

7462
public static class Builder {
7563
private Limit limit = VegasLimit.newDefault();
64+
private long maxWindowTime = DEFAULT_MAX_WINDOW_TIME;
7665
private long minWindowTime = DEFAULT_MIN_WINDOW_TIME;
7766
private int windowSize = DEFAULT_WINDOW_SIZE;
7867

68+
/**
69+
* Algorithm used to determine the new limit based on the current limit and minimum
70+
* measured RTT in the sample window
71+
*/
7972
public Builder limit(Limit limit) {
8073
Preconditions.checkArgument(limit != null, "Algorithm may not be null");
8174
this.limit = limit;
8275
return this;
8376
}
8477

78+
/**
79+
* Minimum window duration for sampling a new minRtt
80+
*/
8581
public Builder minWindowTime(long minWindowTime, TimeUnit units) {
8682
Preconditions.checkArgument(units.toMillis(minWindowTime) >= 100, "minWindowTime must be >= 100 ms");
8783
this.minWindowTime = units.toNanos(minWindowTime);
8884
return this;
8985
}
9086

87+
/**
88+
* Maximum window duration for sampling a new minRtt
89+
*/
90+
public Builder maxWindowTime(long maxWindowTime, TimeUnit units) {
91+
Preconditions.checkArgument(maxWindowTime >= units.toMillis(100), "minWindowTime must be >= 100 ms");
92+
this.maxWindowTime = units.toNanos(maxWindowTime);
93+
return this;
94+
}
95+
96+
/**
97+
* Minimum sampling window size for finding a new minimum rtt
98+
*/
9199
public Builder windowSize(int windowSize) {
92100
Preconditions.checkArgument(windowSize >= 10, "Window size must be >= 10");
93101
this.windowSize = windowSize;
94102
return this;
95103
}
96104

105+
/**
106+
* @param strategy Strategy for enforcing the limit
107+
*/
97108
public <ContextT> DefaultLimiter<ContextT> build(Strategy<ContextT> strategy) {
98109
Preconditions.checkArgument(strategy != null, "Strategy may not be null");
99110
return new DefaultLimiter<ContextT>(this, strategy);
@@ -117,12 +128,14 @@ public DefaultLimiter(Limit limit, Strategy<ContextT> strategy) {
117128
this.strategy = strategy;
118129
this.windowSize = DEFAULT_WINDOW_SIZE;
119130
this.minWindowTime = DEFAULT_MIN_WINDOW_TIME;
131+
this.maxWindowTime = DEFAULT_MAX_WINDOW_TIME;
120132
strategy.setLimit(limit.getLimit());
121133
}
122134

123135
private DefaultLimiter(Builder builder, Strategy<ContextT> strategy) {
124136
this.limit = builder.limit;
125137
this.minWindowTime = builder.minWindowTime;
138+
this.maxWindowTime = builder.maxWindowTime;
126139
this.windowSize = builder.windowSize;
127140
this.strategy = strategy;
128141
strategy.setLimit(limit.getLimit());
@@ -150,16 +163,17 @@ public void onSuccess() {
150163

151164
sample.getAndUpdate(current -> current.addSample(rtt, currentMaxInFlight));
152165

153-
long updateTime = nextUpdateTime.get();
154-
if (endTime >= updateTime) {
155-
long nextUpdate = endTime + Math.max(minWindowTime, rtt * windowSize);
156-
if (nextUpdateTime.compareAndSet(updateTime, nextUpdate)) {
157-
ImmutableSample last = sample.getAndUpdate(ImmutableSample::reset);
158-
if (last.getCandidateRttNanos() < Integer.MAX_VALUE
159-
&& last.getSampleCount() > MIN_WINDOW_SAMPLE_COUNT
160-
&& last.getMaxInFlight() > MIN_WINDOW_MAX_INFLIGHT) {
161-
limit.update(last);
162-
strategy.setLimit(limit.getLimit());
166+
if (endTime >= nextUpdateTime) {
167+
synchronized (this) {
168+
// Double check inside the lock
169+
if (endTime >= nextUpdateTime) {
170+
ImmutableSample last = sample.get();
171+
if (isSampleReady(last)) {
172+
nextUpdateTime = endTime + Math.min(Math.max(last.getCandidateRttNanos() * 2, minWindowTime), maxWindowTime);
173+
sample.set(new ImmutableSample());
174+
limit.update(last);
175+
strategy.setLimit(limit.getLimit());
176+
}
163177
}
164178
}
165179
}
@@ -180,6 +194,12 @@ public void onDropped() {
180194
});
181195
}
182196

197+
private boolean isSampleReady(ImmutableSample sample) {
198+
return sample.getCandidateRttNanos() < Long.MAX_VALUE
199+
&& sample.getSampleCount() > windowSize
200+
&& sample.getMaxInFlight() > MIN_WINDOW_MAX_INFLIGHT;
201+
}
202+
183203
protected int getLimit() {
184204
return limit.getLimit();
185205
}

concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/ImmutableSample.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package com.netflix.concurrency.limits.limiter;
22

3-
import java.util.concurrent.TimeUnit;
4-
53
import com.netflix.concurrency.limits.Limit;
64

5+
import java.util.concurrent.TimeUnit;
6+
77
/**
88
* Class used to track immutable samples in an AtomicReference
99
*/
@@ -14,16 +14,12 @@ public class ImmutableSample implements Limit.SampleWindow {
1414
final boolean didDrop;
1515

1616
public ImmutableSample() {
17-
this.minRtt = Integer.MAX_VALUE;
17+
this.minRtt = Long.MAX_VALUE;
1818
this.maxInFlight = 0;
1919
this.sampleCount = 0;
2020
this.didDrop = false;
2121
}
2222

23-
public ImmutableSample reset() {
24-
return new ImmutableSample();
25-
}
26-
2723
public ImmutableSample(long minRtt, int maxInFlight, int sampleCount, boolean didDrop) {
2824
this.minRtt = minRtt;
2925
this.maxInFlight = maxInFlight;
@@ -36,7 +32,7 @@ public ImmutableSample addSample(long rtt, int maxInFlight) {
3632
}
3733

3834
public ImmutableSample addDroppedSample(int maxInFlight) {
39-
return new ImmutableSample(minRtt, Math.max(maxInFlight, this.maxInFlight), sampleCount+1, true);
35+
return new ImmutableSample(minRtt, Math.max(maxInFlight, this.maxInFlight), sampleCount, true);
4036
}
4137

4238
@Override
@@ -61,9 +57,7 @@ public boolean didDrop() {
6157

6258
@Override
6359
public String toString() {
64-
return "ImmutableSample [minRtt=" + TimeUnit.NANOSECONDS.toMillis(minRtt)
65-
+ ", maxInFlight=" + maxInFlight
66-
+ ", sampleCount=" + sampleCount
60+
return "ImmutableSample [minRtt=" + TimeUnit.NANOSECONDS.toMicros(minRtt) / 1000.0 + ", maxInFlight=" + maxInFlight + ", sampleCount=" + sampleCount
6761
+ ", didDrop=" + didDrop + "]";
6862
}
6963
}

concurrency-limits-grpc/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,5 @@ dependencies {
1515
testCompile "io.grpc:grpc-stub:1.9.0"
1616
testCompile "junit:junit-dep:4.10"
1717
testCompile "org.slf4j:slf4j-log4j12:1.7.+"
18+
testCompile "org.apache.commons:commons-math3:3.6.1"
1819
}

0 commit comments

Comments
 (0)