|
32 | 32 | import java.util.concurrent.TimeUnit;
|
33 | 33 | import java.util.concurrent.atomic.AtomicInteger;
|
34 | 34 | import java.util.concurrent.atomic.AtomicLong;
|
| 35 | +import java.util.function.LongBinaryOperator; |
35 | 36 |
|
36 | 37 | import static com.facebook.airlift.stats.QuantileDigest.MiddleFunction.DEFAULT;
|
37 | 38 | import static com.google.common.base.Preconditions.checkArgument;
|
@@ -328,6 +329,14 @@ public void scale(double scaleFactor)
|
328 | 329 | */
|
329 | 330 | public List<Long> getQuantilesLowerBound(List<Double> quantiles)
|
330 | 331 | {
|
| 332 | + if (alpha > 0.0) { |
| 333 | + long nowInSeconds = TimeUnit.NANOSECONDS.toSeconds(ticker.read()); |
| 334 | + if (nowInSeconds - landmarkInSeconds >= RESCALE_THRESHOLD_SECONDS) { |
| 335 | + rescale(nowInSeconds); |
| 336 | + compress(); // rescale affects weights globally, so force compression |
| 337 | + } |
| 338 | + } |
| 339 | + |
331 | 340 | checkArgument(Ordering.natural().isOrdered(quantiles), "quantiles must be sorted in increasing order");
|
332 | 341 | for (double quantile : quantiles) {
|
333 | 342 | checkArgument(quantile >= 0 && quantile <= 1, "quantile must be between [0,1]");
|
@@ -379,6 +388,14 @@ public boolean process(int node)
|
379 | 388 | */
|
380 | 389 | public List<Long> getQuantilesUpperBound(List<Double> quantiles)
|
381 | 390 | {
|
| 391 | + if (alpha > 0.0) { |
| 392 | + long nowInSeconds = TimeUnit.NANOSECONDS.toSeconds(ticker.read()); |
| 393 | + if (nowInSeconds - landmarkInSeconds >= RESCALE_THRESHOLD_SECONDS) { |
| 394 | + rescale(nowInSeconds); |
| 395 | + compress(); // rescale affects weights globally, so force compression |
| 396 | + } |
| 397 | + } |
| 398 | + |
382 | 399 | checkArgument(Ordering.natural().isOrdered(quantiles), "quantiles must be sorted in increasing order");
|
383 | 400 | for (double quantile : quantiles) {
|
384 | 401 | checkArgument(quantile >= 0 && quantile <= 1, "quantile must be between [0,1]");
|
@@ -411,9 +428,12 @@ public boolean process(int node)
|
411 | 428 |
|
412 | 429 | // we finished the traversal without consuming all quantiles. This means the remaining quantiles
|
413 | 430 | // correspond to the max known value
|
414 |
| - while (iterator.hasNext()) { |
415 |
| - builder.add(max); |
416 |
| - iterator.next(); |
| 431 | + if (iterator.hasNext()) { |
| 432 | + long max = getMax(); |
| 433 | + while (iterator.hasNext()) { |
| 434 | + builder.add(max); |
| 435 | + iterator.next(); |
| 436 | + } |
417 | 437 | }
|
418 | 438 |
|
419 | 439 | return builder.build();
|
@@ -624,7 +644,13 @@ void compress()
|
624 | 644 | {
|
625 | 645 | double bound = Math.floor(weightedCount / calculateCompressionFactor());
|
626 | 646 |
|
| 647 | + AtomicLong max = new AtomicLong(Integer.MIN_VALUE); |
| 648 | + AtomicLong min = new AtomicLong(Integer.MAX_VALUE); |
627 | 649 | postOrderTraversal(root, node -> {
|
| 650 | + if (counts[node] >= ZERO_WEIGHT_THRESHOLD) { |
| 651 | + max.accumulateAndGet(upperBound(node), Math::max); |
| 652 | + min.accumulateAndGet(lowerBound(node), Math::min); |
| 653 | + } |
628 | 654 | // if children's weights are 0 remove them and shift the weight to their parent
|
629 | 655 | int left = lefts[node];
|
630 | 656 | int right = rights[node];
|
@@ -655,6 +681,15 @@ void compress()
|
655 | 681 | // root's count may have decayed to ~0
|
656 | 682 | if (root != -1 && counts[root] < ZERO_WEIGHT_THRESHOLD) {
|
657 | 683 | root = tryRemove(root);
|
| 684 | + if (root < 0) { |
| 685 | + this.min = Long.MAX_VALUE; |
| 686 | + this.max = Long.MIN_VALUE; |
| 687 | + } |
| 688 | + // If decay is being used, the min and max values may have decayed as well |
| 689 | + else if (alpha > 0) { |
| 690 | + this.min = min.get(); |
| 691 | + this.max = max.get(); |
| 692 | + } |
658 | 693 | }
|
659 | 694 | }
|
660 | 695 |
|
|
0 commit comments