99import com .netflix .concurrency .limits .Limit ;
1010import com .netflix .concurrency .limits .MetricIds ;
1111import com .netflix .concurrency .limits .MetricRegistry ;
12+ import com .netflix .concurrency .limits .MetricRegistry .SampleListener ;
1213import com .netflix .concurrency .limits .internal .EmptyMetricRegistry ;
1314import com .netflix .concurrency .limits .internal .Preconditions ;
1415import com .netflix .concurrency .limits .limit .functions .SquareRootFunction ;
@@ -26,35 +27,84 @@ public final class GradientLimit implements Limit {
2627 public static class Builder {
2728 private int initialLimit = 20 ;
2829 private int maxConcurrency = 1000 ;
30+ private long minRttThreshold = TimeUnit .MILLISECONDS .toNanos (1 );
31+
2932 private double smoothing = 0.2 ;
3033 private Function <Integer , Integer > queueSize = SquareRootFunction .create (4 );
3134 private MetricRegistry registry = EmptyMetricRegistry .INSTANCE ;
3235
36+ /**
37+ * Minimum threshold for accepting a new rtt sample. Any RTT lower than this threshold
38+ * will be discarded.
39+ *
40+ * @param minRttTreshold
41+ * @param units
42+ * @return Chainable builder
43+ */
44+ public Builder minRttThreshold (long minRttTreshold , TimeUnit units ) {
45+ this .minRttThreshold = units .toMillis (minRttTreshold );
46+ return this ;
47+ }
48+
49+ /**
50+ * Initial limit used by the limiter
51+ * @param initialLimit
52+ * @return Chainable builder
53+ */
3354 public Builder initialLimit (int initialLimit ) {
3455 this .initialLimit = initialLimit ;
3556 return this ;
3657 }
3758
59+ /**
60+ * Maximum allowable concurrency. Any estimated concurrency will be capped
61+ * at this value
62+ * @param maxConcurrency
63+ * @return Chainable builder
64+ */
3865 public Builder maxConcurrency (int maxConcurrency ) {
3966 this .maxConcurrency = maxConcurrency ;
4067 return this ;
4168 }
4269
70+ /**
71+ * Fixed amount the estimated limit can grow while latencies remain low
72+ * @param queueSize
73+ * @return Chainable builder
74+ */
4375 public Builder queueSize (int queueSize ) {
4476 this .queueSize = (ignore ) -> queueSize ;
4577 return this ;
4678 }
47-
79+
80+ /**
81+ * Function to dynamically determine the amount the estimated limit can grow while
82+ * latencies remain low as a function of the current limit.
83+ * @param queueSize
84+ * @return Chainable builder
85+ */
4886 public Builder queueSize (Function <Integer , Integer > queueSize ) {
4987 this .queueSize = queueSize ;
5088 return this ;
5189 }
5290
91+ /**
92+ * Smoothing factor to limit how aggressively the estimated limit can shrink
93+ * when queuing has been detected.
94+ * @param smoothing Value of 0.0 to 1.0 where 1.0 means the limit is completely
95+ * replicated by the new estimate.
96+ * @return Chainable builder
97+ */
5398 public Builder smoothing (double smoothing ) {
5499 this .smoothing = smoothing ;
55100 return this ;
56101 }
57102
103+ /**
104+ * Registry for reporting metrics about the limiter's internal state.
105+ * @param registry
106+ * @return Chainable builder
107+ */
58108 public Builder metricRegistry (MetricRegistry registry ) {
59109 this .registry = registry ;
60110 return this ;
@@ -93,22 +143,35 @@ public static GradientLimit newDefault() {
93143
94144 private final double smoothing ;
95145
146+ private final SampleListener sampleRttMetric ;
147+
148+ private final long minRttThreshold ;
149+
96150 private GradientLimit (Builder builder ) {
97151 this .estimatedLimit = builder .initialLimit ;
98152 this .maxLimit = builder .maxConcurrency ;
99153 this .queueSize = builder .queueSize ;
100154 this .smoothing = builder .smoothing ;
155+ this .minRttThreshold = builder .minRttThreshold ;
156+
157+ this .sampleRttMetric = builder .registry .registerDistribution ("sample_rtt" );
101158 }
102159
103160 @ Override
104161 public synchronized void update (long rtt , int maxInFlight ) {
105162 Preconditions .checkArgument (rtt > 0 , "rtt must be >0 but got " + rtt );
106163
164+ if (rtt < minRttThreshold ) {
165+ return ;
166+ }
167+
107168 if (rtt_noload == 0 || rtt < rtt_noload ) {
108169 LOG .debug ("New MinRTT {}" , rtt );
109170 rtt_noload = rtt ;
110171 }
111172
173+ sampleRttMetric .addSample (rtt );
174+
112175 final double queueSize = this .queueSize .apply ((int )this .estimatedLimit );
113176 final double gradient = (double )rtt_noload / rtt ;
114177 double newLimit ;
0 commit comments