Skip to content

Commit 60d02de

Browse files
authored
Merge pull request #155 from DataSketches/vo_res_update
union reservoir sketches into varopt
2 parents e22e016 + 5ae2806 commit 60d02de

3 files changed

Lines changed: 234 additions & 8 deletions

File tree

src/main/java/com/yahoo/sketches/sampling/VarOptItemsUnion.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,18 @@ public void update(final Memory mem, final ArrayOfItemsSerDe<T> serDe) {
209209
}
210210
}
211211

212+
/**
213+
* Union a reservoir sketch. The reservoir sample is treated as if all items were added with a
214+
* weight of 1.0.
215+
*
216+
* @param reservoirIn The reservoir sketch to be merged
217+
*/
218+
public void update(final ReservoirItemsSketch<T> reservoirIn) {
219+
if (reservoirIn != null) {
220+
mergeReservoirInto(reservoirIn);
221+
}
222+
}
223+
212224
/**
213225
* Gets the varopt sketch resulting from the union of any input sketches.
214226
*
@@ -390,6 +402,61 @@ private void mergeInto(final VarOptItemsSketch<T> sketch) {
390402
}
391403
}
392404

405+
/**
406+
* Used to merge a reservoir sample into varopt, assuming the reservoir was built with items
407+
* of weight 1.0. Logic is very similar to mergeInto() for a sketch with no heavy items.
408+
* @param reservoir Reservoir sketch to merge into this union
409+
*/
410+
private void mergeReservoirInto(final ReservoirItemsSketch<T> reservoir) {
411+
final long reservoirN = reservoir.getN();
412+
if (reservoirN == 0) {
413+
return;
414+
}
415+
416+
n_ += reservoirN;
417+
418+
final int reservoirK = reservoir.getK();
419+
if (reservoir.getN() <= reservoirK) {
420+
// exact mode, so just insert and be done
421+
for (T item : reservoir.getRawSamplesAsList()) {
422+
gadget_.update(item, 1.0, false);
423+
}
424+
} else {
425+
// sampling mode. We'll replicate a weight-correcting iterator
426+
final double reservoirTau = reservoir.getImplicitSampleWeight();
427+
428+
double cumWeight = 0.0;
429+
final ArrayList<T> samples = reservoir.getRawSamplesAsList();
430+
for (int i = 0; i < reservoirK - 1; ++i) {
431+
gadget_.update(samples.get(i), reservoirTau, true);
432+
cumWeight += reservoirTau;
433+
}
434+
// correct for any numerical discrepancies with the last item
435+
gadget_.update(samples.get(reservoirK - 1), reservoir.getN() - cumWeight, true);
436+
437+
// resolve tau
438+
final double outerTau = getOuterTau();
439+
440+
if (outerTauDenom == 0) {
441+
// detect first estimation mode sketch and grab its tau
442+
outerTauNumer = reservoirN;
443+
outerTauDenom = reservoirK;
444+
} else if (reservoirTau > outerTau) {
445+
// switch to a bigger value of outerTau
446+
outerTauNumer = reservoirN;
447+
outerTauDenom = reservoirK;
448+
} else if (reservoirTau == outerTau) {
449+
// Ok if previous equality test isn't quite perfect. Mistakes in either direction should
450+
// be fairly benign.
451+
// Without conceptually changing outerTau, update number and denominator. In particular,
452+
// add the total weight of the incoming reservoir to the running total.
453+
outerTauNumer += reservoirN;
454+
outerTauDenom += reservoirK;
455+
}
456+
// do nothing if reservoir "tau" is no smaller than outerTau
457+
}
458+
}
459+
393460
/**
394461
* When there are no marked items in H, teh gadget is mathematically equivalent to a valid
395462
* varopt sketch. This method simply returns a copy (without perserving marks).

src/test/java/com/yahoo/sketches/sampling/VarOptItemsSketchTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,6 @@ public void checkEmptySketch() {
202202
final Memory mem = Memory.wrap(sketchBytes);
203203

204204
// only minPreLongs bytes and should deserialize to empty
205-
assert sketchBytes != null;
206205
assertEquals(sketchBytes.length, Family.VAROPT.getMinPreLongs() << 3);
207206
final ArrayOfStringsSerDe serDe = new ArrayOfStringsSerDe();
208207
final VarOptItemsSketch<String> loadedVis = VarOptItemsSketch.heapify(mem, serDe);

src/test/java/com/yahoo/sketches/sampling/VarOptItemsUnionTest.java

Lines changed: 167 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -187,12 +187,145 @@ public void unionSmallSamplingSketch() {
187187
assertEquals(result.getTotalWtR(), 96.0, EPS); // n1+n2 light items, ignore the heavy one
188188
}
189189

190+
@Test
191+
public void unionExactReservoirSketch() {
192+
// build a varopt union which contains both heavy and light items, then copy it and
193+
// compare unioning:
194+
// 1. A varopt sketch of items with weight 1.0
195+
// 2. A reservoir sample made of the same input items as above
196+
// and we should find that the resulting unions are equivalent.
197+
198+
final int k = 20;
199+
final long n = 2 * k;
200+
201+
final VarOptItemsSketch<Long> baseVis = VarOptItemsSketch.newInstance(k);
202+
for (long i = 1; i <= n; ++i) {
203+
baseVis.update(-i, i);
204+
}
205+
baseVis.update(-n - 1L, n * n);
206+
baseVis.update(-n - 2L, n * n);
207+
baseVis.update(-n - 3L, n * n);
208+
209+
final VarOptItemsUnion<Long> union1 = VarOptItemsUnion.newInstance(k);
210+
union1.update(baseVis);
211+
212+
final ArrayOfLongsSerDe serDe = new ArrayOfLongsSerDe();
213+
final Memory unionImg = Memory.wrap(union1.toByteArray(serDe));
214+
final VarOptItemsUnion<Long> union2 = VarOptItemsUnion.heapify(unionImg, serDe);
215+
216+
compareUnionsExact(union1, union2); // sanity check
217+
218+
final VarOptItemsSketch<Long> vis = VarOptItemsSketch.newInstance(k);
219+
final ReservoirItemsSketch<Long> ris = ReservoirItemsSketch.newInstance(k);
220+
221+
union2.update((ReservoirItemsSketch<Long>) null);
222+
union2.update(ris); // empty
223+
224+
compareUnionsExact(union1, union2); // union2 should be unchanged
225+
226+
for (long i = 1; i < k - 1; ++i) {
227+
ris.update(i);
228+
vis.update(i, 1.0);
229+
}
230+
231+
union1.update(vis);
232+
union2.update(ris);
233+
234+
compareUnionsEquivalent(union1, union2);
235+
}
236+
237+
@Test
238+
public void unionSamplingReservoirSketch() {
239+
// Like unionExactReservoirSketch, but merge in reservoir first, with reservoir in sampling mode
240+
final int k = 20;
241+
final long n = k * k;
242+
243+
final VarOptItemsUnion<Long> union1 = VarOptItemsUnion.newInstance(k);
244+
final VarOptItemsUnion<Long> union2 = VarOptItemsUnion.newInstance(k);
245+
246+
compareUnionsExact(union1, union2); // sanity check
247+
248+
final VarOptItemsSketch<Long> vis = VarOptItemsSketch.newInstance(k);
249+
final ReservoirItemsSketch<Long> ris = ReservoirItemsSketch.newInstance(k);
250+
251+
for (long i = 1; i < n; ++i) {
252+
ris.update(i);
253+
vis.update(i, 1.0);
254+
}
255+
256+
union1.update(vis);
257+
union2.update(ris);
258+
compareUnionsEquivalent(union1, union2);
259+
260+
// repeat to trigger equal tau scenario
261+
union1.update(vis);
262+
union2.update(ris);
263+
compareUnionsEquivalent(union1, union2);
264+
265+
// create and add a sketch with some heavy items
266+
final VarOptItemsSketch<Long> newVis = VarOptItemsSketch.newInstance(k);
267+
for (long i = 1; i <= n; ++i) {
268+
newVis.update(-i, i);
269+
}
270+
newVis.update(-n - 1L, n * n);
271+
newVis.update(-n - 2L, n * n);
272+
newVis.update(-n - 3L, n * n);
273+
274+
union1.update(newVis);
275+
union2.update(newVis);
276+
compareUnionsEquivalent(union1, union2);
277+
}
278+
279+
@Test
280+
public void unionReservoirVariousTauValues() {
281+
final int k = 20;
282+
final long n = 2 * k;
283+
284+
final VarOptItemsSketch<Long> baseVis = VarOptItemsSketch.newInstance(k);
285+
for (long i = 1; i <= n; ++i) {
286+
baseVis.update(-i, 1.0);
287+
}
288+
289+
final VarOptItemsUnion<Long> union1 = VarOptItemsUnion.newInstance(k);
290+
union1.update(baseVis);
291+
292+
final ArrayOfLongsSerDe serDe = new ArrayOfLongsSerDe();
293+
final Memory unionImg = Memory.wrap(union1.toByteArray(serDe));
294+
final VarOptItemsUnion<Long> union2 = VarOptItemsUnion.heapify(unionImg, serDe);
295+
296+
compareUnionsExact(union1, union2); // sanity check
297+
298+
// reservoir tau will be greater than gadget's tau
299+
VarOptItemsSketch<Long> vis = VarOptItemsSketch.newInstance(k);
300+
ReservoirItemsSketch<Long> ris = ReservoirItemsSketch.newInstance(k);
301+
for (long i = 1; i < 2 * n; ++i) {
302+
ris.update(i);
303+
vis.update(i, 1.0);
304+
}
305+
306+
union1.update(vis);
307+
union2.update(ris);
308+
compareUnionsEquivalent(union1, union2);
309+
310+
// reservoir tau will be smaller than gadget's tau
311+
vis = VarOptItemsSketch.newInstance(k);
312+
ris = ReservoirItemsSketch.newInstance(k);
313+
for (long i = 1; i <= k + 1; ++i) {
314+
ris.update(i);
315+
vis.update(i, 1.0);
316+
}
317+
318+
union1.update(vis);
319+
union2.update(ris);
320+
compareUnionsEquivalent(union1, union2);
321+
}
322+
190323
@Test
191324
public void serializeEmptyUnion() {
192325
final int k = 100;
193326
final VarOptItemsUnion<String> union = VarOptItemsUnion.newInstance(k);
194327
// null inputs to update() should leave the union empty
195-
union.update(null);
328+
union.update((VarOptItemsSketch<String>) null);
196329
union.update(null, new ArrayOfStringsSerDe());
197330

198331
final ArrayOfStringsSerDe serDe = new ArrayOfStringsSerDe();
@@ -225,7 +358,7 @@ public void serializeExactUnion() {
225358
final Memory mem = Memory.wrap(unionBytes);
226359

227360
final VarOptItemsUnion<Long> rebuilt = VarOptItemsUnion.heapify(mem, serDe);
228-
compareUnions(rebuilt, union);
361+
compareUnionsExact(rebuilt, union);
229362

230363
assertEquals(rebuilt.toString(), union.toString());
231364
}
@@ -252,21 +385,48 @@ public void serializeSamplingUnion() {
252385
final Memory mem = Memory.wrap(unionBytes);
253386

254387
final VarOptItemsUnion<Long> rebuilt = VarOptItemsUnion.heapify(mem, serDe);
255-
compareUnions(rebuilt, union);
388+
compareUnionsExact(rebuilt, union);
256389

257390
assertEquals(rebuilt.toString(), union.toString());
258391
}
259392

260-
static <T> void compareUnions(final VarOptItemsUnion<T> u1,
261-
final VarOptItemsUnion<T> u2) {
393+
private static <T> void compareUnionsExact(final VarOptItemsUnion<T> u1,
394+
final VarOptItemsUnion<T> u2) {
262395
assertEquals(u1.getOuterTau(), u2.getOuterTau());
263396

264-
final VarOptItemsSamples<T> s1 = u1.getResult().getSketchSamples();
265-
final VarOptItemsSamples<T> s2 = u2.getResult().getSketchSamples();
397+
final VarOptItemsSketch<T> sketch1 = u1.getResult();
398+
final VarOptItemsSketch<T> sketch2 = u2.getResult();
399+
assertEquals(sketch1.getN(), sketch2.getN());
400+
assertEquals(sketch1.getHRegionCount(), sketch2.getHRegionCount());
401+
assertEquals(sketch1.getRRegionCount(), sketch2.getRRegionCount());
402+
403+
final VarOptItemsSamples<T> s1 = sketch1.getSketchSamples();
404+
final VarOptItemsSamples<T> s2 = sketch2.getSketchSamples();
266405

267406
assertEquals(s1.getNumSamples(), s2.getNumSamples());
407+
assertEquals(s1.weights(), s2.weights());
268408
assertEquals(s1.items(), s2.items());
409+
}
410+
411+
private static <T> void compareUnionsEquivalent(final VarOptItemsUnion<T> u1,
412+
final VarOptItemsUnion<T> u2) {
413+
assertEquals(u1.getOuterTau(), u2.getOuterTau());
414+
415+
final VarOptItemsSketch<T> sketch1 = u1.getResult();
416+
final VarOptItemsSketch<T> sketch2 = u2.getResult();
417+
assertEquals(sketch1.getN(), sketch2.getN());
418+
assertEquals(sketch1.getHRegionCount(), sketch2.getHRegionCount());
419+
assertEquals(sketch1.getRRegionCount(), sketch2.getRRegionCount());
420+
421+
final VarOptItemsSamples<T> s1 = sketch1.getSketchSamples();
422+
final VarOptItemsSamples<T> s2 = sketch2.getSketchSamples();
423+
424+
assertEquals(s1.getNumSamples(), s2.getNumSamples());
269425
assertEquals(s1.weights(), s2.weights());
426+
// only compare exact items; others can differ as long as weights match
427+
for (int i = 0; i < sketch1.getHRegionCount(); ++i) {
428+
assertEquals(s1.items(i), s2.items(i));
429+
}
270430
}
271431

272432
/**

0 commit comments

Comments
 (0)