Skip to content

Commit ff07407

Browse files
authored
Merge pull request #498 from apache/weighted_updates_for_kll_items_sketch
Weighted updates for kll items sketch
2 parents b078525 + 44bb7d4 commit ff07407

15 files changed

+278
-79
lines changed

src/main/java/org/apache/datasketches/common/ArrayOfItemsSerDe.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -111,5 +111,5 @@ public int sizeOf(final T[] items) {
111111
* Returns the concrete class of type T
112112
* @return the concrete class of type T
113113
*/
114-
public abstract Class<?> getClassOfT();
114+
public abstract Class<T> getClassOfT();
115115
}

src/main/java/org/apache/datasketches/kll/KllDoublesHelper.java

+8-7
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,13 @@
3131

3232
import org.apache.datasketches.memory.WritableMemory;
3333

34+
//
3435
//
3536
/**
3637
* Static methods to support KllDoublesSketch
3738
* @author Kevin Lang
3839
* @author Alexander Saydakov
3940
*/
40-
//
4141
final class KllDoublesHelper {
4242

4343
/**
@@ -47,8 +47,8 @@ final class KllDoublesHelper {
4747
* @param weight the given weight
4848
* @return the Items Array.
4949
*/
50-
static double[] createItemsArray(final double item, final int weight) {
51-
final int itemsArrLen = Integer.bitCount(weight);
50+
static double[] createItemsArray(final double item, final long weight) {
51+
final int itemsArrLen = Long.bitCount(weight);
5252
final double[] itemsArr = new double[itemsArrLen];
5353
Arrays.fill(itemsArr, item);
5454
return itemsArr;
@@ -332,12 +332,13 @@ static void updateDouble(final KllDoublesSketch dblSk, final double item) {
332332
}
333333

334334
//Called from KllDoublesSketch::update with weight
335-
static void updateDouble(final KllDoublesSketch dblSk, final double item, final int weight) {
335+
static void updateDouble(final KllDoublesSketch dblSk, final double item, final long weight) {
336336
if (weight < dblSk.levelsArr[0]) {
337-
for (int i = 0; i < weight; i++) { updateDouble(dblSk, item); }
337+
for (int i = 0; i < (int)weight; i++) { updateDouble(dblSk, item); }
338338
} else {
339339
dblSk.updateMinMax(item);
340340
final KllHeapDoublesSketch tmpSk = new KllHeapDoublesSketch(dblSk.getK(), DEFAULT_M, item, weight);
341+
341342
dblSk.merge(tmpSk);
342343
}
343344
}
@@ -471,7 +472,7 @@ private static void populateDoubleWorkArrays( //workBuf and workLevels are modif
471472

472473
workLevels[0] = 0;
473474

474-
// Note: the level zero data from "other" was already inserted into "self",
475+
// Note: the level zero data from "other" was already inserted into "self".
475476
// This copies into workbuf.
476477
final int selfPopZero = KllHelper.currentLevelSizeItems(0, myCurNumLevels, myCurLevelsArr);
477478
System.arraycopy(myCurDoubleItemsArr, myCurLevelsArr[0], workBuf, workLevels[0], selfPopZero);
@@ -481,7 +482,7 @@ private static void populateDoubleWorkArrays( //workBuf and workLevels are modif
481482
final int selfPop = KllHelper.currentLevelSizeItems(lvl, myCurNumLevels, myCurLevelsArr);
482483
final int otherPop = KllHelper.currentLevelSizeItems(lvl, otherNumLevels, otherLevelsArr);
483484
workLevels[lvl + 1] = workLevels[lvl] + selfPop + otherPop;
484-
485+
assert selfPop >= 0 && otherPop >= 0;
485486
if (selfPop == 0 && otherPop == 0) { continue; }
486487
else if (selfPop > 0 && otherPop == 0) {
487488
System.arraycopy(myCurDoubleItemsArr, myCurLevelsArr[lvl], workBuf, workLevels[lvl], selfPop);

src/main/java/org/apache/datasketches/kll/KllDoublesSketch.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -330,11 +330,11 @@ public void update(final double item) {
330330
* @param item the item to be repeated. NaNs are ignored.
331331
* @param weight the number of times the update of item is to be repeated. It must be &ge; one.
332332
*/
333-
public void update(final double item, final int weight) {
333+
public void update(final double item, final long weight) {
334334
if (Double.isNaN(item)) { return; } //ignore
335335
if (readOnly) { throw new SketchesArgumentException(TGT_IS_READ_ONLY_MSG); }
336-
if (weight < 1) { throw new SketchesArgumentException("Weight is less than one."); }
337-
if (weight == 1) { KllDoublesHelper.updateDouble(this, item); }
336+
if (weight < 1L) { throw new SketchesArgumentException("Weight is less than one."); }
337+
if (weight == 1L) { KllDoublesHelper.updateDouble(this, item); }
338338
else { KllDoublesHelper.updateDouble(this, item, weight); }
339339
kllDoublesSV = null;
340340
}

src/main/java/org/apache/datasketches/kll/KllFloatsHelper.java

+10-9
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,13 @@
3131

3232
import org.apache.datasketches.memory.WritableMemory;
3333

34+
//
3435
//
3536
/**
3637
* Static methods to support KllFloatsSketch
3738
* @author Kevin Lang
3839
* @author Alexander Saydakov
3940
*/
40-
//
4141
final class KllFloatsHelper {
4242

4343
/**
@@ -47,8 +47,8 @@ final class KllFloatsHelper {
4747
* @param weight the given weight
4848
* @return the Items Array.
4949
*/
50-
static float[] createItemsArray(final float item, final int weight) {
51-
final int itemsArrLen = Integer.bitCount(weight);
50+
static float[] createItemsArray(final float item, final long weight) {
51+
final int itemsArrLen = Long.bitCount(weight);
5252
final float[] itemsArr = new float[itemsArrLen];
5353
Arrays.fill(itemsArr, item);
5454
return itemsArr;
@@ -332,12 +332,13 @@ static void updateFloat(final KllFloatsSketch fltSk, final float item) {
332332
}
333333

334334
//Called from KllFloatsSketch::update with weight
335-
static void updateFloat(final KllFloatsSketch fltSk, final float item, final int weight) {
335+
static void updateFloat(final KllFloatsSketch fltSk, final float item, final long weight) {
336336
if (weight < fltSk.levelsArr[0]) {
337-
for (int i = 0; i < weight; i++) { updateFloat(fltSk, item); }
337+
for (int i = 0; i < (int)weight; i++) { updateFloat(fltSk, item); }
338338
} else {
339339
fltSk.updateMinMax(item);
340340
final KllHeapFloatsSketch tmpSk = new KllHeapFloatsSketch(fltSk.getK(), DEFAULT_M, item, weight);
341+
341342
fltSk.merge(tmpSk);
342343
}
343344
}
@@ -471,7 +472,7 @@ private static void populateFloatWorkArrays(
471472

472473
worklevels[0] = 0;
473474

474-
// Note: the level zero data from "other" was already inserted into "self"
475+
// Note: the level zero data from "other" was already inserted into "self".
475476
// This copies into workbuf.
476477
final int selfPopZero = KllHelper.currentLevelSizeItems(0, myCurNumLevels, myCurLevelsArr);
477478
System.arraycopy( myCurFloatItemsArr, myCurLevelsArr[0], workbuf, worklevels[0], selfPopZero);
@@ -481,14 +482,14 @@ private static void populateFloatWorkArrays(
481482
final int selfPop = KllHelper.currentLevelSizeItems(lvl, myCurNumLevels, myCurLevelsArr);
482483
final int otherPop = KllHelper.currentLevelSizeItems(lvl, otherNumLevels, otherLevelsArr);
483484
worklevels[lvl + 1] = worklevels[lvl] + selfPop + otherPop;
484-
485+
assert selfPop >= 0 && otherPop >= 0;
485486
if (selfPop == 0 && otherPop == 0) { continue; }
486487
if (selfPop > 0 && otherPop == 0) {
487488
System.arraycopy(myCurFloatItemsArr, myCurLevelsArr[lvl], workbuf, worklevels[lvl], selfPop);
488-
}
489+
}
489490
else if (selfPop == 0 && otherPop > 0) {
490491
System.arraycopy(otherFloatItemsArr, otherLevelsArr[lvl], workbuf, worklevels[lvl], otherPop);
491-
}
492+
}
492493
else if (selfPop > 0 && otherPop > 0) {
493494
mergeSortedFloatArrays(
494495
myCurFloatItemsArr, myCurLevelsArr[lvl], selfPop,

src/main/java/org/apache/datasketches/kll/KllFloatsSketch.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -330,11 +330,11 @@ public void update(final float item) {
330330
* @param item the item to be repeated. NaNs are ignored.
331331
* @param weight the number of times the update of item is to be repeated. It must be &ge; one.
332332
*/
333-
public void update(final float item, final int weight) {
333+
public void update(final float item, final long weight) {
334334
if (Float.isNaN(item)) { return; } //ignore
335335
if (readOnly) { throw new SketchesArgumentException(TGT_IS_READ_ONLY_MSG); }
336-
if (weight < 1) { throw new SketchesArgumentException("Weight is less than one."); }
337-
if (weight == 1) { KllFloatsHelper.updateFloat(this, item); }
336+
if (weight < 1L) { throw new SketchesArgumentException("Weight is less than one."); }
337+
if (weight == 1L) { KllFloatsHelper.updateFloat(this, item); }
338338
else { KllFloatsHelper.updateFloat(this, item, weight); }
339339
kllFloatsSV = null;
340340
}

src/main/java/org/apache/datasketches/kll/KllHeapDoublesSketch.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,6 @@ final class KllHeapDoublesSketch extends KllDoublesSketch {
5959
*
6060
* @param k parameter that controls size of the sketch and accuracy of estimates.
6161
* <em>k</em> can be between <em>m</em> and 65535, inclusive.
62-
* The default <em>k</em> = 200 results in a normalized rank error of about 1.65%.
63-
* Larger <em>k</em> will have smaller error but the sketch will be larger (and slower).
6462
* @param m parameter controls the minimum level width in items. It can be 2, 4, 6 or 8.
6563
* The DEFAULT_M, which is 8 is recommended. Other sizes of <em>m</em> should be considered
6664
* experimental as they have not been as well characterized.
@@ -84,7 +82,7 @@ final class KllHeapDoublesSketch extends KllDoublesSketch {
8482
/**
8583
* Used for creating a temporary sketch for use with weighted updates.
8684
*/
87-
KllHeapDoublesSketch(final int k, final int m, final double item, final int weight) {
85+
KllHeapDoublesSketch(final int k, final int m, final double item, final long weight) {
8886
super(UPDATABLE);
8987
KllHelper.checkM(m);
9088
KllHelper.checkK(k, m);

src/main/java/org/apache/datasketches/kll/KllHeapFloatsSketch.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,6 @@ final class KllHeapFloatsSketch extends KllFloatsSketch {
5959
*
6060
* @param k parameter that controls size of the sketch and accuracy of estimates.
6161
* <em>k</em> can be between <em>m</em> and 65535, inclusive.
62-
* The default <em>k</em> = 200 results in a normalized rank error of about 1.65%.
63-
* Larger <em>k</em> will have smaller error but the sketch will be larger (and slower).
6462
* @param m parameter controls the minimum level width in items. It can be 2, 4, 6 or 8.
6563
* The DEFAULT_M, which is 8 is recommended. Other sizes of <em>m</em> should be considered
6664
* experimental as they have not been as well characterized.
@@ -84,7 +82,7 @@ final class KllHeapFloatsSketch extends KllFloatsSketch {
8482
/**
8583
* Used for creating a temporary sketch for use with weighted updates.
8684
*/
87-
KllHeapFloatsSketch(final int k, final int m, final float item, final int weight) {
85+
KllHeapFloatsSketch(final int k, final int m, final float item, final long weight) {
8886
super(UPDATABLE);
8987
KllHelper.checkM(m);
9088
KllHelper.checkK(k, m);

src/main/java/org/apache/datasketches/kll/KllHeapItemsSketch.java

+39-11
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static org.apache.datasketches.kll.KllSketch.SketchStructure.COMPACT_EMPTY;
2626
import static org.apache.datasketches.kll.KllSketch.SketchStructure.COMPACT_FULL;
2727
import static org.apache.datasketches.kll.KllSketch.SketchStructure.COMPACT_SINGLE;
28+
import static org.apache.datasketches.kll.KllSketch.SketchStructure.UPDATABLE;
2829

2930
import java.lang.reflect.Array;
3031
import java.util.Comparator;
@@ -34,6 +35,14 @@
3435
import org.apache.datasketches.memory.Memory;
3536
import org.apache.datasketches.memory.WritableMemory;
3637

38+
/**
39+
* This class implements an on-heap doubles KllSketch.
40+
*
41+
* <p>Please refer to the documentation in the package-info:<br>
42+
* {@link org.apache.datasketches.kll}</p>
43+
*
44+
* @author Lee Rhodes, Kevin Lang
45+
*/
3746
@SuppressWarnings("unchecked")
3847
final class KllHeapItemsSketch<T> extends KllItemsSketch<T> {
3948
private final int k; // configured size of K.
@@ -46,14 +55,17 @@ final class KllHeapItemsSketch<T> extends KllItemsSketch<T> {
4655
private Object[] itemsArr;
4756

4857
/**
49-
* Constructs a new empty instance of this sketch on the Java heap.
58+
* New instance heap constructor.
59+
* @param k parameter that controls size of the sketch and accuracy of estimates.
60+
* @param m parameter controls the minimum level width in items. It can be 2, 4, 6 or 8.
61+
* The DEFAULT_M, which is 8 is recommended. Other sizes of <em>m</em> should be considered
62+
* experimental as they have not been as well characterized.
63+
* @param comparator user specified comparator of type T.
64+
* @param serDe serialization / deserialization class
5065
*/
51-
KllHeapItemsSketch(
52-
final int k,
53-
final int m,
54-
final Comparator<? super T> comparator,
66+
KllHeapItemsSketch(final int k, final int m, final Comparator<? super T> comparator,
5567
final ArrayOfItemsSerDe<T> serDe) {
56-
super(SketchStructure.UPDATABLE, comparator, serDe);
68+
super(UPDATABLE, comparator, serDe);
5769
KllHelper.checkM(m);
5870
KllHelper.checkK(k, m);
5971
this.levelsArr = new int[] {k, k};
@@ -69,11 +81,27 @@ final class KllHeapItemsSketch<T> extends KllItemsSketch<T> {
6981
}
7082

7183
/**
72-
* The Heapify constructor, which constructs an image of this sketch from
73-
* a Memory (or WritableMemory) object that was created by this sketch
74-
* and has a type T consistent with the given comparator and serDe.
75-
* Once the data from the given Memory has been transferred into this heap sketch,
76-
* the reference to the Memory object is no longer retained.
84+
* Used for creating a temporary sketch for use with weighted updates.
85+
*/
86+
KllHeapItemsSketch(final int k, final int m, final T item, final long weight, final Comparator<? super T> comparator,
87+
final ArrayOfItemsSerDe<T> serDe) {
88+
super(UPDATABLE, comparator, serDe);
89+
KllHelper.checkM(m);
90+
KllHelper.checkK(k, m);
91+
this.levelsArr = KllHelper.createLevelsArray(weight);
92+
this.readOnly = false;
93+
this.k = k;
94+
this.m = m;
95+
this.n = weight;
96+
this.minK = k;
97+
this.isLevelZeroSorted = false;
98+
this.minItem = item;
99+
this.maxItem = item;
100+
this.itemsArr = KllItemsHelper.createItemsArray(serDe.getClassOfT(), item, weight);
101+
}
102+
103+
/**
104+
* The Heapify constructor
77105
* @param srcMem the Source Memory image that contains data.
78106
* @param comparator the comparator for this sketch and given Memory.
79107
* @param serDe the serializer / deserializer for this sketch and the given Memory.

src/main/java/org/apache/datasketches/kll/KllHelper.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,11 @@ public static long convertToCumulative(final long[] array) {
157157
* @param weight the given weight
158158
* @return the Levels Array
159159
*/
160-
static int[] createLevelsArray(final int weight) {
161-
final int numLevels = 32 - Integer.numberOfLeadingZeros(weight);
160+
static int[] createLevelsArray(final long weight) {
161+
final int numLevels = 64 - Long.numberOfLeadingZeros(weight);
162+
if (numLevels > 61) {
163+
throw new SketchesArgumentException("The requested weight must not exceed 2^61");
164+
}
162165
final int[] levelsArr = new int[numLevels + 1]; //always one more than numLevels
163166
int itemsArrIndex = 0;
164167
levelsArr[0] = itemsArrIndex;

0 commit comments

Comments
 (0)