BloomFilter v2 support [databricks]#14406
Conversation
Fixes NVIDIA#14148. This commit adds support for the new BloomFilter v2 format that was added in Apache Spark 4.1.1 (via apache/spark@a08d8b0). The v1 format used INT32s for bit index calculation. When the number of items in the bloom-filter approaches INT_MAX, one sees a higher rate of collisions. The v2 format uses INT64 values for bit index calculations, allowing the full bit space to be addressed. Apparently, this reduces the false positive rates for large filters. Before the fix in this current PR was applied to spark-rapids, only certain bloom filter join tests would fail against Apache Spark 4.1.1; specifically: 1. `test_bloom_filter_join_cpu_build`, where the bloom filter is built on CPU and then probed on GPU. This failed because the CPU would produce a v2 filter that couldn't be treated as a v1 format on GPU. 2. `test_bloom_filter_join_split_cpu_build`, where the bloom filter is partially aggregated on CPU, then merged on GPU. Again, the GPU-side merging expected v1 format, while the CPU produced v2. Note that `test_bloom_filter_join_cpu_probe` and `test_bloom_filter_join` did not actually fail on 4.1.1. That is because: 1. `test_bloom_filter_join_cpu_probe` tests CPU probing, which supports v1 and v2 flexibly. 2. `test_bloom_filter_join` tests both the build and probe jointly being either on CPU, or GPU. The CPU ran v2 format, the GPU ran v1. Both produce the same query results, albeit with different formats. The fix in this commit allows for v1 and v2 formats to be jointly supported on GPU, depending on the Spark version. Signed-off-by: MithunR <mithunr@nvidia.com>
Greptile SummaryThis PR adds GPU-side support for Apache Spark 4.1.1's BloomFilter V2 format (64-bit index calculations), fixing broken bloom filter join tests on Spark 4.1.1+ while keeping full backward compatibility with the V1 format used by earlier Spark versions. Key changes:
Confidence Score: 5/5Safe to merge; the implementation is correct and previously flagged concerns have been resolved. All P0/P1 issues raised in earlier review rounds are confirmed fixed. The shim-based default version, ARM-pattern resource management in tests/src/test/spark411/scala/com/nvidia/spark/rapids/BloomFilterAggregateQuerySuite.scala — only a literal V2 unit test is present; other build/probe scenarios are integration-test-only for Spark 4.1.1. Important Files Changed
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[CPU builds BloomFilter\nSpark serialises V1 or V2\nbig-endian bytes] --> B[GPU receives\ndevice buffer]
B --> C[GpuBloomFilter.deserialize]
C --> D[readVersionFromDevice\nsliceWithCopy 0..4\ncopyToHost\nInteger.reverseBytes]
D --> E{version?}
E -->|1| F[headerSize = 12\nHEADER_SIZE_V1]
E -->|2| G[headerSize = 16\nHEADER_SIZE_V2]
E -->|other| H[IllegalArgumentException]
F --> I[Validate totalLen >= headerSize\nValidate bitBufferLen % 8 == 0]
G --> I
I --> J[DeviceMemoryBuffer.allocate\ncopyFromDeviceBufferAsync\nnew GpuBloomFilter]
subgraph Build path
K[BloomFilterShims.convertToGpuImpl\nreads BloomFilterConstantsShims\n.BLOOM_FILTER_FORMAT_VERSION] --> L[GpuBloomFilterAggregate\nversion=1 spark330\nversion=2 spark411]
L --> M[GpuBloomFilterUpdate\nBloomFilter.create version numHashes numBits seed]
end
Reviews (6): Last reviewed commit: "Merge remote-tracking branch 'origin/rel..." | Re-trigger Greptile |
...plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/BloomFilterConstantsShims.scala
Outdated
Show resolved
Hide resolved
.../src/main/spark330/scala/org/apache/spark/sql/rapids/aggregate/GpuBloomFilterAggregate.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: MithunR <mithunr@nvidia.com>
Signed-off-by: MithunR <mithunr@nvidia.com>
|
@greptile-apps: Have all the concerns from the review been addressed in the latest version? |
|
Yes, all concerns from the review have been addressed. There were two issues raised:
Both review threads are marked as resolved and the developer has confirmed the fixes in their responses. |
amahussein
left a comment
There was a problem hiding this comment.
Thanks @mythrocks for the fix!
I believe V1-to-V2 change is a correctness bug fix for extreme-scale bloom filters, not a performance improvement on typical workloads like NDS/TPCDS.
We would see difference if the bloom filters are large (hundreds of MB) and billions of items to to see any difference.
|
The dependency PR (NVIDIA/spark-rapids-jni#4360) has been merged. The CI will fail until 4360 finds its way into a published snapshot. That shouldn't prevent us from reviewing this change. |
My understanding is this is a performance issue not a correctness bug fix. The v1 will have a higher false-positive rate with large data sets, but should produce correct results. Am I missing something? |
|
Build |
|
New 26.04 nightly JNI is just available, re-kick the build to include DB runtime validations #14462 |
|
build |
|
early termination of CI spark400db173, |
|
@mythrocks @jihoonson why are we merging something without any performance tests? When are the performance tests going to be completed? Also spark-rapids build is broken due to the deprecation of the BloomFilter.create method in jni. |
@jihoonson I don't remember Spark's PR provided any information about performance difference with that fix.
We need separate benchmark for bloomFilters. CC: @abellina |
Fixes NVIDIA#14462. This change addresses the build breakage in `spark-rapids` from the deprecation of `spark-rapids-jni` `BloomFilter.create(int,int)` deprecation, introduced in NVIDIA/spark-rapids-jni#4360. This is a stop-gap solution that only restores prior behaviour, i.e. support for the BloomFilter v1 binary format. Actual support for the BloomFilter v2 format will follow in NVIDIA#14406. Signed-off-by: MithunR <mithunr@nvidia.com>
That is addressed in #14468. |
It is not my intention to break the build, or to merge changes that weren't perf tested. The JNI benchmark tests were updated in NVIDIA/spark-rapids-jni#4360 to include the v2 format. I intended to document this here after I was done testing: I have been running tests on the plugin change for a while now. The tests are custom and local; I didn't think NDS addresses this change. The test involves an inner join a fact table against a dimension table, where:
The tests force the bloom-join optimization on. My findings have been:
The above should address @jihoonson and @amahussein's question: The change from V1 to V2 was a performance issue for Apache Spark. It is a functionality issue for
#14468. I hadn't expected turbulence on #14406 in Databricks. |
...plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/BloomFilterConstantsShims.scala
Show resolved
Hide resolved
.../src/main/spark330/scala/org/apache/spark/sql/rapids/aggregate/GpuBloomFilterAggregate.scala
Show resolved
Hide resolved
Fixes #14462. ### Description This change addresses the build breakage in `spark-rapids` from the deprecation of `spark-rapids-jni` `BloomFilter.create(int,int)` deprecation, introduced in NVIDIA/spark-rapids-jni#4360. This is a stop-gap solution that only restores prior behaviour, i.e. support for the BloomFilter v1 binary format. Actual support for the BloomFilter v2 format will follow in #14406. ### Checklists - [ ] This PR has added documentation for new or modified features or behaviors. - [ ] This PR has added new tests or modified existing tests to cover new code paths. (Please explain in the PR description how the new code paths are tested, such as names of the new/existing tests that cover them.) - [ ] Performance testing has been performed and its results are added in the PR description. Or, an issue has been filed with a link in the PR description. Signed-off-by: MithunR <mithunr@nvidia.com>
|
Build |
@abellina It is important to track the performance change of each PR, but I don't think the performance testing must be done in the same PR. There are often such cases when you want to get the code merged in first and then do some performance testing, such as 1) the feature is too big to fit in a single PR, 2) you don't have enough time to run performance tests for some reason, such as code freeze. In any case, we should track the issue to measure performance of the feature. Our PR template suggests to file a follow-up github issue in this case. In the case of this PR, the performance test must be completed before the release. We will be able to decide whether to keep the feature or not based on the result. This only becomes the case when the author decides to do the performance test later. I'm fine either way as long as the performance testing is done before the release. |
|
build |
I'll sort the test out. |
Signed-off-by: MithunR <mithunr@nvidia.com>
Signed-off-by: MithunR <mithunr@nvidia.com>
|
Build |
|
The failing test doesn't seem to have to do with bloom filters. 112086:2026-03-26T07:45:35.5084196Z [2026-03-26T07:28:48.027Z] FAILED ../../../../integration_tests/src/main/python/hash_aggregate_test.py::test_hash_grpby_sum[KUDO-{'spark.rapids.sql.variableFloatAgg.enabled': 'true', 'spark.rapids.sql.castStringToFloat.enabled': 'true'}-[('a', Long), ('b', Integer), ('c', Long)]][DATAGEN_SEED=1774509770, TZ=UTC, INJECT_OOM, IGNORE_ORDER, INCOMPAT, APPROXIMATE_FLOAT, ALLOW_NON_GPU(HashAggregateExec,AggregateExpression,UnscaledValue,MakeDecimal,AttributeReference,Alias,Sum,Count,Max,Min,Average,Cast,StddevPop,StddevSamp,VariancePop,VarianceSamp,NormalizeNaNAndZero,GreaterThan,Literal,If,EqualTo,First,SortAggregateExec,Coalesce,IsNull,EqualNullSafe,PivotFirst,GetArrayItem,ShuffleExchangeExec,HashPartitioning)] - py4j.protocol.Py4JJavaError: An error occurred while calling o692.collectToPython.
➜ ~/workspace/dev/spark-rapids/bloomfilters-v2/deletemeI might give this another crank. |
|
Build |
|
NOTE: release/26.04 has been created from main. Please retarget your PR to release/26.04 if it should be included in the release. |
|
Build |
1 similar comment
|
Build |
|
@jihoonson, @amahussein, et al: I have updated the performance-testing section of the description with my tests. I have also rebased this to Please let me know if this doesn't look agreeable. |
|
build |
amahussein
left a comment
There was a problem hiding this comment.
Thanks @mythrocks !
LGTM
Fixes #14148.
Depends on NVIDIA/spark-rapids-jni#4360.
Description
This commit adds support for the new BloomFilter v2 format that was added in Apache Spark 4.1.1 (via apache/spark@a08d8b0).
Background
The v1 format used INT32s for bit index calculation. When the number of items in the bloom-filter approaches INT_MAX, one sees a higher rate of collisions. The v2 format uses INT64 values for bit index calculations, allowing the full bit space to be addressed. Apparently, this reduces the false positive rates for large filters.
Before the fix in this current PR was applied to spark-rapids, only certain bloom filter join tests would fail against Apache Spark 4.1.1; specifically:
test_bloom_filter_join_cpu_build, where the bloom filter is built on CPU and then probed on GPU. This failed because the CPU would produce a v2 filter that couldn't be treated as a v1 format on GPU.test_bloom_filter_join_split_cpu_build, where the bloom filter is partially aggregated on CPU, then merged on GPU. Again, the GPU-side merging expected v1 format, while the CPU produced v2.Note that
test_bloom_filter_join_cpu_probeandtest_bloom_filter_joindid not actually fail on 4.1.1. That is because:test_bloom_filter_join_cpu_probetests CPU probing, which supports v1 and v2 flexibly.test_bloom_filter_jointests both the build and probe jointly being either on CPU, or GPU. The CPU ran v2 format, the GPU ran v1. Both produce the same query results, albeit with different formats.Effect
The fix in this commit allows for v1 and v2 formats to be jointly supported on GPU, depending on the Spark version.
Documentation
The change is not strictly user-facing. The bloom filter involved is an implementation detail, constructed in the background, and not exposed to the user. The user should see performance improvement for joins in the INT_MAX cases, but nothing else. No documentation need be updated.
Tests
The existing bloom filter test cases should really cover this.
test_bloom_filter_join_cpu_buildandtest_bloom_filter_join_split_cpu_buildhave been re-enabled for Spark version >= 4.1.1.Performance Tests
This has been tested by performing joins on two synthetically generated tables, as follows:
factwas generated with ~2 billion rows (11GB), consisting two fields:-
id:BIGINT: 10 distinct values, uniformly distributed (195M rows each), except one value (10) that only has one row.-
str:STRING: Immaterial baggage.dimwas generated with ~2 billion rows (8GB), with exactly one field:-
id:BIGINT: Each row is distinct, approachingInt.MAX_INT.The following query was run:
Predicate pushdown for Parquet was also disabled (via
set spark.sql.parquet.filterPushdown=false), to avoid obfuscation.The join predicate was chosen so that the bloom-filter created would be quite large. The expected result is that there would be just 1 result row from the join
The test was run against the
26.02release version and the26.04release candidate, on Spark 4.1.1 and Spark 3.5.x.It was observed that the
26.04version performed about identically with26.02. The query took about 33 seconds (with the26.04"fixed" version, on average, being a few 100ms faster than26.02.)As an aside, tests were also run with the predicate chosen to select
id=10:Again, it was observed that the fixed version performed about the same as the
26.02, i.e. about 350ms.Checklists
(Please explain in the PR description how the new code paths are tested, such as names of the new/existing tests that cover them.)