Skip to content

Commit 46de7f0

Browse files
fix: Fix ORC bloom filter hashing to match official implementation (#76)
* fix: replace Murmur3 dependency with custom hash functions in Bloom filter * fix: improve Bloom filter predicate tests for clarity and accuracy * fix: refactor Bloom filter hash handling to use add_hash method * Update src/bloom_filter.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 56f1682 commit 46de7f0

4 files changed

Lines changed: 242 additions & 147 deletions

File tree

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ flate2 = "1"
4242
log = "0.4"
4343
lz4_flex = "0.11"
4444
lzokay-native = "0.1"
45-
murmur3 = "0.5"
4645
num = "0.4.1"
4746
prost = "0.13"
4847
snafu = "0.8"

src/bloom_filter.rs

Lines changed: 156 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,15 @@
1919
//!
2020
//! This follows the ORC v1 spec (https://orc.apache.org/specification/ORCv1/):
2121
//! - Stream kinds `BLOOM_FILTER` / `BLOOM_FILTER_UTF8` provide per-row-group filters.
22-
//! - Bits are set using Murmur3 x64_128 with seed 0, deriving h1/h2 and the
23-
//! double-hash sequence `h1 + i*h2 (mod m)` for `numHashFunctions`.
22+
//! - Values are hashed to a 64-bit base hash (ORC's Murmur3 hash64),
23+
//! split into two 32-bit hashes, and combined with `hash1 + i*hash2`
24+
//! for `numHashFunctions` (i starts at 1).
2425
//! - A cleared bit means the value is **definitely absent**; set bits mean
2526
//! **possible presence** (false positives allowed).
2627
//!
2728
//! Bloom filters are attached to row groups and can quickly rule out equality
2829
//! predicates (e.g. `col = 'abc'`) before any data decoding.
2930
30-
use murmur3::murmur3_x64_128;
31-
3231
use crate::proto;
3332

3433
/// A Bloom filter parsed from the ORC index stream.
@@ -94,78 +93,175 @@ impl BloomFilter {
9493
}
9594
}
9695

97-
/// Returns true if the value *might* be contained. False means *definitely not*.
98-
pub fn might_contain(&self, value: &[u8]) -> bool {
96+
/// Set bits for the provided 64-bit hash using ORC's double-hash scheme.
97+
pub fn add_hash(&mut self, hash64: u64) {
98+
let bit_count = self.bitset.len() * 64;
99+
if bit_count == 0 {
100+
return;
101+
}
102+
103+
let hash1 = hash64 as u32 as i32;
104+
let hash2 = (hash64 >> 32) as u32 as i32;
105+
106+
for i in 1..=self.num_hash_functions {
107+
let mut combined = hash1.wrapping_add((i as i32).wrapping_mul(hash2));
108+
if combined < 0 {
109+
combined = !combined;
110+
}
111+
let bit_idx = ((combined as u32 as u64) % (bit_count as u64)) as usize;
112+
self.bitset[bit_idx / 64] |= 1u64 << (bit_idx % 64);
113+
}
114+
}
115+
116+
/// Returns true if the hash *might* be contained. False means *definitely not*.
117+
pub fn test_hash(&self, hash64: u64) -> bool {
99118
let bit_count = self.bitset.len() * 64;
100119
if bit_count == 0 {
101120
// Defensive: no bits means we cannot use the filter
102121
return true;
103122
}
104123

105-
let hash = self.hash128(value);
106-
let h1 = hash as u64;
107-
let h2 = (hash >> 64) as u64;
124+
let hash1 = hash64 as u32 as i32;
125+
let hash2 = (hash64 >> 32) as u32 as i32;
108126

109-
for i in 0..self.num_hash_functions {
110-
// ORC uses the standard double-hash scheme: h1 + i*h2 (mod m)
111-
let combined = h1.wrapping_add((i as u64).wrapping_mul(h2));
112-
let bit_idx = (combined % (bit_count as u64)) as usize;
113-
if !self.test_bit(bit_idx) {
127+
// Mirror ORC Java BloomFilter.addHash/testHash:
128+
// split 64-bit hash into two signed 32-bit ints, combine with i=1..k,
129+
// flip negative results, then modulo by bit count.
130+
for i in 1..=self.num_hash_functions {
131+
let mut combined = hash1.wrapping_add((i as i32).wrapping_mul(hash2));
132+
if combined < 0 {
133+
combined = !combined;
134+
}
135+
let bit_idx = ((combined as u32 as u64) % (bit_count as u64)) as usize;
136+
let word = bit_idx / 64;
137+
let bit = bit_idx % 64;
138+
let mask = 1u64 << bit;
139+
if self
140+
.bitset
141+
.get(word)
142+
.map_or(true, |bits| (bits & mask) == 0)
143+
{
114144
return false;
115145
}
116146
}
117147

118148
true
119149
}
120150

121-
fn hash128(&self, value: &[u8]) -> u128 {
122-
// The ORC specification uses Murmur3 (64-bit) for bloom filters.
123-
// murmur3_x64_128 matches the Java reference implementation, where
124-
// the lower 64 bits are treated as h1 and the upper 64 bits as h2.
125-
let mut cursor = std::io::Cursor::new(value);
126-
murmur3_x64_128(&mut cursor, 0).unwrap_or(0)
151+
pub(crate) fn hash_bytes(value: &[u8]) -> u64 {
152+
// ORC Java uses Murmur3.hash64 with a fixed seed (104729).
153+
murmur3_64_orc(value)
127154
}
128155

129-
fn test_bit(&self, bit_idx: usize) -> bool {
130-
let word = bit_idx / 64;
131-
let bit = bit_idx % 64;
132-
if let Some(bits) = self.bitset.get(word) {
133-
(bits & (1u64 << bit)) != 0
134-
} else {
135-
false
136-
}
156+
pub(crate) fn hash_long(value: i64) -> u64 {
157+
// Thomas Wang's 64-bit mix function, matching Java's signed long operations.
158+
let mut key = value;
159+
key = (!key).wrapping_add(key.wrapping_shl(21));
160+
key ^= key >> 24;
161+
key = key
162+
.wrapping_add(key.wrapping_shl(3))
163+
.wrapping_add(key.wrapping_shl(8));
164+
key ^= key >> 14;
165+
key = key
166+
.wrapping_add(key.wrapping_shl(2))
167+
.wrapping_add(key.wrapping_shl(4));
168+
key ^= key >> 28;
169+
key = key.wrapping_add(key.wrapping_shl(31));
170+
key as u64
137171
}
138172
}
139173

174+
/// ORC's Murmur3 hash64 implementation (seed = 104729), used for Bloom filters.
175+
fn murmur3_64_orc(bytes: &[u8]) -> u64 {
176+
const C1: u64 = 0x87c3_7b91_1142_53d5;
177+
const C2: u64 = 0x4cf5_ad43_2745_937f;
178+
const R1: u32 = 31;
179+
const R2: u32 = 27;
180+
const M: u64 = 5;
181+
const N1: u64 = 1_390_208_809;
182+
const SEED: u64 = 104_729;
183+
184+
let mut h1 = SEED;
185+
let nblocks = bytes.len() / 8;
186+
187+
for i in 0..nblocks {
188+
let start = i * 8;
189+
let mut k1 =
190+
u64::from_le_bytes(bytes[start..start + 8].try_into().unwrap()).wrapping_mul(C1);
191+
k1 = k1.rotate_left(R1);
192+
k1 = k1.wrapping_mul(C2);
193+
194+
h1 ^= k1;
195+
h1 = h1.rotate_left(R2);
196+
h1 = h1.wrapping_mul(M).wrapping_add(N1);
197+
}
198+
199+
let mut k1 = 0u64;
200+
let tail = &bytes[nblocks * 8..];
201+
if tail.len() >= 7 {
202+
k1 ^= (tail[6] as u64) << 48;
203+
}
204+
if tail.len() >= 6 {
205+
k1 ^= (tail[5] as u64) << 40;
206+
}
207+
if tail.len() >= 5 {
208+
k1 ^= (tail[4] as u64) << 32;
209+
}
210+
if tail.len() >= 4 {
211+
k1 ^= (tail[3] as u64) << 24;
212+
}
213+
if tail.len() >= 3 {
214+
k1 ^= (tail[2] as u64) << 16;
215+
}
216+
if tail.len() >= 2 {
217+
k1 ^= (tail[1] as u64) << 8;
218+
}
219+
if !tail.is_empty() {
220+
k1 ^= tail[0] as u64;
221+
}
222+
223+
if !tail.is_empty() {
224+
k1 = k1.wrapping_mul(C1);
225+
k1 = k1.rotate_left(R1);
226+
k1 = k1.wrapping_mul(C2);
227+
h1 ^= k1;
228+
}
229+
230+
h1 ^= bytes.len() as u64;
231+
fmix64(h1)
232+
}
233+
234+
/// Finalization mix function for Murmur3 64-bit.
235+
fn fmix64(mut k: u64) -> u64 {
236+
k ^= k >> 33;
237+
k = k.wrapping_mul(0xff51_afd7_ed55_8ccd);
238+
k ^= k >> 33;
239+
k = k.wrapping_mul(0xc4ce_b9fe_1a85_ec53);
240+
k ^= k >> 33;
241+
k
242+
}
243+
140244
#[cfg(test)]
141245
mod tests {
142246
use super::*;
143247

144248
fn build_filter(values: &[&[u8]], bitset_words: usize, hash_funcs: u32) -> BloomFilter {
145-
let mut bitset = vec![0u64; bitset_words];
146-
let bit_count = bitset_words * 64;
147-
249+
let mut filter = BloomFilter::from_parts(hash_funcs, vec![0u64; bitset_words]);
148250
for value in values {
149-
let mut cursor = std::io::Cursor::new(*value);
150-
let hash = murmur3_x64_128(&mut cursor, 0).unwrap();
151-
let h1 = hash as u64;
152-
let h2 = (hash >> 64) as u64;
153-
for i in 0..hash_funcs {
154-
let combined = h1.wrapping_add((i as u64).wrapping_mul(h2));
155-
let bit_idx = (combined % (bit_count as u64)) as usize;
156-
bitset[bit_idx / 64] |= 1u64 << (bit_idx % 64);
157-
}
251+
let hash64 = BloomFilter::hash_bytes(value);
252+
filter.add_hash(hash64);
158253
}
159-
160-
BloomFilter::from_parts(hash_funcs, bitset)
254+
filter
161255
}
162256

163257
#[test]
164258
fn test_bloom_filter_hit_and_miss() {
165259
let filter = build_filter(&[b"abc", b"def"], 2, 3);
166260

167-
assert!(filter.might_contain(b"abc"));
168-
assert!(!filter.might_contain(b"xyz"));
261+
let abc = BloomFilter::hash_bytes(b"abc");
262+
let xyz = BloomFilter::hash_bytes(b"xyz");
263+
assert!(filter.test_hash(abc));
264+
assert!(!filter.test_hash(xyz));
169265
}
170266

171267
#[test]
@@ -179,7 +275,21 @@ mod tests {
179275
};
180276

181277
let decoded = BloomFilter::try_from_proto(&proto).unwrap();
182-
assert!(decoded.might_contain(b"foo"));
183-
assert!(!decoded.might_contain(b"bar"));
278+
let foo = BloomFilter::hash_bytes(b"foo");
279+
let bar = BloomFilter::hash_bytes(b"bar");
280+
assert!(decoded.test_hash(foo));
281+
assert!(!decoded.test_hash(bar));
282+
}
283+
284+
#[test]
285+
fn test_might_contain_hash64() {
286+
let value = 42i64;
287+
let hash64 = BloomFilter::hash_long(value);
288+
289+
let num_hash_functions = 3;
290+
let mut filter = BloomFilter::from_parts(num_hash_functions, vec![0u64; 2]);
291+
filter.add_hash(hash64);
292+
assert!(filter.test_hash(hash64));
293+
assert!(!filter.test_hash(BloomFilter::hash_long(value + 1)));
184294
}
185295
}

src/row_group_filter.rs

Lines changed: 19 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@
2020
//! This module implements predicate evaluation against row group statistics
2121
//! to determine which row groups should be read or skipped.
2222
23-
use std::borrow::Cow;
24-
23+
use crate::bloom_filter::BloomFilter;
2524
use crate::error::{Result, UnexpectedSnafu};
2625
use crate::predicate::{ComparisonOp, Predicate, PredicateValue};
2726
use crate::row_index::RowGroupEntry;
@@ -346,24 +345,27 @@ fn row_group_might_match_bloom(
346345
None => return true,
347346
};
348347

349-
if let Some(bytes) = bloom_value_bytes(value) {
350-
bloom_filter.might_contain(&bytes)
348+
if let Some(hash) = bloom_value_hash64(value) {
349+
bloom_filter.test_hash(hash)
351350
} else {
352351
debug!("Skipping bloom filter: unsupported predicate value type");
353352
true
354353
}
355354
}
356355

357-
fn bloom_value_bytes(value: &PredicateValue) -> Option<Cow<'_, [u8]>> {
356+
fn bloom_value_hash64(value: &PredicateValue) -> Option<u64> {
358357
match value {
359-
PredicateValue::Utf8(Some(v)) => Some(Cow::Borrowed(v.as_bytes())),
360-
PredicateValue::Int8(Some(v)) => Some(Cow::Owned((*v as i64).to_le_bytes().to_vec())),
361-
PredicateValue::Int16(Some(v)) => Some(Cow::Owned((*v as i64).to_le_bytes().to_vec())),
362-
PredicateValue::Int32(Some(v)) => Some(Cow::Owned((*v as i64).to_le_bytes().to_vec())),
363-
PredicateValue::Int64(Some(v)) => Some(Cow::Owned((*v).to_le_bytes().to_vec())),
364-
PredicateValue::Float32(Some(v)) => Some(Cow::Owned(v.to_bits().to_le_bytes().to_vec())),
365-
PredicateValue::Float64(Some(v)) => Some(Cow::Owned(v.to_bits().to_le_bytes().to_vec())),
366-
PredicateValue::Boolean(Some(v)) => Some(Cow::Borrowed(if *v { &[1u8] } else { &[0u8] })),
358+
PredicateValue::Utf8(Some(v)) => Some(BloomFilter::hash_bytes(v.as_bytes())),
359+
PredicateValue::Int8(Some(v)) => Some(BloomFilter::hash_long(*v as i64)),
360+
PredicateValue::Int16(Some(v)) => Some(BloomFilter::hash_long(*v as i64)),
361+
PredicateValue::Int32(Some(v)) => Some(BloomFilter::hash_long(*v as i64)),
362+
PredicateValue::Int64(Some(v)) => Some(BloomFilter::hash_long(*v)),
363+
PredicateValue::Float32(Some(v)) => {
364+
let as_double = *v as f64;
365+
Some(BloomFilter::hash_long(as_double.to_bits() as i64))
366+
}
367+
PredicateValue::Float64(Some(v)) => Some(BloomFilter::hash_long(v.to_bits() as i64)),
368+
PredicateValue::Boolean(Some(v)) => Some(BloomFilter::hash_long(if *v { 1 } else { 0 })),
367369
_ => None,
368370
}
369371
}
@@ -773,22 +775,9 @@ mod tests {
773775
use crate::predicate::{Predicate, PredicateValue};
774776

775777
// Build a bloom filter that only contains the value 10
776-
let value = 10i64.to_le_bytes().to_vec();
777-
let mut bitset = vec![0u64; 2];
778-
let bit_count = bitset.len() * 64;
779-
let hash = {
780-
let mut cursor = std::io::Cursor::new(&value);
781-
murmur3::murmur3_x64_128(&mut cursor, 0).unwrap()
782-
};
783-
let h1 = hash as u64;
784-
let h2 = (hash >> 64) as u64;
785778
let num_hash_functions = 3;
786-
for i in 0..num_hash_functions {
787-
let combined = h1.wrapping_add((i as u64).wrapping_mul(h2));
788-
let bit_idx = (combined % (bit_count as u64)) as usize;
789-
bitset[bit_idx / 64] |= 1u64 << (bit_idx % 64);
790-
}
791-
let bloom_filter = BloomFilter::from_parts(num_hash_functions, bitset);
779+
let mut bloom_filter = BloomFilter::from_parts(num_hash_functions, vec![0u64; 2]);
780+
bloom_filter.add_hash(BloomFilter::hash_long(10));
792781

793782
// Attach bloom filter to a single row group
794783
let entry = RowGroupEntry::new(None, vec![]).with_bloom_filter(Some(bloom_filter));
@@ -810,22 +799,9 @@ mod tests {
810799
use crate::predicate::{Predicate, PredicateValue};
811800

812801
// Build a bloom filter that claims value 50 may exist
813-
let value = 50i64.to_le_bytes().to_vec();
814-
let mut bitset = vec![0u64; 2];
815-
let bit_count = bitset.len() * 64;
816-
let hash = {
817-
let mut cursor = std::io::Cursor::new(&value);
818-
murmur3::murmur3_x64_128(&mut cursor, 0).unwrap()
819-
};
820-
let h1 = hash as u64;
821-
let h2 = (hash >> 64) as u64;
822802
let num_hash_functions = 3;
823-
for i in 0..num_hash_functions {
824-
let combined = h1.wrapping_add((i as u64).wrapping_mul(h2));
825-
let bit_idx = (combined % (bit_count as u64)) as usize;
826-
bitset[bit_idx / 64] |= 1u64 << (bit_idx % 64);
827-
}
828-
let bloom_filter = BloomFilter::from_parts(num_hash_functions, bitset);
803+
let mut bloom_filter = BloomFilter::from_parts(num_hash_functions, vec![0u64; 2]);
804+
bloom_filter.add_hash(BloomFilter::hash_long(50));
829805

830806
// Row group stats that make the predicate impossible (min/max do not cover 50)
831807
let stats = {

0 commit comments

Comments
 (0)