From ac2540a42c2e95037112e2b3126cebef308c7150 Mon Sep 17 00:00:00 2001 From: MithunR Date: Mon, 9 Mar 2026 21:55:11 -0700 Subject: [PATCH 01/29] WIP: BloomFilter v2 support. Signed-off-by: MithunR --- src/main/cpp/benchmarks/bloom_filter.cu | 51 ++- src/main/cpp/src/BloomFilterJni.cpp | 7 +- src/main/cpp/src/bloom_filter.cu | 411 +++++++++++------- src/main/cpp/src/bloom_filter.hpp | 64 ++- src/main/cpp/tests/bloom_filter.cu | 180 +++++++- .../nvidia/spark/rapids/jni/BloomFilter.java | 106 +++-- .../spark/rapids/jni/BloomFilterTest.java | 47 +- 7 files changed, 614 insertions(+), 252 deletions(-) diff --git a/src/main/cpp/benchmarks/bloom_filter.cu b/src/main/cpp/benchmarks/bloom_filter.cu index ddbb680167..55daef8e45 100644 --- a/src/main/cpp/benchmarks/bloom_filter.cu +++ b/src/main/cpp/benchmarks/bloom_filter.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023-2024, NVIDIA CORPORATION. + * Copyright (c) 2023-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,41 @@ #include #include -static void bloom_filter_put(nvbench::state& state) +static void bloom_filter_put_v1(nvbench::state& state) +{ + constexpr int num_rows = 150'000'000; + constexpr int num_hashes = 3; + + cudf::size_type const bloom_filter_bytes = state.get_int64("bloom_filter_bytes"); + cudf::size_type const bloom_filter_longs = bloom_filter_bytes / sizeof(int64_t); + auto bloom_filter = spark_rapids_jni::bloom_filter_create( + spark_rapids_jni::bloom_filter_version_1, num_hashes, bloom_filter_longs); + + data_profile_builder builder; + builder.no_validity(); + auto const src = create_random_table({{cudf::type_id::INT64}}, row_count{num_rows}, builder); + auto const input = spark_rapids_jni::xxhash64(*src); + + auto const stream = cudf::get_default_stream(); + state.set_cuda_stream(nvbench::make_cuda_stream_view(stream.value())); + state.exec(nvbench::exec_tag::timer | nvbench::exec_tag::sync, + [&](nvbench::launch& launch, auto& timer) { + timer.start(); + spark_rapids_jni::bloom_filter_put(*bloom_filter, *input); + stream.synchronize(); + timer.stop(); + }); + + size_t const bytes_read = num_rows * sizeof(int64_t); + size_t const bytes_written = num_rows * sizeof(cudf::bitmask_type) * num_hashes; + auto const time = state.get_summary("nv/cold/time/gpu/mean").get_float64("value"); + state.add_element_count(std::size_t{num_rows}, "Rows Inserted"); + state.add_global_memory_reads(bytes_read, "Bytes read"); + state.add_global_memory_writes(bytes_written, "Bytes written"); + state.add_element_count(static_cast(bytes_written) / time, "Write bytes/sec"); +} + +static void bloom_filter_put_v2(nvbench::state& state) { constexpr int num_rows = 150'000'000; constexpr int num_hashes = 3; @@ -30,9 +64,9 @@ static void bloom_filter_put(nvbench::state& state) // create the bloom filter cudf::size_type const bloom_filter_bytes = state.get_int64("bloom_filter_bytes"); cudf::size_type const bloom_filter_longs = bloom_filter_bytes / sizeof(int64_t); - auto bloom_filter = spark_rapids_jni::bloom_filter_create(num_hashes, bloom_filter_longs); + auto bloom_filter = spark_rapids_jni::bloom_filter_create( + spark_rapids_jni::bloom_filter_version_2, num_hashes, bloom_filter_longs); - // create a column of hashed values data_profile_builder builder; builder.no_validity(); auto const src = create_random_table({{cudf::type_id::INT64}}, row_count{num_rows}, builder); @@ -57,7 +91,12 @@ static void bloom_filter_put(nvbench::state& state) state.add_element_count(static_cast(bytes_written) / time, "Write bytes/sec"); } -NVBENCH_BENCH(bloom_filter_put) - .set_name("Bloom Filter Put") +NVBENCH_BENCH(bloom_filter_put_v1) + .set_name("Bloom Filter Put V1") + .add_int64_axis("bloom_filter_bytes", + {512 * 1024, 1024 * 1024, 2 * 1024 * 1024, 4 * 1024 * 1024, 8 * 1024 * 1024}); + +NVBENCH_BENCH(bloom_filter_put_v2) + .set_name("Bloom Filter Put V2") .add_int64_axis("bloom_filter_bytes", {512 * 1024, 1024 * 1024, 2 * 1024 * 1024, 4 * 1024 * 1024, 8 * 1024 * 1024}); diff --git a/src/main/cpp/src/BloomFilterJni.cpp b/src/main/cpp/src/BloomFilterJni.cpp index bbbe525a40..ad579f2128 100644 --- a/src/main/cpp/src/BloomFilterJni.cpp +++ b/src/main/cpp/src/BloomFilterJni.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023-2025, NVIDIA CORPORATION. + * Copyright (c) 2023-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,14 +23,15 @@ extern "C" { JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_BloomFilter_creategpu( - JNIEnv* env, jclass, jint numHashes, jlong bloomFilterBits) + JNIEnv* env, jclass, jint version, jint numHashes, jlong bloomFilterBits, jint seed) { JNI_TRY { cudf::jni::auto_set_device(env); int bloom_filter_longs = static_cast((bloomFilterBits + 63) / 64); - auto bloom_filter = spark_rapids_jni::bloom_filter_create(numHashes, bloom_filter_longs); + auto bloom_filter = + spark_rapids_jni::bloom_filter_create(version, numHashes, bloom_filter_longs, seed); return reinterpret_cast(bloom_filter.release()); } JNI_CATCH(env, 0); diff --git a/src/main/cpp/src/bloom_filter.cu b/src/main/cpp/src/bloom_filter.cu index 3a6c965764..57a86eabf9 100644 --- a/src/main/cpp/src/bloom_filter.cu +++ b/src/main/cpp/src/bloom_filter.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023-2025, NVIDIA CORPORATION. + * Copyright (c) 2023-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,6 +37,7 @@ #include #include +#include namespace spark_rapids_jni { @@ -44,27 +45,30 @@ namespace { using bloom_hash_type = spark_rapids_jni::murmur_hash_value_type; -__device__ inline std::pair gpu_get_hash_mask( - bloom_hash_type h, cudf::size_type bloom_filter_bits) +inline int32_t byte_swap_int32(int32_t val) { - // https://github.com/apache/spark/blob/7bfbeb62cb1dc58d81243d22888faa688bad8064/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java#L94 - auto const index = (h < 0 ? ~h : h) % static_cast(bloom_filter_bits); + return static_cast(bswap_32(static_cast(val))); +} - // spark expects serialized bloom filters to be big endian (64 bit longs), - // so we will produce a big endian buffer. if spark CPU ends up consuming it, it can do so - // directly. the gpu bloom filter implementation will always be handed the same serialized buffer. - auto const word_index = cudf::word_index(index) ^ 0x1; // word-swizzle within 64 bit long - auto const bit_index = - cudf::intra_word_index(index) ^ 0x18; // byte swizzle within the 32 bit word +// Given a non-negative bit position within the bloom filter, compute +// the 32-bit word index and bitmask. Handles big-endian swizzle so +// the GPU buffer is directly compatible with Spark's serialized format. +__device__ inline std::pair gpu_bit_to_word_mask(int64_t bit_pos) +{ + auto const word_index = (bit_pos / 32) ^ 0x1; + auto const bit_index = static_cast(bit_pos % 32) ^ 0x18; - return {word_index, (1 << bit_index)}; + return {word_index, static_cast(1u << bit_index)}; } -template +// V1: combined hash is 32-bit int, loop from 1..num_hashes +// V2: combined hash is 64-bit long, seeded with h1*INT32_MAX, loop from 0..num_hashes-1 +template CUDF_KERNEL void gpu_bloom_filter_put(cudf::bitmask_type* const bloom_filter, - cudf::size_type bloom_filter_bits, + int64_t bloom_filter_bits, cudf::column_device_view input, - cudf::size_type num_hashes) + cudf::size_type num_hashes, + int32_t seed) { size_t const tid = threadIdx.x + blockIdx.x * blockDim.x; if (tid >= input.size()) { return; } @@ -73,178 +77,245 @@ CUDF_KERNEL void gpu_bloom_filter_put(cudf::bitmask_type* const bloom_filter, if (!input.is_valid(tid)) { return; } } - // https://github.com/apache/spark/blob/7bfbeb62cb1dc58d81243d22888faa688bad8064/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java#L87 - auto const el = input.element(tid); - bloom_hash_type const h1 = MurmurHash3_32(0)(el); + auto const el = input.element(tid); + // V1 has no seed in the format; use 0. V2 uses the stored seed. + int32_t const hash_seed = (Version == 1) ? 0 : seed; + bloom_hash_type const h1 = MurmurHash3_32(hash_seed)(el); bloom_hash_type const h2 = MurmurHash3_32(h1)(el); - // set a bit in the bloom filter for each hashed value - for (auto idx = 1; idx <= num_hashes; idx++) { - bloom_hash_type combined_hash = h1 + (idx * h2); - - auto const [word_index, mask] = gpu_get_hash_mask(combined_hash, bloom_filter_bits); - atomicOr(bloom_filter + word_index, mask); + if constexpr (Version == 1) { + // https://github.com/apache/spark/blob/5075ea6a85f3f1689766cf08a7d5b2ce500be1fb/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java#L38 + // This is the original V1 hash algorithm from Spark. + for (auto idx = 1; idx <= num_hashes; idx++) { + bloom_hash_type combined_hash = h1 + (idx * h2); + auto const bit_pos = static_cast( + (combined_hash < 0 ? ~combined_hash : combined_hash) % + static_cast(bloom_filter_bits)); + auto const [word_index, mask] = gpu_bit_to_word_mask(bit_pos); + atomicOr(bloom_filter + word_index, mask); + } + } else { + // https://github.com/apache/spark/blob/5075ea6a85f3f1689766cf08a7d5b2ce500be1fb/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImplV2.java#L63 + int64_t combined_hash = static_cast(h1) * static_cast(std::numeric_limits::max()); + for (int idx = 0; idx < num_hashes; idx++) { + combined_hash += h2; + int64_t combined_index = combined_hash < 0 ? ~combined_hash : combined_hash; + auto const bit_pos = combined_index % bloom_filter_bits; + auto const [word_index, mask] = gpu_bit_to_word_mask(bit_pos); + atomicOr(bloom_filter + word_index, mask); + } } } +template struct bloom_probe_functor { cudf::bitmask_type const* const bloom_filter; - cudf::size_type const bloom_filter_bits; + int64_t const bloom_filter_bits; cudf::size_type const num_hashes; + int32_t const seed; __device__ bool operator()(int64_t input) const { - // https://github.com/apache/spark/blob/7bfbeb62cb1dc58d81243d22888faa688bad8064/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java#L110 - // this code could be combined with the very similar code in gpu_bloom_filter_put. i've - // left it this way since the expectation is that we will early out fairly often, whereas - // in the build case we never early out so doing the additional if() return check is pointless. - bloom_hash_type const h1 = MurmurHash3_32(0)(input); + int32_t const hash_seed = (Version == 1) ? 0 : seed; + bloom_hash_type const h1 = MurmurHash3_32(hash_seed)(input); bloom_hash_type const h2 = MurmurHash3_32(h1)(input); - // set a bit in the bloom filter for each hashed value - for (auto idx = 1; idx <= num_hashes; idx++) { - bloom_hash_type combined_hash = h1 + (idx * h2); - auto const [word_index, mask] = gpu_get_hash_mask(combined_hash, bloom_filter_bits); - if (!(bloom_filter[word_index] & mask)) { return false; } + if constexpr (Version == 1) { + for (auto idx = 1; idx <= num_hashes; idx++) { + bloom_hash_type combined_hash = h1 + (idx * h2); + auto const bit_pos = static_cast( + (combined_hash < 0 ? ~combined_hash : combined_hash) % + static_cast(bloom_filter_bits)); + auto const [word_index, mask] = gpu_bit_to_word_mask(bit_pos); + if (!(bloom_filter[word_index] & mask)) { return false; } + } + } else { + int64_t combined_hash = + static_cast(h1) * static_cast(std::numeric_limits::max()); + for (int idx = 0; idx < num_hashes; idx++) { + combined_hash += h2; + int64_t combined_index = combined_hash < 0 ? ~combined_hash : combined_hash; + auto const bit_pos = combined_index % bloom_filter_bits; + auto const [word_index, mask] = gpu_bit_to_word_mask(bit_pos); + if (!(bloom_filter[word_index] & mask)) { return false; } + } } return true; } }; -constexpr int spark_bloom_filter_version = 1; - -bloom_filter_header byte_swap_header(bloom_filter_header const& header) -{ - return {static_cast(bswap_32(static_cast(header.version))), - static_cast(bswap_32(static_cast(header.num_hashes))), - static_cast(bswap_32(static_cast(header.num_longs)))}; -} - -/* - Pack a bloom_filter_header (passed as little endian) into a bloom filter buffer. -*/ void pack_bloom_filter_header(cudf::device_span buf, bloom_filter_header const& header, - rmm::cuda_stream_view stream) + rmm::cuda_stream_view stream, + int32_t seed) { - // swizzle to big endian - bloom_filter_header header_swizzled = byte_swap_header(header); - - // header goes at the top of the buffer - cudaMemcpyAsync( - buf.data(), &header_swizzled, bloom_filter_header_size, cudaMemcpyHostToDevice, stream); + if (header.version == bloom_filter_version_1) { + bloom_filter_header_v1 raw = {byte_swap_int32(header.version), + byte_swap_int32(header.num_hashes), + byte_swap_int32(header.num_longs)}; + cudaMemcpyAsync( + buf.data(), &raw, bloom_filter_header_v1_size_bytes, cudaMemcpyHostToDevice, stream); + } else { + bloom_filter_header_v2 raw = {byte_swap_int32(header.version), + byte_swap_int32(header.num_hashes), + byte_swap_int32(seed), + byte_swap_int32(header.num_longs)}; + cudaMemcpyAsync( + buf.data(), &raw, bloom_filter_header_v2_size_bytes, cudaMemcpyHostToDevice, stream); + } } /* - Unpack bloom filter information from a bloom filter buffer. returns the header, a span - representing the bloom filter bits and the number of bloom filter bits. + Unpack bloom filter information from a Spark-format bloom filter buffer (big-endian). + Accepts both V1 and V2; the version is read from the first 4 bytes. + + @return A std::tuple of four elements: + - Element 0 (bloom_filter_header): Decoded header with version, num_hashes, and num_longs. + Does not include seed; that is returned separately as element 3. + - Element 1 (cudf::device_span): Device span over the bloom filter bit + array. Length is header.num_longs * 2 (number of 32-bit words). The data start + immediately after the version-specific header in the buffer. + - Element 2 (int64_t): Total number of bits in the bloom filter, i.e. header.num_longs * 64. + - Element 3 (int32_t): Hash seed used when building/probing the filter. Zero for V1; + for V2, the value stored in the serialized header. */ -std::tuple, int> unpack_bloom_filter( - cudf::device_span bloom_filter, rmm::cuda_stream_view stream) +std::tuple, int64_t, int32_t> +unpack_bloom_filter(cudf::device_span bloom_filter, rmm::cuda_stream_view stream) { - CUDF_EXPECTS(bloom_filter.size() >= bloom_filter_header_size, + CUDF_EXPECTS(bloom_filter.size() >= static_cast(bloom_filter_header_v1_size_bytes), "Encountered truncated bloom filter"); - bloom_filter_header header_swizzled; - cudaMemcpyAsync(&header_swizzled, - bloom_filter.data(), - bloom_filter_header_size, - cudaMemcpyDeviceToHost, - stream); + int32_t raw_ints[4] = {}; + auto const read_size = + std::min(bloom_filter.size(), static_cast(bloom_filter_header_v2_size_bytes)); + cudaMemcpyAsync(raw_ints, bloom_filter.data(), read_size, cudaMemcpyDeviceToHost, stream); stream.synchronize(); - // swizzle to little endian. - bloom_filter_header header = byte_swap_header(header_swizzled); + int const version = byte_swap_int32(raw_ints[0]); + CUDF_EXPECTS(version == bloom_filter_version_1 || version == bloom_filter_version_2, + "Unexpected bloom filter version"); + + auto const hdr_size = bloom_filter_header_size_for_version(version); + CUDF_EXPECTS(bloom_filter.size() >= static_cast(hdr_size), + "Encountered truncated bloom filter header"); - auto const bloom_filter_bits = header.num_longs * 64; + bloom_filter_header header; + header.version = version; + header.num_hashes = byte_swap_int32(raw_ints[1]); + header.num_longs = (version == bloom_filter_version_1) ? byte_swap_int32(raw_ints[2]) + : byte_swap_int32(raw_ints[3]); + + int32_t seed = 0; + if (version == bloom_filter_version_2) { + CUDF_EXPECTS(read_size >= static_cast(bloom_filter_header_v2_size_bytes), + "Encountered truncated V2 bloom filter header"); + seed = byte_swap_int32(raw_ints[2]); + } + + auto const bloom_filter_bits = static_cast(header.num_longs) * 64; auto const num_bitmask_words = static_cast(header.num_longs) * 2; - CUDF_EXPECTS(header.version == 1, "Unexpected bloom filter version"); CUDF_EXPECTS(bloom_filter_bits > 0, "Invalid empty bloom filter size"); CUDF_EXPECTS(num_bitmask_words == cudf::num_bitmask_words(bloom_filter_bits), "Bloom filter bit/length mismatch"); return {header, - {reinterpret_cast(bloom_filter.data() + bloom_filter_header_size), + {reinterpret_cast(bloom_filter.data() + hdr_size), num_bitmask_words}, - bloom_filter_bits}; + bloom_filter_bits, + seed}; } -/* - Unpack bloom filter information a from column_view that wraps a single bloom filter buffer. - returns the header, a span representing the bloom filter bits and the number of bloom filter bits. -*/ -std::tuple, int> unpack_bloom_filter( - cudf::column_view const& bloom_filter, rmm::cuda_stream_view stream) +std::tuple, int64_t, int32_t> +unpack_bloom_filter(cudf::column_view const& bloom_filter, rmm::cuda_stream_view stream) { - // the const_cast is necessary because list_scalar does not provide a mutable_view() function. return unpack_bloom_filter( cudf::device_span{const_cast(bloom_filter.data()), static_cast(bloom_filter.size())}, stream); } +std::tuple, int64_t, int32_t> +unpack_bloom_filter(cudf::device_span bloom_filter, rmm::cuda_stream_view stream) +{ + return unpack_bloom_filter( + cudf::device_span{const_cast(bloom_filter.data()), + bloom_filter.size()}, + stream); +} + +/* + Device functor used by bloom_filter_merge to verify every filter in the list has the same + header. raw_header holds the reference header in big-endian form (as in the serialized buffer). +*/ struct bloom_filter_same { - bloom_filter_header header; + /// Reference header: big-endian int32s. + /// V1 uses [0..2]: version, num_hashes, num_longs. + /// V2 uses [0..3]: version, num_hashes, seed, num_longs. + int32_t raw_header[4]; + int header_field_count; cudf::detail::lists_column_device_view ldv; cudf::size_type stride; bool __device__ operator()(cudf::size_type i) { - bloom_filter_header const* a = - reinterpret_cast(ldv.child().data() + stride * i); - return (a->version == header.version) && (a->num_hashes == header.num_hashes) && - (a->num_longs == header.num_longs); + auto const* a = reinterpret_cast(ldv.child().data() + stride * i); + for (int j = 0; j < header_field_count; j++) { + if (a[j] != raw_header[j]) return false; + } + return true; } }; /* Returns a pair indicating: - - size of the bloom filter bits - - total size of the bloom filter buffer (header + bits) + - first: size in bytes of the bloom filter bit array (num_longs * 8). + - second: total size in bytes of the serialized bloom filter buffer (header + bit array). + Uses the version-specific header size. */ -std::pair get_bloom_filter_stride(int bloom_filter_longs) +std::pair get_bloom_filter_stride(int version, int bloom_filter_longs) { - auto const bloom_filter_size = (bloom_filter_longs * sizeof(int64_t)); - auto const buf_size = bloom_filter_header_size + bloom_filter_size; + auto const bloom_filter_size = static_cast(bloom_filter_longs) * sizeof(int64_t); + auto const hdr_size = bloom_filter_header_size_for_version(version); + auto const buf_size = hdr_size + bloom_filter_size; return {bloom_filter_size, buf_size}; } } // anonymous namespace /* - Creates a new bloom filter. The bloom filter is stored using a cudf list_scalar with a specific - structure. - - The data type is int8, representing a generic buffer - - The first 12 bytes of the buffer are a bloom_filter_header - - The remaining bytes are the bloom filter buffer itself. The length of the remaining bytes must - be bloom_filter_header.num_longs * 8 - - All of the data in the buffer is stored in big-endian format. unpack_bloom_filter() unpacks - this into a usable form, and pack_bloom_filter_header packs new data into the output (big-endian) - form. + Creates a new bloom filter. The result is stored in a cudf list_scalar with a single + UINT8 buffer of the following form (all header and bit data in big-endian for Spark): + + - V1: first 12 bytes = bloom_filter_header_v1 (version, num_hashes, num_longs). + - V2: first 16 bytes = bloom_filter_header_v2 (version, num_hashes, seed, num_longs). + - Remaining bytes: bloom_filter_longs * 8, the bit array. Initialized to zero. + + unpack_bloom_filter() reads this layout; pack_bloom_filter_header() writes the header. */ -std::unique_ptr bloom_filter_create(int num_hashes, +std::unique_ptr bloom_filter_create(int version, + int num_hashes, int bloom_filter_longs, + int seed, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { - auto [bloom_filter_size, buf_size] = get_bloom_filter_stride(bloom_filter_longs); + CUDF_EXPECTS(version == bloom_filter_version_1 || version == bloom_filter_version_2, + "Bloom filter version must be 1 or 2"); + + auto [bloom_filter_size, buf_size] = get_bloom_filter_stride(version, bloom_filter_longs); + auto const hdr_size = bloom_filter_header_size_for_version(version); - // build the packed bloom filter buffer ------------------ rmm::device_buffer buf{static_cast(buf_size), stream, mr}; - // pack the header - bloom_filter_header header{spark_bloom_filter_version, num_hashes, bloom_filter_longs}; + bloom_filter_header header{version, num_hashes, bloom_filter_longs}; pack_bloom_filter_header( - {reinterpret_cast(buf.data()), static_cast(buf_size)}, header, stream); - // memset the bloom filter bits to 0. + {reinterpret_cast(buf.data()), static_cast(buf_size)}, header, stream, + (version == bloom_filter_version_1 ? 0 : seed)); - CUDF_CUDA_TRY(cudaMemsetAsync(reinterpret_cast(buf.data()) + bloom_filter_header_size, - 0, - bloom_filter_size, - stream)); + CUDF_CUDA_TRY(cudaMemsetAsync( + reinterpret_cast(buf.data()) + hdr_size, 0, bloom_filter_size, stream)); - // create the 1-row list column and move it into a scalar. return std::make_unique( cudf::column( cudf::data_type{cudf::type_id::UINT8}, buf_size, std::move(buf), rmm::device_buffer{}, 0), @@ -257,21 +328,34 @@ void bloom_filter_put(cudf::list_scalar& bloom_filter, cudf::column_view const& input, rmm::cuda_stream_view stream) { - // unpack the bloom filter - auto [header, buffer, bloom_filter_bits] = unpack_bloom_filter(bloom_filter.view(), stream); - CUDF_EXPECTS(bloom_filter.view().size() == (buffer.size() * 4) + bloom_filter_header_size, - "Encountered invalid/mismatched bloom filter buffer data"); + auto [header, buffer, bloom_filter_bits, seed] = unpack_bloom_filter(bloom_filter.view(), stream); + auto const hdr_size = bloom_filter_header_size_for_version(header.version); + CUDF_EXPECTS( + bloom_filter.view().size() == static_cast((buffer.size() * 4) + hdr_size), + "Encountered invalid/mismatched bloom filter buffer data"); constexpr int block_size = 256; auto grid = cudf::detail::grid_1d{input.size(), block_size, 1}; auto d_input = cudf::column_device_view::create(input); - if (input.has_nulls()) { - gpu_bloom_filter_put<<>>( - buffer.data(), bloom_filter_bits, *d_input, header.num_hashes); + auto launch = [&](auto version_tag, auto nullable_tag) { + gpu_bloom_filter_put + <<>>( + buffer.data(), bloom_filter_bits, *d_input, header.num_hashes, seed); + }; + + if (header.version == bloom_filter_version_2) { + if (input.has_nulls()) { + launch(std::integral_constant{}, std::true_type{}); + } else { + launch(std::integral_constant{}, std::false_type{}); + } } else { - gpu_bloom_filter_put<<>>( - buffer.data(), bloom_filter_bits, *d_input, header.num_hashes); + if (input.has_nulls()) { + launch(std::integral_constant{}, std::true_type{}); + } else { + launch(std::integral_constant{}, std::false_type{}); + } } } @@ -279,39 +363,56 @@ std::unique_ptr bloom_filter_merge(cudf::column_view const& b rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { - // unpack the bloom filter cudf::lists_column_view lcv(bloom_filters); - // since the list child column is just a bunch of packed bloom filter buffers one after another, - // we can just pass the base data pointer to unpack the first one. - auto [header, buffer, bloom_filter_bits] = unpack_bloom_filter(lcv.child(), stream); - // NOTE: since this is a column containing multiple bloom filters, the expected total size is the - // size for one bloom filter times the number of rows (bloom_filters.size()) + // The list child column is a concatenation of packed bloom filter buffers (header + bits) + // one after another. Unpack the first buffer to get the header, bit span, bit count, and + // seed; we use these to validate total size and to build the merged output. + auto [header, buffer, bloom_filter_bits, seed] = unpack_bloom_filter(lcv.child(), stream); + auto const hdr_size = bloom_filter_header_size_for_version(header.version); CUDF_EXPECTS( - lcv.child().size() == ((buffer.size() * 4) + bloom_filter_header_size) * bloom_filters.size(), + lcv.child().size() == static_cast( + ((buffer.size() * 4) + hdr_size) * bloom_filters.size()), "Encountered invalid/mismatched bloom filter buffer data"); - auto [bloom_filter_size, buf_size] = get_bloom_filter_stride(header.num_longs); + auto [bloom_filter_size, buf_size] = get_bloom_filter_stride(header.version, header.num_longs); - // validate all the bloom filters are the same - auto dv = cudf::column_device_view::create(bloom_filters); - bloom_filter_header header_swizzled = byte_swap_header(header); - CUDF_EXPECTS(thrust::all_of(rmm::exec_policy(cudf::get_default_stream()), - thrust::make_counting_iterator(1), - thrust::make_counting_iterator(bloom_filters.size()), - bloom_filter_same{header_swizzled, *dv, buf_size}), - "Mismatch of bloom filter parameters"); + int32_t raw_hdr[4] = {}; + int header_field_count = 0; + if (header.version == bloom_filter_version_1) { + raw_hdr[0] = byte_swap_int32(header.version); + raw_hdr[1] = byte_swap_int32(header.num_hashes); + raw_hdr[2] = byte_swap_int32(header.num_longs); + header_field_count = 3; + } else { + raw_hdr[0] = byte_swap_int32(header.version); + raw_hdr[1] = byte_swap_int32(header.num_hashes); + raw_hdr[2] = byte_swap_int32(seed); + raw_hdr[3] = byte_swap_int32(header.num_longs); + header_field_count = 4; + } + + auto dv = cudf::column_device_view::create(bloom_filters); + CUDF_EXPECTS( + thrust::all_of( + rmm::exec_policy(cudf::get_default_stream()), + thrust::make_counting_iterator(1), + thrust::make_counting_iterator(bloom_filters.size()), + bloom_filter_same{ + {raw_hdr[0], raw_hdr[1], raw_hdr[2], raw_hdr[3]}, + header_field_count, + *dv, + static_cast(buf_size)}), + "Mismatch of bloom filter parameters"); - // build the packed bloom filter buffer ------------------ rmm::device_buffer buf{static_cast(buf_size), stream, mr}; pack_bloom_filter_header( - {reinterpret_cast(buf.data()), static_cast(buf_size)}, header, stream); + {reinterpret_cast(buf.data()), static_cast(buf_size)}, header, stream, seed); - auto src = lcv.child().data() + bloom_filter_header_size; + auto src = lcv.child().data() + hdr_size; auto dst = reinterpret_cast(reinterpret_cast(buf.data()) + - bloom_filter_header_size); + hdr_size); - // bitwise-or all the bloom filters together cudf::size_type num_words = header.num_longs * 2; thrust::transform( rmm::exec_policy(stream), @@ -342,12 +443,12 @@ std::unique_ptr bloom_filter_probe(cudf::column_view const& input, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { - // unpack the bloom filter - auto [header, buffer, bloom_filter_bits] = unpack_bloom_filter(bloom_filter, stream); - CUDF_EXPECTS(bloom_filter.size() == (buffer.size() * 4) + bloom_filter_header_size, - "Encountered invalid/mismatched bloom filter buffer data"); + auto [header, buffer, bloom_filter_bits, seed] = unpack_bloom_filter(bloom_filter, stream); + auto const hdr_size = bloom_filter_header_size_for_version(header.version); + CUDF_EXPECTS( + bloom_filter.size() == static_cast((buffer.size() * 4) + hdr_size), + "Encountered invalid/mismatched bloom filter buffer data"); - // duplicate input mask auto out = cudf::make_fixed_width_column(cudf::data_type{cudf::type_id::BOOL8}, input.size(), cudf::copy_bitmask(input), @@ -355,13 +456,21 @@ std::unique_ptr bloom_filter_probe(cudf::column_view const& input, stream, mr); - thrust::transform( - rmm::exec_policy(stream), - input.begin(), - input.end(), - out->mutable_view().begin(), - bloom_probe_functor{ - buffer.data(), static_cast(bloom_filter_bits), header.num_hashes}); + if (header.version == bloom_filter_version_2) { + thrust::transform(rmm::exec_policy(stream), + input.begin(), + input.end(), + out->mutable_view().begin(), + bloom_probe_functor<2>{buffer.data(), bloom_filter_bits, + header.num_hashes, seed}); + } else { + thrust::transform(rmm::exec_policy(stream), + input.begin(), + input.end(), + out->mutable_view().begin(), + bloom_probe_functor<1>{buffer.data(), bloom_filter_bits, + header.num_hashes, seed}); + } return out; } diff --git a/src/main/cpp/src/bloom_filter.hpp b/src/main/cpp/src/bloom_filter.hpp index fd305c191c..7aee087f81 100644 --- a/src/main/cpp/src/bloom_filter.hpp +++ b/src/main/cpp/src/bloom_filter.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023-2024, NVIDIA CORPORATION. + * Copyright (c) 2023-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,30 +25,70 @@ namespace spark_rapids_jni { -// included only for testing purposes +constexpr int bloom_filter_version_1 = 1; +constexpr int bloom_filter_version_2 = 2; + +// V1 header: [version, num_hashes, num_longs] — 12 bytes +struct bloom_filter_header_v1 { + int version; + int num_hashes; + int num_longs; +}; +constexpr int bloom_filter_header_v1_size_bytes = sizeof(bloom_filter_header_v1); + +// V2 header: [version, num_hashes, seed, num_longs] — 16 bytes +struct bloom_filter_header_v2 { + int version; + int num_hashes; + int seed; + int num_longs; +}; +constexpr int bloom_filter_header_v2_size_bytes = sizeof(bloom_filter_header_v2); + +// Unified header used internally after unpacking from either format. +// Seed is not stored here; for V2 it is returned separately from unpack_bloom_filter. struct bloom_filter_header { int version; int num_hashes; int num_longs; }; -constexpr int bloom_filter_header_size = sizeof(bloom_filter_header); + +inline int bloom_filter_header_size_for_version(int version) +{ + return version == bloom_filter_version_2 ? bloom_filter_header_v2_size_bytes + : bloom_filter_header_v1_size_bytes; +} /** - * @brief Create an empty bloom filter of the specified size in (64 bit) longs with using - * the specified number of hashes to be used when operating on the filter. + * @brief Create an empty bloom filter of the specified size and parameters. * - * @param num_hashes The number of hashes to use. - * @param bloom_filter_longs Size of the bloom filter in bits. - * @param stream CUDA stream used for device memory operations and kernel launches. - * @param mr Device memory resource used to allocate the returned bloom filter's memory. - * @returns An list_scalar wrapping a packed Spark bloom_filter. + * The bloom filter is stored in a cudf list_scalar as a single byte buffer. The buffer + * layout is Spark-compatible: a version-specific header (big-endian) followed by the + * bit array. V1 header is 12 bytes (version, num_hashes, num_longs). V2 header is 16 bytes + * (version, num_hashes, seed, num_longs). The remainder of the buffer is + * bloom_filter_longs * 8 bytes of bit data, also written in big-endian order for Spark + * interchange. * + * @param version Bloom filter format version: 1 or 2 (e.g. bloom_filter_version_1, + * bloom_filter_version_2). V2 uses 64-bit hash indexing and supports a configurable + * seed for better distribution on large filters. + * @param num_hashes Number of bit positions set (and checked) per key. Derived from two + * underlying hashes; higher values reduce false positives but increase work per + * put/probe. + * @param bloom_filter_longs Size of the bit array in 64-bit longs; total bits = + * bloom_filter_longs * 64. + * @param seed Hash seed. Used only for V2; ignored for V1 (V1 always uses seed 0). + * @param stream CUDA stream for device memory operations and kernel launches. + * @param mr Device memory resource for allocating the bloom filter buffer. + * @returns A list_scalar wrapping the packed Spark-format bloom filter (header + bits). */ std::unique_ptr bloom_filter_create( + int version, int num_hashes, int bloom_filter_longs, - rmm::cuda_stream_view stream = cudf::get_default_stream(), - rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource_ref()); + int seed = 0, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource_ref()); /** * @brief Inserts input values into a bloom filter. diff --git a/src/main/cpp/tests/bloom_filter.cu b/src/main/cpp/tests/bloom_filter.cu index 1ba3520f0f..d242d95139 100644 --- a/src/main/cpp/tests/bloom_filter.cu +++ b/src/main/cpp/tests/bloom_filter.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,21 +31,26 @@ struct is_zero { __device__ bool operator()(cudf::bitmask_type w) { return w == 0; } }; -TEST_F(BloomFilterTest, Initialization) +// ======================== V1 Tests ======================== + +TEST_F(BloomFilterTest, InitializationV1) { constexpr int num_hashes = 3; std::vector expected{1, 2, 3}; for (size_t idx = 0; idx < expected.size(); idx++) { - auto bloom_filter = - spark_rapids_jni::bloom_filter_create(num_hashes, expected[idx], cudf::get_default_stream()); + auto bloom_filter = spark_rapids_jni::bloom_filter_create( + spark_rapids_jni::bloom_filter_version_1, num_hashes, expected[idx], 0, + cudf::get_default_stream()); auto const bloom_filter_size = expected[idx] * sizeof(int64_t); CUDF_EXPECTS( - bloom_filter->view().size() == spark_rapids_jni::bloom_filter_header_size + bloom_filter_size, + bloom_filter->view().size() == + spark_rapids_jni::bloom_filter_header_v1_size_bytes + bloom_filter_size, "Bloom filter not of expected size"); - auto bytes = (bloom_filter->view().data()) + spark_rapids_jni::bloom_filter_header_size; + auto bytes = + (bloom_filter->view().data()) + spark_rapids_jni::bloom_filter_header_v1_size_bytes; CUDF_EXPECTS( thrust::all_of( rmm::exec_policy(cudf::get_default_stream()), bytes, bytes + bloom_filter_size, is_zero{}), @@ -53,13 +58,14 @@ TEST_F(BloomFilterTest, Initialization) } } -TEST_F(BloomFilterTest, BuildAndProbe) +TEST_F(BloomFilterTest, BuildAndProbeV1) { auto stream = cudf::get_default_stream(); constexpr int bloom_filter_longs = (1024 * 1024); constexpr int num_hashes = 3; - auto bloom_filter = spark_rapids_jni::bloom_filter_create(num_hashes, bloom_filter_longs, stream); + auto bloom_filter = spark_rapids_jni::bloom_filter_create( + spark_rapids_jni::bloom_filter_version_1, num_hashes, bloom_filter_longs, 0, stream); cudf::test::fixed_width_column_wrapper input{20, 80, 100, 99, 47, -9, 234000000}; spark_rapids_jni::bloom_filter_put(*bloom_filter, input, stream); @@ -73,13 +79,14 @@ TEST_F(BloomFilterTest, BuildAndProbe) CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, *result); } -TEST_F(BloomFilterTest, BuildWithNullsAndProbe) +TEST_F(BloomFilterTest, BuildWithNullsAndProbeV1) { auto stream = cudf::get_default_stream(); constexpr int bloom_filter_longs = (1024 * 1024); constexpr int num_hashes = 3; - auto bloom_filter = spark_rapids_jni::bloom_filter_create(num_hashes, bloom_filter_longs, stream); + auto bloom_filter = spark_rapids_jni::bloom_filter_create( + spark_rapids_jni::bloom_filter_version_1, num_hashes, bloom_filter_longs, 0, stream); cudf::test::fixed_width_column_wrapper input{{20, 80, 100, 99, 47, -9, 234000000}, {0, 1, 1, 1, 0, 1, 1}}; @@ -94,18 +101,18 @@ TEST_F(BloomFilterTest, BuildWithNullsAndProbe) CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, *result); } -TEST_F(BloomFilterTest, BuildAndProbeWithNulls) +TEST_F(BloomFilterTest, BuildAndProbeWithNullsV1) { auto stream = cudf::get_default_stream(); constexpr int bloom_filter_longs = (1024 * 1024); constexpr int num_hashes = 3; cudf::test::fixed_width_column_wrapper input{20, 80, 100, 99, 47, -9, 234000000}; - auto bloom_filter = spark_rapids_jni::bloom_filter_create(num_hashes, bloom_filter_longs, stream); + auto bloom_filter = spark_rapids_jni::bloom_filter_create( + spark_rapids_jni::bloom_filter_version_1, num_hashes, bloom_filter_longs, 0, stream); spark_rapids_jni::bloom_filter_put(*bloom_filter, input, stream); - // probe cudf::test::fixed_width_column_wrapper probe{ {20, 80, 100, 99, 47, -9, 234000000, -10, 1, 2, 3}, {0, 0, 0, 1, 1, 1, 1, 0, 0, 1, 1}}; cudf::test::fixed_width_column_wrapper expected{{1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0}, @@ -117,11 +124,10 @@ TEST_F(BloomFilterTest, BuildAndProbeWithNulls) struct bloom_filter_stride_transform { int const stride; - cudf::size_type __device__ operator()(cudf::size_type i) { return i * stride; } }; -TEST_F(BloomFilterTest, ProbeMerged) +TEST_F(BloomFilterTest, ProbeMergedV1) { auto stream = cudf::get_default_stream(); constexpr int bloom_filter_longs = (1024 * 1024); @@ -129,20 +135,20 @@ TEST_F(BloomFilterTest, ProbeMerged) // column a cudf::test::fixed_width_column_wrapper col_a{20, 80, 100, 99, 47, -9, 234000000}; - auto bloom_filter_a = - spark_rapids_jni::bloom_filter_create(num_hashes, bloom_filter_longs, stream); + auto bloom_filter_a = spark_rapids_jni::bloom_filter_create( + spark_rapids_jni::bloom_filter_version_1, num_hashes, bloom_filter_longs, 0, stream); spark_rapids_jni::bloom_filter_put(*bloom_filter_a, col_a, stream); // column b cudf::test::fixed_width_column_wrapper col_b{100, 200, 300, 400}; - auto bloom_filter_b = - spark_rapids_jni::bloom_filter_create(num_hashes, bloom_filter_longs, stream); + auto bloom_filter_b = spark_rapids_jni::bloom_filter_create( + spark_rapids_jni::bloom_filter_version_1, num_hashes, bloom_filter_longs, 0, stream); spark_rapids_jni::bloom_filter_put(*bloom_filter_b, col_b, stream); // column c cudf::test::fixed_width_column_wrapper col_c{-100, -200, -300, -400}; - auto bloom_filter_c = - spark_rapids_jni::bloom_filter_create(num_hashes, bloom_filter_longs, stream); + auto bloom_filter_c = spark_rapids_jni::bloom_filter_create( + spark_rapids_jni::bloom_filter_version_1, num_hashes, bloom_filter_longs, 0, stream); spark_rapids_jni::bloom_filter_put(*bloom_filter_c, col_c, stream); // pre-merge the individual bloom filters. the merge function expects the inputs to be a single @@ -170,3 +176,135 @@ TEST_F(BloomFilterTest, ProbeMerged) CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, *result); } + +// ======================== V2 Tests ======================== + +TEST_F(BloomFilterTest, InitializationV2) +{ + constexpr int num_hashes = 3; + constexpr int seed = 42; + std::vector expected{1, 2, 3}; + + for (size_t idx = 0; idx < expected.size(); idx++) { + auto bloom_filter = spark_rapids_jni::bloom_filter_create( + spark_rapids_jni::bloom_filter_version_2, num_hashes, expected[idx], seed, + cudf::get_default_stream()); + + auto const bloom_filter_size = expected[idx] * sizeof(int64_t); + CUDF_EXPECTS( + bloom_filter->view().size() == + spark_rapids_jni::bloom_filter_header_v2_size_bytes + bloom_filter_size, + "Bloom filter not of expected size"); + + auto bytes = + (bloom_filter->view().data()) + spark_rapids_jni::bloom_filter_header_v2_size_bytes; + CUDF_EXPECTS( + thrust::all_of( + rmm::exec_policy(cudf::get_default_stream()), bytes, bytes + bloom_filter_size, is_zero{}), + "Bloom filter not initialized to 0"); + } +} + +TEST_F(BloomFilterTest, BuildAndProbeV2) +{ + auto stream = cudf::get_default_stream(); + constexpr int bloom_filter_longs = (1024 * 1024); + constexpr int num_hashes = 3; + + auto bloom_filter = spark_rapids_jni::bloom_filter_create( + spark_rapids_jni::bloom_filter_version_2, num_hashes, bloom_filter_longs, 0, stream); + + cudf::test::fixed_width_column_wrapper input{20, 80, 100, 99, 47, -9, 234000000}; + spark_rapids_jni::bloom_filter_put(*bloom_filter, input, stream); + + cudf::test::fixed_width_column_wrapper probe_in{20, 80, 100, 99, 47, -9, 234000000}; + cudf::test::fixed_width_column_wrapper expected_in{1, 1, 1, 1, 1, 1, 1}; + auto result = spark_rapids_jni::bloom_filter_probe(probe_in, *bloom_filter, stream); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_in, *result); +} + +TEST_F(BloomFilterTest, BuildWithNullsAndProbeV2) +{ + auto stream = cudf::get_default_stream(); + constexpr int bloom_filter_longs = (1024 * 1024); + constexpr int num_hashes = 3; + + auto bloom_filter = spark_rapids_jni::bloom_filter_create( + spark_rapids_jni::bloom_filter_version_2, num_hashes, bloom_filter_longs, 0, stream); + cudf::test::fixed_width_column_wrapper input{{20, 80, 100, 99, 47, -9, 234000000}, + {0, 1, 1, 1, 0, 1, 1}}; + + spark_rapids_jni::bloom_filter_put(*bloom_filter, input, stream); + + cudf::test::fixed_width_column_wrapper probe_inserted{80, 100, 99, -9, 234000000}; + cudf::test::fixed_width_column_wrapper expected_inserted{1, 1, 1, 1, 1}; + auto result = spark_rapids_jni::bloom_filter_probe(probe_inserted, *bloom_filter, stream); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_inserted, *result); +} + +TEST_F(BloomFilterTest, ProbeMergedV2) +{ + auto stream = cudf::get_default_stream(); + constexpr int bloom_filter_longs = (1024 * 1024); + constexpr int num_hashes = 3; + + cudf::test::fixed_width_column_wrapper col_a{20, 80, 100, 99, 47, -9, 234000000}; + auto bloom_filter_a = spark_rapids_jni::bloom_filter_create( + spark_rapids_jni::bloom_filter_version_2, num_hashes, bloom_filter_longs, 0, stream); + spark_rapids_jni::bloom_filter_put(*bloom_filter_a, col_a, stream); + + cudf::test::fixed_width_column_wrapper col_b{100, 200, 300, 400}; + auto bloom_filter_b = spark_rapids_jni::bloom_filter_create( + spark_rapids_jni::bloom_filter_version_2, num_hashes, bloom_filter_longs, 0, stream); + spark_rapids_jni::bloom_filter_put(*bloom_filter_b, col_b, stream); + + cudf::test::fixed_width_column_wrapper col_c{-100, -200, -300, -400}; + auto bloom_filter_c = spark_rapids_jni::bloom_filter_create( + spark_rapids_jni::bloom_filter_version_2, num_hashes, bloom_filter_longs, 0, stream); + spark_rapids_jni::bloom_filter_put(*bloom_filter_c, col_c, stream); + + std::vector cols( + {bloom_filter_a->view(), bloom_filter_b->view(), bloom_filter_c->view()}); + auto premerge_children = cudf::concatenate(cols); + auto premerge_offsets = cudf::make_fixed_width_column(cudf::data_type{cudf::type_id::INT32}, 4); + thrust::transform(rmm::exec_policy(cudf::get_default_stream()), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(0) + 4, + premerge_offsets->mutable_view().begin(), + bloom_filter_stride_transform{bloom_filter_a->view().size()}); + auto premerged = cudf::make_lists_column( + 3, std::move(premerge_offsets), std::move(premerge_children), 0, rmm::device_buffer{}); + + auto bloom_filter_merged = spark_rapids_jni::bloom_filter_merge(*premerged); + + cudf::test::fixed_width_column_wrapper probe_all{ + 20, 80, 100, 99, 47, -9, 234000000, 200, 300, 400, -100, -200, -300, -400}; + cudf::test::fixed_width_column_wrapper expected_all{ + 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}; + auto result = spark_rapids_jni::bloom_filter_probe(probe_all, *bloom_filter_merged, stream); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_all, *result); +} + +TEST_F(BloomFilterTest, V2WithSeed) +{ + auto stream = cudf::get_default_stream(); + constexpr int bloom_filter_longs = (1024 * 1024); + constexpr int num_hashes = 3; + + auto bf_seed0 = spark_rapids_jni::bloom_filter_create( + spark_rapids_jni::bloom_filter_version_2, num_hashes, bloom_filter_longs, 0, stream); + auto bf_seed42 = spark_rapids_jni::bloom_filter_create( + spark_rapids_jni::bloom_filter_version_2, num_hashes, bloom_filter_longs, 42, stream); + + cudf::test::fixed_width_column_wrapper input{20, 80, 100, 99, 47, -9, 234000000}; + spark_rapids_jni::bloom_filter_put(*bf_seed0, input, stream); + spark_rapids_jni::bloom_filter_put(*bf_seed42, input, stream); + + cudf::test::fixed_width_column_wrapper probe_in{20, 80, 100, 99, 47, -9, 234000000}; + cudf::test::fixed_width_column_wrapper expected_in{1, 1, 1, 1, 1, 1, 1}; + + auto r0 = spark_rapids_jni::bloom_filter_probe(probe_in, *bf_seed0, stream); + auto r42 = spark_rapids_jni::bloom_filter_probe(probe_in, *bf_seed42, stream); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_in, *r0); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_in, *r42); +} diff --git a/src/main/java/com/nvidia/spark/rapids/jni/BloomFilter.java b/src/main/java/com/nvidia/spark/rapids/jni/BloomFilter.java index 6a676a54bb..f28fc3d392 100644 --- a/src/main/java/com/nvidia/spark/rapids/jni/BloomFilter.java +++ b/src/main/java/com/nvidia/spark/rapids/jni/BloomFilter.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023-2024, NVIDIA CORPORATION. + * Copyright (c) 2023-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,77 +24,95 @@ import ai.rapids.cudf.NativeDepsLoader; public class BloomFilter { + public static final int VERSION_1 = 1; + public static final int VERSION_2 = 2; + public static final int DEFAULT_SEED = 0; + static { NativeDepsLoader.loadNativeDeps(); } /** - * Create a bloom filter with the specified number of hashes and bloom filter bits. - * @param numHashes The number of hashes to use when inserting values into the bloom filter or - * when probing. - * @param bloomFilterBits Size of the bloom filter in bits. - * @return a Scalar object which encapsulates the bloom filter. + * Create a V2 bloom filter with the specified number of hashes and bloom filter bits, + * using the default seed (0). + * + * @param numHashes Number of bit positions set (and checked) per key. Higher values reduce + * false positives but increase work per put/probe. + * @param bloomFilterBits Total size of the bloom filter in bits (will be rounded up to a + * multiple of 64). + * @return A Scalar wrapping the GPU bloom filter (Spark V2 format). + */ + public static Scalar create(int numHashes, long bloomFilterBits) { + return create(VERSION_2, numHashes, bloomFilterBits, DEFAULT_SEED); + } + + /** + * Create a bloom filter with the specified version, number of hashes, bloom filter bits, + * and hash seed. + * + * @param version Bloom filter format: {@link #VERSION_1} or {@link #VERSION_2}. V2 uses + * 64-bit hash indexing and supports a configurable seed. + * @param numHashes Number of bit positions set (and checked) per key. Higher values reduce + * false positives but increase work per put/probe. + * @param bloomFilterBits Total size of the bloom filter in bits (will be rounded up to a + * multiple of 64). + * @param seed Hash seed. Used only for V2; ignored for V1. + * @return A Scalar wrapping the GPU bloom filter (Spark format). */ - public static Scalar create(int numHashes, long bloomFilterBits){ - if(numHashes <= 0){ + public static Scalar create(int version, int numHashes, long bloomFilterBits, int seed) { + if (version != VERSION_1 && version != VERSION_2) { + throw new IllegalArgumentException("Bloom filter version must be 1 or 2"); + } + if (numHashes <= 0) { throw new IllegalArgumentException("Bloom filters must have a positive hash count"); } - if(bloomFilterBits <= 0){ + if (bloomFilterBits <= 0) { throw new IllegalArgumentException("Bloom filters must have a positive number of bits"); } - return new Scalar(DType.LIST, creategpu(numHashes, bloomFilterBits)); + return new Scalar(DType.LIST, creategpu(version, numHashes, bloomFilterBits, seed)); } - /** - * Insert a column of longs into a bloom filter. - * @param bloomFilter The bloom filter to which values will be inserted. - * @param cv The column containing the values to add. - */ - public static void put(Scalar bloomFilter, ColumnVector cv){ + public static void put(Scalar bloomFilter, ColumnVector cv) { put(bloomFilter.getScalarHandle(), cv.getNativeView()); } - /** - * Merge one or more bloom filters into a new bloom filter. - * @param bloomFilters A ColumnVector containing a bloom filter per row. - * @return A new bloom filter containing the merged inputs. - */ - public static Scalar merge(ColumnVector bloomFilters){ + public static Scalar merge(ColumnVector bloomFilters) { return new Scalar(DType.LIST, merge(bloomFilters.getNativeView())); } /** - * Probe a bloom filter with a column of longs. Returns a column of booleans. For - * each row in the output; a value of true indicates that the corresponding input value - * -may- be in the set of values used to build the bloom filter; a value of false indicates - * that the corresponding input value is conclusively not in the set of values used to build - * the bloom filter. - * @param bloomFilter The bloom filter to be probed. - * @param cv The column containing the values to check. - * @return A boolean column indicating the results of the probe. + * Probe a bloom filter with a column of longs. For each row, true means the value may be + * in the set used to build the filter; false means it is definitely not in the set. + * + * @param bloomFilter The bloom filter to probe (a Scalar wrapping the GPU filter). + * @param cv Column of int64 values to check for membership. + * @return A boolean column with the same row count as cv; true for possible membership, + * false for definite non-membership. Nulls in cv are preserved in the output. */ - public static ColumnVector probe(Scalar bloomFilter, ColumnVector cv){ + public static ColumnVector probe(Scalar bloomFilter, ColumnVector cv) { return new ColumnVector(probe(bloomFilter.getScalarHandle(), cv.getNativeView())); } /** - * Probe a bloom filter with a column of longs. Returns a column of booleans. For - * each row in the output; a value of true indicates that the corresponding input value - * -may- be in the set of values used to build the bloom filter; a value of false indicates - * that the corresponding input value is conclusively not in the set of values used to build - * the bloom filter. - * @param bloomFilter The bloom filter to be probed. This buffer is expected to be the - * fully packed Spark bloom filter, including header. - * @param cv The column containing the values to check. - * @return A boolean column indicating the results of the probe. + * Probe a bloom filter with a column of longs. For each row, true means the value may be + * in the set used to build the filter; false means it is definitely not in the set. + * Use this overload when the filter is in a device buffer (e.g. Spark serialized form). + * + * @param bloomFilter Device buffer containing the packed bloom filter including header. + * @param cv Column of int64 values to check for membership. + * @return A boolean column with the same row count as cv; true for possible membership, + * false for definite non-membership. Nulls in cv are preserved in the output. */ - public static ColumnVector probe(BaseDeviceMemoryBuffer bloomFilter, ColumnVector cv){ - return new ColumnVector(probebuffer(bloomFilter.getAddress(), bloomFilter.getLength(), cv.getNativeView())); + public static ColumnVector probe(BaseDeviceMemoryBuffer bloomFilter, ColumnVector cv) { + return new ColumnVector( + probebuffer(bloomFilter.getAddress(), bloomFilter.getLength(), cv.getNativeView())); } - private static native long creategpu(int numHashes, long bloomFilterBits) throws CudfException; + private static native long creategpu(int version, int numHashes, long bloomFilterBits, int seed) + throws CudfException; private static native int put(long bloomFilter, long cv) throws CudfException; private static native long merge(long bloomFilters) throws CudfException; private static native long probe(long bloomFilter, long cv) throws CudfException; - private static native long probebuffer(long bloomFilter, long bloomFilterSize, long cv) throws CudfException; + private static native long probebuffer(long bloomFilter, long bloomFilterSize, long cv) + throws CudfException; } diff --git a/src/test/java/com/nvidia/spark/rapids/jni/BloomFilterTest.java b/src/test/java/com/nvidia/spark/rapids/jni/BloomFilterTest.java index 05ac497e72..ce9d99b731 100644 --- a/src/test/java/com/nvidia/spark/rapids/jni/BloomFilterTest.java +++ b/src/test/java/com/nvidia/spark/rapids/jni/BloomFilterTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023-2025, NVIDIA CORPORATION. + * Copyright (c) 2023-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,14 +16,10 @@ package com.nvidia.spark.rapids.jni; -import com.nvidia.spark.rapids.jni.BloomFilter; - import ai.rapids.cudf.AssertUtils; import ai.rapids.cudf.ColumnVector; -import ai.rapids.cudf.Cuda; import ai.rapids.cudf.CudfException; import ai.rapids.cudf.Scalar; -import ai.rapids.cudf.DeviceMemoryBuffer; import static org.junit.jupiter.api.Assertions.assertThrows; import org.junit.jupiter.api.Test; @@ -153,22 +149,43 @@ void testBuildTrivialMergeProbe(){ } @Test - void testBuildExpectedFailures(){ + void testBuildAndProbeV1() { + int numHashes = 3; + long bloomFilterBits = 4 * 1024 * 1024; + + try (ColumnVector input = ColumnVector.fromLongs(20, 80, 100, 99, 47, -9, 234000000); + Scalar bloomFilter = BloomFilter.create(BloomFilter.VERSION_1, numHashes, bloomFilterBits, 0)) { + BloomFilter.put(bloomFilter, input); + try (ColumnVector probe = ColumnVector.fromLongs(20, 80, 100, 99, 47, -9, 234000000, -10, 1, 2, 3); + ColumnVector expected = ColumnVector.fromBooleans(true, true, true, true, true, true, true, false, false, false, false); + ColumnVector result = BloomFilter.probe(bloomFilter, probe)) { + AssertUtils.assertColumnsAreEqual(expected, result); + } + } + } + + @Test + void testBuildExpectedFailures() { // bloom filter with no hashes assertThrows(IllegalArgumentException.class, () -> { - try (Scalar bloomFilter = BloomFilter.create(0, 64)){} + try (Scalar bloomFilter = BloomFilter.create(BloomFilter.VERSION_1, 0, 64, 0)) {} }); // bloom filter with no size assertThrows(IllegalArgumentException.class, () -> { - try (Scalar bloomFilter = BloomFilter.create(3, 0)){} + try (Scalar bloomFilter = BloomFilter.create(BloomFilter.VERSION_1, 3, 0, 0)) {} }); - + + // invalid version + assertThrows(IllegalArgumentException.class, () -> { + try (Scalar bloomFilter = BloomFilter.create(3, 3, 64, 0)) {} + }); + // merge with mixed hash counts assertThrows(CudfException.class, () -> { - try (Scalar bloomFilterA = BloomFilter.create(3, 1024); - Scalar bloomFilterB = BloomFilter.create(4, 1024); - Scalar bloomFilterC = BloomFilter.create(4, 1024); + try (Scalar bloomFilterA = BloomFilter.create(BloomFilter.VERSION_1, 3, 1024, 0); + Scalar bloomFilterB = BloomFilter.create(BloomFilter.VERSION_1, 4, 1024, 0); + Scalar bloomFilterC = BloomFilter.create(BloomFilter.VERSION_1, 4, 1024, 0); ColumnVector bloomA = ColumnVector.fromScalar(bloomFilterA, 1); ColumnVector bloomB = ColumnVector.fromScalar(bloomFilterB, 1); ColumnVector bloomC = ColumnVector.fromScalar(bloomFilterC, 1); @@ -178,9 +195,9 @@ void testBuildExpectedFailures(){ // merge with mixed hash bit sizes assertThrows(CudfException.class, () -> { - try (Scalar bloomFilterA = BloomFilter.create(3, 1024); - Scalar bloomFilterB = BloomFilter.create(3, 1024); - Scalar bloomFilterC = BloomFilter.create(3, 2048); + try (Scalar bloomFilterA = BloomFilter.create(BloomFilter.VERSION_1, 3, 1024, 0); + Scalar bloomFilterB = BloomFilter.create(BloomFilter.VERSION_1, 3, 1024, 0); + Scalar bloomFilterC = BloomFilter.create(BloomFilter.VERSION_1, 3, 2048, 0); ColumnVector bloomA = ColumnVector.fromScalar(bloomFilterA, 1); ColumnVector bloomB = ColumnVector.fromScalar(bloomFilterB, 1); ColumnVector bloomC = ColumnVector.fromScalar(bloomFilterC, 1); From 7af89f76601c8215abd44938069f39e0b550cc53 Mon Sep 17 00:00:00 2001 From: MithunR Date: Wed, 11 Mar 2026 16:19:15 -0700 Subject: [PATCH 02/29] Code formatting. Signed-off-by: MithunR --- src/main/cpp/benchmarks/bloom_filter.cu | 8 +- src/main/cpp/src/bloom_filter.cu | 129 ++++++++++++------------ src/main/cpp/src/bloom_filter.hpp | 6 +- src/main/cpp/tests/bloom_filter.cu | 32 +++--- 4 files changed, 88 insertions(+), 87 deletions(-) diff --git a/src/main/cpp/benchmarks/bloom_filter.cu b/src/main/cpp/benchmarks/bloom_filter.cu index 55daef8e45..1622c5668a 100644 --- a/src/main/cpp/benchmarks/bloom_filter.cu +++ b/src/main/cpp/benchmarks/bloom_filter.cu @@ -27,9 +27,9 @@ static void bloom_filter_put_v1(nvbench::state& state) constexpr int num_rows = 150'000'000; constexpr int num_hashes = 3; - cudf::size_type const bloom_filter_bytes = state.get_int64("bloom_filter_bytes"); - cudf::size_type const bloom_filter_longs = bloom_filter_bytes / sizeof(int64_t); - auto bloom_filter = spark_rapids_jni::bloom_filter_create( + cudf::size_type const bloom_filter_bytes = state.get_int64("bloom_filter_bytes"); + cudf::size_type const bloom_filter_longs = bloom_filter_bytes / sizeof(int64_t); + auto bloom_filter = spark_rapids_jni::bloom_filter_create( spark_rapids_jni::bloom_filter_version_1, num_hashes, bloom_filter_longs); data_profile_builder builder; @@ -64,7 +64,7 @@ static void bloom_filter_put_v2(nvbench::state& state) // create the bloom filter cudf::size_type const bloom_filter_bytes = state.get_int64("bloom_filter_bytes"); cudf::size_type const bloom_filter_longs = bloom_filter_bytes / sizeof(int64_t); - auto bloom_filter = spark_rapids_jni::bloom_filter_create( + auto bloom_filter = spark_rapids_jni::bloom_filter_create( spark_rapids_jni::bloom_filter_version_2, num_hashes, bloom_filter_longs); data_profile_builder builder; diff --git a/src/main/cpp/src/bloom_filter.cu b/src/main/cpp/src/bloom_filter.cu index 57a86eabf9..fe574f8c59 100644 --- a/src/main/cpp/src/bloom_filter.cu +++ b/src/main/cpp/src/bloom_filter.cu @@ -37,6 +37,7 @@ #include #include + #include namespace spark_rapids_jni { @@ -79,7 +80,7 @@ CUDF_KERNEL void gpu_bloom_filter_put(cudf::bitmask_type* const bloom_filter, auto const el = input.element(tid); // V1 has no seed in the format; use 0. V2 uses the stored seed. - int32_t const hash_seed = (Version == 1) ? 0 : seed; + int32_t const hash_seed = (Version == 1) ? 0 : seed; bloom_hash_type const h1 = MurmurHash3_32(hash_seed)(el); bloom_hash_type const h2 = MurmurHash3_32(h1)(el); @@ -88,19 +89,20 @@ CUDF_KERNEL void gpu_bloom_filter_put(cudf::bitmask_type* const bloom_filter, // This is the original V1 hash algorithm from Spark. for (auto idx = 1; idx <= num_hashes; idx++) { bloom_hash_type combined_hash = h1 + (idx * h2); - auto const bit_pos = static_cast( - (combined_hash < 0 ? ~combined_hash : combined_hash) % - static_cast(bloom_filter_bits)); + auto const bit_pos = + static_cast((combined_hash < 0 ? ~combined_hash : combined_hash) % + static_cast(bloom_filter_bits)); auto const [word_index, mask] = gpu_bit_to_word_mask(bit_pos); atomicOr(bloom_filter + word_index, mask); } } else { // https://github.com/apache/spark/blob/5075ea6a85f3f1689766cf08a7d5b2ce500be1fb/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImplV2.java#L63 - int64_t combined_hash = static_cast(h1) * static_cast(std::numeric_limits::max()); + int64_t combined_hash = + static_cast(h1) * static_cast(std::numeric_limits::max()); for (int idx = 0; idx < num_hashes; idx++) { combined_hash += h2; - int64_t combined_index = combined_hash < 0 ? ~combined_hash : combined_hash; - auto const bit_pos = combined_index % bloom_filter_bits; + int64_t combined_index = combined_hash < 0 ? ~combined_hash : combined_hash; + auto const bit_pos = combined_index % bloom_filter_bits; auto const [word_index, mask] = gpu_bit_to_word_mask(bit_pos); atomicOr(bloom_filter + word_index, mask); } @@ -116,16 +118,16 @@ struct bloom_probe_functor { __device__ bool operator()(int64_t input) const { - int32_t const hash_seed = (Version == 1) ? 0 : seed; + int32_t const hash_seed = (Version == 1) ? 0 : seed; bloom_hash_type const h1 = MurmurHash3_32(hash_seed)(input); bloom_hash_type const h2 = MurmurHash3_32(h1)(input); if constexpr (Version == 1) { for (auto idx = 1; idx <= num_hashes; idx++) { bloom_hash_type combined_hash = h1 + (idx * h2); - auto const bit_pos = static_cast( - (combined_hash < 0 ? ~combined_hash : combined_hash) % - static_cast(bloom_filter_bits)); + auto const bit_pos = + static_cast((combined_hash < 0 ? ~combined_hash : combined_hash) % + static_cast(bloom_filter_bits)); auto const [word_index, mask] = gpu_bit_to_word_mask(bit_pos); if (!(bloom_filter[word_index] & mask)) { return false; } } @@ -134,8 +136,8 @@ struct bloom_probe_functor { static_cast(h1) * static_cast(std::numeric_limits::max()); for (int idx = 0; idx < num_hashes; idx++) { combined_hash += h2; - int64_t combined_index = combined_hash < 0 ? ~combined_hash : combined_hash; - auto const bit_pos = combined_index % bloom_filter_bits; + int64_t combined_index = combined_hash < 0 ? ~combined_hash : combined_hash; + auto const bit_pos = combined_index % bloom_filter_bits; auto const [word_index, mask] = gpu_bit_to_word_mask(bit_pos); if (!(bloom_filter[word_index] & mask)) { return false; } } @@ -219,11 +221,11 @@ unpack_bloom_filter(cudf::device_span bloom_filter, rmm::cuda_stream_vi CUDF_EXPECTS(num_bitmask_words == cudf::num_bitmask_words(bloom_filter_bits), "Bloom filter bit/length mismatch"); - return {header, - {reinterpret_cast(bloom_filter.data() + hdr_size), - num_bitmask_words}, - bloom_filter_bits, - seed}; + return { + header, + {reinterpret_cast(bloom_filter.data() + hdr_size), num_bitmask_words}, + bloom_filter_bits, + seed}; } std::tuple, int64_t, int32_t> @@ -239,8 +241,7 @@ std::tuple, int64_t, unpack_bloom_filter(cudf::device_span bloom_filter, rmm::cuda_stream_view stream) { return unpack_bloom_filter( - cudf::device_span{const_cast(bloom_filter.data()), - bloom_filter.size()}, + cudf::device_span{const_cast(bloom_filter.data()), bloom_filter.size()}, stream); } @@ -249,7 +250,7 @@ unpack_bloom_filter(cudf::device_span bloom_filter, rmm::cuda_str header. raw_header holds the reference header in big-endian form (as in the serialized buffer). */ struct bloom_filter_same { - /// Reference header: big-endian int32s. + /// Reference header: big-endian int32s. /// V1 uses [0..2]: version, num_hashes, num_longs. /// V2 uses [0..3]: version, num_hashes, seed, num_longs. int32_t raw_header[4]; @@ -304,14 +305,15 @@ std::unique_ptr bloom_filter_create(int version, "Bloom filter version must be 1 or 2"); auto [bloom_filter_size, buf_size] = get_bloom_filter_stride(version, bloom_filter_longs); - auto const hdr_size = bloom_filter_header_size_for_version(version); + auto const hdr_size = bloom_filter_header_size_for_version(version); rmm::device_buffer buf{static_cast(buf_size), stream, mr}; bloom_filter_header header{version, num_hashes, bloom_filter_longs}; - pack_bloom_filter_header( - {reinterpret_cast(buf.data()), static_cast(buf_size)}, header, stream, - (version == bloom_filter_version_1 ? 0 : seed)); + pack_bloom_filter_header({reinterpret_cast(buf.data()), static_cast(buf_size)}, + header, + stream, + (version == bloom_filter_version_1 ? 0 : seed)); CUDF_CUDA_TRY(cudaMemsetAsync( reinterpret_cast(buf.data()) + hdr_size, 0, bloom_filter_size, stream)); @@ -370,48 +372,44 @@ std::unique_ptr bloom_filter_merge(cudf::column_view const& b // seed; we use these to validate total size and to build the merged output. auto [header, buffer, bloom_filter_bits, seed] = unpack_bloom_filter(lcv.child(), stream); auto const hdr_size = bloom_filter_header_size_for_version(header.version); - CUDF_EXPECTS( - lcv.child().size() == static_cast( - ((buffer.size() * 4) + hdr_size) * bloom_filters.size()), - "Encountered invalid/mismatched bloom filter buffer data"); + CUDF_EXPECTS(lcv.child().size() == static_cast(((buffer.size() * 4) + hdr_size) * + bloom_filters.size()), + "Encountered invalid/mismatched bloom filter buffer data"); auto [bloom_filter_size, buf_size] = get_bloom_filter_stride(header.version, header.num_longs); int32_t raw_hdr[4] = {}; int header_field_count = 0; if (header.version == bloom_filter_version_1) { - raw_hdr[0] = byte_swap_int32(header.version); - raw_hdr[1] = byte_swap_int32(header.num_hashes); - raw_hdr[2] = byte_swap_int32(header.num_longs); - header_field_count = 3; + raw_hdr[0] = byte_swap_int32(header.version); + raw_hdr[1] = byte_swap_int32(header.num_hashes); + raw_hdr[2] = byte_swap_int32(header.num_longs); + header_field_count = 3; } else { - raw_hdr[0] = byte_swap_int32(header.version); - raw_hdr[1] = byte_swap_int32(header.num_hashes); - raw_hdr[2] = byte_swap_int32(seed); - raw_hdr[3] = byte_swap_int32(header.num_longs); - header_field_count = 4; + raw_hdr[0] = byte_swap_int32(header.version); + raw_hdr[1] = byte_swap_int32(header.num_hashes); + raw_hdr[2] = byte_swap_int32(seed); + raw_hdr[3] = byte_swap_int32(header.num_longs); + header_field_count = 4; } auto dv = cudf::column_device_view::create(bloom_filters); - CUDF_EXPECTS( - thrust::all_of( - rmm::exec_policy(cudf::get_default_stream()), - thrust::make_counting_iterator(1), - thrust::make_counting_iterator(bloom_filters.size()), - bloom_filter_same{ - {raw_hdr[0], raw_hdr[1], raw_hdr[2], raw_hdr[3]}, - header_field_count, - *dv, - static_cast(buf_size)}), - "Mismatch of bloom filter parameters"); + CUDF_EXPECTS(thrust::all_of(rmm::exec_policy(cudf::get_default_stream()), + thrust::make_counting_iterator(1), + thrust::make_counting_iterator(bloom_filters.size()), + bloom_filter_same{{raw_hdr[0], raw_hdr[1], raw_hdr[2], raw_hdr[3]}, + header_field_count, + *dv, + static_cast(buf_size)}), + "Mismatch of bloom filter parameters"); rmm::device_buffer buf{static_cast(buf_size), stream, mr}; pack_bloom_filter_header( {reinterpret_cast(buf.data()), static_cast(buf_size)}, header, stream, seed); auto src = lcv.child().data() + hdr_size; - auto dst = reinterpret_cast(reinterpret_cast(buf.data()) + - hdr_size); + auto dst = + reinterpret_cast(reinterpret_cast(buf.data()) + hdr_size); cudf::size_type num_words = header.num_longs * 2; thrust::transform( @@ -445,9 +443,8 @@ std::unique_ptr bloom_filter_probe(cudf::column_view const& input, { auto [header, buffer, bloom_filter_bits, seed] = unpack_bloom_filter(bloom_filter, stream); auto const hdr_size = bloom_filter_header_size_for_version(header.version); - CUDF_EXPECTS( - bloom_filter.size() == static_cast((buffer.size() * 4) + hdr_size), - "Encountered invalid/mismatched bloom filter buffer data"); + CUDF_EXPECTS(bloom_filter.size() == static_cast((buffer.size() * 4) + hdr_size), + "Encountered invalid/mismatched bloom filter buffer data"); auto out = cudf::make_fixed_width_column(cudf::data_type{cudf::type_id::BOOL8}, input.size(), @@ -457,19 +454,19 @@ std::unique_ptr bloom_filter_probe(cudf::column_view const& input, mr); if (header.version == bloom_filter_version_2) { - thrust::transform(rmm::exec_policy(stream), - input.begin(), - input.end(), - out->mutable_view().begin(), - bloom_probe_functor<2>{buffer.data(), bloom_filter_bits, - header.num_hashes, seed}); + thrust::transform( + rmm::exec_policy(stream), + input.begin(), + input.end(), + out->mutable_view().begin(), + bloom_probe_functor<2>{buffer.data(), bloom_filter_bits, header.num_hashes, seed}); } else { - thrust::transform(rmm::exec_policy(stream), - input.begin(), - input.end(), - out->mutable_view().begin(), - bloom_probe_functor<1>{buffer.data(), bloom_filter_bits, - header.num_hashes, seed}); + thrust::transform( + rmm::exec_policy(stream), + input.begin(), + input.end(), + out->mutable_view().begin(), + bloom_probe_functor<1>{buffer.data(), bloom_filter_bits, header.num_hashes, seed}); } return out; diff --git a/src/main/cpp/src/bloom_filter.hpp b/src/main/cpp/src/bloom_filter.hpp index 7aee087f81..1a361cd62e 100644 --- a/src/main/cpp/src/bloom_filter.hpp +++ b/src/main/cpp/src/bloom_filter.hpp @@ -86,9 +86,9 @@ std::unique_ptr bloom_filter_create( int version, int num_hashes, int bloom_filter_longs, - int seed = 0, - rmm::cuda_stream_view stream = cudf::get_default_stream(), - rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource_ref()); + int seed = 0, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource_ref()); /** * @brief Inserts input values into a bloom filter. diff --git a/src/main/cpp/tests/bloom_filter.cu b/src/main/cpp/tests/bloom_filter.cu index d242d95139..26bdc3f17f 100644 --- a/src/main/cpp/tests/bloom_filter.cu +++ b/src/main/cpp/tests/bloom_filter.cu @@ -39,15 +39,17 @@ TEST_F(BloomFilterTest, InitializationV1) std::vector expected{1, 2, 3}; for (size_t idx = 0; idx < expected.size(); idx++) { - auto bloom_filter = spark_rapids_jni::bloom_filter_create( - spark_rapids_jni::bloom_filter_version_1, num_hashes, expected[idx], 0, - cudf::get_default_stream()); + auto bloom_filter = + spark_rapids_jni::bloom_filter_create(spark_rapids_jni::bloom_filter_version_1, + num_hashes, + expected[idx], + 0, + cudf::get_default_stream()); auto const bloom_filter_size = expected[idx] * sizeof(int64_t); - CUDF_EXPECTS( - bloom_filter->view().size() == - spark_rapids_jni::bloom_filter_header_v1_size_bytes + bloom_filter_size, - "Bloom filter not of expected size"); + CUDF_EXPECTS(bloom_filter->view().size() == + spark_rapids_jni::bloom_filter_header_v1_size_bytes + bloom_filter_size, + "Bloom filter not of expected size"); auto bytes = (bloom_filter->view().data()) + spark_rapids_jni::bloom_filter_header_v1_size_bytes; @@ -186,15 +188,17 @@ TEST_F(BloomFilterTest, InitializationV2) std::vector expected{1, 2, 3}; for (size_t idx = 0; idx < expected.size(); idx++) { - auto bloom_filter = spark_rapids_jni::bloom_filter_create( - spark_rapids_jni::bloom_filter_version_2, num_hashes, expected[idx], seed, - cudf::get_default_stream()); + auto bloom_filter = + spark_rapids_jni::bloom_filter_create(spark_rapids_jni::bloom_filter_version_2, + num_hashes, + expected[idx], + seed, + cudf::get_default_stream()); auto const bloom_filter_size = expected[idx] * sizeof(int64_t); - CUDF_EXPECTS( - bloom_filter->view().size() == - spark_rapids_jni::bloom_filter_header_v2_size_bytes + bloom_filter_size, - "Bloom filter not of expected size"); + CUDF_EXPECTS(bloom_filter->view().size() == + spark_rapids_jni::bloom_filter_header_v2_size_bytes + bloom_filter_size, + "Bloom filter not of expected size"); auto bytes = (bloom_filter->view().data()) + spark_rapids_jni::bloom_filter_header_v2_size_bytes; From bface5fbe2c9f7570eef88cc46434d9c807ae24a Mon Sep 17 00:00:00 2001 From: MithunR Date: Thu, 12 Mar 2026 12:13:30 -0700 Subject: [PATCH 03/29] Removed redundant CUDF_EXPECTS. Signed-off-by: MithunR --- src/main/cpp/src/bloom_filter.cu | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/cpp/src/bloom_filter.cu b/src/main/cpp/src/bloom_filter.cu index fe574f8c59..747cd05f73 100644 --- a/src/main/cpp/src/bloom_filter.cu +++ b/src/main/cpp/src/bloom_filter.cu @@ -209,8 +209,8 @@ unpack_bloom_filter(cudf::device_span bloom_filter, rmm::cuda_stream_vi int32_t seed = 0; if (version == bloom_filter_version_2) { - CUDF_EXPECTS(read_size >= static_cast(bloom_filter_header_v2_size_bytes), - "Encountered truncated V2 bloom filter header"); + // Safe: the header-size check above guarantees bloom_filter.size() >= v2 header size, + // so read_size (= min(bloom_filter.size(), v2 header size)) == v2 header size. seed = byte_swap_int32(raw_ints[2]); } From aa9b576020c93e4e38ae802f3bd3b73589c8f6c4 Mon Sep 17 00:00:00 2001 From: MithunR Date: Thu, 12 Mar 2026 14:29:26 -0700 Subject: [PATCH 04/29] Review fixes: 1. Checked for narrowing for num_longs. 2. Removed unused create function with the seemingly changed default format version. Signed-off-by: MithunR --- src/main/cpp/src/BloomFilterJni.cpp | 10 +++++++++- .../com/nvidia/spark/rapids/jni/BloomFilter.java | 14 -------------- 2 files changed, 9 insertions(+), 15 deletions(-) diff --git a/src/main/cpp/src/BloomFilterJni.cpp b/src/main/cpp/src/BloomFilterJni.cpp index ad579f2128..8c250ec116 100644 --- a/src/main/cpp/src/BloomFilterJni.cpp +++ b/src/main/cpp/src/BloomFilterJni.cpp @@ -20,6 +20,8 @@ #include "jni_utils.hpp" #include "utilities.hpp" +#include + extern "C" { JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_BloomFilter_creategpu( @@ -29,7 +31,13 @@ JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_BloomFilter_creategpu( { cudf::jni::auto_set_device(env); - int bloom_filter_longs = static_cast((bloomFilterBits + 63) / 64); + jlong bloom_filter_longs_long = (bloomFilterBits + 63) / 64; + JNI_ARG_CHECK( + env, + bloom_filter_longs_long >= 0 && bloom_filter_longs_long <= std::numeric_limits::max(), + "bloom filter bit count overflows int when converted to longs", + 0); + int bloom_filter_longs = static_cast(bloom_filter_longs_long); auto bloom_filter = spark_rapids_jni::bloom_filter_create(version, numHashes, bloom_filter_longs, seed); return reinterpret_cast(bloom_filter.release()); diff --git a/src/main/java/com/nvidia/spark/rapids/jni/BloomFilter.java b/src/main/java/com/nvidia/spark/rapids/jni/BloomFilter.java index f28fc3d392..621c264f18 100644 --- a/src/main/java/com/nvidia/spark/rapids/jni/BloomFilter.java +++ b/src/main/java/com/nvidia/spark/rapids/jni/BloomFilter.java @@ -32,20 +32,6 @@ public class BloomFilter { NativeDepsLoader.loadNativeDeps(); } - /** - * Create a V2 bloom filter with the specified number of hashes and bloom filter bits, - * using the default seed (0). - * - * @param numHashes Number of bit positions set (and checked) per key. Higher values reduce - * false positives but increase work per put/probe. - * @param bloomFilterBits Total size of the bloom filter in bits (will be rounded up to a - * multiple of 64). - * @return A Scalar wrapping the GPU bloom filter (Spark V2 format). - */ - public static Scalar create(int numHashes, long bloomFilterBits) { - return create(VERSION_2, numHashes, bloomFilterBits, DEFAULT_SEED); - } - /** * Create a bloom filter with the specified version, number of hashes, bloom filter bits, * and hash seed. From 8c2c1014034fd2d51d03edeb81becb0800327817 Mon Sep 17 00:00:00 2001 From: MithunR Date: Thu, 12 Mar 2026 16:02:19 -0700 Subject: [PATCH 05/29] Java tests for V2 format. Signed-off-by: MithunR --- .../spark/rapids/jni/BloomFilterTest.java | 150 ++++++++++-------- 1 file changed, 84 insertions(+), 66 deletions(-) diff --git a/src/test/java/com/nvidia/spark/rapids/jni/BloomFilterTest.java b/src/test/java/com/nvidia/spark/rapids/jni/BloomFilterTest.java index ce9d99b731..e844883952 100644 --- a/src/test/java/com/nvidia/spark/rapids/jni/BloomFilterTest.java +++ b/src/test/java/com/nvidia/spark/rapids/jni/BloomFilterTest.java @@ -23,88 +23,90 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; public class BloomFilterTest { - @Test - void testBuildAndProbe(){ + @ParameterizedTest + @ValueSource(ints = {BloomFilter.VERSION_1, BloomFilter.VERSION_2}) + void testBuildAndProbe(int version) { int numHashes = 3; long bloomFilterBits = 4 * 1024 * 1024; try (ColumnVector input = ColumnVector.fromLongs(20, 80, 100, 99, 47, -9, 234000000); - Scalar bloomFilter = BloomFilter.create(numHashes, bloomFilterBits)){ - + Scalar bloomFilter = BloomFilter.create(version, numHashes, bloomFilterBits, 0)) { BloomFilter.put(bloomFilter, input); - try(ColumnVector probe = ColumnVector.fromLongs(20, 80, 100, 99, 47, -9, 234000000, -10, 1, 2, 3); - ColumnVector expected = ColumnVector.fromBooleans(true, true, true, true, true, true, true, false, false, false, false); - ColumnVector result = BloomFilter.probe(bloomFilter, probe)){ + try (ColumnVector probe = ColumnVector.fromLongs(20, 80, 100, 99, 47, -9, 234000000, -10, 1, 2, 3); + ColumnVector expected = ColumnVector.fromBooleans(true, true, true, true, true, true, true, false, false, false, false); + ColumnVector result = BloomFilter.probe(bloomFilter, probe)) { AssertUtils.assertColumnsAreEqual(expected, result); } } } - @Test - void testBuildAndProbeBuffer(){ + @ParameterizedTest + @ValueSource(ints = {BloomFilter.VERSION_1, BloomFilter.VERSION_2}) + void testBuildAndProbeBuffer(int version) { int numHashes = 3; long bloomFilterBits = 4 * 1024 * 1024; try (ColumnVector input = ColumnVector.fromLongs(20, 80, 100, 99, 47, -9, 234000000); - Scalar bloomFilter = BloomFilter.create(numHashes, bloomFilterBits)){ - + Scalar bloomFilter = BloomFilter.create(version, numHashes, bloomFilterBits, 0)) { BloomFilter.put(bloomFilter, input); - - try(ColumnVector probe = ColumnVector.fromLongs(20, 80, 100, 99, 47, -9, 234000000, -10, 1, 2, 3); - ColumnVector expected = ColumnVector.fromBooleans(true, true, true, true, true, true, true, false, false, false, false); - ColumnVector result = BloomFilter.probe(bloomFilter.getListAsColumnView().getData(), probe)){ + try (ColumnVector probe = ColumnVector.fromLongs(20, 80, 100, 99, 47, -9, 234000000, -10, 1, 2, 3); + ColumnVector expected = ColumnVector.fromBooleans(true, true, true, true, true, true, true, false, false, false, false); + ColumnVector result = BloomFilter.probe(bloomFilter.getListAsColumnView().getData(), probe)) { AssertUtils.assertColumnsAreEqual(expected, result); } } } - @Test - void testBuildWithNullsAndProbe(){ + @ParameterizedTest + @ValueSource(ints = {BloomFilter.VERSION_1, BloomFilter.VERSION_2}) + void testBuildWithNullsAndProbe(int version) { int numHashes = 3; long bloomFilterBits = 4 * 1024 * 1024; try (ColumnVector input = ColumnVector.fromBoxedLongs(null, 80L, 100L, null, 47L, -9L, 234000000L); - Scalar bloomFilter = BloomFilter.create(numHashes, bloomFilterBits)){ - + Scalar bloomFilter = BloomFilter.create(version, numHashes, bloomFilterBits, 0)) { BloomFilter.put(bloomFilter, input); - try(ColumnVector probe = ColumnVector.fromLongs(20, 80, 100, 99, 47, -9, 234000000, -10, 1, 2, 3); - ColumnVector expected = ColumnVector.fromBooleans(false, true, true, false, true, true, true, false, false, false, false); - ColumnVector result = BloomFilter.probe(bloomFilter, probe)){ + try (ColumnVector probe = ColumnVector.fromLongs(20, 80, 100, 99, 47, -9, 234000000, -10, 1, 2, 3); + ColumnVector expected = ColumnVector.fromBooleans(false, true, true, false, true, true, true, false, false, false, false); + ColumnVector result = BloomFilter.probe(bloomFilter, probe)) { AssertUtils.assertColumnsAreEqual(expected, result); } } } - @Test - void testBuildAndProbeWithNulls(){ + @ParameterizedTest + @ValueSource(ints = {BloomFilter.VERSION_1, BloomFilter.VERSION_2}) + void testBuildAndProbeWithNulls(int version) { int numHashes = 3; long bloomFilterBits = 4 * 1024 * 1024; try (ColumnVector input = ColumnVector.fromLongs(20, 80, 100, 99, 47, -9, 234000000); - Scalar bloomFilter = BloomFilter.create(numHashes, bloomFilterBits)){ - + Scalar bloomFilter = BloomFilter.create(version, numHashes, bloomFilterBits, 0)) { BloomFilter.put(bloomFilter, input); - try(ColumnVector probe = ColumnVector.fromBoxedLongs(null, null, null, 99L, 47L, -9L, 234000000L, null, null, 2L, 3L); - ColumnVector expected = ColumnVector.fromBoxedBooleans(null, null, null, true, true, true, true, null, null, false, false); - ColumnVector result = BloomFilter.probe(bloomFilter, probe)){ + try (ColumnVector probe = ColumnVector.fromBoxedLongs(null, null, null, 99L, 47L, -9L, 234000000L, null, null, 2L, 3L); + ColumnVector expected = ColumnVector.fromBoxedBooleans(null, null, null, true, true, true, true, null, null, false, false); + ColumnVector result = BloomFilter.probe(bloomFilter, probe)) { AssertUtils.assertColumnsAreEqual(expected, result); } } } - - @Test - void testBuildMergeProbe(){ + + @ParameterizedTest + @ValueSource(ints = {BloomFilter.VERSION_1, BloomFilter.VERSION_2}) + void testBuildMergeProbe(int version) { int numHashes = 3; long bloomFilterBits = 4 * 1024 * 1024; try (ColumnVector colA = ColumnVector.fromLongs(20, 80, 100, 99, 47, -9, 234000000); ColumnVector colB = ColumnVector.fromLongs(100, 200, 300, 400); ColumnVector colC = ColumnVector.fromLongs(-100, -200, -300, -400); - Scalar bloomFilterA = BloomFilter.create(numHashes, bloomFilterBits); - Scalar bloomFilterB = BloomFilter.create(numHashes, bloomFilterBits); - Scalar bloomFilterC = BloomFilter.create(numHashes, bloomFilterBits)){ + Scalar bloomFilterA = BloomFilter.create(version, numHashes, bloomFilterBits, 0); + Scalar bloomFilterB = BloomFilter.create(version, numHashes, bloomFilterBits, 0); + Scalar bloomFilterC = BloomFilter.create(version, numHashes, bloomFilterBits, 0)) { BloomFilter.put(bloomFilterA, colA); BloomFilter.put(bloomFilterB, colB); @@ -119,8 +121,8 @@ void testBuildMergeProbe(){ 65535, 0, -100, -200, -300, -400); ColumnVector expected = ColumnVector.fromBooleans(true, true, true, false, false, true, false, false, true, true, true, true); - Scalar merged = BloomFilter.merge(premerge); - ColumnVector result = BloomFilter.probe(merged, probe)) { + Scalar merged = BloomFilter.merge(premerge); + ColumnVector result = BloomFilter.probe(merged, probe)) { AssertUtils.assertColumnsAreEqual(expected, result); } } @@ -128,33 +130,35 @@ void testBuildMergeProbe(){ } } - @Test - void testBuildTrivialMergeProbe(){ + @ParameterizedTest + @ValueSource(ints = {BloomFilter.VERSION_1, BloomFilter.VERSION_2}) + void testBuildTrivialMergeProbe(int version) { int numHashes = 3; long bloomFilterBits = 4 * 1024 * 1024; try (ColumnVector colA = ColumnVector.fromLongs(20, 80, 100, 99, 47, -9, 234000000); - Scalar bloomFilter = BloomFilter.create(numHashes, bloomFilterBits)){ + Scalar bloomFilter = BloomFilter.create(version, numHashes, bloomFilterBits, 0)) { BloomFilter.put(bloomFilter, colA); - try(ColumnVector premerge = ColumnVector.fromScalar(bloomFilter, 1); - ColumnVector probe = ColumnVector.fromLongs(-9, 200, 300, 6000, -2546, 99, 65535, 0, -100, -200, -300, -400); - ColumnVector expected = ColumnVector.fromBooleans(true, false, false, false, false, true, false, false, false, false, false, false); - Scalar merged = BloomFilter.merge(premerge); - ColumnVector result = BloomFilter.probe(merged, probe)){ - AssertUtils.assertColumnsAreEqual(expected, result); + try (ColumnVector premerge = ColumnVector.fromScalar(bloomFilter, 1); + ColumnVector probe = ColumnVector.fromLongs(-9, 200, 300, 6000, -2546, 99, 65535, 0, -100, -200, -300, -400); + ColumnVector expected = ColumnVector.fromBooleans(true, false, false, false, false, true, false, false, false, false, false, false); + Scalar merged = BloomFilter.merge(premerge); + ColumnVector result = BloomFilter.probe(merged, probe)) { + AssertUtils.assertColumnsAreEqual(expected, result); } } } @Test - void testBuildAndProbeV1() { + void testBuildAndProbeV2WithSeed() { int numHashes = 3; long bloomFilterBits = 4 * 1024 * 1024; + int seed = 42; try (ColumnVector input = ColumnVector.fromLongs(20, 80, 100, 99, 47, -9, 234000000); - Scalar bloomFilter = BloomFilter.create(BloomFilter.VERSION_1, numHashes, bloomFilterBits, 0)) { + Scalar bloomFilter = BloomFilter.create(BloomFilter.VERSION_2, numHashes, bloomFilterBits, seed)) { BloomFilter.put(bloomFilter, input); try (ColumnVector probe = ColumnVector.fromLongs(20, 80, 100, 99, 47, -9, 234000000, -10, 1, 2, 3); ColumnVector expected = ColumnVector.fromBooleans(true, true, true, true, true, true, true, false, false, false, false); @@ -164,45 +168,59 @@ void testBuildAndProbeV1() { } } - @Test - void testBuildExpectedFailures() { + @ParameterizedTest + @ValueSource(ints = {BloomFilter.VERSION_1, BloomFilter.VERSION_2}) + void testBuildExpectedFailures(int version) { // bloom filter with no hashes assertThrows(IllegalArgumentException.class, () -> { - try (Scalar bloomFilter = BloomFilter.create(BloomFilter.VERSION_1, 0, 64, 0)) {} + try (Scalar bloomFilter = BloomFilter.create(version, 0, 64, 0)) {} }); // bloom filter with no size assertThrows(IllegalArgumentException.class, () -> { - try (Scalar bloomFilter = BloomFilter.create(BloomFilter.VERSION_1, 3, 0, 0)) {} - }); - - // invalid version - assertThrows(IllegalArgumentException.class, () -> { - try (Scalar bloomFilter = BloomFilter.create(3, 3, 64, 0)) {} + try (Scalar bloomFilter = BloomFilter.create(version, 3, 0, 0)) {} }); // merge with mixed hash counts assertThrows(CudfException.class, () -> { - try (Scalar bloomFilterA = BloomFilter.create(BloomFilter.VERSION_1, 3, 1024, 0); - Scalar bloomFilterB = BloomFilter.create(BloomFilter.VERSION_1, 4, 1024, 0); - Scalar bloomFilterC = BloomFilter.create(BloomFilter.VERSION_1, 4, 1024, 0); + try (Scalar bloomFilterA = BloomFilter.create(version, 3, 1024, 0); + Scalar bloomFilterB = BloomFilter.create(version, 4, 1024, 0); + Scalar bloomFilterC = BloomFilter.create(version, 4, 1024, 0); ColumnVector bloomA = ColumnVector.fromScalar(bloomFilterA, 1); ColumnVector bloomB = ColumnVector.fromScalar(bloomFilterB, 1); ColumnVector bloomC = ColumnVector.fromScalar(bloomFilterC, 1); ColumnVector premerge = ColumnVector.concatenate(bloomA, bloomB, bloomC); - Scalar merged = BloomFilter.merge(premerge)){} + Scalar merged = BloomFilter.merge(premerge)) {} }); - // merge with mixed hash bit sizes + // merge with mixed bit sizes assertThrows(CudfException.class, () -> { - try (Scalar bloomFilterA = BloomFilter.create(BloomFilter.VERSION_1, 3, 1024, 0); - Scalar bloomFilterB = BloomFilter.create(BloomFilter.VERSION_1, 3, 1024, 0); - Scalar bloomFilterC = BloomFilter.create(BloomFilter.VERSION_1, 3, 2048, 0); + try (Scalar bloomFilterA = BloomFilter.create(version, 3, 1024, 0); + Scalar bloomFilterB = BloomFilter.create(version, 3, 1024, 0); + Scalar bloomFilterC = BloomFilter.create(version, 3, 2048, 0); ColumnVector bloomA = ColumnVector.fromScalar(bloomFilterA, 1); ColumnVector bloomB = ColumnVector.fromScalar(bloomFilterB, 1); ColumnVector bloomC = ColumnVector.fromScalar(bloomFilterC, 1); ColumnVector premerge = ColumnVector.concatenate(bloomA, bloomB, bloomC); - Scalar merged = BloomFilter.merge(premerge)){} + Scalar merged = BloomFilter.merge(premerge)) {} + }); + } + + @Test + void testBuildExpectedFailuresVersionIndependent() { + // invalid version + assertThrows(IllegalArgumentException.class, () -> { + try (Scalar bloomFilter = BloomFilter.create(3, 3, 64, 0)) {} + }); + + // merge with mixed versions (V1 + V2) + assertThrows(CudfException.class, () -> { + try (Scalar bloomFilterA = BloomFilter.create(BloomFilter.VERSION_1, 3, 1024, 0); + Scalar bloomFilterB = BloomFilter.create(BloomFilter.VERSION_2, 3, 1024, 0); + ColumnVector bloomA = ColumnVector.fromScalar(bloomFilterA, 1); + ColumnVector bloomB = ColumnVector.fromScalar(bloomFilterB, 1); + ColumnVector premerge = ColumnVector.concatenate(bloomA, bloomB); + Scalar merged = BloomFilter.merge(premerge)) {} }); } } From 2e63ec831f2f11c9ea2a68611b21fbb346531308 Mon Sep 17 00:00:00 2001 From: MithunR Date: Fri, 13 Mar 2026 11:20:48 -0700 Subject: [PATCH 06/29] Better overflow checking. Signed-off-by: MithunR --- src/main/cpp/src/bloom_filter.cu | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/main/cpp/src/bloom_filter.cu b/src/main/cpp/src/bloom_filter.cu index 747cd05f73..d441ff644c 100644 --- a/src/main/cpp/src/bloom_filter.cu +++ b/src/main/cpp/src/bloom_filter.cu @@ -274,12 +274,14 @@ struct bloom_filter_same { - second: total size in bytes of the serialized bloom filter buffer (header + bit array). Uses the version-specific header size. */ -std::pair get_bloom_filter_stride(int version, int bloom_filter_longs) +std::tuple get_bloom_filter_stride(int version, int bloom_filter_longs) { auto const bloom_filter_size = static_cast(bloom_filter_longs) * sizeof(int64_t); auto const hdr_size = bloom_filter_header_size_for_version(version); auto const buf_size = hdr_size + bloom_filter_size; - return {bloom_filter_size, buf_size}; + CUDF_EXPECTS(buf_size <= std::numeric_limits::max(), + "Bloom filter buffer size exceeds int32 range"); + return {static_cast(bloom_filter_size), static_cast(buf_size)}; } } // anonymous namespace @@ -333,7 +335,7 @@ void bloom_filter_put(cudf::list_scalar& bloom_filter, auto [header, buffer, bloom_filter_bits, seed] = unpack_bloom_filter(bloom_filter.view(), stream); auto const hdr_size = bloom_filter_header_size_for_version(header.version); CUDF_EXPECTS( - bloom_filter.view().size() == static_cast((buffer.size() * 4) + hdr_size), + static_cast(bloom_filter.view().size()) == (buffer.size() * 4) + hdr_size, "Encountered invalid/mismatched bloom filter buffer data"); constexpr int block_size = 256; @@ -400,7 +402,7 @@ std::unique_ptr bloom_filter_merge(cudf::column_view const& b bloom_filter_same{{raw_hdr[0], raw_hdr[1], raw_hdr[2], raw_hdr[3]}, header_field_count, *dv, - static_cast(buf_size)}), + buf_size}), "Mismatch of bloom filter parameters"); rmm::device_buffer buf{static_cast(buf_size), stream, mr}; From d83b2c9ede5ade5693f9d30d149014ed666188b6 Mon Sep 17 00:00:00 2001 From: MithunR Date: Fri, 13 Mar 2026 11:35:21 -0700 Subject: [PATCH 07/29] Formatting. Signed-off-by: MithunR --- src/main/cpp/src/bloom_filter.cu | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/src/main/cpp/src/bloom_filter.cu b/src/main/cpp/src/bloom_filter.cu index d441ff644c..9f008b8e26 100644 --- a/src/main/cpp/src/bloom_filter.cu +++ b/src/main/cpp/src/bloom_filter.cu @@ -334,9 +334,8 @@ void bloom_filter_put(cudf::list_scalar& bloom_filter, { auto [header, buffer, bloom_filter_bits, seed] = unpack_bloom_filter(bloom_filter.view(), stream); auto const hdr_size = bloom_filter_header_size_for_version(header.version); - CUDF_EXPECTS( - static_cast(bloom_filter.view().size()) == (buffer.size() * 4) + hdr_size, - "Encountered invalid/mismatched bloom filter buffer data"); + CUDF_EXPECTS(static_cast(bloom_filter.view().size()) == (buffer.size() * 4) + hdr_size, + "Encountered invalid/mismatched bloom filter buffer data"); constexpr int block_size = 256; auto grid = cudf::detail::grid_1d{input.size(), block_size, 1}; @@ -396,14 +395,14 @@ std::unique_ptr bloom_filter_merge(cudf::column_view const& b } auto dv = cudf::column_device_view::create(bloom_filters); - CUDF_EXPECTS(thrust::all_of(rmm::exec_policy(cudf::get_default_stream()), - thrust::make_counting_iterator(1), - thrust::make_counting_iterator(bloom_filters.size()), - bloom_filter_same{{raw_hdr[0], raw_hdr[1], raw_hdr[2], raw_hdr[3]}, - header_field_count, - *dv, - buf_size}), - "Mismatch of bloom filter parameters"); + CUDF_EXPECTS( + thrust::all_of( + rmm::exec_policy(cudf::get_default_stream()), + thrust::make_counting_iterator(1), + thrust::make_counting_iterator(bloom_filters.size()), + bloom_filter_same{ + {raw_hdr[0], raw_hdr[1], raw_hdr[2], raw_hdr[3]}, header_field_count, *dv, buf_size}), + "Mismatch of bloom filter parameters"); rmm::device_buffer buf{static_cast(buf_size), stream, mr}; pack_bloom_filter_header( From 095dd88e5ae95a8f5cf753650e39951d6619f7ef Mon Sep 17 00:00:00 2001 From: MithunR Date: Fri, 13 Mar 2026 14:30:50 -0700 Subject: [PATCH 08/29] Review: Better int width checks. Plus, added deprecated create() function back. Signed-off-by: MithunR --- src/main/cpp/src/bloom_filter.cu | 7 ++++--- .../com/nvidia/spark/rapids/jni/BloomFilter.java | 15 +++++++++++++++ 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/src/main/cpp/src/bloom_filter.cu b/src/main/cpp/src/bloom_filter.cu index 9f008b8e26..ec71a2a90d 100644 --- a/src/main/cpp/src/bloom_filter.cu +++ b/src/main/cpp/src/bloom_filter.cu @@ -373,9 +373,10 @@ std::unique_ptr bloom_filter_merge(cudf::column_view const& b // seed; we use these to validate total size and to build the merged output. auto [header, buffer, bloom_filter_bits, seed] = unpack_bloom_filter(lcv.child(), stream); auto const hdr_size = bloom_filter_header_size_for_version(header.version); - CUDF_EXPECTS(lcv.child().size() == static_cast(((buffer.size() * 4) + hdr_size) * - bloom_filters.size()), - "Encountered invalid/mismatched bloom filter buffer data"); + CUDF_EXPECTS( + static_cast(lcv.child().size()) == (buffer.size() * 4 + static_cast(hdr_size)) * + static_cast(bloom_filters.size()), + "Encountered invalid/mismatched bloom filter buffer data"); auto [bloom_filter_size, buf_size] = get_bloom_filter_stride(header.version, header.num_longs); diff --git a/src/main/java/com/nvidia/spark/rapids/jni/BloomFilter.java b/src/main/java/com/nvidia/spark/rapids/jni/BloomFilter.java index 621c264f18..926a80705b 100644 --- a/src/main/java/com/nvidia/spark/rapids/jni/BloomFilter.java +++ b/src/main/java/com/nvidia/spark/rapids/jni/BloomFilter.java @@ -32,6 +32,21 @@ public class BloomFilter { NativeDepsLoader.loadNativeDeps(); } + /** + * Create a V1 bloom filter with the specified number of hashes and bloom filter bits, + * using the default seed. + * + * @deprecated Use {@link #create(int, int, long, int)} instead. + * @param numHashes Number of bit positions set (and checked) per key. + * @param bloomFilterBits Total size of the bloom filter in bits (will be rounded up to a + * multiple of 64). + * @return A Scalar wrapping the GPU bloom filter (Spark format). + */ + @Deprecated + public static Scalar create(int numHashes, long bloomFilterBits) { + return create(VERSION_1, numHashes, bloomFilterBits, DEFAULT_SEED); + } + /** * Create a bloom filter with the specified version, number of hashes, bloom filter bits, * and hash seed. From be2a1202afb337cee91396671e1195b18eaea710 Mon Sep 17 00:00:00 2001 From: MithunR Date: Fri, 13 Mar 2026 16:48:31 -0700 Subject: [PATCH 09/29] Better overflow checking. Documented impedance mismatch. Signed-off-by: MithunR --- src/main/cpp/src/BloomFilterJni.cpp | 20 +++++++++++++------- src/main/cpp/src/bloom_filter.cu | 2 ++ 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/src/main/cpp/src/BloomFilterJni.cpp b/src/main/cpp/src/BloomFilterJni.cpp index 8c250ec116..9a721ef1b0 100644 --- a/src/main/cpp/src/BloomFilterJni.cpp +++ b/src/main/cpp/src/BloomFilterJni.cpp @@ -31,13 +31,19 @@ JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_BloomFilter_creategpu( { cudf::jni::auto_set_device(env); - jlong bloom_filter_longs_long = (bloomFilterBits + 63) / 64; - JNI_ARG_CHECK( - env, - bloom_filter_longs_long >= 0 && bloom_filter_longs_long <= std::numeric_limits::max(), - "bloom filter bit count overflows int when converted to longs", - 0); - int bloom_filter_longs = static_cast(bloom_filter_longs_long); + // TODO (future): There is an impedance mismatch between the C++ and Java APIs. + // This seems to have been introduced in https://github.com/NVIDIA/spark-rapids-jni/pull/1303. + // The Java API accepts a long for the bloom filter bit count, but the C++ API accepts an int. + // This means that the Java API can represent a bloom filter bit count that is too large to + // be represented as an int in the C++ API. + // We should fix this by changing the C++ API to accept a long for the bloom filter bit count. + // We will address this in a future PR. For now, we add error checking to avoid overflow. + JNI_ARG_CHECK(env, + bloomFilterBits >= 0 && bloomFilterBits <= std::numeric_limits::max() - 63, + "bloom filter bit count overflows int when converted to longs", + 0); + auto const bloom_filter_longs_long = (bloomFilterBits + 63) / 64; + auto const bloom_filter_longs = static_cast(bloom_filter_longs_long); auto bloom_filter = spark_rapids_jni::bloom_filter_create(version, numHashes, bloom_filter_longs, seed); return reinterpret_cast(bloom_filter.release()); diff --git a/src/main/cpp/src/bloom_filter.cu b/src/main/cpp/src/bloom_filter.cu index ec71a2a90d..88eeaa5ff8 100644 --- a/src/main/cpp/src/bloom_filter.cu +++ b/src/main/cpp/src/bloom_filter.cu @@ -306,6 +306,8 @@ std::unique_ptr bloom_filter_create(int version, CUDF_EXPECTS(version == bloom_filter_version_1 || version == bloom_filter_version_2, "Bloom filter version must be 1 or 2"); + CUDF_EXPECTS(bloom_filter_longs >= 0, "Bloom filter bit count must be non-negative"); + auto [bloom_filter_size, buf_size] = get_bloom_filter_stride(version, bloom_filter_longs); auto const hdr_size = bloom_filter_header_size_for_version(version); From bd90bf401bba5da7af0461ef3d303fd1c6956722 Mon Sep 17 00:00:00 2001 From: MithunR Date: Fri, 13 Mar 2026 20:41:07 -0700 Subject: [PATCH 10/29] Corrected truncation. More error checks. Signed-off-by: MithunR --- src/main/cpp/src/bloom_filter.cu | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/src/main/cpp/src/bloom_filter.cu b/src/main/cpp/src/bloom_filter.cu index 88eeaa5ff8..9ae7cb11fb 100644 --- a/src/main/cpp/src/bloom_filter.cu +++ b/src/main/cpp/src/bloom_filter.cu @@ -90,8 +90,8 @@ CUDF_KERNEL void gpu_bloom_filter_put(cudf::bitmask_type* const bloom_filter, for (auto idx = 1; idx <= num_hashes; idx++) { bloom_hash_type combined_hash = h1 + (idx * h2); auto const bit_pos = - static_cast((combined_hash < 0 ? ~combined_hash : combined_hash) % - static_cast(bloom_filter_bits)); + static_cast(combined_hash < 0 ? ~combined_hash : combined_hash) % + bloom_filter_bits; auto const [word_index, mask] = gpu_bit_to_word_mask(bit_pos); atomicOr(bloom_filter + word_index, mask); } @@ -126,8 +126,8 @@ struct bloom_probe_functor { for (auto idx = 1; idx <= num_hashes; idx++) { bloom_hash_type combined_hash = h1 + (idx * h2); auto const bit_pos = - static_cast((combined_hash < 0 ? ~combined_hash : combined_hash) % - static_cast(bloom_filter_bits)); + static_cast(combined_hash < 0 ? ~combined_hash : combined_hash) % + bloom_filter_bits; auto const [word_index, mask] = gpu_bit_to_word_mask(bit_pos); if (!(bloom_filter[word_index] & mask)) { return false; } } @@ -349,17 +349,19 @@ void bloom_filter_put(cudf::list_scalar& bloom_filter, buffer.data(), bloom_filter_bits, *d_input, header.num_hashes, seed); }; - if (header.version == bloom_filter_version_2) { + if (header.version == bloom_filter_version_1) { + CUDF_EXPECTS(bloom_filter_bits <= std::numeric_limits::max(), + "V1 bloom filter bit count exceeds int32 range"); if (input.has_nulls()) { - launch(std::integral_constant{}, std::true_type{}); + launch(std::integral_constant{}, std::true_type{}); } else { - launch(std::integral_constant{}, std::false_type{}); + launch(std::integral_constant{}, std::false_type{}); } } else { if (input.has_nulls()) { - launch(std::integral_constant{}, std::true_type{}); + launch(std::integral_constant{}, std::true_type{}); } else { - launch(std::integral_constant{}, std::false_type{}); + launch(std::integral_constant{}, std::false_type{}); } } } @@ -457,20 +459,22 @@ std::unique_ptr bloom_filter_probe(cudf::column_view const& input, stream, mr); - if (header.version == bloom_filter_version_2) { + if (header.version == bloom_filter_version_1) { + CUDF_EXPECTS(bloom_filter_bits <= std::numeric_limits::max(), + "V1 bloom filter bit count exceeds int32 range"); thrust::transform( rmm::exec_policy(stream), input.begin(), input.end(), out->mutable_view().begin(), - bloom_probe_functor<2>{buffer.data(), bloom_filter_bits, header.num_hashes, seed}); + bloom_probe_functor<1>{buffer.data(), bloom_filter_bits, header.num_hashes, seed}); } else { thrust::transform( rmm::exec_policy(stream), input.begin(), input.end(), out->mutable_view().begin(), - bloom_probe_functor<1>{buffer.data(), bloom_filter_bits, header.num_hashes, seed}); + bloom_probe_functor<2>{buffer.data(), bloom_filter_bits, header.num_hashes, seed}); } return out; From 2037b0e4695b9a4e546d18e43208be7f66bd05c2 Mon Sep 17 00:00:00 2001 From: MithunR Date: Fri, 13 Mar 2026 23:28:06 -0700 Subject: [PATCH 11/29] Check that bloom filter bit count is positive. Signed-off-by: MithunR --- src/main/cpp/src/bloom_filter.cu | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/cpp/src/bloom_filter.cu b/src/main/cpp/src/bloom_filter.cu index 9ae7cb11fb..4f7a2e1483 100644 --- a/src/main/cpp/src/bloom_filter.cu +++ b/src/main/cpp/src/bloom_filter.cu @@ -306,7 +306,7 @@ std::unique_ptr bloom_filter_create(int version, CUDF_EXPECTS(version == bloom_filter_version_1 || version == bloom_filter_version_2, "Bloom filter version must be 1 or 2"); - CUDF_EXPECTS(bloom_filter_longs >= 0, "Bloom filter bit count must be non-negative"); + CUDF_EXPECTS(bloom_filter_longs > 0, "Bloom filter bit count must be positive"); auto [bloom_filter_size, buf_size] = get_bloom_filter_stride(version, bloom_filter_longs); auto const hdr_size = bloom_filter_header_size_for_version(version); From 3e74747f87a8b9fb3d36270bb2e015f3e6fb9aa3 Mon Sep 17 00:00:00 2001 From: MithunR Date: Thu, 19 Mar 2026 11:11:08 -0700 Subject: [PATCH 12/29] Review: Consolidated commonality. Signed-off-by: MithunR --- src/main/cpp/benchmarks/bloom_filter.cu | 47 +++++++------------------ 1 file changed, 13 insertions(+), 34 deletions(-) diff --git a/src/main/cpp/benchmarks/bloom_filter.cu b/src/main/cpp/benchmarks/bloom_filter.cu index 1622c5668a..2b06b702a5 100644 --- a/src/main/cpp/benchmarks/bloom_filter.cu +++ b/src/main/cpp/benchmarks/bloom_filter.cu @@ -22,15 +22,17 @@ #include #include -static void bloom_filter_put_v1(nvbench::state& state) +namespace { + +void bloom_filter_put_impl(nvbench::state& state, int version) { constexpr int num_rows = 150'000'000; constexpr int num_hashes = 3; cudf::size_type const bloom_filter_bytes = state.get_int64("bloom_filter_bytes"); cudf::size_type const bloom_filter_longs = bloom_filter_bytes / sizeof(int64_t); - auto bloom_filter = spark_rapids_jni::bloom_filter_create( - spark_rapids_jni::bloom_filter_version_1, num_hashes, bloom_filter_longs); + auto bloom_filter = + spark_rapids_jni::bloom_filter_create(version, num_hashes, bloom_filter_longs); data_profile_builder builder; builder.no_validity(); @@ -56,41 +58,18 @@ static void bloom_filter_put_v1(nvbench::state& state) state.add_element_count(static_cast(bytes_written) / time, "Write bytes/sec"); } -static void bloom_filter_put_v2(nvbench::state& state) +void bloom_filter_put_v1(nvbench::state& state) { - constexpr int num_rows = 150'000'000; - constexpr int num_hashes = 3; - - // create the bloom filter - cudf::size_type const bloom_filter_bytes = state.get_int64("bloom_filter_bytes"); - cudf::size_type const bloom_filter_longs = bloom_filter_bytes / sizeof(int64_t); - auto bloom_filter = spark_rapids_jni::bloom_filter_create( - spark_rapids_jni::bloom_filter_version_2, num_hashes, bloom_filter_longs); - - data_profile_builder builder; - builder.no_validity(); - auto const src = create_random_table({{cudf::type_id::INT64}}, row_count{num_rows}, builder); - auto const input = spark_rapids_jni::xxhash64(*src); - - auto const stream = cudf::get_default_stream(); - state.set_cuda_stream(nvbench::make_cuda_stream_view(stream.value())); - state.exec(nvbench::exec_tag::timer | nvbench::exec_tag::sync, - [&](nvbench::launch& launch, auto& timer) { - timer.start(); - spark_rapids_jni::bloom_filter_put(*bloom_filter, *input); - stream.synchronize(); - timer.stop(); - }); + bloom_filter_put_impl(state, spark_rapids_jni::bloom_filter_version_1); +} - size_t const bytes_read = num_rows * sizeof(int64_t); - size_t const bytes_written = num_rows * sizeof(cudf::bitmask_type) * num_hashes; - auto const time = state.get_summary("nv/cold/time/gpu/mean").get_float64("value"); - state.add_element_count(std::size_t{num_rows}, "Rows Inserted"); - state.add_global_memory_reads(bytes_read, "Bytes read"); - state.add_global_memory_writes(bytes_written, "Bytes written"); - state.add_element_count(static_cast(bytes_written) / time, "Write bytes/sec"); +void bloom_filter_put_v2(nvbench::state& state) +{ + bloom_filter_put_impl(state, spark_rapids_jni::bloom_filter_version_2); } +} // namespace + NVBENCH_BENCH(bloom_filter_put_v1) .set_name("Bloom Filter Put V1") .add_int64_axis("bloom_filter_bytes", From edc63a93501013985945fee89fe7b42489d4bd85 Mon Sep 17 00:00:00 2001 From: MithunR Date: Thu, 19 Mar 2026 11:57:20 -0700 Subject: [PATCH 13/29] Review: Removed unused parameter name. Signed-off-by: MithunR --- src/main/cpp/benchmarks/bloom_filter.cu | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/cpp/benchmarks/bloom_filter.cu b/src/main/cpp/benchmarks/bloom_filter.cu index 2b06b702a5..cae6b8707d 100644 --- a/src/main/cpp/benchmarks/bloom_filter.cu +++ b/src/main/cpp/benchmarks/bloom_filter.cu @@ -42,7 +42,7 @@ void bloom_filter_put_impl(nvbench::state& state, int version) auto const stream = cudf::get_default_stream(); state.set_cuda_stream(nvbench::make_cuda_stream_view(stream.value())); state.exec(nvbench::exec_tag::timer | nvbench::exec_tag::sync, - [&](nvbench::launch& launch, auto& timer) { + [&](nvbench::launch&, auto& timer) { timer.start(); spark_rapids_jni::bloom_filter_put(*bloom_filter, *input); stream.synchronize(); From 62920482c87792b6bca1321c065ac98f4a614beb Mon Sep 17 00:00:00 2001 From: MithunR Date: Thu, 19 Mar 2026 13:35:19 -0700 Subject: [PATCH 14/29] Review: cuda::std::. Signed-off-by: MithunR --- src/main/cpp/src/bloom_filter.cu | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/main/cpp/src/bloom_filter.cu b/src/main/cpp/src/bloom_filter.cu index d70f2e5526..e4a935f87c 100644 --- a/src/main/cpp/src/bloom_filter.cu +++ b/src/main/cpp/src/bloom_filter.cu @@ -33,6 +33,7 @@ #include #include +#include #include #include @@ -53,7 +54,7 @@ inline int32_t byte_swap_int32(int32_t val) // Given a non-negative bit position within the bloom filter, compute // the 32-bit word index and bitmask. Handles big-endian swizzle so // the GPU buffer is directly compatible with Spark's serialized format. -__device__ inline std::pair gpu_bit_to_word_mask(int64_t bit_pos) +__device__ inline cuda::std::pair gpu_bit_to_word_mask(int64_t bit_pos) { auto const word_index = (bit_pos / 32) ^ 0x1; auto const bit_index = static_cast(bit_pos % 32) ^ 0x18; @@ -97,7 +98,7 @@ CUDF_KERNEL void gpu_bloom_filter_put(cudf::bitmask_type* const bloom_filter, } else { // https://github.com/apache/spark/blob/5075ea6a85f3f1689766cf08a7d5b2ce500be1fb/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImplV2.java#L63 int64_t combined_hash = - static_cast(h1) * static_cast(std::numeric_limits::max()); + static_cast(h1) * static_cast(cuda::std::numeric_limits::max()); for (int idx = 0; idx < num_hashes; idx++) { combined_hash += h2; int64_t combined_index = combined_hash < 0 ? ~combined_hash : combined_hash; @@ -132,7 +133,7 @@ struct bloom_probe_functor { } } else { int64_t combined_hash = - static_cast(h1) * static_cast(std::numeric_limits::max()); + static_cast(h1) * static_cast(cuda::std::numeric_limits::max()); for (int idx = 0; idx < num_hashes; idx++) { combined_hash += h2; int64_t combined_index = combined_hash < 0 ? ~combined_hash : combined_hash; From a773edd5987687d470614581b7e64adaec365c93 Mon Sep 17 00:00:00 2001 From: MithunR Date: Thu, 19 Mar 2026 14:21:59 -0700 Subject: [PATCH 15/29] Review: CUDA_TRY. Signed-off-by: MithunR --- src/main/cpp/src/bloom_filter.cu | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/main/cpp/src/bloom_filter.cu b/src/main/cpp/src/bloom_filter.cu index e4a935f87c..ecdd05b6dd 100644 --- a/src/main/cpp/src/bloom_filter.cu +++ b/src/main/cpp/src/bloom_filter.cu @@ -27,6 +27,7 @@ #include #include #include +#include #include #include @@ -155,15 +156,15 @@ void pack_bloom_filter_header(cudf::device_span buf, bloom_filter_header_v1 raw = {byte_swap_int32(header.version), byte_swap_int32(header.num_hashes), byte_swap_int32(header.num_longs)}; - cudaMemcpyAsync( - buf.data(), &raw, bloom_filter_header_v1_size_bytes, cudaMemcpyHostToDevice, stream); + CUDF_CUDA_TRY(cudaMemcpyAsync( + buf.data(), &raw, bloom_filter_header_v1_size_bytes, cudaMemcpyHostToDevice, stream)); } else { bloom_filter_header_v2 raw = {byte_swap_int32(header.version), byte_swap_int32(header.num_hashes), byte_swap_int32(seed), byte_swap_int32(header.num_longs)}; - cudaMemcpyAsync( - buf.data(), &raw, bloom_filter_header_v2_size_bytes, cudaMemcpyHostToDevice, stream); + CUDF_CUDA_TRY(cudaMemcpyAsync( + buf.data(), &raw, bloom_filter_header_v2_size_bytes, cudaMemcpyHostToDevice, stream)); } } @@ -190,7 +191,9 @@ unpack_bloom_filter(cudf::device_span bloom_filter, rmm::cuda_stream_vi int32_t raw_ints[4] = {}; auto const read_size = std::min(bloom_filter.size(), static_cast(bloom_filter_header_v2_size_bytes)); - cudaMemcpyAsync(raw_ints, bloom_filter.data(), read_size, cudaMemcpyDeviceToHost, stream); + + CUDF_CUDA_TRY( + cudaMemcpyAsync(raw_ints, bloom_filter.data(), read_size, cudaMemcpyDeviceToHost, stream)); stream.synchronize(); int const version = byte_swap_int32(raw_ints[0]); From 110fc0ec294dbe8791e1b721835fac52e88a343b Mon Sep 17 00:00:00 2001 From: MithunR Date: Thu, 19 Mar 2026 14:36:19 -0700 Subject: [PATCH 16/29] Review: exec_policy_nosync. Signed-off-by: MithunR --- src/main/cpp/src/bloom_filter.cu | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/cpp/src/bloom_filter.cu b/src/main/cpp/src/bloom_filter.cu index ecdd05b6dd..d0c0b9b3dd 100644 --- a/src/main/cpp/src/bloom_filter.cu +++ b/src/main/cpp/src/bloom_filter.cu @@ -405,7 +405,7 @@ std::unique_ptr bloom_filter_merge(cudf::column_view const& b auto dv = cudf::column_device_view::create(bloom_filters); CUDF_EXPECTS( thrust::all_of( - rmm::exec_policy(cudf::get_default_stream()), + rmm::exec_policy_nosync(stream), thrust::make_counting_iterator(1), thrust::make_counting_iterator(bloom_filters.size()), bloom_filter_same{ @@ -422,7 +422,7 @@ std::unique_ptr bloom_filter_merge(cudf::column_view const& b cudf::size_type num_words = header.num_longs * 2; thrust::transform( - rmm::exec_policy(stream), + rmm::exec_policy_nosync(stream), thrust::make_counting_iterator(0), thrust::make_counting_iterator(0) + num_words, dst, @@ -466,14 +466,14 @@ std::unique_ptr bloom_filter_probe(cudf::column_view const& input, CUDF_EXPECTS(bloom_filter_bits <= std::numeric_limits::max(), "V1 bloom filter bit count exceeds int32 range"); thrust::transform( - rmm::exec_policy(stream), + rmm::exec_policy_nosync(stream), input.begin(), input.end(), out->mutable_view().begin(), bloom_probe_functor<1>{buffer.data(), bloom_filter_bits, header.num_hashes, seed}); } else { thrust::transform( - rmm::exec_policy(stream), + rmm::exec_policy_nosync(stream), input.begin(), input.end(), out->mutable_view().begin(), From d48c048341d41e07316964ce068627489e714c4e Mon Sep 17 00:00:00 2001 From: MithunR Date: Thu, 19 Mar 2026 14:52:06 -0700 Subject: [PATCH 17/29] Review: atomic_refs. Signed-off-by: MithunR --- src/main/cpp/src/bloom_filter.cu | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/main/cpp/src/bloom_filter.cu b/src/main/cpp/src/bloom_filter.cu index d0c0b9b3dd..bfa12e6652 100644 --- a/src/main/cpp/src/bloom_filter.cu +++ b/src/main/cpp/src/bloom_filter.cu @@ -33,6 +33,7 @@ #include #include +#include #include #include #include @@ -94,7 +95,8 @@ CUDF_KERNEL void gpu_bloom_filter_put(cudf::bitmask_type* const bloom_filter, static_cast(combined_hash < 0 ? ~combined_hash : combined_hash) % bloom_filter_bits; auto const [word_index, mask] = gpu_bit_to_word_mask(bit_pos); - atomicOr(bloom_filter + word_index, mask); + cuda::atomic_ref ref(bloom_filter[word_index]); + ref.fetch_or(mask, cuda::memory_order_relaxed); } } else { // https://github.com/apache/spark/blob/5075ea6a85f3f1689766cf08a7d5b2ce500be1fb/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImplV2.java#L63 @@ -105,7 +107,8 @@ CUDF_KERNEL void gpu_bloom_filter_put(cudf::bitmask_type* const bloom_filter, int64_t combined_index = combined_hash < 0 ? ~combined_hash : combined_hash; auto const bit_pos = combined_index % bloom_filter_bits; auto const [word_index, mask] = gpu_bit_to_word_mask(bit_pos); - atomicOr(bloom_filter + word_index, mask); + cuda::atomic_ref ref(bloom_filter[word_index]); + ref.fetch_or(mask, cuda::memory_order_relaxed); } } } From 5eb90eade98918ecf3be9ff9bee640c3197df6cd Mon Sep 17 00:00:00 2001 From: MithunR Date: Thu, 19 Mar 2026 16:04:54 -0700 Subject: [PATCH 18/29] Review: Test for all absent. Signed-off-by: MithunR --- src/main/cpp/tests/bloom_filter.cu | 38 ++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/src/main/cpp/tests/bloom_filter.cu b/src/main/cpp/tests/bloom_filter.cu index 26bdc3f17f..5f0f164c3b 100644 --- a/src/main/cpp/tests/bloom_filter.cu +++ b/src/main/cpp/tests/bloom_filter.cu @@ -179,6 +179,25 @@ TEST_F(BloomFilterTest, ProbeMergedV1) CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, *result); } +TEST_F(BloomFilterTest, ProbeAllAbsentV1) +{ + auto stream = cudf::get_default_stream(); + constexpr int bloom_filter_longs = (1024 * 1024); + constexpr int num_hashes = 3; + + auto bloom_filter = spark_rapids_jni::bloom_filter_create( + spark_rapids_jni::bloom_filter_version_1, num_hashes, bloom_filter_longs, 0, stream); + + cudf::test::fixed_width_column_wrapper input{20, 80, 100, 99, 47, -9, 234000000}; + spark_rapids_jni::bloom_filter_put(*bloom_filter, input, stream); + + cudf::test::fixed_width_column_wrapper probe{-10, 1, 2, 3, 5000, 999999, -77777}; + cudf::test::fixed_width_column_wrapper expected{0, 0, 0, 0, 0, 0, 0}; + auto result = spark_rapids_jni::bloom_filter_probe(probe, *bloom_filter, stream); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, *result); +} + // ======================== V2 Tests ======================== TEST_F(BloomFilterTest, InitializationV2) @@ -289,6 +308,25 @@ TEST_F(BloomFilterTest, ProbeMergedV2) CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_all, *result); } +TEST_F(BloomFilterTest, ProbeAllAbsentV2) +{ + auto stream = cudf::get_default_stream(); + constexpr int bloom_filter_longs = (1024 * 1024); + constexpr int num_hashes = 3; + + auto bloom_filter = spark_rapids_jni::bloom_filter_create( + spark_rapids_jni::bloom_filter_version_2, num_hashes, bloom_filter_longs, 0, stream); + + cudf::test::fixed_width_column_wrapper input{20, 80, 100, 99, 47, -9, 234000000}; + spark_rapids_jni::bloom_filter_put(*bloom_filter, input, stream); + + cudf::test::fixed_width_column_wrapper probe{-10, 1, 2, 3, 5000, 999999, -77777}; + cudf::test::fixed_width_column_wrapper expected{0, 0, 0, 0, 0, 0, 0}; + auto result = spark_rapids_jni::bloom_filter_probe(probe, *bloom_filter, stream); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, *result); +} + TEST_F(BloomFilterTest, V2WithSeed) { auto stream = cudf::get_default_stream(); From dc224c97907e913934a8fad8b5661583a3e610cc Mon Sep 17 00:00:00 2001 From: MithunR Date: Thu, 19 Mar 2026 16:39:18 -0700 Subject: [PATCH 19/29] Review: Removed tautological check. Signed-off-by: MithunR --- src/main/cpp/src/bloom_filter.cu | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/cpp/src/bloom_filter.cu b/src/main/cpp/src/bloom_filter.cu index bfa12e6652..7e98916184 100644 --- a/src/main/cpp/src/bloom_filter.cu +++ b/src/main/cpp/src/bloom_filter.cu @@ -224,8 +224,6 @@ unpack_bloom_filter(cudf::device_span bloom_filter, rmm::cuda_stream_vi auto const num_bitmask_words = static_cast(header.num_longs) * 2; CUDF_EXPECTS(bloom_filter_bits > 0, "Invalid empty bloom filter size"); - CUDF_EXPECTS(num_bitmask_words == cudf::num_bitmask_words(bloom_filter_bits), - "Bloom filter bit/length mismatch"); return { header, From 5d7d5e18841e006ef1deaf0226370cc77f642b81 Mon Sep 17 00:00:00 2001 From: MithunR Date: Fri, 20 Mar 2026 10:27:23 -0700 Subject: [PATCH 20/29] Review: Fixed range checks for bit counts. Signed-off-by: MithunR --- src/main/cpp/src/BloomFilterJni.cpp | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/main/cpp/src/BloomFilterJni.cpp b/src/main/cpp/src/BloomFilterJni.cpp index 9a721ef1b0..d88a92c93a 100644 --- a/src/main/cpp/src/BloomFilterJni.cpp +++ b/src/main/cpp/src/BloomFilterJni.cpp @@ -31,19 +31,19 @@ JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_BloomFilter_creategpu( { cudf::jni::auto_set_device(env); - // TODO (future): There is an impedance mismatch between the C++ and Java APIs. - // This seems to have been introduced in https://github.com/NVIDIA/spark-rapids-jni/pull/1303. - // The Java API accepts a long for the bloom filter bit count, but the C++ API accepts an int. - // This means that the Java API can represent a bloom filter bit count that is too large to - // be represented as an int in the C++ API. - // We should fix this by changing the C++ API to accept a long for the bloom filter bit count. - // We will address this in a future PR. For now, we add error checking to avoid overflow. - JNI_ARG_CHECK(env, - bloomFilterBits >= 0 && bloomFilterBits <= std::numeric_limits::max() - 63, - "bloom filter bit count overflows int when converted to longs", - 0); - auto const bloom_filter_longs_long = (bloomFilterBits + 63) / 64; - auto const bloom_filter_longs = static_cast(bloom_filter_longs_long); + // Per the Spark implementation, according to the BitArray class, + // https://github.com/apache/spark/blob/5075ea6a85f3f1689766cf08a7d5b2ce500be1fb/common/sketch/src/main/java/org/apache/spark/util/sketch/BitArray.java#L34 + // the number of longs representing the bit array can only be Integer.MAX_VALUE, at the most. + // (This is presumably because the BitArray is indexed with an int32_t.) + // This implies that the maximum supported bloom filter bit count is Integer.MAX_VALUE * 64. + + JNI_ARG_CHECK( + env, + bloomFilterBits >= 0 && + bloomFilterBits <= static_cast(std::numeric_limits::max()) * 64, + "bloom filter bit count exceeds maximum supported size", + 0); + auto const bloom_filter_longs = static_cast((bloomFilterBits + 63) / 64); auto bloom_filter = spark_rapids_jni::bloom_filter_create(version, numHashes, bloom_filter_longs, seed); return reinterpret_cast(bloom_filter.release()); From 27e338897124fe52aeba0034384d8aba8d5da085 Mon Sep 17 00:00:00 2001 From: MithunR Date: Fri, 20 Mar 2026 20:33:55 -0700 Subject: [PATCH 21/29] Review: Use stream, mr. Fix stale comment. Signed-off-by: MithunR --- src/main/cpp/src/bloom_filter.cu | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/cpp/src/bloom_filter.cu b/src/main/cpp/src/bloom_filter.cu index 7e98916184..4d17a2a634 100644 --- a/src/main/cpp/src/bloom_filter.cu +++ b/src/main/cpp/src/bloom_filter.cu @@ -273,7 +273,7 @@ struct bloom_filter_same { }; /* - Returns a pair indicating: + Returns a std::tuple indicating: - first: size in bytes of the bloom filter bit array (num_longs * 8). - second: total size in bytes of the serialized bloom filter buffer (header + bit array). Uses the version-specific header size. @@ -458,7 +458,7 @@ std::unique_ptr bloom_filter_probe(cudf::column_view const& input, auto out = cudf::make_fixed_width_column(cudf::data_type{cudf::type_id::BOOL8}, input.size(), - cudf::copy_bitmask(input), + cudf::copy_bitmask(input, stream, mr), input.null_count(), stream, mr); From 317e800e0909ccef4f759e9741e00ecf81797e24 Mon Sep 17 00:00:00 2001 From: MithunR Date: Fri, 20 Mar 2026 20:39:08 -0700 Subject: [PATCH 22/29] Review: Reduce const-casts. Signed-off-by: MithunR --- src/main/cpp/src/bloom_filter.cu | 38 ++++++++++++++------------------ 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/src/main/cpp/src/bloom_filter.cu b/src/main/cpp/src/bloom_filter.cu index 4d17a2a634..e5c5b1269e 100644 --- a/src/main/cpp/src/bloom_filter.cu +++ b/src/main/cpp/src/bloom_filter.cu @@ -178,15 +178,15 @@ void pack_bloom_filter_header(cudf::device_span buf, @return A std::tuple of four elements: - Element 0 (bloom_filter_header): Decoded header with version, num_hashes, and num_longs. Does not include seed; that is returned separately as element 3. - - Element 1 (cudf::device_span): Device span over the bloom filter bit - array. Length is header.num_longs * 2 (number of 32-bit words). The data start + - Element 1 (cudf::device_span): Device span over the bloom filter + bit array. Length is header.num_longs * 2 (number of 32-bit words). The data start immediately after the version-specific header in the buffer. - Element 2 (int64_t): Total number of bits in the bloom filter, i.e. header.num_longs * 64. - Element 3 (int32_t): Hash seed used when building/probing the filter. Zero for V1; for V2, the value stored in the serialized header. */ -std::tuple, int64_t, int32_t> -unpack_bloom_filter(cudf::device_span bloom_filter, rmm::cuda_stream_view stream) +std::tuple, int64_t, int32_t> +unpack_bloom_filter(cudf::device_span bloom_filter, rmm::cuda_stream_view stream) { CUDF_EXPECTS(bloom_filter.size() >= static_cast(bloom_filter_header_v1_size_bytes), "Encountered truncated bloom filter"); @@ -225,27 +225,19 @@ unpack_bloom_filter(cudf::device_span bloom_filter, rmm::cuda_stream_vi CUDF_EXPECTS(bloom_filter_bits > 0, "Invalid empty bloom filter size"); - return { - header, - {reinterpret_cast(bloom_filter.data() + hdr_size), num_bitmask_words}, - bloom_filter_bits, - seed}; + return {header, + {reinterpret_cast(bloom_filter.data() + hdr_size), + num_bitmask_words}, + bloom_filter_bits, + seed}; } -std::tuple, int64_t, int32_t> +std::tuple, int64_t, int32_t> unpack_bloom_filter(cudf::column_view const& bloom_filter, rmm::cuda_stream_view stream) { return unpack_bloom_filter( - cudf::device_span{const_cast(bloom_filter.data()), - static_cast(bloom_filter.size())}, - stream); -} - -std::tuple, int64_t, int32_t> -unpack_bloom_filter(cudf::device_span bloom_filter, rmm::cuda_stream_view stream) -{ - return unpack_bloom_filter( - cudf::device_span{const_cast(bloom_filter.data()), bloom_filter.size()}, + cudf::device_span{bloom_filter.data(), + static_cast(bloom_filter.size())}, stream); } @@ -343,6 +335,10 @@ void bloom_filter_put(cudf::list_scalar& bloom_filter, CUDF_EXPECTS(static_cast(bloom_filter.view().size()) == (buffer.size() * 4) + hdr_size, "Encountered invalid/mismatched bloom filter buffer data"); + // bloom_filter is non-const, so mutable access to the underlying data is valid. + // list_scalar::view() returns a const column_view, requiring const_cast here. + auto* mutable_buffer = const_cast(buffer.data()); + constexpr int block_size = 256; auto grid = cudf::detail::grid_1d{input.size(), block_size, 1}; auto d_input = cudf::column_device_view::create(input); @@ -350,7 +346,7 @@ void bloom_filter_put(cudf::list_scalar& bloom_filter, auto launch = [&](auto version_tag, auto nullable_tag) { gpu_bloom_filter_put <<>>( - buffer.data(), bloom_filter_bits, *d_input, header.num_hashes, seed); + mutable_buffer, bloom_filter_bits, *d_input, header.num_hashes, seed); }; if (header.version == bloom_filter_version_1) { From 81893fbc3ac6c90887fc541579f2959e8d5f653b Mon Sep 17 00:00:00 2001 From: MithunR Date: Fri, 20 Mar 2026 20:55:51 -0700 Subject: [PATCH 23/29] Review: Protect against empty bloom filter bits. Signed-off-by: MithunR --- src/main/cpp/src/BloomFilterJni.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/cpp/src/BloomFilterJni.cpp b/src/main/cpp/src/BloomFilterJni.cpp index d88a92c93a..8344da5b6b 100644 --- a/src/main/cpp/src/BloomFilterJni.cpp +++ b/src/main/cpp/src/BloomFilterJni.cpp @@ -39,9 +39,10 @@ JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_BloomFilter_creategpu( JNI_ARG_CHECK( env, - bloomFilterBits >= 0 && + bloomFilterBits > 0 && bloomFilterBits <= static_cast(std::numeric_limits::max()) * 64, - "bloom filter bit count exceeds maximum supported size", + "bloom filter bit count must be positive and less than or equal to the maximum supported " + "size", 0); auto const bloom_filter_longs = static_cast((bloomFilterBits + 63) / 64); auto bloom_filter = From 4995300025b0a2664928a4c6d5b6473261d91d8d Mon Sep 17 00:00:00 2001 From: MithunR Date: Fri, 20 Mar 2026 21:05:12 -0700 Subject: [PATCH 24/29] Review: include . Signed-off-by: MithunR --- src/main/cpp/src/bloom_filter.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/cpp/src/bloom_filter.hpp b/src/main/cpp/src/bloom_filter.hpp index 1a361cd62e..fb9213f2fc 100644 --- a/src/main/cpp/src/bloom_filter.hpp +++ b/src/main/cpp/src/bloom_filter.hpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include From 090714c1c4e15a54d18b553c7a9be9dbb96df364 Mon Sep 17 00:00:00 2001 From: MithunR Date: Fri, 20 Mar 2026 21:14:16 -0700 Subject: [PATCH 25/29] Review: static_asserts for struct sizes. Signed-off-by: MithunR --- src/main/cpp/src/bloom_filter.hpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/cpp/src/bloom_filter.hpp b/src/main/cpp/src/bloom_filter.hpp index fb9213f2fc..d6f413ca89 100644 --- a/src/main/cpp/src/bloom_filter.hpp +++ b/src/main/cpp/src/bloom_filter.hpp @@ -36,6 +36,7 @@ struct bloom_filter_header_v1 { int num_longs; }; constexpr int bloom_filter_header_v1_size_bytes = sizeof(bloom_filter_header_v1); +static_assert(bloom_filter_header_v1_size_bytes == 12, "V1 header size must be 12 bytes"); // V2 header: [version, num_hashes, seed, num_longs] — 16 bytes struct bloom_filter_header_v2 { @@ -45,6 +46,7 @@ struct bloom_filter_header_v2 { int num_longs; }; constexpr int bloom_filter_header_v2_size_bytes = sizeof(bloom_filter_header_v2); +static_assert(bloom_filter_header_v2_size_bytes == 16, "V2 header size must be 16 bytes"); // Unified header used internally after unpacking from either format. // Seed is not stored here; for V2 it is returned separately from unpack_bloom_filter. From 92db59f35957c79343c954b7375300fc627c20d2 Mon Sep 17 00:00:00 2001 From: MithunR Date: Fri, 20 Mar 2026 21:47:07 -0700 Subject: [PATCH 26/29] Review: Added test BuildAndProbeWithNullsV2. Signed-off-by: MithunR --- src/main/cpp/tests/bloom_filter.cu | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/src/main/cpp/tests/bloom_filter.cu b/src/main/cpp/tests/bloom_filter.cu index 5f0f164c3b..522ac80545 100644 --- a/src/main/cpp/tests/bloom_filter.cu +++ b/src/main/cpp/tests/bloom_filter.cu @@ -265,6 +265,27 @@ TEST_F(BloomFilterTest, BuildWithNullsAndProbeV2) CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_inserted, *result); } +TEST_F(BloomFilterTest, BuildAndProbeWithNullsV2) +{ + auto stream = cudf::get_default_stream(); + constexpr int bloom_filter_longs = (1024 * 1024); + constexpr int num_hashes = 3; + + cudf::test::fixed_width_column_wrapper input{20, 80, 100, 99, 47, -9, 234000000}; + auto bloom_filter = spark_rapids_jni::bloom_filter_create( + spark_rapids_jni::bloom_filter_version_2, num_hashes, bloom_filter_longs, 0, stream); + + spark_rapids_jni::bloom_filter_put(*bloom_filter, input, stream); + + cudf::test::fixed_width_column_wrapper probe{ + {20, 80, 100, 99, 47, -9, 234000000, -10, 1, 2, 3}, {0, 0, 0, 1, 1, 1, 1, 0, 0, 1, 1}}; + cudf::test::fixed_width_column_wrapper expected{{1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0}, + {0, 0, 0, 1, 1, 1, 1, 0, 0, 1, 1}}; + auto result = spark_rapids_jni::bloom_filter_probe(probe, *bloom_filter, stream); + + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(expected, *result); +} + TEST_F(BloomFilterTest, ProbeMergedV2) { auto stream = cudf::get_default_stream(); From 5f8d7114fc92efbcb0d7c17ead7799e78dc203fa Mon Sep 17 00:00:00 2001 From: MithunR Date: Fri, 20 Mar 2026 22:07:42 -0700 Subject: [PATCH 27/29] Review: SRJ_FUNC_RANGE. Signed-off-by: MithunR --- src/main/cpp/src/bloom_filter.cu | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/main/cpp/src/bloom_filter.cu b/src/main/cpp/src/bloom_filter.cu index e5c5b1269e..0539d67447 100644 --- a/src/main/cpp/src/bloom_filter.cu +++ b/src/main/cpp/src/bloom_filter.cu @@ -16,6 +16,7 @@ #include "bloom_filter.hpp" #include "hash/murmur_hash.cuh" +#include "nvtx_ranges.hpp" #include #include @@ -299,6 +300,7 @@ std::unique_ptr bloom_filter_create(int version, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { + SRJ_FUNC_RANGE(); CUDF_EXPECTS(version == bloom_filter_version_1 || version == bloom_filter_version_2, "Bloom filter version must be 1 or 2"); @@ -330,6 +332,7 @@ void bloom_filter_put(cudf::list_scalar& bloom_filter, cudf::column_view const& input, rmm::cuda_stream_view stream) { + SRJ_FUNC_RANGE(); auto [header, buffer, bloom_filter_bits, seed] = unpack_bloom_filter(bloom_filter.view(), stream); auto const hdr_size = bloom_filter_header_size_for_version(header.version); CUDF_EXPECTS(static_cast(bloom_filter.view().size()) == (buffer.size() * 4) + hdr_size, @@ -370,6 +373,7 @@ std::unique_ptr bloom_filter_merge(cudf::column_view const& b rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { + SRJ_FUNC_RANGE(); cudf::lists_column_view lcv(bloom_filters); // The list child column is a concatenation of packed bloom filter buffers (header + bits) @@ -447,6 +451,7 @@ std::unique_ptr bloom_filter_probe(cudf::column_view const& input, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { + SRJ_FUNC_RANGE(); auto [header, buffer, bloom_filter_bits, seed] = unpack_bloom_filter(bloom_filter, stream); auto const hdr_size = bloom_filter_header_size_for_version(header.version); CUDF_EXPECTS(bloom_filter.size() == static_cast((buffer.size() * 4) + hdr_size), @@ -485,6 +490,7 @@ std::unique_ptr bloom_filter_probe(cudf::column_view const& input, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { + SRJ_FUNC_RANGE(); return bloom_filter_probe(input, bloom_filter.view(), stream, mr); } From cd79867a87f30b78a72d77a1e0fc15133067bed8 Mon Sep 17 00:00:00 2001 From: MithunR Date: Fri, 20 Mar 2026 22:23:23 -0700 Subject: [PATCH 28/29] Review: Left todo for pinned memory use. Signed-off-by: MithunR --- src/main/cpp/src/bloom_filter.cu | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/cpp/src/bloom_filter.cu b/src/main/cpp/src/bloom_filter.cu index 0539d67447..501986c2ff 100644 --- a/src/main/cpp/src/bloom_filter.cu +++ b/src/main/cpp/src/bloom_filter.cu @@ -196,6 +196,8 @@ unpack_bloom_filter(cudf::device_span bloom_filter, rmm::cuda_str auto const read_size = std::min(bloom_filter.size(), static_cast(bloom_filter_header_v2_size_bytes)); + // TODO (future): Consider using pinned host memory for cudaMemcpyAsync. + // Refer to https://github.com/NVIDIA/spark-rapids-jni/issues/4407. CUDF_CUDA_TRY( cudaMemcpyAsync(raw_ints, bloom_filter.data(), read_size, cudaMemcpyDeviceToHost, stream)); stream.synchronize(); From 8480b6ffe9331665bffccac928888a187fd1bf4b Mon Sep 17 00:00:00 2001 From: MithunR Date: Mon, 23 Mar 2026 13:06:04 -0700 Subject: [PATCH 29/29] Review: exec_policy_nosync, test changes. Signed-off-by: MithunR --- src/main/cpp/src/bloom_filter.cu | 2 +- src/main/cpp/tests/bloom_filter.cu | 20 +++++++++++--------- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/src/main/cpp/src/bloom_filter.cu b/src/main/cpp/src/bloom_filter.cu index 501986c2ff..6d328fc556 100644 --- a/src/main/cpp/src/bloom_filter.cu +++ b/src/main/cpp/src/bloom_filter.cu @@ -305,7 +305,7 @@ std::unique_ptr bloom_filter_create(int version, SRJ_FUNC_RANGE(); CUDF_EXPECTS(version == bloom_filter_version_1 || version == bloom_filter_version_2, "Bloom filter version must be 1 or 2"); - + CUDF_EXPECTS(num_hashes > 0, "Bloom filter hash count must be positive"); CUDF_EXPECTS(bloom_filter_longs > 0, "Bloom filter bit count must be positive"); auto [bloom_filter_size, buf_size] = get_bloom_filter_stride(version, bloom_filter_longs); diff --git a/src/main/cpp/tests/bloom_filter.cu b/src/main/cpp/tests/bloom_filter.cu index 522ac80545..8f33082d83 100644 --- a/src/main/cpp/tests/bloom_filter.cu +++ b/src/main/cpp/tests/bloom_filter.cu @@ -53,10 +53,11 @@ TEST_F(BloomFilterTest, InitializationV1) auto bytes = (bloom_filter->view().data()) + spark_rapids_jni::bloom_filter_header_v1_size_bytes; - CUDF_EXPECTS( - thrust::all_of( - rmm::exec_policy(cudf::get_default_stream()), bytes, bytes + bloom_filter_size, is_zero{}), - "Bloom filter not initialized to 0"); + CUDF_EXPECTS(thrust::all_of(rmm::exec_policy_nosync(cudf::get_default_stream()), + bytes, + bytes + bloom_filter_size, + is_zero{}), + "Bloom filter not initialized to 0"); } } @@ -159,7 +160,7 @@ TEST_F(BloomFilterTest, ProbeMergedV1) {bloom_filter_a->view(), bloom_filter_b->view(), bloom_filter_c->view()}); auto premerge_children = cudf::concatenate(cols); auto premerge_offsets = cudf::make_fixed_width_column(cudf::data_type{cudf::type_id::INT32}, 4); - thrust::transform(rmm::exec_policy(cudf::get_default_stream()), + thrust::transform(rmm::exec_policy_nosync(cudf::get_default_stream()), thrust::make_counting_iterator(0), thrust::make_counting_iterator(0) + 4, premerge_offsets->mutable_view().begin(), @@ -259,10 +260,11 @@ TEST_F(BloomFilterTest, BuildWithNullsAndProbeV2) spark_rapids_jni::bloom_filter_put(*bloom_filter, input, stream); - cudf::test::fixed_width_column_wrapper probe_inserted{80, 100, 99, -9, 234000000}; - cudf::test::fixed_width_column_wrapper expected_inserted{1, 1, 1, 1, 1}; - auto result = spark_rapids_jni::bloom_filter_probe(probe_inserted, *bloom_filter, stream); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_inserted, *result); + cudf::test::fixed_width_column_wrapper probe{ + 20, 80, 100, 99, 47, -9, 234000000, -10, 1, 2, 3}; + cudf::test::fixed_width_column_wrapper expected{0, 1, 1, 1, 0, 1, 1, 0, 0, 0, 0}; + auto result = spark_rapids_jni::bloom_filter_probe(probe, *bloom_filter, stream); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, *result); } TEST_F(BloomFilterTest, BuildAndProbeWithNullsV2)