BloomFilter v2 support for Spark's bloom-filter based joins#4360
BloomFilter v2 support for Spark's bloom-filter based joins#4360mythrocks merged 30 commits intoNVIDIA:release/26.04from
Conversation
Signed-off-by: MithunR <mithunr@nvidia.com>
Signed-off-by: MithunR <mithunr@nvidia.com>
Greptile SummaryThis PR adds V2 bloom-filter support matching Apache Spark 4.1.1's Key changes:
Minor nit: Confidence Score: 4/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant Java as BloomFilter.java
participant JNI as BloomFilterJni.cpp
participant CU as bloom_filter.cu (CPU)
participant GPU as GPU Kernels
Note over Java,GPU: Create
Java->>JNI: creategpu(version, numHashes, bloomFilterBits, seed)
JNI->>JNI: Validate bits range (≤ INT32_MAX * 64)
JNI->>CU: bloom_filter_create(version, numHashes, longs, seed)
CU->>CU: get_bloom_filter_stride(version, longs) → validates ≤ INT32_MAX
CU->>CU: pack_bloom_filter_header (V1: 12B, V2: 16B, big-endian)
CU-->>Java: list_scalar (header + zeroed bit array)
Note over Java,GPU: Put
Java->>JNI: put(bloomFilter, cv)
JNI->>CU: bloom_filter_put(scalar, column)
CU->>CU: unpack_bloom_filter → (header, buffer, bits, seed)
alt version == 1
CU->>GPU: gpu_bloom_filter_put<1> (32-bit hash indices)
else version == 2
CU->>GPU: gpu_bloom_filter_put<2> (64-bit hash indices, seed)
end
GPU->>GPU: atomic OR bits into filter buffer
Note over Java,GPU: Probe
Java->>JNI: probe(bloomFilter, cv)
JNI->>CU: bloom_filter_probe(column, column)
CU->>CU: unpack_bloom_filter → (header, buffer, bits, seed)
alt version == 1
CU->>GPU: thrust::transform + bloom_probe_functor<1>
else version == 2
CU->>GPU: thrust::transform + bloom_probe_functor<2>
end
GPU-->>Java: bool column (true = may be present)
Reviews (19): Last reviewed commit: "Review: exec_policy_nosync, test changes..." | Re-trigger Greptile |
Signed-off-by: MithunR <mithunr@nvidia.com>
1. Checked for narrowing for num_longs. 2. Removed unused create function with the seemingly changed default format version. Signed-off-by: MithunR <mithunr@nvidia.com>
|
@greptile-apps: Please review, to see if your prior concerns have been addressed. |
Signed-off-by: MithunR <mithunr@nvidia.com>
Signed-off-by: MithunR <mithunr@nvidia.com>
Signed-off-by: MithunR <mithunr@nvidia.com>
|
@greptile-apps: Please check if this change is now acceptable. |
Plus, added deprecated create() function back. Signed-off-by: MithunR <mithunr@nvidia.com>
|
@greptile-apps: How does this change look now? |
|
The changes look great! All the concerns raised in the review have been addressed: ✅ Redundant size checks cleaned up The implementation correctly matches Spark's V2 format (using |
Signed-off-by: MithunR <mithunr@nvidia.com>
Signed-off-by: MithunR <mithunr@nvidia.com>
Signed-off-by: MithunR <mithunr@nvidia.com>
|
@greptile-apps, how is this change looking now? |
|
This is looking good! All the issues I raised have been addressed:
The one remaining unresolved thread (V1 narrowing cast) shows your fixes are correct - you removed the |
|
Build |
|
NOTE: release/26.04 has been created from main. Please retarget your PR to release/26.04 if it should be included in the release. |
| if constexpr (Version == 1) { | ||
| // https://github.com/apache/spark/blob/5075ea6a85f3f1689766cf08a7d5b2ce500be1fb/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java#L38 | ||
| // This is the original V1 hash algorithm from Spark. | ||
| for (auto idx = 1; idx <= num_hashes; idx++) { | ||
| bloom_hash_type combined_hash = h1 + (idx * h2); | ||
| auto const bit_pos = | ||
| static_cast<int64_t>(combined_hash < 0 ? ~combined_hash : combined_hash) % | ||
| bloom_filter_bits; | ||
| auto const [word_index, mask] = gpu_bit_to_word_mask(bit_pos); | ||
| cuda::atomic_ref<cudf::bitmask_type, cuda::thread_scope_device> ref(bloom_filter[word_index]); | ||
| ref.fetch_or(mask, cuda::memory_order_relaxed); | ||
| } | ||
| } else { | ||
| // https://github.com/apache/spark/blob/5075ea6a85f3f1689766cf08a7d5b2ce500be1fb/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImplV2.java#L63 | ||
| int64_t combined_hash = | ||
| static_cast<int64_t>(h1) * static_cast<int64_t>(cuda::std::numeric_limits<int32_t>::max()); | ||
| for (int idx = 0; idx < num_hashes; idx++) { | ||
| combined_hash += h2; | ||
| int64_t combined_index = combined_hash < 0 ? ~combined_hash : combined_hash; | ||
| auto const bit_pos = combined_index % bloom_filter_bits; | ||
| auto const [word_index, mask] = gpu_bit_to_word_mask(bit_pos); | ||
| cuda::atomic_ref<cudf::bitmask_type, cuda::thread_scope_device> ref(bloom_filter[word_index]); | ||
| ref.fetch_or(mask, cuda::memory_order_relaxed); |
There was a problem hiding this comment.
This is duplicate with the block at row 129-148.
There was a problem hiding this comment.
Consider extracting this init a common function.
There was a problem hiding this comment.
I tried in the first pass. I just tried this out again. The only way I can think of is to move the common part to a template <typename Visitor> for_each_bit(...) function, and pass a visitor that:
- Either sets the bit value (for the
putcase), or... - Reads the bit value (for the probe case).
Both times, I found that the code got obfuscated for reading. (I can get it short, not readable.)
I'm inclined not to shorten this further, at least for now.
| cudaMemcpyAsync( | ||
| buf.data(), &header_swizzled, bloom_filter_header_size, cudaMemcpyHostToDevice, stream); | ||
| if (header.version == bloom_filter_version_1) { | ||
| bloom_filter_header_v1 raw = {byte_swap_int32(header.version), |
There was a problem hiding this comment.
Can we use pinned memory for host buffer? Here and any other places that does memcpy H<->D.
There was a problem hiding this comment.
Could I make the pinned memory change in a follow up?
There was a problem hiding this comment.
Good idea, by the way. I hadn't considered this.
There was a problem hiding this comment.
I've taken a follow-up for this: #4407.
ttnghia
left a comment
There was a problem hiding this comment.
There is also a convention-violation problem: All five public API functions are missing SRJ_FUNC_RANGE().
Signed-off-by: MithunR <mithunr@nvidia.com>
Signed-off-by: MithunR <mithunr@nvidia.com>
Signed-off-by: MithunR <mithunr@nvidia.com>
Signed-off-by: MithunR <mithunr@nvidia.com>
Signed-off-by: MithunR <mithunr@nvidia.com>
Signed-off-by: MithunR <mithunr@nvidia.com>
Signed-off-by: MithunR <mithunr@nvidia.com>
Signed-off-by: MithunR <mithunr@nvidia.com>
|
Build |
I've sorted this out as well. @ttnghia: Do take another look when you have a moment. |
Signed-off-by: MithunR <mithunr@nvidia.com>
|
Build |
amahussein
left a comment
There was a problem hiding this comment.
Thanks @mythrocks for putting up the fix.
Regarding the followup #4407, it will be great to get a rough estimate about the effort to figure out the answer and what is the tradeoffs in using pinned-memory.
The BloomFilter performance is critical at the moment.
Fixes NVIDIA#14462. This change addresses the build breakage in `spark-rapids` from the deprecation of `spark-rapids-jni` `BloomFilter.create(int,int)` deprecation, introduced in NVIDIA/spark-rapids-jni#4360. This is a stop-gap solution that only restores prior behaviour, i.e. support for the BloomFilter v1 binary format. Actual support for the BloomFilter v2 format will follow in NVIDIA#14406. Signed-off-by: MithunR <mithunr@nvidia.com>
Fixes #14462. ### Description This change addresses the build breakage in `spark-rapids` from the deprecation of `spark-rapids-jni` `BloomFilter.create(int,int)` deprecation, introduced in NVIDIA/spark-rapids-jni#4360. This is a stop-gap solution that only restores prior behaviour, i.e. support for the BloomFilter v1 binary format. Actual support for the BloomFilter v2 format will follow in #14406. ### Checklists - [ ] This PR has added documentation for new or modified features or behaviors. - [ ] This PR has added new tests or modified existing tests to cover new code paths. (Please explain in the PR description how the new code paths are tested, such as names of the new/existing tests that cover them.) - [ ] Performance testing has been performed and its results are added in the PR description. Or, an issue has been filed with a link in the PR description. Signed-off-by: MithunR <mithunr@nvidia.com>
Description
This commit adds support for the v2 format of the BloomFilters used in Apache Spark 4.1.1 for joins (via apache/spark@a08d8b0).
Background
The v1 format used INT32s for bit index calculation. When the number of items in the bloom-filter approaches INT_MAX, one sees a higher rate of collisions. The v2 format uses INT64 values for bit index calculations, allowing the full bit space to be addressed. Apparently, this reduces the false positive rates for large filters.
Before the fix in this current PR,
spark-rapids-jnisupported only the v1 bloom filter format. Testingspark-rapidson Apache Spark 4.1.1 revealed failures in mixed-mode execution, where bloom filters built on CPU were probed on the GPU (assuming v1 format).The changes here should allow for a reduced false-positive rate for bloom filters built on join keys with high cardinalities (approaching INT_MAX). Note also that support for the v1 format is retained, for backward compatibility.