diff --git a/src/main/cpp/benchmarks/bloom_filter.cu b/src/main/cpp/benchmarks/bloom_filter.cu index ddbb680167..cae6b8707d 100644 --- a/src/main/cpp/benchmarks/bloom_filter.cu +++ b/src/main/cpp/benchmarks/bloom_filter.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023-2024, NVIDIA CORPORATION. + * Copyright (c) 2023-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,17 +22,18 @@ #include #include -static void bloom_filter_put(nvbench::state& state) +namespace { + +void bloom_filter_put_impl(nvbench::state& state, int version) { constexpr int num_rows = 150'000'000; constexpr int num_hashes = 3; - // create the bloom filter cudf::size_type const bloom_filter_bytes = state.get_int64("bloom_filter_bytes"); cudf::size_type const bloom_filter_longs = bloom_filter_bytes / sizeof(int64_t); - auto bloom_filter = spark_rapids_jni::bloom_filter_create(num_hashes, bloom_filter_longs); + auto bloom_filter = + spark_rapids_jni::bloom_filter_create(version, num_hashes, bloom_filter_longs); - // create a column of hashed values data_profile_builder builder; builder.no_validity(); auto const src = create_random_table({{cudf::type_id::INT64}}, row_count{num_rows}, builder); @@ -41,7 +42,7 @@ static void bloom_filter_put(nvbench::state& state) auto const stream = cudf::get_default_stream(); state.set_cuda_stream(nvbench::make_cuda_stream_view(stream.value())); state.exec(nvbench::exec_tag::timer | nvbench::exec_tag::sync, - [&](nvbench::launch& launch, auto& timer) { + [&](nvbench::launch&, auto& timer) { timer.start(); spark_rapids_jni::bloom_filter_put(*bloom_filter, *input); stream.synchronize(); @@ -57,7 +58,24 @@ static void bloom_filter_put(nvbench::state& state) state.add_element_count(static_cast(bytes_written) / time, "Write bytes/sec"); } -NVBENCH_BENCH(bloom_filter_put) - .set_name("Bloom Filter Put") +void bloom_filter_put_v1(nvbench::state& state) +{ + bloom_filter_put_impl(state, spark_rapids_jni::bloom_filter_version_1); +} + +void bloom_filter_put_v2(nvbench::state& state) +{ + bloom_filter_put_impl(state, spark_rapids_jni::bloom_filter_version_2); +} + +} // namespace + +NVBENCH_BENCH(bloom_filter_put_v1) + .set_name("Bloom Filter Put V1") + .add_int64_axis("bloom_filter_bytes", + {512 * 1024, 1024 * 1024, 2 * 1024 * 1024, 4 * 1024 * 1024, 8 * 1024 * 1024}); + +NVBENCH_BENCH(bloom_filter_put_v2) + .set_name("Bloom Filter Put V2") .add_int64_axis("bloom_filter_bytes", {512 * 1024, 1024 * 1024, 2 * 1024 * 1024, 4 * 1024 * 1024, 8 * 1024 * 1024}); diff --git a/src/main/cpp/src/BloomFilterJni.cpp b/src/main/cpp/src/BloomFilterJni.cpp index bbbe525a40..8344da5b6b 100644 --- a/src/main/cpp/src/BloomFilterJni.cpp +++ b/src/main/cpp/src/BloomFilterJni.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023-2025, NVIDIA CORPORATION. + * Copyright (c) 2023-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,17 +20,33 @@ #include "jni_utils.hpp" #include "utilities.hpp" +#include + extern "C" { JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_BloomFilter_creategpu( - JNIEnv* env, jclass, jint numHashes, jlong bloomFilterBits) + JNIEnv* env, jclass, jint version, jint numHashes, jlong bloomFilterBits, jint seed) { JNI_TRY { cudf::jni::auto_set_device(env); - int bloom_filter_longs = static_cast((bloomFilterBits + 63) / 64); - auto bloom_filter = spark_rapids_jni::bloom_filter_create(numHashes, bloom_filter_longs); + // Per the Spark implementation, according to the BitArray class, + // https://github.com/apache/spark/blob/5075ea6a85f3f1689766cf08a7d5b2ce500be1fb/common/sketch/src/main/java/org/apache/spark/util/sketch/BitArray.java#L34 + // the number of longs representing the bit array can only be Integer.MAX_VALUE, at the most. + // (This is presumably because the BitArray is indexed with an int32_t.) + // This implies that the maximum supported bloom filter bit count is Integer.MAX_VALUE * 64. + + JNI_ARG_CHECK( + env, + bloomFilterBits > 0 && + bloomFilterBits <= static_cast(std::numeric_limits::max()) * 64, + "bloom filter bit count must be positive and less than or equal to the maximum supported " + "size", + 0); + auto const bloom_filter_longs = static_cast((bloomFilterBits + 63) / 64); + auto bloom_filter = + spark_rapids_jni::bloom_filter_create(version, numHashes, bloom_filter_longs, seed); return reinterpret_cast(bloom_filter.release()); } JNI_CATCH(env, 0); diff --git a/src/main/cpp/src/bloom_filter.cu b/src/main/cpp/src/bloom_filter.cu index 741f9753e1..6d328fc556 100644 --- a/src/main/cpp/src/bloom_filter.cu +++ b/src/main/cpp/src/bloom_filter.cu @@ -16,6 +16,7 @@ #include "bloom_filter.hpp" #include "hash/murmur_hash.cuh" +#include "nvtx_ranges.hpp" #include #include @@ -27,43 +28,51 @@ #include #include #include +#include #include #include #include +#include #include +#include #include #include +#include + namespace spark_rapids_jni { namespace { using bloom_hash_type = spark_rapids_jni::murmur_hash_value_type; -__device__ inline std::pair gpu_get_hash_mask( - bloom_hash_type h, cudf::size_type bloom_filter_bits) +inline int32_t byte_swap_int32(int32_t val) { - // https://github.com/apache/spark/blob/7bfbeb62cb1dc58d81243d22888faa688bad8064/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java#L94 - auto const index = (h < 0 ? ~h : h) % static_cast(bloom_filter_bits); + return static_cast(bswap_32(static_cast(val))); +} - // spark expects serialized bloom filters to be big endian (64 bit longs), - // so we will produce a big endian buffer. if spark CPU ends up consuming it, it can do so - // directly. the gpu bloom filter implementation will always be handed the same serialized buffer. - auto const word_index = cudf::word_index(index) ^ 0x1; // word-swizzle within 64 bit long - auto const bit_index = - cudf::intra_word_index(index) ^ 0x18; // byte swizzle within the 32 bit word +// Given a non-negative bit position within the bloom filter, compute +// the 32-bit word index and bitmask. Handles big-endian swizzle so +// the GPU buffer is directly compatible with Spark's serialized format. +__device__ inline cuda::std::pair gpu_bit_to_word_mask(int64_t bit_pos) +{ + auto const word_index = (bit_pos / 32) ^ 0x1; + auto const bit_index = static_cast(bit_pos % 32) ^ 0x18; - return {word_index, (1 << bit_index)}; + return {word_index, static_cast(1u << bit_index)}; } -template +// V1: combined hash is 32-bit int, loop from 1..num_hashes +// V2: combined hash is 64-bit long, seeded with h1*INT32_MAX, loop from 0..num_hashes-1 +template CUDF_KERNEL void gpu_bloom_filter_put(cudf::bitmask_type* const bloom_filter, - cudf::size_type bloom_filter_bits, + int64_t bloom_filter_bits, cudf::column_device_view input, - cudf::size_type num_hashes) + cudf::size_type num_hashes, + int32_t seed) { size_t const tid = threadIdx.x + blockIdx.x * blockDim.x; if (tid >= input.size()) { return; } @@ -72,178 +81,247 @@ CUDF_KERNEL void gpu_bloom_filter_put(cudf::bitmask_type* const bloom_filter, if (!input.is_valid(tid)) { return; } } - // https://github.com/apache/spark/blob/7bfbeb62cb1dc58d81243d22888faa688bad8064/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java#L87 - auto const el = input.element(tid); - bloom_hash_type const h1 = MurmurHash3_32(0)(el); + auto const el = input.element(tid); + // V1 has no seed in the format; use 0. V2 uses the stored seed. + int32_t const hash_seed = (Version == 1) ? 0 : seed; + bloom_hash_type const h1 = MurmurHash3_32(hash_seed)(el); bloom_hash_type const h2 = MurmurHash3_32(h1)(el); - // set a bit in the bloom filter for each hashed value - for (auto idx = 1; idx <= num_hashes; idx++) { - bloom_hash_type combined_hash = h1 + (idx * h2); - - auto const [word_index, mask] = gpu_get_hash_mask(combined_hash, bloom_filter_bits); - atomicOr(bloom_filter + word_index, mask); + if constexpr (Version == 1) { + // https://github.com/apache/spark/blob/5075ea6a85f3f1689766cf08a7d5b2ce500be1fb/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java#L38 + // This is the original V1 hash algorithm from Spark. + for (auto idx = 1; idx <= num_hashes; idx++) { + bloom_hash_type combined_hash = h1 + (idx * h2); + auto const bit_pos = + static_cast(combined_hash < 0 ? ~combined_hash : combined_hash) % + bloom_filter_bits; + auto const [word_index, mask] = gpu_bit_to_word_mask(bit_pos); + cuda::atomic_ref ref(bloom_filter[word_index]); + ref.fetch_or(mask, cuda::memory_order_relaxed); + } + } else { + // https://github.com/apache/spark/blob/5075ea6a85f3f1689766cf08a7d5b2ce500be1fb/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImplV2.java#L63 + int64_t combined_hash = + static_cast(h1) * static_cast(cuda::std::numeric_limits::max()); + for (int idx = 0; idx < num_hashes; idx++) { + combined_hash += h2; + int64_t combined_index = combined_hash < 0 ? ~combined_hash : combined_hash; + auto const bit_pos = combined_index % bloom_filter_bits; + auto const [word_index, mask] = gpu_bit_to_word_mask(bit_pos); + cuda::atomic_ref ref(bloom_filter[word_index]); + ref.fetch_or(mask, cuda::memory_order_relaxed); + } } } +template struct bloom_probe_functor { cudf::bitmask_type const* const bloom_filter; - cudf::size_type const bloom_filter_bits; + int64_t const bloom_filter_bits; cudf::size_type const num_hashes; + int32_t const seed; __device__ bool operator()(int64_t input) const { - // https://github.com/apache/spark/blob/7bfbeb62cb1dc58d81243d22888faa688bad8064/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java#L110 - // this code could be combined with the very similar code in gpu_bloom_filter_put. i've - // left it this way since the expectation is that we will early out fairly often, whereas - // in the build case we never early out so doing the additional if() return check is pointless. - bloom_hash_type const h1 = MurmurHash3_32(0)(input); + int32_t const hash_seed = (Version == 1) ? 0 : seed; + bloom_hash_type const h1 = MurmurHash3_32(hash_seed)(input); bloom_hash_type const h2 = MurmurHash3_32(h1)(input); - // set a bit in the bloom filter for each hashed value - for (auto idx = 1; idx <= num_hashes; idx++) { - bloom_hash_type combined_hash = h1 + (idx * h2); - auto const [word_index, mask] = gpu_get_hash_mask(combined_hash, bloom_filter_bits); - if (!(bloom_filter[word_index] & mask)) { return false; } + if constexpr (Version == 1) { + for (auto idx = 1; idx <= num_hashes; idx++) { + bloom_hash_type combined_hash = h1 + (idx * h2); + auto const bit_pos = + static_cast(combined_hash < 0 ? ~combined_hash : combined_hash) % + bloom_filter_bits; + auto const [word_index, mask] = gpu_bit_to_word_mask(bit_pos); + if (!(bloom_filter[word_index] & mask)) { return false; } + } + } else { + int64_t combined_hash = + static_cast(h1) * static_cast(cuda::std::numeric_limits::max()); + for (int idx = 0; idx < num_hashes; idx++) { + combined_hash += h2; + int64_t combined_index = combined_hash < 0 ? ~combined_hash : combined_hash; + auto const bit_pos = combined_index % bloom_filter_bits; + auto const [word_index, mask] = gpu_bit_to_word_mask(bit_pos); + if (!(bloom_filter[word_index] & mask)) { return false; } + } } return true; } }; -constexpr int spark_bloom_filter_version = 1; - -bloom_filter_header byte_swap_header(bloom_filter_header const& header) -{ - return {static_cast(bswap_32(static_cast(header.version))), - static_cast(bswap_32(static_cast(header.num_hashes))), - static_cast(bswap_32(static_cast(header.num_longs)))}; -} - -/* - Pack a bloom_filter_header (passed as little endian) into a bloom filter buffer. -*/ void pack_bloom_filter_header(cudf::device_span buf, bloom_filter_header const& header, - rmm::cuda_stream_view stream) + rmm::cuda_stream_view stream, + int32_t seed) { - // swizzle to big endian - bloom_filter_header header_swizzled = byte_swap_header(header); - - // header goes at the top of the buffer - cudaMemcpyAsync( - buf.data(), &header_swizzled, bloom_filter_header_size, cudaMemcpyHostToDevice, stream); + if (header.version == bloom_filter_version_1) { + bloom_filter_header_v1 raw = {byte_swap_int32(header.version), + byte_swap_int32(header.num_hashes), + byte_swap_int32(header.num_longs)}; + CUDF_CUDA_TRY(cudaMemcpyAsync( + buf.data(), &raw, bloom_filter_header_v1_size_bytes, cudaMemcpyHostToDevice, stream)); + } else { + bloom_filter_header_v2 raw = {byte_swap_int32(header.version), + byte_swap_int32(header.num_hashes), + byte_swap_int32(seed), + byte_swap_int32(header.num_longs)}; + CUDF_CUDA_TRY(cudaMemcpyAsync( + buf.data(), &raw, bloom_filter_header_v2_size_bytes, cudaMemcpyHostToDevice, stream)); + } } /* - Unpack bloom filter information from a bloom filter buffer. returns the header, a span - representing the bloom filter bits and the number of bloom filter bits. + Unpack bloom filter information from a Spark-format bloom filter buffer (big-endian). + Accepts both V1 and V2; the version is read from the first 4 bytes. + + @return A std::tuple of four elements: + - Element 0 (bloom_filter_header): Decoded header with version, num_hashes, and num_longs. + Does not include seed; that is returned separately as element 3. + - Element 1 (cudf::device_span): Device span over the bloom filter + bit array. Length is header.num_longs * 2 (number of 32-bit words). The data start + immediately after the version-specific header in the buffer. + - Element 2 (int64_t): Total number of bits in the bloom filter, i.e. header.num_longs * 64. + - Element 3 (int32_t): Hash seed used when building/probing the filter. Zero for V1; + for V2, the value stored in the serialized header. */ -std::tuple, int> unpack_bloom_filter( - cudf::device_span bloom_filter, rmm::cuda_stream_view stream) +std::tuple, int64_t, int32_t> +unpack_bloom_filter(cudf::device_span bloom_filter, rmm::cuda_stream_view stream) { - CUDF_EXPECTS(bloom_filter.size() >= bloom_filter_header_size, + CUDF_EXPECTS(bloom_filter.size() >= static_cast(bloom_filter_header_v1_size_bytes), "Encountered truncated bloom filter"); - bloom_filter_header header_swizzled; - cudaMemcpyAsync(&header_swizzled, - bloom_filter.data(), - bloom_filter_header_size, - cudaMemcpyDeviceToHost, - stream); + int32_t raw_ints[4] = {}; + auto const read_size = + std::min(bloom_filter.size(), static_cast(bloom_filter_header_v2_size_bytes)); + + // TODO (future): Consider using pinned host memory for cudaMemcpyAsync. + // Refer to https://github.com/NVIDIA/spark-rapids-jni/issues/4407. + CUDF_CUDA_TRY( + cudaMemcpyAsync(raw_ints, bloom_filter.data(), read_size, cudaMemcpyDeviceToHost, stream)); stream.synchronize(); - // swizzle to little endian. - bloom_filter_header header = byte_swap_header(header_swizzled); + int const version = byte_swap_int32(raw_ints[0]); + CUDF_EXPECTS(version == bloom_filter_version_1 || version == bloom_filter_version_2, + "Unexpected bloom filter version"); - auto const bloom_filter_bits = header.num_longs * 64; + auto const hdr_size = bloom_filter_header_size_for_version(version); + CUDF_EXPECTS(bloom_filter.size() >= static_cast(hdr_size), + "Encountered truncated bloom filter header"); + + bloom_filter_header header; + header.version = version; + header.num_hashes = byte_swap_int32(raw_ints[1]); + header.num_longs = (version == bloom_filter_version_1) ? byte_swap_int32(raw_ints[2]) + : byte_swap_int32(raw_ints[3]); + + int32_t seed = 0; + if (version == bloom_filter_version_2) { + // Safe: the header-size check above guarantees bloom_filter.size() >= v2 header size, + // so read_size (= min(bloom_filter.size(), v2 header size)) == v2 header size. + seed = byte_swap_int32(raw_ints[2]); + } + + auto const bloom_filter_bits = static_cast(header.num_longs) * 64; auto const num_bitmask_words = static_cast(header.num_longs) * 2; - CUDF_EXPECTS(header.version == 1, "Unexpected bloom filter version"); CUDF_EXPECTS(bloom_filter_bits > 0, "Invalid empty bloom filter size"); - CUDF_EXPECTS(num_bitmask_words == cudf::num_bitmask_words(bloom_filter_bits), - "Bloom filter bit/length mismatch"); return {header, - {reinterpret_cast(bloom_filter.data() + bloom_filter_header_size), + {reinterpret_cast(bloom_filter.data() + hdr_size), num_bitmask_words}, - bloom_filter_bits}; + bloom_filter_bits, + seed}; } -/* - Unpack bloom filter information a from column_view that wraps a single bloom filter buffer. - returns the header, a span representing the bloom filter bits and the number of bloom filter bits. -*/ -std::tuple, int> unpack_bloom_filter( - cudf::column_view const& bloom_filter, rmm::cuda_stream_view stream) +std::tuple, int64_t, int32_t> +unpack_bloom_filter(cudf::column_view const& bloom_filter, rmm::cuda_stream_view stream) { - // the const_cast is necessary because list_scalar does not provide a mutable_view() function. return unpack_bloom_filter( - cudf::device_span{const_cast(bloom_filter.data()), - static_cast(bloom_filter.size())}, + cudf::device_span{bloom_filter.data(), + static_cast(bloom_filter.size())}, stream); } +/* + Device functor used by bloom_filter_merge to verify every filter in the list has the same + header. raw_header holds the reference header in big-endian form (as in the serialized buffer). +*/ struct bloom_filter_same { - bloom_filter_header header; + /// Reference header: big-endian int32s. + /// V1 uses [0..2]: version, num_hashes, num_longs. + /// V2 uses [0..3]: version, num_hashes, seed, num_longs. + int32_t raw_header[4]; + int header_field_count; cudf::detail::lists_column_device_view ldv; cudf::size_type stride; bool __device__ operator()(cudf::size_type i) { - bloom_filter_header const* a = - reinterpret_cast(ldv.child().data() + stride * i); - return (a->version == header.version) && (a->num_hashes == header.num_hashes) && - (a->num_longs == header.num_longs); + auto const* a = reinterpret_cast(ldv.child().data() + stride * i); + for (int j = 0; j < header_field_count; j++) { + if (a[j] != raw_header[j]) return false; + } + return true; } }; /* - Returns a pair indicating: - - size of the bloom filter bits - - total size of the bloom filter buffer (header + bits) + Returns a std::tuple indicating: + - first: size in bytes of the bloom filter bit array (num_longs * 8). + - second: total size in bytes of the serialized bloom filter buffer (header + bit array). + Uses the version-specific header size. */ -std::pair get_bloom_filter_stride(int bloom_filter_longs) +std::tuple get_bloom_filter_stride(int version, int bloom_filter_longs) { - auto const bloom_filter_size = (bloom_filter_longs * sizeof(int64_t)); - auto const buf_size = bloom_filter_header_size + bloom_filter_size; - return {bloom_filter_size, buf_size}; + auto const bloom_filter_size = static_cast(bloom_filter_longs) * sizeof(int64_t); + auto const hdr_size = bloom_filter_header_size_for_version(version); + auto const buf_size = hdr_size + bloom_filter_size; + CUDF_EXPECTS(buf_size <= std::numeric_limits::max(), + "Bloom filter buffer size exceeds int32 range"); + return {static_cast(bloom_filter_size), static_cast(buf_size)}; } } // anonymous namespace /* - Creates a new bloom filter. The bloom filter is stored using a cudf list_scalar with a specific - structure. - - The data type is int8, representing a generic buffer - - The first 12 bytes of the buffer are a bloom_filter_header - - The remaining bytes are the bloom filter buffer itself. The length of the remaining bytes must - be bloom_filter_header.num_longs * 8 - - All of the data in the buffer is stored in big-endian format. unpack_bloom_filter() unpacks - this into a usable form, and pack_bloom_filter_header packs new data into the output (big-endian) - form. + Creates a new bloom filter. The result is stored in a cudf list_scalar with a single + UINT8 buffer of the following form (all header and bit data in big-endian for Spark): + + - V1: first 12 bytes = bloom_filter_header_v1 (version, num_hashes, num_longs). + - V2: first 16 bytes = bloom_filter_header_v2 (version, num_hashes, seed, num_longs). + - Remaining bytes: bloom_filter_longs * 8, the bit array. Initialized to zero. + + unpack_bloom_filter() reads this layout; pack_bloom_filter_header() writes the header. */ -std::unique_ptr bloom_filter_create(int num_hashes, +std::unique_ptr bloom_filter_create(int version, + int num_hashes, int bloom_filter_longs, + int seed, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { - auto [bloom_filter_size, buf_size] = get_bloom_filter_stride(bloom_filter_longs); + SRJ_FUNC_RANGE(); + CUDF_EXPECTS(version == bloom_filter_version_1 || version == bloom_filter_version_2, + "Bloom filter version must be 1 or 2"); + CUDF_EXPECTS(num_hashes > 0, "Bloom filter hash count must be positive"); + CUDF_EXPECTS(bloom_filter_longs > 0, "Bloom filter bit count must be positive"); + + auto [bloom_filter_size, buf_size] = get_bloom_filter_stride(version, bloom_filter_longs); + auto const hdr_size = bloom_filter_header_size_for_version(version); - // build the packed bloom filter buffer ------------------ rmm::device_buffer buf{static_cast(buf_size), stream, mr}; - // pack the header - bloom_filter_header header{spark_bloom_filter_version, num_hashes, bloom_filter_longs}; - pack_bloom_filter_header( - {reinterpret_cast(buf.data()), static_cast(buf_size)}, header, stream); - // memset the bloom filter bits to 0. + bloom_filter_header header{version, num_hashes, bloom_filter_longs}; + pack_bloom_filter_header({reinterpret_cast(buf.data()), static_cast(buf_size)}, + header, + stream, + (version == bloom_filter_version_1 ? 0 : seed)); - CUDF_CUDA_TRY(cudaMemsetAsync(reinterpret_cast(buf.data()) + bloom_filter_header_size, - 0, - bloom_filter_size, - stream)); + CUDF_CUDA_TRY(cudaMemsetAsync( + reinterpret_cast(buf.data()) + hdr_size, 0, bloom_filter_size, stream)); - // create the 1-row list column and move it into a scalar. return std::make_unique( cudf::column( cudf::data_type{cudf::type_id::UINT8}, buf_size, std::move(buf), rmm::device_buffer{}, 0), @@ -256,21 +334,40 @@ void bloom_filter_put(cudf::list_scalar& bloom_filter, cudf::column_view const& input, rmm::cuda_stream_view stream) { - // unpack the bloom filter - auto [header, buffer, bloom_filter_bits] = unpack_bloom_filter(bloom_filter.view(), stream); - CUDF_EXPECTS(bloom_filter.view().size() == (buffer.size() * 4) + bloom_filter_header_size, + SRJ_FUNC_RANGE(); + auto [header, buffer, bloom_filter_bits, seed] = unpack_bloom_filter(bloom_filter.view(), stream); + auto const hdr_size = bloom_filter_header_size_for_version(header.version); + CUDF_EXPECTS(static_cast(bloom_filter.view().size()) == (buffer.size() * 4) + hdr_size, "Encountered invalid/mismatched bloom filter buffer data"); + // bloom_filter is non-const, so mutable access to the underlying data is valid. + // list_scalar::view() returns a const column_view, requiring const_cast here. + auto* mutable_buffer = const_cast(buffer.data()); + constexpr int block_size = 256; auto grid = cudf::detail::grid_1d{input.size(), block_size, 1}; auto d_input = cudf::column_device_view::create(input); - if (input.has_nulls()) { - gpu_bloom_filter_put<<>>( - buffer.data(), bloom_filter_bits, *d_input, header.num_hashes); + auto launch = [&](auto version_tag, auto nullable_tag) { + gpu_bloom_filter_put + <<>>( + mutable_buffer, bloom_filter_bits, *d_input, header.num_hashes, seed); + }; + + if (header.version == bloom_filter_version_1) { + CUDF_EXPECTS(bloom_filter_bits <= std::numeric_limits::max(), + "V1 bloom filter bit count exceeds int32 range"); + if (input.has_nulls()) { + launch(std::integral_constant{}, std::true_type{}); + } else { + launch(std::integral_constant{}, std::false_type{}); + } } else { - gpu_bloom_filter_put<<>>( - buffer.data(), bloom_filter_bits, *d_input, header.num_hashes); + if (input.has_nulls()) { + launch(std::integral_constant{}, std::true_type{}); + } else { + launch(std::integral_constant{}, std::false_type{}); + } } } @@ -278,42 +375,57 @@ std::unique_ptr bloom_filter_merge(cudf::column_view const& b rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { - // unpack the bloom filter + SRJ_FUNC_RANGE(); cudf::lists_column_view lcv(bloom_filters); - // since the list child column is just a bunch of packed bloom filter buffers one after another, - // we can just pass the base data pointer to unpack the first one. - auto [header, buffer, bloom_filter_bits] = unpack_bloom_filter(lcv.child(), stream); - // NOTE: since this is a column containing multiple bloom filters, the expected total size is the - // size for one bloom filter times the number of rows (bloom_filters.size()) + // The list child column is a concatenation of packed bloom filter buffers (header + bits) + // one after another. Unpack the first buffer to get the header, bit span, bit count, and + // seed; we use these to validate total size and to build the merged output. + auto [header, buffer, bloom_filter_bits, seed] = unpack_bloom_filter(lcv.child(), stream); + auto const hdr_size = bloom_filter_header_size_for_version(header.version); CUDF_EXPECTS( - lcv.child().size() == ((buffer.size() * 4) + bloom_filter_header_size) * bloom_filters.size(), + static_cast(lcv.child().size()) == (buffer.size() * 4 + static_cast(hdr_size)) * + static_cast(bloom_filters.size()), "Encountered invalid/mismatched bloom filter buffer data"); - auto [bloom_filter_size, buf_size] = get_bloom_filter_stride(header.num_longs); + auto [bloom_filter_size, buf_size] = get_bloom_filter_stride(header.version, header.num_longs); - // validate all the bloom filters are the same - auto dv = cudf::column_device_view::create(bloom_filters); - bloom_filter_header header_swizzled = byte_swap_header(header); - CUDF_EXPECTS(thrust::all_of(rmm::exec_policy(cudf::get_default_stream()), - thrust::make_counting_iterator(1), - thrust::make_counting_iterator(bloom_filters.size()), - bloom_filter_same{header_swizzled, *dv, buf_size}), - "Mismatch of bloom filter parameters"); + int32_t raw_hdr[4] = {}; + int header_field_count = 0; + if (header.version == bloom_filter_version_1) { + raw_hdr[0] = byte_swap_int32(header.version); + raw_hdr[1] = byte_swap_int32(header.num_hashes); + raw_hdr[2] = byte_swap_int32(header.num_longs); + header_field_count = 3; + } else { + raw_hdr[0] = byte_swap_int32(header.version); + raw_hdr[1] = byte_swap_int32(header.num_hashes); + raw_hdr[2] = byte_swap_int32(seed); + raw_hdr[3] = byte_swap_int32(header.num_longs); + header_field_count = 4; + } + + auto dv = cudf::column_device_view::create(bloom_filters); + CUDF_EXPECTS( + thrust::all_of( + rmm::exec_policy_nosync(stream), + thrust::make_counting_iterator(1), + thrust::make_counting_iterator(bloom_filters.size()), + bloom_filter_same{ + {raw_hdr[0], raw_hdr[1], raw_hdr[2], raw_hdr[3]}, header_field_count, *dv, buf_size}), + "Mismatch of bloom filter parameters"); - // build the packed bloom filter buffer ------------------ rmm::device_buffer buf{static_cast(buf_size), stream, mr}; pack_bloom_filter_header( - {reinterpret_cast(buf.data()), static_cast(buf_size)}, header, stream); + {reinterpret_cast(buf.data()), static_cast(buf_size)}, header, stream, seed); - auto src = lcv.child().data() + bloom_filter_header_size; - auto dst = reinterpret_cast(reinterpret_cast(buf.data()) + - bloom_filter_header_size); + auto src = lcv.child().data() + hdr_size; + auto dst = + reinterpret_cast(reinterpret_cast(buf.data()) + hdr_size); - // bitwise-or all the bloom filters together cudf::size_type num_words = header.num_longs * 2; thrust::transform( - rmm::exec_policy(stream), + rmm::exec_policy_nosync(stream), thrust::make_counting_iterator(0), thrust::make_counting_iterator(0) + num_words, dst, @@ -341,26 +453,36 @@ std::unique_ptr bloom_filter_probe(cudf::column_view const& input, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { - // unpack the bloom filter - auto [header, buffer, bloom_filter_bits] = unpack_bloom_filter(bloom_filter, stream); - CUDF_EXPECTS(bloom_filter.size() == (buffer.size() * 4) + bloom_filter_header_size, + SRJ_FUNC_RANGE(); + auto [header, buffer, bloom_filter_bits, seed] = unpack_bloom_filter(bloom_filter, stream); + auto const hdr_size = bloom_filter_header_size_for_version(header.version); + CUDF_EXPECTS(bloom_filter.size() == static_cast((buffer.size() * 4) + hdr_size), "Encountered invalid/mismatched bloom filter buffer data"); - // duplicate input mask auto out = cudf::make_fixed_width_column(cudf::data_type{cudf::type_id::BOOL8}, input.size(), - cudf::copy_bitmask(input), + cudf::copy_bitmask(input, stream, mr), input.null_count(), stream, mr); - thrust::transform( - rmm::exec_policy(stream), - input.begin(), - input.end(), - out->mutable_view().begin(), - bloom_probe_functor{ - buffer.data(), static_cast(bloom_filter_bits), header.num_hashes}); + if (header.version == bloom_filter_version_1) { + CUDF_EXPECTS(bloom_filter_bits <= std::numeric_limits::max(), + "V1 bloom filter bit count exceeds int32 range"); + thrust::transform( + rmm::exec_policy_nosync(stream), + input.begin(), + input.end(), + out->mutable_view().begin(), + bloom_probe_functor<1>{buffer.data(), bloom_filter_bits, header.num_hashes, seed}); + } else { + thrust::transform( + rmm::exec_policy_nosync(stream), + input.begin(), + input.end(), + out->mutable_view().begin(), + bloom_probe_functor<2>{buffer.data(), bloom_filter_bits, header.num_hashes, seed}); + } return out; } @@ -370,6 +492,7 @@ std::unique_ptr bloom_filter_probe(cudf::column_view const& input, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { + SRJ_FUNC_RANGE(); return bloom_filter_probe(input, bloom_filter.view(), stream, mr); } diff --git a/src/main/cpp/src/bloom_filter.hpp b/src/main/cpp/src/bloom_filter.hpp index fd305c191c..d6f413ca89 100644 --- a/src/main/cpp/src/bloom_filter.hpp +++ b/src/main/cpp/src/bloom_filter.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023-2024, NVIDIA CORPORATION. + * Copyright (c) 2023-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,34 +19,77 @@ #include #include #include +#include #include #include namespace spark_rapids_jni { -// included only for testing purposes +constexpr int bloom_filter_version_1 = 1; +constexpr int bloom_filter_version_2 = 2; + +// V1 header: [version, num_hashes, num_longs] — 12 bytes +struct bloom_filter_header_v1 { + int version; + int num_hashes; + int num_longs; +}; +constexpr int bloom_filter_header_v1_size_bytes = sizeof(bloom_filter_header_v1); +static_assert(bloom_filter_header_v1_size_bytes == 12, "V1 header size must be 12 bytes"); + +// V2 header: [version, num_hashes, seed, num_longs] — 16 bytes +struct bloom_filter_header_v2 { + int version; + int num_hashes; + int seed; + int num_longs; +}; +constexpr int bloom_filter_header_v2_size_bytes = sizeof(bloom_filter_header_v2); +static_assert(bloom_filter_header_v2_size_bytes == 16, "V2 header size must be 16 bytes"); + +// Unified header used internally after unpacking from either format. +// Seed is not stored here; for V2 it is returned separately from unpack_bloom_filter. struct bloom_filter_header { int version; int num_hashes; int num_longs; }; -constexpr int bloom_filter_header_size = sizeof(bloom_filter_header); + +inline int bloom_filter_header_size_for_version(int version) +{ + return version == bloom_filter_version_2 ? bloom_filter_header_v2_size_bytes + : bloom_filter_header_v1_size_bytes; +} /** - * @brief Create an empty bloom filter of the specified size in (64 bit) longs with using - * the specified number of hashes to be used when operating on the filter. + * @brief Create an empty bloom filter of the specified size and parameters. * - * @param num_hashes The number of hashes to use. - * @param bloom_filter_longs Size of the bloom filter in bits. - * @param stream CUDA stream used for device memory operations and kernel launches. - * @param mr Device memory resource used to allocate the returned bloom filter's memory. - * @returns An list_scalar wrapping a packed Spark bloom_filter. + * The bloom filter is stored in a cudf list_scalar as a single byte buffer. The buffer + * layout is Spark-compatible: a version-specific header (big-endian) followed by the + * bit array. V1 header is 12 bytes (version, num_hashes, num_longs). V2 header is 16 bytes + * (version, num_hashes, seed, num_longs). The remainder of the buffer is + * bloom_filter_longs * 8 bytes of bit data, also written in big-endian order for Spark + * interchange. * + * @param version Bloom filter format version: 1 or 2 (e.g. bloom_filter_version_1, + * bloom_filter_version_2). V2 uses 64-bit hash indexing and supports a configurable + * seed for better distribution on large filters. + * @param num_hashes Number of bit positions set (and checked) per key. Derived from two + * underlying hashes; higher values reduce false positives but increase work per + * put/probe. + * @param bloom_filter_longs Size of the bit array in 64-bit longs; total bits = + * bloom_filter_longs * 64. + * @param seed Hash seed. Used only for V2; ignored for V1 (V1 always uses seed 0). + * @param stream CUDA stream for device memory operations and kernel launches. + * @param mr Device memory resource for allocating the bloom filter buffer. + * @returns A list_scalar wrapping the packed Spark-format bloom filter (header + bits). */ std::unique_ptr bloom_filter_create( + int version, int num_hashes, int bloom_filter_longs, + int seed = 0, rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource_ref()); diff --git a/src/main/cpp/tests/bloom_filter.cu b/src/main/cpp/tests/bloom_filter.cu index 1ba3520f0f..8f33082d83 100644 --- a/src/main/cpp/tests/bloom_filter.cu +++ b/src/main/cpp/tests/bloom_filter.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,35 +31,44 @@ struct is_zero { __device__ bool operator()(cudf::bitmask_type w) { return w == 0; } }; -TEST_F(BloomFilterTest, Initialization) +// ======================== V1 Tests ======================== + +TEST_F(BloomFilterTest, InitializationV1) { constexpr int num_hashes = 3; std::vector expected{1, 2, 3}; for (size_t idx = 0; idx < expected.size(); idx++) { auto bloom_filter = - spark_rapids_jni::bloom_filter_create(num_hashes, expected[idx], cudf::get_default_stream()); + spark_rapids_jni::bloom_filter_create(spark_rapids_jni::bloom_filter_version_1, + num_hashes, + expected[idx], + 0, + cudf::get_default_stream()); auto const bloom_filter_size = expected[idx] * sizeof(int64_t); - CUDF_EXPECTS( - bloom_filter->view().size() == spark_rapids_jni::bloom_filter_header_size + bloom_filter_size, - "Bloom filter not of expected size"); - - auto bytes = (bloom_filter->view().data()) + spark_rapids_jni::bloom_filter_header_size; - CUDF_EXPECTS( - thrust::all_of( - rmm::exec_policy(cudf::get_default_stream()), bytes, bytes + bloom_filter_size, is_zero{}), - "Bloom filter not initialized to 0"); + CUDF_EXPECTS(bloom_filter->view().size() == + spark_rapids_jni::bloom_filter_header_v1_size_bytes + bloom_filter_size, + "Bloom filter not of expected size"); + + auto bytes = + (bloom_filter->view().data()) + spark_rapids_jni::bloom_filter_header_v1_size_bytes; + CUDF_EXPECTS(thrust::all_of(rmm::exec_policy_nosync(cudf::get_default_stream()), + bytes, + bytes + bloom_filter_size, + is_zero{}), + "Bloom filter not initialized to 0"); } } -TEST_F(BloomFilterTest, BuildAndProbe) +TEST_F(BloomFilterTest, BuildAndProbeV1) { auto stream = cudf::get_default_stream(); constexpr int bloom_filter_longs = (1024 * 1024); constexpr int num_hashes = 3; - auto bloom_filter = spark_rapids_jni::bloom_filter_create(num_hashes, bloom_filter_longs, stream); + auto bloom_filter = spark_rapids_jni::bloom_filter_create( + spark_rapids_jni::bloom_filter_version_1, num_hashes, bloom_filter_longs, 0, stream); cudf::test::fixed_width_column_wrapper input{20, 80, 100, 99, 47, -9, 234000000}; spark_rapids_jni::bloom_filter_put(*bloom_filter, input, stream); @@ -73,13 +82,14 @@ TEST_F(BloomFilterTest, BuildAndProbe) CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, *result); } -TEST_F(BloomFilterTest, BuildWithNullsAndProbe) +TEST_F(BloomFilterTest, BuildWithNullsAndProbeV1) { auto stream = cudf::get_default_stream(); constexpr int bloom_filter_longs = (1024 * 1024); constexpr int num_hashes = 3; - auto bloom_filter = spark_rapids_jni::bloom_filter_create(num_hashes, bloom_filter_longs, stream); + auto bloom_filter = spark_rapids_jni::bloom_filter_create( + spark_rapids_jni::bloom_filter_version_1, num_hashes, bloom_filter_longs, 0, stream); cudf::test::fixed_width_column_wrapper input{{20, 80, 100, 99, 47, -9, 234000000}, {0, 1, 1, 1, 0, 1, 1}}; @@ -94,18 +104,18 @@ TEST_F(BloomFilterTest, BuildWithNullsAndProbe) CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, *result); } -TEST_F(BloomFilterTest, BuildAndProbeWithNulls) +TEST_F(BloomFilterTest, BuildAndProbeWithNullsV1) { auto stream = cudf::get_default_stream(); constexpr int bloom_filter_longs = (1024 * 1024); constexpr int num_hashes = 3; cudf::test::fixed_width_column_wrapper input{20, 80, 100, 99, 47, -9, 234000000}; - auto bloom_filter = spark_rapids_jni::bloom_filter_create(num_hashes, bloom_filter_longs, stream); + auto bloom_filter = spark_rapids_jni::bloom_filter_create( + spark_rapids_jni::bloom_filter_version_1, num_hashes, bloom_filter_longs, 0, stream); spark_rapids_jni::bloom_filter_put(*bloom_filter, input, stream); - // probe cudf::test::fixed_width_column_wrapper probe{ {20, 80, 100, 99, 47, -9, 234000000, -10, 1, 2, 3}, {0, 0, 0, 1, 1, 1, 1, 0, 0, 1, 1}}; cudf::test::fixed_width_column_wrapper expected{{1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0}, @@ -117,11 +127,10 @@ TEST_F(BloomFilterTest, BuildAndProbeWithNulls) struct bloom_filter_stride_transform { int const stride; - cudf::size_type __device__ operator()(cudf::size_type i) { return i * stride; } }; -TEST_F(BloomFilterTest, ProbeMerged) +TEST_F(BloomFilterTest, ProbeMergedV1) { auto stream = cudf::get_default_stream(); constexpr int bloom_filter_longs = (1024 * 1024); @@ -129,20 +138,20 @@ TEST_F(BloomFilterTest, ProbeMerged) // column a cudf::test::fixed_width_column_wrapper col_a{20, 80, 100, 99, 47, -9, 234000000}; - auto bloom_filter_a = - spark_rapids_jni::bloom_filter_create(num_hashes, bloom_filter_longs, stream); + auto bloom_filter_a = spark_rapids_jni::bloom_filter_create( + spark_rapids_jni::bloom_filter_version_1, num_hashes, bloom_filter_longs, 0, stream); spark_rapids_jni::bloom_filter_put(*bloom_filter_a, col_a, stream); // column b cudf::test::fixed_width_column_wrapper col_b{100, 200, 300, 400}; - auto bloom_filter_b = - spark_rapids_jni::bloom_filter_create(num_hashes, bloom_filter_longs, stream); + auto bloom_filter_b = spark_rapids_jni::bloom_filter_create( + spark_rapids_jni::bloom_filter_version_1, num_hashes, bloom_filter_longs, 0, stream); spark_rapids_jni::bloom_filter_put(*bloom_filter_b, col_b, stream); // column c cudf::test::fixed_width_column_wrapper col_c{-100, -200, -300, -400}; - auto bloom_filter_c = - spark_rapids_jni::bloom_filter_create(num_hashes, bloom_filter_longs, stream); + auto bloom_filter_c = spark_rapids_jni::bloom_filter_create( + spark_rapids_jni::bloom_filter_version_1, num_hashes, bloom_filter_longs, 0, stream); spark_rapids_jni::bloom_filter_put(*bloom_filter_c, col_c, stream); // pre-merge the individual bloom filters. the merge function expects the inputs to be a single @@ -151,7 +160,7 @@ TEST_F(BloomFilterTest, ProbeMerged) {bloom_filter_a->view(), bloom_filter_b->view(), bloom_filter_c->view()}); auto premerge_children = cudf::concatenate(cols); auto premerge_offsets = cudf::make_fixed_width_column(cudf::data_type{cudf::type_id::INT32}, 4); - thrust::transform(rmm::exec_policy(cudf::get_default_stream()), + thrust::transform(rmm::exec_policy_nosync(cudf::get_default_stream()), thrust::make_counting_iterator(0), thrust::make_counting_iterator(0) + 4, premerge_offsets->mutable_view().begin(), @@ -170,3 +179,197 @@ TEST_F(BloomFilterTest, ProbeMerged) CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, *result); } + +TEST_F(BloomFilterTest, ProbeAllAbsentV1) +{ + auto stream = cudf::get_default_stream(); + constexpr int bloom_filter_longs = (1024 * 1024); + constexpr int num_hashes = 3; + + auto bloom_filter = spark_rapids_jni::bloom_filter_create( + spark_rapids_jni::bloom_filter_version_1, num_hashes, bloom_filter_longs, 0, stream); + + cudf::test::fixed_width_column_wrapper input{20, 80, 100, 99, 47, -9, 234000000}; + spark_rapids_jni::bloom_filter_put(*bloom_filter, input, stream); + + cudf::test::fixed_width_column_wrapper probe{-10, 1, 2, 3, 5000, 999999, -77777}; + cudf::test::fixed_width_column_wrapper expected{0, 0, 0, 0, 0, 0, 0}; + auto result = spark_rapids_jni::bloom_filter_probe(probe, *bloom_filter, stream); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, *result); +} + +// ======================== V2 Tests ======================== + +TEST_F(BloomFilterTest, InitializationV2) +{ + constexpr int num_hashes = 3; + constexpr int seed = 42; + std::vector expected{1, 2, 3}; + + for (size_t idx = 0; idx < expected.size(); idx++) { + auto bloom_filter = + spark_rapids_jni::bloom_filter_create(spark_rapids_jni::bloom_filter_version_2, + num_hashes, + expected[idx], + seed, + cudf::get_default_stream()); + + auto const bloom_filter_size = expected[idx] * sizeof(int64_t); + CUDF_EXPECTS(bloom_filter->view().size() == + spark_rapids_jni::bloom_filter_header_v2_size_bytes + bloom_filter_size, + "Bloom filter not of expected size"); + + auto bytes = + (bloom_filter->view().data()) + spark_rapids_jni::bloom_filter_header_v2_size_bytes; + CUDF_EXPECTS( + thrust::all_of( + rmm::exec_policy(cudf::get_default_stream()), bytes, bytes + bloom_filter_size, is_zero{}), + "Bloom filter not initialized to 0"); + } +} + +TEST_F(BloomFilterTest, BuildAndProbeV2) +{ + auto stream = cudf::get_default_stream(); + constexpr int bloom_filter_longs = (1024 * 1024); + constexpr int num_hashes = 3; + + auto bloom_filter = spark_rapids_jni::bloom_filter_create( + spark_rapids_jni::bloom_filter_version_2, num_hashes, bloom_filter_longs, 0, stream); + + cudf::test::fixed_width_column_wrapper input{20, 80, 100, 99, 47, -9, 234000000}; + spark_rapids_jni::bloom_filter_put(*bloom_filter, input, stream); + + cudf::test::fixed_width_column_wrapper probe_in{20, 80, 100, 99, 47, -9, 234000000}; + cudf::test::fixed_width_column_wrapper expected_in{1, 1, 1, 1, 1, 1, 1}; + auto result = spark_rapids_jni::bloom_filter_probe(probe_in, *bloom_filter, stream); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_in, *result); +} + +TEST_F(BloomFilterTest, BuildWithNullsAndProbeV2) +{ + auto stream = cudf::get_default_stream(); + constexpr int bloom_filter_longs = (1024 * 1024); + constexpr int num_hashes = 3; + + auto bloom_filter = spark_rapids_jni::bloom_filter_create( + spark_rapids_jni::bloom_filter_version_2, num_hashes, bloom_filter_longs, 0, stream); + cudf::test::fixed_width_column_wrapper input{{20, 80, 100, 99, 47, -9, 234000000}, + {0, 1, 1, 1, 0, 1, 1}}; + + spark_rapids_jni::bloom_filter_put(*bloom_filter, input, stream); + + cudf::test::fixed_width_column_wrapper probe{ + 20, 80, 100, 99, 47, -9, 234000000, -10, 1, 2, 3}; + cudf::test::fixed_width_column_wrapper expected{0, 1, 1, 1, 0, 1, 1, 0, 0, 0, 0}; + auto result = spark_rapids_jni::bloom_filter_probe(probe, *bloom_filter, stream); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, *result); +} + +TEST_F(BloomFilterTest, BuildAndProbeWithNullsV2) +{ + auto stream = cudf::get_default_stream(); + constexpr int bloom_filter_longs = (1024 * 1024); + constexpr int num_hashes = 3; + + cudf::test::fixed_width_column_wrapper input{20, 80, 100, 99, 47, -9, 234000000}; + auto bloom_filter = spark_rapids_jni::bloom_filter_create( + spark_rapids_jni::bloom_filter_version_2, num_hashes, bloom_filter_longs, 0, stream); + + spark_rapids_jni::bloom_filter_put(*bloom_filter, input, stream); + + cudf::test::fixed_width_column_wrapper probe{ + {20, 80, 100, 99, 47, -9, 234000000, -10, 1, 2, 3}, {0, 0, 0, 1, 1, 1, 1, 0, 0, 1, 1}}; + cudf::test::fixed_width_column_wrapper expected{{1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0}, + {0, 0, 0, 1, 1, 1, 1, 0, 0, 1, 1}}; + auto result = spark_rapids_jni::bloom_filter_probe(probe, *bloom_filter, stream); + + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(expected, *result); +} + +TEST_F(BloomFilterTest, ProbeMergedV2) +{ + auto stream = cudf::get_default_stream(); + constexpr int bloom_filter_longs = (1024 * 1024); + constexpr int num_hashes = 3; + + cudf::test::fixed_width_column_wrapper col_a{20, 80, 100, 99, 47, -9, 234000000}; + auto bloom_filter_a = spark_rapids_jni::bloom_filter_create( + spark_rapids_jni::bloom_filter_version_2, num_hashes, bloom_filter_longs, 0, stream); + spark_rapids_jni::bloom_filter_put(*bloom_filter_a, col_a, stream); + + cudf::test::fixed_width_column_wrapper col_b{100, 200, 300, 400}; + auto bloom_filter_b = spark_rapids_jni::bloom_filter_create( + spark_rapids_jni::bloom_filter_version_2, num_hashes, bloom_filter_longs, 0, stream); + spark_rapids_jni::bloom_filter_put(*bloom_filter_b, col_b, stream); + + cudf::test::fixed_width_column_wrapper col_c{-100, -200, -300, -400}; + auto bloom_filter_c = spark_rapids_jni::bloom_filter_create( + spark_rapids_jni::bloom_filter_version_2, num_hashes, bloom_filter_longs, 0, stream); + spark_rapids_jni::bloom_filter_put(*bloom_filter_c, col_c, stream); + + std::vector cols( + {bloom_filter_a->view(), bloom_filter_b->view(), bloom_filter_c->view()}); + auto premerge_children = cudf::concatenate(cols); + auto premerge_offsets = cudf::make_fixed_width_column(cudf::data_type{cudf::type_id::INT32}, 4); + thrust::transform(rmm::exec_policy(cudf::get_default_stream()), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(0) + 4, + premerge_offsets->mutable_view().begin(), + bloom_filter_stride_transform{bloom_filter_a->view().size()}); + auto premerged = cudf::make_lists_column( + 3, std::move(premerge_offsets), std::move(premerge_children), 0, rmm::device_buffer{}); + + auto bloom_filter_merged = spark_rapids_jni::bloom_filter_merge(*premerged); + + cudf::test::fixed_width_column_wrapper probe_all{ + 20, 80, 100, 99, 47, -9, 234000000, 200, 300, 400, -100, -200, -300, -400}; + cudf::test::fixed_width_column_wrapper expected_all{ + 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}; + auto result = spark_rapids_jni::bloom_filter_probe(probe_all, *bloom_filter_merged, stream); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_all, *result); +} + +TEST_F(BloomFilterTest, ProbeAllAbsentV2) +{ + auto stream = cudf::get_default_stream(); + constexpr int bloom_filter_longs = (1024 * 1024); + constexpr int num_hashes = 3; + + auto bloom_filter = spark_rapids_jni::bloom_filter_create( + spark_rapids_jni::bloom_filter_version_2, num_hashes, bloom_filter_longs, 0, stream); + + cudf::test::fixed_width_column_wrapper input{20, 80, 100, 99, 47, -9, 234000000}; + spark_rapids_jni::bloom_filter_put(*bloom_filter, input, stream); + + cudf::test::fixed_width_column_wrapper probe{-10, 1, 2, 3, 5000, 999999, -77777}; + cudf::test::fixed_width_column_wrapper expected{0, 0, 0, 0, 0, 0, 0}; + auto result = spark_rapids_jni::bloom_filter_probe(probe, *bloom_filter, stream); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, *result); +} + +TEST_F(BloomFilterTest, V2WithSeed) +{ + auto stream = cudf::get_default_stream(); + constexpr int bloom_filter_longs = (1024 * 1024); + constexpr int num_hashes = 3; + + auto bf_seed0 = spark_rapids_jni::bloom_filter_create( + spark_rapids_jni::bloom_filter_version_2, num_hashes, bloom_filter_longs, 0, stream); + auto bf_seed42 = spark_rapids_jni::bloom_filter_create( + spark_rapids_jni::bloom_filter_version_2, num_hashes, bloom_filter_longs, 42, stream); + + cudf::test::fixed_width_column_wrapper input{20, 80, 100, 99, 47, -9, 234000000}; + spark_rapids_jni::bloom_filter_put(*bf_seed0, input, stream); + spark_rapids_jni::bloom_filter_put(*bf_seed42, input, stream); + + cudf::test::fixed_width_column_wrapper probe_in{20, 80, 100, 99, 47, -9, 234000000}; + cudf::test::fixed_width_column_wrapper expected_in{1, 1, 1, 1, 1, 1, 1}; + + auto r0 = spark_rapids_jni::bloom_filter_probe(probe_in, *bf_seed0, stream); + auto r42 = spark_rapids_jni::bloom_filter_probe(probe_in, *bf_seed42, stream); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_in, *r0); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_in, *r42); +} diff --git a/src/main/java/com/nvidia/spark/rapids/jni/BloomFilter.java b/src/main/java/com/nvidia/spark/rapids/jni/BloomFilter.java index 6a676a54bb..926a80705b 100644 --- a/src/main/java/com/nvidia/spark/rapids/jni/BloomFilter.java +++ b/src/main/java/com/nvidia/spark/rapids/jni/BloomFilter.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023-2024, NVIDIA CORPORATION. + * Copyright (c) 2023-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,77 +24,96 @@ import ai.rapids.cudf.NativeDepsLoader; public class BloomFilter { + public static final int VERSION_1 = 1; + public static final int VERSION_2 = 2; + public static final int DEFAULT_SEED = 0; + static { NativeDepsLoader.loadNativeDeps(); } /** - * Create a bloom filter with the specified number of hashes and bloom filter bits. - * @param numHashes The number of hashes to use when inserting values into the bloom filter or - * when probing. - * @param bloomFilterBits Size of the bloom filter in bits. - * @return a Scalar object which encapsulates the bloom filter. + * Create a V1 bloom filter with the specified number of hashes and bloom filter bits, + * using the default seed. + * + * @deprecated Use {@link #create(int, int, long, int)} instead. + * @param numHashes Number of bit positions set (and checked) per key. + * @param bloomFilterBits Total size of the bloom filter in bits (will be rounded up to a + * multiple of 64). + * @return A Scalar wrapping the GPU bloom filter (Spark format). + */ + @Deprecated + public static Scalar create(int numHashes, long bloomFilterBits) { + return create(VERSION_1, numHashes, bloomFilterBits, DEFAULT_SEED); + } + + /** + * Create a bloom filter with the specified version, number of hashes, bloom filter bits, + * and hash seed. + * + * @param version Bloom filter format: {@link #VERSION_1} or {@link #VERSION_2}. V2 uses + * 64-bit hash indexing and supports a configurable seed. + * @param numHashes Number of bit positions set (and checked) per key. Higher values reduce + * false positives but increase work per put/probe. + * @param bloomFilterBits Total size of the bloom filter in bits (will be rounded up to a + * multiple of 64). + * @param seed Hash seed. Used only for V2; ignored for V1. + * @return A Scalar wrapping the GPU bloom filter (Spark format). */ - public static Scalar create(int numHashes, long bloomFilterBits){ - if(numHashes <= 0){ + public static Scalar create(int version, int numHashes, long bloomFilterBits, int seed) { + if (version != VERSION_1 && version != VERSION_2) { + throw new IllegalArgumentException("Bloom filter version must be 1 or 2"); + } + if (numHashes <= 0) { throw new IllegalArgumentException("Bloom filters must have a positive hash count"); } - if(bloomFilterBits <= 0){ + if (bloomFilterBits <= 0) { throw new IllegalArgumentException("Bloom filters must have a positive number of bits"); } - return new Scalar(DType.LIST, creategpu(numHashes, bloomFilterBits)); + return new Scalar(DType.LIST, creategpu(version, numHashes, bloomFilterBits, seed)); } - /** - * Insert a column of longs into a bloom filter. - * @param bloomFilter The bloom filter to which values will be inserted. - * @param cv The column containing the values to add. - */ - public static void put(Scalar bloomFilter, ColumnVector cv){ + public static void put(Scalar bloomFilter, ColumnVector cv) { put(bloomFilter.getScalarHandle(), cv.getNativeView()); } - /** - * Merge one or more bloom filters into a new bloom filter. - * @param bloomFilters A ColumnVector containing a bloom filter per row. - * @return A new bloom filter containing the merged inputs. - */ - public static Scalar merge(ColumnVector bloomFilters){ + public static Scalar merge(ColumnVector bloomFilters) { return new Scalar(DType.LIST, merge(bloomFilters.getNativeView())); } /** - * Probe a bloom filter with a column of longs. Returns a column of booleans. For - * each row in the output; a value of true indicates that the corresponding input value - * -may- be in the set of values used to build the bloom filter; a value of false indicates - * that the corresponding input value is conclusively not in the set of values used to build - * the bloom filter. - * @param bloomFilter The bloom filter to be probed. - * @param cv The column containing the values to check. - * @return A boolean column indicating the results of the probe. + * Probe a bloom filter with a column of longs. For each row, true means the value may be + * in the set used to build the filter; false means it is definitely not in the set. + * + * @param bloomFilter The bloom filter to probe (a Scalar wrapping the GPU filter). + * @param cv Column of int64 values to check for membership. + * @return A boolean column with the same row count as cv; true for possible membership, + * false for definite non-membership. Nulls in cv are preserved in the output. */ - public static ColumnVector probe(Scalar bloomFilter, ColumnVector cv){ + public static ColumnVector probe(Scalar bloomFilter, ColumnVector cv) { return new ColumnVector(probe(bloomFilter.getScalarHandle(), cv.getNativeView())); } /** - * Probe a bloom filter with a column of longs. Returns a column of booleans. For - * each row in the output; a value of true indicates that the corresponding input value - * -may- be in the set of values used to build the bloom filter; a value of false indicates - * that the corresponding input value is conclusively not in the set of values used to build - * the bloom filter. - * @param bloomFilter The bloom filter to be probed. This buffer is expected to be the - * fully packed Spark bloom filter, including header. - * @param cv The column containing the values to check. - * @return A boolean column indicating the results of the probe. + * Probe a bloom filter with a column of longs. For each row, true means the value may be + * in the set used to build the filter; false means it is definitely not in the set. + * Use this overload when the filter is in a device buffer (e.g. Spark serialized form). + * + * @param bloomFilter Device buffer containing the packed bloom filter including header. + * @param cv Column of int64 values to check for membership. + * @return A boolean column with the same row count as cv; true for possible membership, + * false for definite non-membership. Nulls in cv are preserved in the output. */ - public static ColumnVector probe(BaseDeviceMemoryBuffer bloomFilter, ColumnVector cv){ - return new ColumnVector(probebuffer(bloomFilter.getAddress(), bloomFilter.getLength(), cv.getNativeView())); + public static ColumnVector probe(BaseDeviceMemoryBuffer bloomFilter, ColumnVector cv) { + return new ColumnVector( + probebuffer(bloomFilter.getAddress(), bloomFilter.getLength(), cv.getNativeView())); } - private static native long creategpu(int numHashes, long bloomFilterBits) throws CudfException; + private static native long creategpu(int version, int numHashes, long bloomFilterBits, int seed) + throws CudfException; private static native int put(long bloomFilter, long cv) throws CudfException; private static native long merge(long bloomFilters) throws CudfException; private static native long probe(long bloomFilter, long cv) throws CudfException; - private static native long probebuffer(long bloomFilter, long bloomFilterSize, long cv) throws CudfException; + private static native long probebuffer(long bloomFilter, long bloomFilterSize, long cv) + throws CudfException; } diff --git a/src/test/java/com/nvidia/spark/rapids/jni/BloomFilterTest.java b/src/test/java/com/nvidia/spark/rapids/jni/BloomFilterTest.java index 05ac497e72..e844883952 100644 --- a/src/test/java/com/nvidia/spark/rapids/jni/BloomFilterTest.java +++ b/src/test/java/com/nvidia/spark/rapids/jni/BloomFilterTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023-2025, NVIDIA CORPORATION. + * Copyright (c) 2023-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,99 +16,97 @@ package com.nvidia.spark.rapids.jni; -import com.nvidia.spark.rapids.jni.BloomFilter; - import ai.rapids.cudf.AssertUtils; import ai.rapids.cudf.ColumnVector; -import ai.rapids.cudf.Cuda; import ai.rapids.cudf.CudfException; import ai.rapids.cudf.Scalar; -import ai.rapids.cudf.DeviceMemoryBuffer; import static org.junit.jupiter.api.Assertions.assertThrows; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; public class BloomFilterTest { - @Test - void testBuildAndProbe(){ + @ParameterizedTest + @ValueSource(ints = {BloomFilter.VERSION_1, BloomFilter.VERSION_2}) + void testBuildAndProbe(int version) { int numHashes = 3; long bloomFilterBits = 4 * 1024 * 1024; try (ColumnVector input = ColumnVector.fromLongs(20, 80, 100, 99, 47, -9, 234000000); - Scalar bloomFilter = BloomFilter.create(numHashes, bloomFilterBits)){ - + Scalar bloomFilter = BloomFilter.create(version, numHashes, bloomFilterBits, 0)) { BloomFilter.put(bloomFilter, input); - try(ColumnVector probe = ColumnVector.fromLongs(20, 80, 100, 99, 47, -9, 234000000, -10, 1, 2, 3); - ColumnVector expected = ColumnVector.fromBooleans(true, true, true, true, true, true, true, false, false, false, false); - ColumnVector result = BloomFilter.probe(bloomFilter, probe)){ + try (ColumnVector probe = ColumnVector.fromLongs(20, 80, 100, 99, 47, -9, 234000000, -10, 1, 2, 3); + ColumnVector expected = ColumnVector.fromBooleans(true, true, true, true, true, true, true, false, false, false, false); + ColumnVector result = BloomFilter.probe(bloomFilter, probe)) { AssertUtils.assertColumnsAreEqual(expected, result); } } } - @Test - void testBuildAndProbeBuffer(){ + @ParameterizedTest + @ValueSource(ints = {BloomFilter.VERSION_1, BloomFilter.VERSION_2}) + void testBuildAndProbeBuffer(int version) { int numHashes = 3; long bloomFilterBits = 4 * 1024 * 1024; try (ColumnVector input = ColumnVector.fromLongs(20, 80, 100, 99, 47, -9, 234000000); - Scalar bloomFilter = BloomFilter.create(numHashes, bloomFilterBits)){ - + Scalar bloomFilter = BloomFilter.create(version, numHashes, bloomFilterBits, 0)) { BloomFilter.put(bloomFilter, input); - - try(ColumnVector probe = ColumnVector.fromLongs(20, 80, 100, 99, 47, -9, 234000000, -10, 1, 2, 3); - ColumnVector expected = ColumnVector.fromBooleans(true, true, true, true, true, true, true, false, false, false, false); - ColumnVector result = BloomFilter.probe(bloomFilter.getListAsColumnView().getData(), probe)){ + try (ColumnVector probe = ColumnVector.fromLongs(20, 80, 100, 99, 47, -9, 234000000, -10, 1, 2, 3); + ColumnVector expected = ColumnVector.fromBooleans(true, true, true, true, true, true, true, false, false, false, false); + ColumnVector result = BloomFilter.probe(bloomFilter.getListAsColumnView().getData(), probe)) { AssertUtils.assertColumnsAreEqual(expected, result); } } } - @Test - void testBuildWithNullsAndProbe(){ + @ParameterizedTest + @ValueSource(ints = {BloomFilter.VERSION_1, BloomFilter.VERSION_2}) + void testBuildWithNullsAndProbe(int version) { int numHashes = 3; long bloomFilterBits = 4 * 1024 * 1024; try (ColumnVector input = ColumnVector.fromBoxedLongs(null, 80L, 100L, null, 47L, -9L, 234000000L); - Scalar bloomFilter = BloomFilter.create(numHashes, bloomFilterBits)){ - + Scalar bloomFilter = BloomFilter.create(version, numHashes, bloomFilterBits, 0)) { BloomFilter.put(bloomFilter, input); - try(ColumnVector probe = ColumnVector.fromLongs(20, 80, 100, 99, 47, -9, 234000000, -10, 1, 2, 3); - ColumnVector expected = ColumnVector.fromBooleans(false, true, true, false, true, true, true, false, false, false, false); - ColumnVector result = BloomFilter.probe(bloomFilter, probe)){ + try (ColumnVector probe = ColumnVector.fromLongs(20, 80, 100, 99, 47, -9, 234000000, -10, 1, 2, 3); + ColumnVector expected = ColumnVector.fromBooleans(false, true, true, false, true, true, true, false, false, false, false); + ColumnVector result = BloomFilter.probe(bloomFilter, probe)) { AssertUtils.assertColumnsAreEqual(expected, result); } } } - @Test - void testBuildAndProbeWithNulls(){ + @ParameterizedTest + @ValueSource(ints = {BloomFilter.VERSION_1, BloomFilter.VERSION_2}) + void testBuildAndProbeWithNulls(int version) { int numHashes = 3; long bloomFilterBits = 4 * 1024 * 1024; try (ColumnVector input = ColumnVector.fromLongs(20, 80, 100, 99, 47, -9, 234000000); - Scalar bloomFilter = BloomFilter.create(numHashes, bloomFilterBits)){ - + Scalar bloomFilter = BloomFilter.create(version, numHashes, bloomFilterBits, 0)) { BloomFilter.put(bloomFilter, input); - try(ColumnVector probe = ColumnVector.fromBoxedLongs(null, null, null, 99L, 47L, -9L, 234000000L, null, null, 2L, 3L); - ColumnVector expected = ColumnVector.fromBoxedBooleans(null, null, null, true, true, true, true, null, null, false, false); - ColumnVector result = BloomFilter.probe(bloomFilter, probe)){ + try (ColumnVector probe = ColumnVector.fromBoxedLongs(null, null, null, 99L, 47L, -9L, 234000000L, null, null, 2L, 3L); + ColumnVector expected = ColumnVector.fromBoxedBooleans(null, null, null, true, true, true, true, null, null, false, false); + ColumnVector result = BloomFilter.probe(bloomFilter, probe)) { AssertUtils.assertColumnsAreEqual(expected, result); } } } - - @Test - void testBuildMergeProbe(){ + + @ParameterizedTest + @ValueSource(ints = {BloomFilter.VERSION_1, BloomFilter.VERSION_2}) + void testBuildMergeProbe(int version) { int numHashes = 3; long bloomFilterBits = 4 * 1024 * 1024; try (ColumnVector colA = ColumnVector.fromLongs(20, 80, 100, 99, 47, -9, 234000000); ColumnVector colB = ColumnVector.fromLongs(100, 200, 300, 400); ColumnVector colC = ColumnVector.fromLongs(-100, -200, -300, -400); - Scalar bloomFilterA = BloomFilter.create(numHashes, bloomFilterBits); - Scalar bloomFilterB = BloomFilter.create(numHashes, bloomFilterBits); - Scalar bloomFilterC = BloomFilter.create(numHashes, bloomFilterBits)){ + Scalar bloomFilterA = BloomFilter.create(version, numHashes, bloomFilterBits, 0); + Scalar bloomFilterB = BloomFilter.create(version, numHashes, bloomFilterBits, 0); + Scalar bloomFilterC = BloomFilter.create(version, numHashes, bloomFilterBits, 0)) { BloomFilter.put(bloomFilterA, colA); BloomFilter.put(bloomFilterB, colB); @@ -123,8 +121,8 @@ void testBuildMergeProbe(){ 65535, 0, -100, -200, -300, -400); ColumnVector expected = ColumnVector.fromBooleans(true, true, true, false, false, true, false, false, true, true, true, true); - Scalar merged = BloomFilter.merge(premerge); - ColumnVector result = BloomFilter.probe(merged, probe)) { + Scalar merged = BloomFilter.merge(premerge); + ColumnVector result = BloomFilter.probe(merged, probe)) { AssertUtils.assertColumnsAreEqual(expected, result); } } @@ -132,60 +130,97 @@ void testBuildMergeProbe(){ } } - @Test - void testBuildTrivialMergeProbe(){ + @ParameterizedTest + @ValueSource(ints = {BloomFilter.VERSION_1, BloomFilter.VERSION_2}) + void testBuildTrivialMergeProbe(int version) { int numHashes = 3; long bloomFilterBits = 4 * 1024 * 1024; try (ColumnVector colA = ColumnVector.fromLongs(20, 80, 100, 99, 47, -9, 234000000); - Scalar bloomFilter = BloomFilter.create(numHashes, bloomFilterBits)){ + Scalar bloomFilter = BloomFilter.create(version, numHashes, bloomFilterBits, 0)) { BloomFilter.put(bloomFilter, colA); - try(ColumnVector premerge = ColumnVector.fromScalar(bloomFilter, 1); - ColumnVector probe = ColumnVector.fromLongs(-9, 200, 300, 6000, -2546, 99, 65535, 0, -100, -200, -300, -400); - ColumnVector expected = ColumnVector.fromBooleans(true, false, false, false, false, true, false, false, false, false, false, false); - Scalar merged = BloomFilter.merge(premerge); - ColumnVector result = BloomFilter.probe(merged, probe)){ - AssertUtils.assertColumnsAreEqual(expected, result); + try (ColumnVector premerge = ColumnVector.fromScalar(bloomFilter, 1); + ColumnVector probe = ColumnVector.fromLongs(-9, 200, 300, 6000, -2546, 99, 65535, 0, -100, -200, -300, -400); + ColumnVector expected = ColumnVector.fromBooleans(true, false, false, false, false, true, false, false, false, false, false, false); + Scalar merged = BloomFilter.merge(premerge); + ColumnVector result = BloomFilter.probe(merged, probe)) { + AssertUtils.assertColumnsAreEqual(expected, result); } } } @Test - void testBuildExpectedFailures(){ + void testBuildAndProbeV2WithSeed() { + int numHashes = 3; + long bloomFilterBits = 4 * 1024 * 1024; + int seed = 42; + + try (ColumnVector input = ColumnVector.fromLongs(20, 80, 100, 99, 47, -9, 234000000); + Scalar bloomFilter = BloomFilter.create(BloomFilter.VERSION_2, numHashes, bloomFilterBits, seed)) { + BloomFilter.put(bloomFilter, input); + try (ColumnVector probe = ColumnVector.fromLongs(20, 80, 100, 99, 47, -9, 234000000, -10, 1, 2, 3); + ColumnVector expected = ColumnVector.fromBooleans(true, true, true, true, true, true, true, false, false, false, false); + ColumnVector result = BloomFilter.probe(bloomFilter, probe)) { + AssertUtils.assertColumnsAreEqual(expected, result); + } + } + } + + @ParameterizedTest + @ValueSource(ints = {BloomFilter.VERSION_1, BloomFilter.VERSION_2}) + void testBuildExpectedFailures(int version) { // bloom filter with no hashes assertThrows(IllegalArgumentException.class, () -> { - try (Scalar bloomFilter = BloomFilter.create(0, 64)){} + try (Scalar bloomFilter = BloomFilter.create(version, 0, 64, 0)) {} }); // bloom filter with no size assertThrows(IllegalArgumentException.class, () -> { - try (Scalar bloomFilter = BloomFilter.create(3, 0)){} + try (Scalar bloomFilter = BloomFilter.create(version, 3, 0, 0)) {} }); - + // merge with mixed hash counts assertThrows(CudfException.class, () -> { - try (Scalar bloomFilterA = BloomFilter.create(3, 1024); - Scalar bloomFilterB = BloomFilter.create(4, 1024); - Scalar bloomFilterC = BloomFilter.create(4, 1024); + try (Scalar bloomFilterA = BloomFilter.create(version, 3, 1024, 0); + Scalar bloomFilterB = BloomFilter.create(version, 4, 1024, 0); + Scalar bloomFilterC = BloomFilter.create(version, 4, 1024, 0); ColumnVector bloomA = ColumnVector.fromScalar(bloomFilterA, 1); ColumnVector bloomB = ColumnVector.fromScalar(bloomFilterB, 1); ColumnVector bloomC = ColumnVector.fromScalar(bloomFilterC, 1); ColumnVector premerge = ColumnVector.concatenate(bloomA, bloomB, bloomC); - Scalar merged = BloomFilter.merge(premerge)){} + Scalar merged = BloomFilter.merge(premerge)) {} }); - // merge with mixed hash bit sizes + // merge with mixed bit sizes assertThrows(CudfException.class, () -> { - try (Scalar bloomFilterA = BloomFilter.create(3, 1024); - Scalar bloomFilterB = BloomFilter.create(3, 1024); - Scalar bloomFilterC = BloomFilter.create(3, 2048); + try (Scalar bloomFilterA = BloomFilter.create(version, 3, 1024, 0); + Scalar bloomFilterB = BloomFilter.create(version, 3, 1024, 0); + Scalar bloomFilterC = BloomFilter.create(version, 3, 2048, 0); ColumnVector bloomA = ColumnVector.fromScalar(bloomFilterA, 1); ColumnVector bloomB = ColumnVector.fromScalar(bloomFilterB, 1); ColumnVector bloomC = ColumnVector.fromScalar(bloomFilterC, 1); ColumnVector premerge = ColumnVector.concatenate(bloomA, bloomB, bloomC); - Scalar merged = BloomFilter.merge(premerge)){} + Scalar merged = BloomFilter.merge(premerge)) {} + }); + } + + @Test + void testBuildExpectedFailuresVersionIndependent() { + // invalid version + assertThrows(IllegalArgumentException.class, () -> { + try (Scalar bloomFilter = BloomFilter.create(3, 3, 64, 0)) {} + }); + + // merge with mixed versions (V1 + V2) + assertThrows(CudfException.class, () -> { + try (Scalar bloomFilterA = BloomFilter.create(BloomFilter.VERSION_1, 3, 1024, 0); + Scalar bloomFilterB = BloomFilter.create(BloomFilter.VERSION_2, 3, 1024, 0); + ColumnVector bloomA = ColumnVector.fromScalar(bloomFilterA, 1); + ColumnVector bloomB = ColumnVector.fromScalar(bloomFilterB, 1); + ColumnVector premerge = ColumnVector.concatenate(bloomA, bloomB); + Scalar merged = BloomFilter.merge(premerge)) {} }); } }