@@ -29,12 +29,12 @@ public static class Builder {
2929 private int initialLimit = 20 ;
3030 private int maxConcurrency = 1000 ;
3131 private MetricRegistry registry = EmptyMetricRegistry .INSTANCE ;
32- private double smoothing = 0.2 ;
32+ private double smoothing = 1.0 ;
3333
3434 private Function <Integer , Integer > alpha = (limit ) -> 3 ;
3535 private Function <Integer , Integer > beta = (limit ) -> 6 ;
36- private Function <Integer , Integer > increaseFunc = (limit ) -> limit + 1 ;
37- private Function <Integer , Integer > decreaseFunc = (limit ) -> limit - 1 ;
36+ private Function <Double , Double > increaseFunc = (limit ) -> limit + 1 ;
37+ private Function <Double , Double > decreaseFunc = (limit ) -> limit - 1 ;
3838
3939 public Builder alpha (int alpha ) {
4040 this .alpha = (ignore ) -> alpha ;
@@ -56,12 +56,12 @@ public Builder beta(Function<Integer, Integer> beta) {
5656 return this ;
5757 }
5858
59- public Builder increase (Function <Integer , Integer > increase ) {
59+ public Builder increase (Function <Double , Double > increase ) {
6060 this .increaseFunc = increase ;
6161 return this ;
6262 }
6363
64- public Builder decrease (Function <Integer , Integer > decrease ) {
64+ public Builder decrease (Function <Double , Double > decrease ) {
6565 this .decreaseFunc = decrease ;
6666 return this ;
6767 }
@@ -128,8 +128,8 @@ public static VegasLimit newDefault() {
128128 private final double smoothing ;
129129 private final Function <Integer , Integer > alphaFunc ;
130130 private final Function <Integer , Integer > betaFunc ;
131- private final Function <Integer , Integer > increaseFunc ;
132- private final Function <Integer , Integer > decreaseFunc ;
131+ private final Function <Double , Double > increaseFunc ;
132+ private final Function <Double , Double > decreaseFunc ;
133133
134134 private VegasLimit (Builder builder ) {
135135 this .estimatedLimit = builder .initialLimit ;
@@ -153,25 +153,25 @@ public synchronized void update(long rtt, int maxInFlight) {
153153 double newLimit ;
154154 final int queueSize = (int ) Math .ceil (estimatedLimit * (1 - (double )rtt_noload / rtt ));
155155 if (didDrop ) {
156- newLimit = decreaseFunc .apply (( int ) estimatedLimit );
156+ newLimit = decreaseFunc .apply (estimatedLimit );
157157 didDrop = false ;
158- } else if (maxInFlight < estimatedLimit ) {
158+ } else if (maxInFlight + queueSize < estimatedLimit ) {
159159 return ;
160160 } else {
161161 int alpha = alphaFunc .apply ((int )estimatedLimit );
162162 int beta = betaFunc .apply ((int )estimatedLimit );
163163
164164 if (queueSize < alpha ) {
165- newLimit = increaseFunc .apply (( int ) estimatedLimit );
165+ newLimit = increaseFunc .apply (estimatedLimit );
166166 } else if (queueSize > beta ) {
167- newLimit = decreaseFunc .apply (( int ) estimatedLimit );
167+ newLimit = decreaseFunc .apply (estimatedLimit );
168168 } else {
169169 return ;
170170 }
171171 }
172172
173173 newLimit = Math .max (1 , Math .min (maxLimit , newLimit ));
174- newLimit = (int ) (( 1 - smoothing ) * estimatedLimit + smoothing * newLimit ) ;
174+ newLimit = (1 - smoothing ) * estimatedLimit + smoothing * newLimit ;
175175 if ((int )newLimit != (int )estimatedLimit && LOG .isDebugEnabled ()) {
176176 LOG .debug ("New limit={} minRtt={} μs winRtt={} μs queueSize={}" ,
177177 estimatedLimit ,
0 commit comments