|
25 | 25 | import static org.apache.datasketches.kll.KllSketch.SketchType.ITEMS_SKETCH; |
26 | 26 |
|
27 | 27 | import java.lang.reflect.Array; |
| 28 | +import java.util.Arrays; |
28 | 29 | import java.util.Comparator; |
29 | 30 | import java.util.Objects; |
30 | 31 |
|
31 | 32 | import org.apache.datasketches.common.ArrayOfItemsSerDe; |
32 | 33 | import org.apache.datasketches.common.SketchesArgumentException; |
| 34 | +import org.apache.datasketches.common.Util; |
33 | 35 | import org.apache.datasketches.memory.Memory; |
34 | 36 | import org.apache.datasketches.memory.MemoryRequestServer; |
35 | 37 | import org.apache.datasketches.memory.WritableMemory; |
36 | 38 | import org.apache.datasketches.quantilescommon.GenericPartitionBoundaries; |
37 | 39 | import org.apache.datasketches.quantilescommon.PartitioningFeature; |
38 | 40 | import org.apache.datasketches.quantilescommon.QuantileSearchCriteria; |
39 | | -import org.apache.datasketches.quantilescommon.QuantilesAPI; |
40 | 41 | import org.apache.datasketches.quantilescommon.QuantilesGenericAPI; |
41 | 42 | import org.apache.datasketches.quantilescommon.QuantilesGenericSketchIterator; |
42 | 43 |
|
@@ -154,7 +155,7 @@ public double[] getCDF(final T[] splitPoints, final QuantileSearchCriteria searc |
154 | 155 | @Override |
155 | 156 | public GenericPartitionBoundaries<T> getPartitionBoundaries(final int numEquallySized, |
156 | 157 | final QuantileSearchCriteria searchCrit) { |
157 | | - if (isEmpty()) { throw new IllegalArgumentException(QuantilesAPI.EMPTY_MSG); } |
| 158 | + if (isEmpty()) { throw new IllegalArgumentException(EMPTY_MSG); } |
158 | 159 | refreshSortedView(); |
159 | 160 | return kllItemsSV.getPartitionBoundaries(numEquallySized, searchCrit); |
160 | 161 | } |
@@ -307,13 +308,6 @@ MemoryRequestServer getMemoryRequestServer() { |
307 | 308 | @Override |
308 | 309 | abstract int getMinMaxSizeBytes(); |
309 | 310 |
|
310 | | - private final KllItemsSketchSortedView<T> refreshSortedView() { |
311 | | - final KllItemsSketchSortedView<T> sv = (kllItemsSV == null) |
312 | | - ? kllItemsSV = new KllItemsSketchSortedView<>(this) |
313 | | - : kllItemsSV; |
314 | | - return sv; |
315 | | - } |
316 | | - |
317 | 311 | abstract T[] getRetainedItemsArray(); |
318 | 312 |
|
319 | 313 | @Override |
@@ -374,4 +368,144 @@ void setWritableMemory(final WritableMemory wmem) { |
374 | 368 | throw new SketchesArgumentException(UNSUPPORTED_MSG + "Sketch not writable."); |
375 | 369 | } |
376 | 370 |
|
| 371 | + void updateMinMax(final T item) { |
| 372 | + if (isEmpty()) { |
| 373 | + setMinItem(item); |
| 374 | + setMaxItem(item); |
| 375 | + } else { |
| 376 | + setMinItem(Util.minT(getMinItem(), item, comparator)); |
| 377 | + setMaxItem(Util.maxT(getMaxItem(), item, comparator)); |
| 378 | + } |
| 379 | + } |
| 380 | + |
| 381 | + private final KllItemsSketchSortedView<T> refreshSortedView() { |
| 382 | + if (kllItemsSV == null) { |
| 383 | + final CreateSortedView csv = new CreateSortedView(); |
| 384 | + kllItemsSV = csv.getSV(); |
| 385 | + } |
| 386 | + return kllItemsSV; |
| 387 | + } |
| 388 | + |
| 389 | + private final class CreateSortedView { |
| 390 | + T[] quantiles; |
| 391 | + long[] cumWeights; |
| 392 | + |
| 393 | + KllItemsSketchSortedView<T> getSV() { |
| 394 | + if (isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); } |
| 395 | + if (getN() == 0) { throw new SketchesArgumentException(EMPTY_MSG); } |
| 396 | + final T[] srcQuantiles = getTotalItemsArray(); |
| 397 | + final int[] srcLevels = levelsArr; |
| 398 | + final int srcNumLevels = getNumLevels(); |
| 399 | + |
| 400 | + if (!isLevelZeroSorted()) { |
| 401 | + Arrays.sort(srcQuantiles, srcLevels[0], srcLevels[1], comparator); |
| 402 | + if (!hasMemory()) { setLevelZeroSorted(true); } |
| 403 | + } |
| 404 | + final int numQuantiles = srcLevels[srcNumLevels] - srcLevels[0]; //remove free space |
| 405 | + quantiles = (T[]) Array.newInstance(serDe.getClassOfT(), numQuantiles); |
| 406 | + cumWeights = new long[numQuantiles]; |
| 407 | + populateFromSketch(srcQuantiles, srcLevels, srcNumLevels, numQuantiles); |
| 408 | + return new KllItemsSketchSortedView<>(quantiles, cumWeights, getN(), comparator, getMaxItem(), getMinItem()); |
| 409 | + } |
| 410 | + |
| 411 | + private void populateFromSketch(final Object[] srcQuantiles, final int[] srcLevels, |
| 412 | + final int srcNumLevels, final int numItems) { |
| 413 | + final int[] myLevels = new int[srcNumLevels + 1]; |
| 414 | + final int offset = srcLevels[0]; |
| 415 | + System.arraycopy(srcQuantiles, offset, quantiles, 0, numItems); |
| 416 | + int srcLevel = 0; |
| 417 | + int dstLevel = 0; |
| 418 | + long weight = 1; |
| 419 | + while (srcLevel < srcNumLevels) { |
| 420 | + final int fromIndex = srcLevels[srcLevel] - offset; |
| 421 | + final int toIndex = srcLevels[srcLevel + 1] - offset; // exclusive |
| 422 | + if (fromIndex < toIndex) { // if equal, skip empty level |
| 423 | + Arrays.fill(cumWeights, fromIndex, toIndex, weight); |
| 424 | + myLevels[dstLevel] = fromIndex; |
| 425 | + myLevels[dstLevel + 1] = toIndex; |
| 426 | + dstLevel++; |
| 427 | + } |
| 428 | + srcLevel++; |
| 429 | + weight *= 2; |
| 430 | + } |
| 431 | + final int numLevels = dstLevel; |
| 432 | + blockyTandemMergeSort(quantiles, cumWeights, myLevels, numLevels, comparator); //create unit weights |
| 433 | + KllHelper.convertToCumulative(cumWeights); |
| 434 | + } |
| 435 | + } |
| 436 | + |
| 437 | + private static <T> void blockyTandemMergeSort(final Object[] quantiles, final long[] weights, |
| 438 | + final int[] levels, final int numLevels, final Comparator<? super T> comp) { |
| 439 | + if (numLevels == 1) { return; } |
| 440 | + |
| 441 | + // duplicate the input in preparation for the "ping-pong" copy reduction strategy. |
| 442 | + final Object[] quantilesTmp = Arrays.copyOf(quantiles, quantiles.length); |
| 443 | + final long[] weightsTmp = Arrays.copyOf(weights, quantiles.length); // don't need the extra one here |
| 444 | + |
| 445 | + blockyTandemMergeSortRecursion(quantilesTmp, weightsTmp, quantiles, weights, levels, 0, numLevels, comp); |
| 446 | + } |
| 447 | + |
| 448 | + private static <T> void blockyTandemMergeSortRecursion( |
| 449 | + final Object[] quantilesSrc, final long[] weightsSrc, |
| 450 | + final Object[] quantilesDst, final long[] weightsDst, |
| 451 | + final int[] levels, final int startingLevel, final int numLevels, final Comparator<? super T> comp) { |
| 452 | + if (numLevels == 1) { return; } |
| 453 | + final int numLevels1 = numLevels / 2; |
| 454 | + final int numLevels2 = numLevels - numLevels1; |
| 455 | + assert numLevels1 >= 1; |
| 456 | + assert numLevels2 >= numLevels1; |
| 457 | + final int startingLevel1 = startingLevel; |
| 458 | + final int startingLevel2 = startingLevel + numLevels1; |
| 459 | + // swap roles of src and dst |
| 460 | + blockyTandemMergeSortRecursion( |
| 461 | + quantilesDst, weightsDst, |
| 462 | + quantilesSrc, weightsSrc, |
| 463 | + levels, startingLevel1, numLevels1, comp); |
| 464 | + blockyTandemMergeSortRecursion( |
| 465 | + quantilesDst, weightsDst, |
| 466 | + quantilesSrc, weightsSrc, |
| 467 | + levels, startingLevel2, numLevels2, comp); |
| 468 | + tandemMerge( |
| 469 | + quantilesSrc, weightsSrc, |
| 470 | + quantilesDst, weightsDst, |
| 471 | + levels, |
| 472 | + startingLevel1, numLevels1, |
| 473 | + startingLevel2, numLevels2, comp); |
| 474 | + } |
| 475 | + |
| 476 | + private static <T> void tandemMerge( |
| 477 | + final Object[] quantilesSrc, final long[] weightsSrc, |
| 478 | + final Object[] quantilesDst, final long[] weightsDst, |
| 479 | + final int[] levelStarts, |
| 480 | + final int startingLevel1, final int numLevels1, |
| 481 | + final int startingLevel2, final int numLevels2, final Comparator<? super T> comp) { |
| 482 | + final int fromIndex1 = levelStarts[startingLevel1]; |
| 483 | + final int toIndex1 = levelStarts[startingLevel1 + numLevels1]; // exclusive |
| 484 | + final int fromIndex2 = levelStarts[startingLevel2]; |
| 485 | + final int toIndex2 = levelStarts[startingLevel2 + numLevels2]; // exclusive |
| 486 | + int iSrc1 = fromIndex1; |
| 487 | + int iSrc2 = fromIndex2; |
| 488 | + int iDst = fromIndex1; |
| 489 | + |
| 490 | + while (iSrc1 < toIndex1 && iSrc2 < toIndex2) { |
| 491 | + if (Util.lt((T) quantilesSrc[iSrc1], (T) quantilesSrc[iSrc2], comp)) { |
| 492 | + quantilesDst[iDst] = quantilesSrc[iSrc1]; |
| 493 | + weightsDst[iDst] = weightsSrc[iSrc1]; |
| 494 | + iSrc1++; |
| 495 | + } else { |
| 496 | + quantilesDst[iDst] = quantilesSrc[iSrc2]; |
| 497 | + weightsDst[iDst] = weightsSrc[iSrc2]; |
| 498 | + iSrc2++; |
| 499 | + } |
| 500 | + iDst++; |
| 501 | + } |
| 502 | + if (iSrc1 < toIndex1) { |
| 503 | + System.arraycopy(quantilesSrc, iSrc1, quantilesDst, iDst, toIndex1 - iSrc1); |
| 504 | + System.arraycopy(weightsSrc, iSrc1, weightsDst, iDst, toIndex1 - iSrc1); |
| 505 | + } else if (iSrc2 < toIndex2) { |
| 506 | + System.arraycopy(quantilesSrc, iSrc2, quantilesDst, iDst, toIndex2 - iSrc2); |
| 507 | + System.arraycopy(weightsSrc, iSrc2, weightsDst, iDst, toIndex2 - iSrc2); |
| 508 | + } |
| 509 | + } |
| 510 | + |
377 | 511 | } |
0 commit comments