@@ -328,6 +328,14 @@ public void scale(double scaleFactor)
328
328
*/
329
329
public List <Long > getQuantilesLowerBound (List <Double > quantiles )
330
330
{
331
+ if (alpha > 0.0 ) {
332
+ long nowInSeconds = TimeUnit .NANOSECONDS .toSeconds (ticker .read ());
333
+ if (nowInSeconds - landmarkInSeconds >= RESCALE_THRESHOLD_SECONDS ) {
334
+ rescale (nowInSeconds );
335
+ compress (); // rescale affects weights globally, so force compression
336
+ }
337
+ }
338
+
331
339
checkArgument (Ordering .natural ().isOrdered (quantiles ), "quantiles must be sorted in increasing order" );
332
340
for (double quantile : quantiles ) {
333
341
checkArgument (quantile >= 0 && quantile <= 1 , "quantile must be between [0,1]" );
@@ -379,6 +387,14 @@ public boolean process(int node)
379
387
*/
380
388
public List <Long > getQuantilesUpperBound (List <Double > quantiles )
381
389
{
390
+ if (alpha > 0.0 ) {
391
+ long nowInSeconds = TimeUnit .NANOSECONDS .toSeconds (ticker .read ());
392
+ if (nowInSeconds - landmarkInSeconds >= RESCALE_THRESHOLD_SECONDS ) {
393
+ rescale (nowInSeconds );
394
+ compress (); // rescale affects weights globally, so force compression
395
+ }
396
+ }
397
+
382
398
checkArgument (Ordering .natural ().isOrdered (quantiles ), "quantiles must be sorted in increasing order" );
383
399
for (double quantile : quantiles ) {
384
400
checkArgument (quantile >= 0 && quantile <= 1 , "quantile must be between [0,1]" );
@@ -411,9 +427,12 @@ public boolean process(int node)
411
427
412
428
// we finished the traversal without consuming all quantiles. This means the remaining quantiles
413
429
// correspond to the max known value
414
- while (iterator .hasNext ()) {
415
- builder .add (max );
416
- iterator .next ();
430
+ if (iterator .hasNext ()) {
431
+ long max = getMax ();
432
+ while (iterator .hasNext ()) {
433
+ builder .add (max );
434
+ iterator .next ();
435
+ }
417
436
}
418
437
419
438
return builder .build ();
@@ -624,7 +643,14 @@ void compress()
624
643
{
625
644
double bound = Math .floor (weightedCount / calculateCompressionFactor ());
626
645
646
+ AtomicLong max = new AtomicLong (Integer .MIN_VALUE );
647
+ AtomicLong min = new AtomicLong (Integer .MAX_VALUE );
627
648
postOrderTraversal (root , node -> {
649
+ // If decay is enabled and this node has a non-zero weight, determine if the value is the new max or min
650
+ if (counts [node ] >= ZERO_WEIGHT_THRESHOLD && alpha > 0.0 ) {
651
+ max .accumulateAndGet (upperBound (node ), Math ::max );
652
+ min .accumulateAndGet (lowerBound (node ), Math ::min );
653
+ }
628
654
// if children's weights are 0 remove them and shift the weight to their parent
629
655
int left = lefts [node ];
630
656
int right = rights [node ];
@@ -655,6 +681,15 @@ void compress()
655
681
// root's count may have decayed to ~0
656
682
if (root != -1 && counts [root ] < ZERO_WEIGHT_THRESHOLD ) {
657
683
root = tryRemove (root );
684
+ if (root < 0 ) {
685
+ this .min = Long .MAX_VALUE ;
686
+ this .max = Long .MIN_VALUE ;
687
+ }
688
+ // If decay is being used, the min and max values may have decayed as well
689
+ else if (alpha > 0 ) {
690
+ this .min = min .get ();
691
+ this .max = max .get ();
692
+ }
658
693
}
659
694
}
660
695
0 commit comments