Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 50 additions & 21 deletions src/main/java/org/apache/datasketches/count/CountMinSketch.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.apache.datasketches.common.Family;
import org.apache.datasketches.common.SketchesArgumentException;
import org.apache.datasketches.common.SketchesException;
import org.apache.datasketches.common.Util;
import org.apache.datasketches.hash.MurmurHash3;
import org.apache.datasketches.tuple.Util;

import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
Copy link
Copy Markdown
Member

@leerho leerho Jul 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not recommend the use of ByteBuffer or any of its subclasses. Since the main branch is targeted for Java 25, I recommend switching to use FFM MemorySegments. Here are my reasons:

  • ByteBuffer off-heap allocations cannot be interactively (dynamically) closed, they are managed by the GC. Thus out-of-memory scenarios can happen in large systems with millions of sketches because the GC can't keep up with new off-heap allocations.
  • The default endianness of BB is Big-Endian! Even if you intentionally set it to Little-Endian, it will silently switch back to BE after duplicate(), clear(), or slice(), operations. To make matters worse, this silent behavior of BB is not documented in the official Javadocs!
  • Managing endianness with BB requires a great deal of care and it is very easy to forget to constantly switch the endianness back to LE. Thus, I consider its use here in this class as dangerous since there is no explicit settings of endianness, (whether to LE or BE!).
  • Endianness is explicitly set with FFM (and with the older datasketches.memory classes) with no silent switching of endianness, thus much safer.
  • Our library is completely LE with one exception in t-Digest for binary compatibility with externally produced t-Digest sketches. Since the introduction of CountMin is new for the Java library it should use LE ordering, since, I assume, there is no history of CountMin sketches being stored.
  • Our Theta, Tuple and CPC sketches must be LE due to their compression algorithms, which rely on LE byte ordering.

Expand All @@ -39,6 +39,9 @@ public class CountMinSketch {
private final long[] sketchArray_;
private long totalWeight_;

// Thread-local ByteBuffer to avoid allocations in hot paths
private static final ThreadLocal<ByteBuffer> LONG_BUFFER =
ThreadLocal.withInitial(() -> ByteBuffer.allocate(8));

private enum Flag {
IS_EMPTY;
Expand All @@ -57,30 +60,59 @@ int mask() {
* @param seed The base hash seed
*/
CountMinSketch(final byte numHashes, final int numBuckets, final long seed) {
numHashes_ = numHashes;
numBuckets_ = numBuckets;
seed_ = seed;
hashSeeds_ = new long[numHashes];
sketchArray_ = new long[numHashes * numBuckets];
totalWeight_ = 0;
// Validate numHashes
if (numHashes <= 0) {
throw new SketchesArgumentException("Number of hash functions must be positive, got: " + numHashes);
}

// Validate numBuckets with clear mathematical justification
if (numBuckets <= 0) {
throw new SketchesArgumentException("Number of buckets must be positive, got: " + numBuckets);
}
if (numBuckets < 3) {
throw new SketchesArgumentException("Using fewer than 3 buckets incurs relative error greater than 1.");
throw new SketchesArgumentException("Number of buckets must be at least 3 to ensure relative error ≤ 1.0. " +
"With " + numBuckets + " buckets, relative error would be " + String.format("%.3f", Math.exp(1.0) / numBuckets));
Comment thread
freakyzoidberg marked this conversation as resolved.
}

// Check for potential overflow in array size calculation
// Use long arithmetic to detect overflow before casting
final long totalSize = (long) numHashes * (long) numBuckets;
if (totalSize > Integer.MAX_VALUE) {
throw new SketchesArgumentException("Sketch array size would overflow: " + numHashes + " * " + numBuckets +
" = " + totalSize + " > " + Integer.MAX_VALUE);
}

// This check is to ensure later compatibility with a Java implementation whose maximum size can only
// be 2^31-1. We check only against 2^30 for simplicity.
if (numBuckets * numHashes >= 1 << 30) {
throw new SketchesArgumentException("These parameters generate a sketch that exceeds 2^30 elements. \n" +
"Try reducing either the number of buckets or the number of hash functions.");
if (totalSize >= (1L << 30)) {
throw new SketchesArgumentException("Sketch would require excessive memory: " + numHashes + " * " + numBuckets +
" = " + totalSize + " elements (~" + String.format("%.1f", totalSize * 8.0 / (1024 * 1024 * 1024)) + " GB). " +
"Consider reducing numHashes or numBuckets.");
}

numHashes_ = numHashes;
numBuckets_ = numBuckets;
seed_ = seed;
hashSeeds_ = new long[numHashes];
sketchArray_ = new long[(int) totalSize];
totalWeight_ = 0;

Random rand = new Random(seed);
for (int i = 0; i < numHashes; i++) {
Comment thread
freakyzoidberg marked this conversation as resolved.
hashSeeds_[i] = rand.nextLong();
}
}

/**
* Efficiently converts a long to byte array using thread-local buffer to avoid allocations.
*/
private static byte[] longToBytes(final long value) {
final ByteBuffer buffer = LONG_BUFFER.get();
buffer.clear();
buffer.putLong(value);
return buffer.array();
}

private long[] getHashes(byte[] item) {
long[] updateLocations = new long[numHashes_];

Expand Down Expand Up @@ -171,8 +203,7 @@ public static int suggestNumBuckets(double relativeError) {
* @param weight The weight of the item.
*/
public void update(final long item, final long weight) {
byte[] longByte = ByteBuffer.allocate(8).putLong(item).array();
update(longByte, weight);
update(longToBytes(item), weight);
}

/**
Expand Down Expand Up @@ -211,8 +242,7 @@ public void update(final byte[] item, final long weight) {
* @return Estimated frequency.
*/
public long getEstimate(final long item) {
byte[] longByte = ByteBuffer.allocate(8).putLong(item).array();
return getEstimate(longByte);
return getEstimate(longToBytes(item));
}

/**
Expand Down Expand Up @@ -241,8 +271,9 @@ public long getEstimate(final byte[] item) {

long[] hashLocations = getHashes(item);
long res = sketchArray_[(int) hashLocations[0]];
for (long h : hashLocations) {
res = Math.min(res, sketchArray_[(int) h]);
// Start from index 1 to avoid processing first element twice
for (int i = 1; i < hashLocations.length; i++) {
res = Math.min(res, sketchArray_[(int) hashLocations[i]]);
}

return res;
Expand All @@ -254,8 +285,7 @@ public long getEstimate(final byte[] item) {
* @return Upper bound of estimated frequency.
*/
public long getUpperBound(final long item) {
byte[] longByte = ByteBuffer.allocate(8).putLong(item).array();
return getUpperBound(longByte);
return getUpperBound(longToBytes(item));
}

/**
Expand Down Expand Up @@ -291,8 +321,7 @@ public long getUpperBound(final byte[] item) {
* @return Lower bound of estimated frequency.
*/
public long getLowerBound(final long item) {
byte[] longByte = ByteBuffer.allocate(8).putLong(item).array();
return getLowerBound(longByte);
return getLowerBound(longToBytes(item));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void checkAllFlavorsGo() throws IOException {
int flavorIdx = 0;
for (int n: nArr) {
final byte[] bytes = Files.readAllBytes(goPath.resolve("cpc_n" + n + "_go.sk"));
final CpcSketch sketch = CpcSketch.heapify(Memory.wrap(bytes));
final CpcSketch sketch = CpcSketch.heapify(MemorySegment.ofArray(bytes));
assertEquals(sketch.getFlavor(), flavorArr[flavorIdx++]);
Comment thread
freakyzoidberg marked this conversation as resolved.
assertEquals(sketch.getEstimate(), n, n * 0.02);
}
Expand Down