Skip to content

Commit fdf68b8

Browse files
authored
Merge pull request #185 from DataSketches/kll-floats-sketch
KllFloatsSketch
2 parents d809fff + fa30b83 commit fdf68b8

14 files changed

Lines changed: 1742 additions & 106 deletions

src/main/java/com/yahoo/sketches/ByteArrayUtil.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,19 @@
22

33
public final class ByteArrayUtil {
44

5+
/**
6+
* Put a given short value into a given byte array at a given offset.
7+
* Assumes little-endian byte order.
8+
* @param bytes destination byte array
9+
* @param offset destination offset
10+
* @param value source value
11+
*/
12+
public static void putShort(final byte[] bytes, final int offset, final short value) {
13+
for (int i = 0; i < Short.BYTES; i++) {
14+
bytes[offset + i] = (byte) ((value >>> (8 * i)) & 0xff);
15+
}
16+
}
17+
518
/**
619
* Put a given integer value into a given byte array at a given offset.
720
* Assumes little-endian byte order.

src/main/java/com/yahoo/sketches/Family.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,12 @@ public enum Family {
111111
/**
112112
* The VarOpt family of sketches. (Not part of TSF.)
113113
*/
114-
VAROPT_UNION(14, "VAROPT_UNION", 1, 4);
114+
VAROPT_UNION(14, "VAROPT_UNION", 1, 4),
115115

116+
/**
117+
* KLL quanliles sketch
118+
*/
119+
KLL(15, "KLL", 1, 2);
116120

117121
private static final Map<Integer, Family> lookupID = new HashMap<Integer, Family>();
118122
private static final Map<String, Family> lookupFamName = new HashMap<String, Family>();
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package com.yahoo.sketches;
2+
3+
/**
4+
* Common static methods for quantiles sketches
5+
*/
6+
public class QuantilesHelper {
7+
8+
/**
9+
* Convert the weights into totals of the weights preceding each item
10+
* @param array of weights
11+
* @return total weight
12+
*/
13+
public static long convertToPrecedingCummulative(final long[] array) {
14+
long subtotal = 0;
15+
for (int i = 0; i < array.length; i++) {
16+
final long newSubtotal = subtotal + array[i];
17+
array[i] = subtotal;
18+
subtotal = newSubtotal;
19+
}
20+
return subtotal;
21+
}
22+
23+
/**
24+
* Returns the zero-based index (position) of a value in the hypothetical sorted stream of
25+
* values of size n.
26+
* @param phi the fractional position where: 0 &le; &#966; &le; 1.0.
27+
* @param n the size of the stream
28+
* @return the index, a value between 0 and n-1.
29+
*/
30+
public static long posOfPhi(final double phi, final long n) {
31+
final long pos = (long) Math.floor(phi * n);
32+
return (pos == n) ? n - 1 : pos;
33+
}
34+
35+
/**
36+
* This is written in terms of a plain array to facilitate testing.
37+
* @param arr the chunk containing the position
38+
* @param pos the position
39+
* @return the index of the chunk containing the position
40+
*/
41+
public static int chunkContainingPos(final long[] arr, final long pos) {
42+
final int nominalLength = arr.length - 1; /* remember, arr contains an "extra" position */
43+
assert nominalLength > 0;
44+
final long n = arr[nominalLength];
45+
assert 0 <= pos;
46+
assert pos < n;
47+
final int l = 0;
48+
final int r = nominalLength;
49+
// the following three asserts should probably be retained since they ensure
50+
// that the necessary invariants hold at the beginning of the search
51+
assert l < r;
52+
assert arr[l] <= pos;
53+
assert pos < arr[r];
54+
return searchForChunkContainingPos(arr, pos, l, r);
55+
}
56+
57+
// Let m_i denote the minimum position of the length=n "full" sorted sequence
58+
// that is represented in slot i of the length = n "chunked" sorted sequence.
59+
//
60+
// Note that m_i is the same thing as auxCumWtsArr_[i]
61+
//
62+
// Then the answer to a positional query 0 <= q < n is l, where 0 <= l < len,
63+
// A) m_l <= q
64+
// B) q < m_r
65+
// C) l+1 = r
66+
//
67+
// A) and B) provide the invariants for our binary search.
68+
// Observe that they are satisfied by the initial conditions: l = 0 and r = len.
69+
private static int searchForChunkContainingPos(final long[] arr, final long pos, final int l, final int r) {
70+
// the following three asserts can probably go away eventually, since it is fairly clear
71+
// that if these invariants hold at the beginning of the search, they will be maintained
72+
assert l < r;
73+
assert arr[l] <= pos;
74+
assert pos < arr[r];
75+
if (l + 1 == r) {
76+
return l;
77+
}
78+
final int m = l + (r - l) / 2;
79+
if (arr[m] <= pos) {
80+
return (searchForChunkContainingPos(arr, pos, m, r));
81+
}
82+
return (searchForChunkContainingPos(arr, pos, l, m));
83+
}
84+
85+
}
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Copyright 2018, Yahoo! Inc. Licensed under the terms of the
3+
* Apache License 2.0. See LICENSE file at the project root for terms.
4+
*/
5+
6+
package com.yahoo.sketches.kll;
7+
8+
import java.util.Arrays;
9+
10+
import com.yahoo.sketches.QuantilesHelper;
11+
12+
/**
13+
* Data structure for answering quantile queries based on the samples from KllSketch
14+
* @author Kevin Lang
15+
* @author Alexander Saydakov
16+
*/
17+
final class KllFloatsQuantileCalculator {
18+
19+
private long n_;
20+
private float[] items_;
21+
private long[] weights_;
22+
private int[] levels_;
23+
private int numLevels_;
24+
25+
// assumes that all levels are sorted including level 0
26+
KllFloatsQuantileCalculator(final float[] items, final int[] levels, final int numLevels,
27+
final long n) {
28+
n_ = n;
29+
final int numItems = levels[numLevels] - levels[0];
30+
items_ = new float[numItems];
31+
weights_ = new long[numItems + 1]; // one more is intentional
32+
levels_ = new int[numLevels + 1];
33+
populateFromSketch(items, levels, numLevels, numItems);
34+
blockyTandemMergeSort(items_, weights_, levels_, numLevels_);
35+
QuantilesHelper.convertToPrecedingCummulative(weights_);
36+
}
37+
38+
float getQuantile(final double phi) {
39+
final long pos = QuantilesHelper.posOfPhi(phi, n_);
40+
return approximatelyAnswerPositonalQuery(pos);
41+
}
42+
43+
private float approximatelyAnswerPositonalQuery(final long pos) {
44+
assert pos >= 0;
45+
assert pos < n_;
46+
final int index = QuantilesHelper.chunkContainingPos(weights_, pos);
47+
return items_[index];
48+
}
49+
50+
private void populateFromSketch(final float[] srcItems, final int[] srcLevels,
51+
final int numLevels, final int numItems) {
52+
final int offset = srcLevels[0];
53+
System.arraycopy(srcItems, offset, items_, 0, numItems);
54+
int srcLevel = 0;
55+
int dstLevel = 0;
56+
int weight = 1;
57+
while (srcLevel < numLevels) {
58+
final int fromIndex = srcLevels[srcLevel] - offset;
59+
final int toIndex = srcLevels[srcLevel + 1] - offset; // exclusive
60+
if (fromIndex < toIndex) { // skip empty levels
61+
Arrays.fill(weights_, fromIndex, toIndex, weight);
62+
levels_[dstLevel] = fromIndex;
63+
levels_[dstLevel + 1] = toIndex;
64+
dstLevel++;
65+
}
66+
srcLevel++;
67+
weight *= 2;
68+
}
69+
weights_[numItems] = 0;
70+
numLevels_ = dstLevel;
71+
}
72+
73+
private static void blockyTandemMergeSort(final float[] items, final long[] weights,
74+
final int[] levels, final int numLevels) {
75+
if (numLevels == 1) { return; }
76+
77+
// duplicate the input in preparation for the "ping-pong" copy reduction strategy.
78+
final float[] itemsTmp = Arrays.copyOf(items, items.length);
79+
final long[] weightsTmp = Arrays.copyOf(weights, items.length); // don't need the extra one here
80+
81+
blockyTandemMergeSortRecursion(itemsTmp, weightsTmp, items, weights, levels, 0, numLevels);
82+
}
83+
84+
private static void blockyTandemMergeSortRecursion(final float[] itemsSrc, final long[] weightsSrc,
85+
final float[] itemsDst, final long[] weightsDst, final int[] levels, final int startingLevel,
86+
final int numLevels) {
87+
if (numLevels == 1) { return; }
88+
final int numLevels1 = numLevels / 2;
89+
final int numLevels2 = numLevels - numLevels1;
90+
assert numLevels1 >= 1;
91+
assert numLevels2 >= numLevels1;
92+
final int startingLevel1 = startingLevel;
93+
final int startingLevel2 = startingLevel + numLevels1;
94+
// swap roles of src and dst
95+
blockyTandemMergeSortRecursion(itemsDst, weightsDst, itemsSrc, weightsSrc, levels,
96+
startingLevel1, numLevels1);
97+
blockyTandemMergeSortRecursion(itemsDst, weightsDst, itemsSrc, weightsSrc, levels,
98+
startingLevel2, numLevels2);
99+
tandemMerge(itemsSrc, weightsSrc, itemsDst, weightsDst, levels, startingLevel1, numLevels1,
100+
startingLevel2, numLevels2);
101+
}
102+
103+
private static void tandemMerge(final float[] itemsSrc, final long[] weightsSrc,
104+
final float[] itemsDst, final long[] weightsDst,
105+
final int[] levelStarts, final int startingLevel1, final int numLevels1,
106+
final int startingLevel2, final int numLevels2) {
107+
final int fromIndex1 = levelStarts[startingLevel1];
108+
final int toIndex1 = levelStarts[startingLevel1 + numLevels1]; // exclusive
109+
final int fromIndex2 = levelStarts[startingLevel2];
110+
final int toIndex2 = levelStarts[startingLevel2 + numLevels2]; // exclusive
111+
int iSrc1 = fromIndex1;
112+
int iSrc2 = fromIndex2;
113+
int iDst = fromIndex1;
114+
115+
while ((iSrc1 < toIndex1) && (iSrc2 < toIndex2)) {
116+
if (itemsSrc[iSrc1] < itemsSrc[iSrc2]) {
117+
itemsDst[iDst] = itemsSrc[iSrc1];
118+
weightsDst[iDst] = weightsSrc[iSrc1];
119+
iSrc1++;
120+
} else {
121+
itemsDst[iDst] = itemsSrc[iSrc2];
122+
weightsDst[iDst] = weightsSrc[iSrc2];
123+
iSrc2++;
124+
}
125+
iDst++;
126+
}
127+
if (iSrc1 < toIndex1) {
128+
System.arraycopy(itemsSrc, iSrc1, itemsDst, iDst, toIndex1 - iSrc1);
129+
System.arraycopy(weightsSrc, iSrc1, weightsDst, iDst, toIndex1 - iSrc1);
130+
} else if (iSrc2 < toIndex2) {
131+
System.arraycopy(itemsSrc, iSrc2, itemsDst, iDst, toIndex2 - iSrc2);
132+
System.arraycopy(weightsSrc, iSrc2, weightsDst, iDst, toIndex2 - iSrc2);
133+
}
134+
}
135+
136+
}

0 commit comments

Comments
 (0)