Skip to content

Commit 6792a8b

Browse files
author
Lee Rhodes
committed
Changes for union-based merging in QuantilesSketches instead of internal
merging.
1 parent aa1557b commit 6792a8b

12 files changed

Lines changed: 1215 additions & 758 deletions

File tree

src/main/java/com/yahoo/sketches/quantiles/HeapQuantilesSketch.java

Lines changed: 64 additions & 483 deletions
Large diffs are not rendered by default.
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
/*
2+
* Copyright 2015, Yahoo! Inc.
3+
* Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
4+
*/
5+
package com.yahoo.sketches.quantiles;
6+
7+
import com.yahoo.sketches.Family;
8+
import com.yahoo.sketches.memory.Memory;
9+
import com.yahoo.sketches.memory.MemoryRequest;
10+
11+
/**
12+
* Union operation for on-heap.
13+
*
14+
* @author Lee Rhodes
15+
*/
16+
@SuppressWarnings("unused")
17+
class HeapUnion extends Union {
18+
private HeapQuantilesSketch gadget_ = null;
19+
20+
HeapUnion() {} //creates a virgin Union object
21+
22+
HeapUnion(QuantilesSketch sketch) {
23+
gadget_ = (HeapQuantilesSketch) sketch;
24+
}
25+
26+
/**
27+
* Heapify the given srcMem into a HeapUnion object.
28+
* @param srcMem the given srcMem.
29+
* A reference to srcMem will not be maintained internally.
30+
*/
31+
HeapUnion(Memory srcMem) {
32+
gadget_ = HeapQuantilesSketch.getInstance(srcMem);
33+
}
34+
35+
@Override
36+
public void update(QuantilesSketch sketchIn) {
37+
gadget_ = unionUpdateLogic(gadget_, (HeapQuantilesSketch)sketchIn);
38+
}
39+
40+
@Override
41+
public void update(Memory srcMem) {
42+
HeapQuantilesSketch that = HeapQuantilesSketch.getInstance(srcMem);
43+
gadget_ = unionUpdateLogic(gadget_, that);
44+
}
45+
46+
@Override
47+
public void update(double dataItem) {
48+
gadget_.update(dataItem);
49+
}
50+
51+
@Override
52+
public QuantilesSketch getResult() {
53+
if (gadget_ == null) return null;
54+
return HeapQuantilesSketch.copy(gadget_);
55+
}
56+
57+
@Override
58+
public void reset() {
59+
gadget_ = null;
60+
}
61+
62+
@Override
63+
public String toString() {
64+
return toString(true, false);
65+
}
66+
67+
@Override
68+
public String toString(boolean sketchSummary, boolean dataDetail) {
69+
return gadget_.toString(sketchSummary, dataDetail);
70+
}
71+
72+
73+
//@formatter:off
74+
@SuppressWarnings("null")
75+
static HeapQuantilesSketch unionUpdateLogic(HeapQuantilesSketch qs1, HeapQuantilesSketch qs2) {
76+
int sw1 = ((qs1 == null)? 0 : qs1.isEmpty()? 4: 8);
77+
sw1 |= ((qs2 == null)? 0 : qs2.isEmpty()? 1: 2);
78+
int outCase = 0; //0=null, 1=NOOP, 2=copy, 3=merge
79+
switch (sw1) {
80+
case 0: outCase = 0; break; //null qs1 = null, qs2 = null
81+
case 1: outCase = 2; break; //copy qs1 = null, qs2 = empty
82+
case 2: outCase = 2; break; //copy qs1 = null, qs2 = valid
83+
case 4: outCase = 1; break; //noop qs1 = empty, qs2 = null
84+
case 5: outCase = 1; break; //noop qs1 = empty, qs2 = empty
85+
case 6: outCase = 3; break; //merge qs1 = empty, qs2 = valid
86+
case 8: outCase = 1; break; //noop qs1 = valid, qs2 = null
87+
case 9: outCase = 1; break; //noop qs1 = valid, qs2 = empty
88+
case 10: outCase = 3; break; //merge qs1 = valid, qs2 = valid
89+
}
90+
switch (outCase) {
91+
case 0: return null;
92+
case 1: return qs1;
93+
case 2: {
94+
return HeapQuantilesSketch.copy(qs2);
95+
}
96+
}
97+
//must merge
98+
if (qs1.getK() <= qs2.getK()) {
99+
HeapUnion.mergeInto(qs2, qs1);
100+
return qs1;
101+
}
102+
103+
//myK > thatK
104+
HeapQuantilesSketch copyQS2 = HeapQuantilesSketch.copy(qs2);
105+
HeapUnion.mergeInto(qs1, copyQS2);
106+
return copyQS2;
107+
}
108+
//@formatter:on
109+
110+
/**
111+
* Merges the source sketch into the target sketch that can have a smaller value of K.
112+
* However, it is required that the ratio of the two K values be a power of 2.
113+
* I.e., source.getK() = target.getK() * 2^(nonnegative integer).
114+
* The source is not modified.
115+
*
116+
* <p>Note: It is easy to prove that the following simplified code which launches multiple waves of
117+
* carry propagation does exactly the same amount of merging work (including the work of
118+
* allocating fresh buffers) as the more complicated and seemingly more efficient approach that
119+
* tracks a single carry propagation wave through both sketches.
120+
*
121+
* <p> This simplified code probably does do slightly more "outer loop" work, but I am pretty
122+
* sure that even that is within a constant factor of the more complicated code, plus the
123+
* total amount of "outer loop" work is at least a factor of K smaller than the total amount of
124+
* merging work, which is identical in the two approaches.
125+
*
126+
* <p>Note: a two-way merge that doesn't modify either of its two inputs could be implemented
127+
* by making a deep copy of the larger sketch and then merging the smaller one into it.
128+
* However, it was decided not to do this.
129+
*
130+
* @param source The source sketch
131+
* @param target The target sketch
132+
*/
133+
134+
static void mergeInto(QuantilesSketch source, QuantilesSketch target) {
135+
136+
HeapQuantilesSketch src = (HeapQuantilesSketch)source;
137+
HeapQuantilesSketch tgt = (HeapQuantilesSketch)target;
138+
int srcK = src.getK();
139+
int tgtK = tgt.getK();
140+
long srcN = src.getN();
141+
long tgtN = tgt.getN();
142+
143+
if (srcK != tgtK) {
144+
Util.downSamplingMergeInto(src, tgt);
145+
return;
146+
}
147+
148+
double[] srcLevels = src.getCombinedBuffer(); // aliasing is a bit dangerous
149+
double[] srcBaseBuffer = srcLevels; // aliasing is a bit dangerous
150+
151+
long nFinal = tgtN + srcN;
152+
153+
for (int i = 0; i < src.getBaseBufferCount(); i++) {
154+
tgt.update(srcBaseBuffer[i]);
155+
}
156+
157+
Util.maybeGrowLevels(nFinal, tgt);
158+
159+
double[] scratchBuf = new double[2*tgtK];
160+
161+
long srcBitPattern = src.getBitPattern();
162+
assert srcBitPattern == (srcN / (2L * srcK));
163+
for (int srcLvl = 0; srcBitPattern != 0L; srcLvl++, srcBitPattern >>>= 1) {
164+
if ((srcBitPattern & 1L) > 0L) {
165+
Util.inPlacePropagateCarry(
166+
srcLvl,
167+
srcLevels, ((2+srcLvl) * tgtK),
168+
scratchBuf, 0,
169+
false, tgt);
170+
// won't update qsTarget.n_ until the very end
171+
}
172+
}
173+
174+
tgt.n_ = nFinal;
175+
176+
assert tgt.getN() / (2*tgtK) == tgt.getBitPattern(); // internal consistency check
177+
178+
double srcMax = src.getMaxValue();
179+
double srcMin = src.getMinValue();
180+
double tgtMax = tgt.getMaxValue();
181+
double tgtMin = tgt.getMinValue();
182+
if (srcMax > tgtMax) { tgt.maxValue_ = srcMax; }
183+
if (srcMin < tgtMin) { tgt.minValue_ = srcMin; }
184+
}
185+
186+
}

0 commit comments

Comments
 (0)