55
66package com .yahoo .sketches .theta ;
77
8- import static com .yahoo .sketches .theta .UpdateReturnState .InsertedCountIncremented ;
8+ import static com .yahoo .sketches .theta .UpdateReturnState .ConcurrentBufferInserted ;
9+ import static com .yahoo .sketches .theta .UpdateReturnState .ConcurrentPropagated ;
910import static com .yahoo .sketches .theta .UpdateReturnState .RejectedOverTheta ;
1011
1112import java .util .concurrent .atomic .AtomicBoolean ;
2728 */
2829final class ConcurrentHeapThetaBuffer extends HeapQuickSelectSketch {
2930
30- private static int computeLogBufferSize (final int lgNomLongs , final long exactSize ,
31- final int maxNumLocalBuffers ) {
32- return Math .min (lgNomLongs , (int )Math .log (Math .sqrt (exactSize ) / (2 * maxNumLocalBuffers )));
33- }
34-
3531 // Shared sketch consisting of the global sample set and theta value.
3632 private final ConcurrentSharedThetaSketch shared ;
3733
@@ -60,6 +56,11 @@ private static int computeLogBufferSize(final int lgNomLongs, final long exactSi
6056 localPropagationInProgress = new AtomicBoolean (false );
6157 }
6258
59+ private static int computeLogBufferSize (final int lgNomLongs , final long exactSize ,
60+ final int maxNumLocalBuffers ) {
61+ return Math .min (lgNomLongs , (int )Math .log (Math .sqrt (exactSize ) / (2 * maxNumLocalBuffers )));
62+ }
63+
6364 //Sketch overrides
6465
6566 @ Override
@@ -82,6 +83,11 @@ public double getUpperBound(final int numStdDev) {
8283 return shared .getUpperBound (numStdDev );
8384 }
8485
86+ @ Override
87+ public boolean hasMemory () {
88+ return shared .hasMemory ();
89+ }
90+
8591 @ Override
8692 public boolean isDirect () {
8793 return shared .isDirect ();
@@ -129,18 +135,21 @@ UpdateReturnState hashUpdate(final long hash) {
129135 }
130136 HashOperations .checkHashCorruption (hash );
131137 if ((getHashTableThreshold () == 0 ) || isExactMode ) {
132- final long thetaLong = getThetaLong ();
133138 //The over-theta and zero test
134- if (HashOperations .continueCondition (thetaLong , hash )) {
139+ if (HashOperations .continueCondition (getThetaLong () , hash )) {
135140 return RejectedOverTheta ; //signal that hash was rejected due to theta or zero.
136141 }
137142 if (propagateToSharedSketch (hash )) {
138- return InsertedCountIncremented ; //not totally correct
143+ return ConcurrentPropagated ;
139144 }
140145 }
141146 final UpdateReturnState state = super .hashUpdate (hash );
142147 if (isOutOfSpace (getRetainedEntries () + 1 )) {
143148 propagateToSharedSketch ();
149+ return ConcurrentPropagated ;
150+ }
151+ if (state == UpdateReturnState .InsertedCountIncremented ) {
152+ return ConcurrentBufferInserted ;
144153 }
145154 return state ;
146155 }
0 commit comments