Skip to content

Commit d1d0fab

Browse files
[FEAT] Support parquet RLE decoding for booleans (Eventual-Inc#3477)
Eventual-Inc#3329 shows that we do not currently support reading boolean values from parquet files when they are RLE-encoded. This PR adds support for this.
1 parent de4fe50 commit d1d0fab

File tree

3 files changed

+159
-1
lines changed

3 files changed

+159
-1
lines changed

src/arrow2/src/io/parquet/read/deserialize/boolean/basic.rs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::collections::VecDeque;
22

33
use parquet2::{
44
deserialize::SliceFilteredIter,
5-
encoding::Encoding,
5+
encoding::{hybrid_rle, Encoding},
66
page::{split_buffer, DataPage, DictPage},
77
schema::Repetition,
88
};
@@ -51,6 +51,25 @@ impl<'a> Required<'a> {
5151
}
5252
}
5353

54+
#[derive(Debug)]
55+
struct ValuesRle<'a>(hybrid_rle::HybridRleDecoder<'a>);
56+
57+
impl<'a> ValuesRle<'a> {
58+
pub fn try_new(page: &'a DataPage) -> Result<Self> {
59+
let (_, _, indices_buffer) = split_buffer(page)?;
60+
// Skip the u32 length prefix.
61+
let indices_buffer = &indices_buffer[std::mem::size_of::<u32>()..];
62+
let decoder =
63+
hybrid_rle::HybridRleDecoder::try_new(
64+
indices_buffer,
65+
1_u32, // The bit width for a boolean is 1.
66+
page.num_values()
67+
)
68+
.map_err(crate::error::Error::from)?;
69+
Ok(Self(decoder))
70+
}
71+
}
72+
5473
#[derive(Debug)]
5574
struct FilteredRequired<'a> {
5675
values: SliceFilteredIter<BitmapIter<'a>>,
@@ -79,6 +98,7 @@ impl<'a> FilteredRequired<'a> {
7998
enum State<'a> {
8099
Optional(OptionalPageValidity<'a>, Values<'a>),
81100
Required(Required<'a>),
101+
OptionalRle(OptionalPageValidity<'a>, ValuesRle<'a>),
82102
FilteredRequired(FilteredRequired<'a>),
83103
FilteredOptional(FilteredOptionalPageValidity<'a>, Values<'a>),
84104
}
@@ -88,6 +108,7 @@ impl<'a> State<'a> {
88108
match self {
89109
State::Optional(validity, _) => validity.len(),
90110
State::Required(page) => page.length - page.offset,
111+
Self::OptionalRle(validity, _) => validity.len(),
91112
State::FilteredRequired(page) => page.len(),
92113
State::FilteredOptional(optional, _) => optional.len(),
93114
}
@@ -125,6 +146,12 @@ impl<'a> Decoder<'a> for BooleanDecoder {
125146
Values::try_new(page)?,
126147
)),
127148
(Encoding::Plain, false, false) => Ok(State::Required(Required::new(page))),
149+
(Encoding::Rle, true, false) => {
150+
Ok(State::OptionalRle(
151+
OptionalPageValidity::try_new(page)?,
152+
ValuesRle::try_new(page)?,
153+
))
154+
},
128155
(Encoding::Plain, true, true) => Ok(State::FilteredOptional(
129156
FilteredOptionalPageValidity::try_new(page)?,
130157
Values::try_new(page)?,
@@ -163,6 +190,15 @@ impl<'a> Decoder<'a> for BooleanDecoder {
163190
values.extend_from_slice(page.values, page.offset, remaining);
164191
page.offset += remaining;
165192
}
193+
State::OptionalRle(page_validity, page_values) => {
194+
utils::extend_from_decoder(
195+
validity,
196+
page_validity,
197+
Some(remaining),
198+
values,
199+
&mut page_values.0.by_ref().map(|x| x.unwrap()).map(|v| v != 0),
200+
)
201+
},
166202
State::FilteredRequired(page) => {
167203
values.reserve(remaining);
168204
for item in page.values.by_ref().take(remaining) {

src/parquet2/src/encoding/hybrid_rle/decoder.rs

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,4 +141,108 @@ mod tests {
141141
panic!()
142142
};
143143
}
144+
145+
#[test]
146+
fn test_bool_bitpacked() {
147+
let bit_width = 1usize;
148+
let length = 4;
149+
let values = [
150+
2, 0, 0, 0, // Length indicator as u32.
151+
0b00000011, // Bitpacked indicator with 1 value (1 << 1 | 1).
152+
0b00001101, // Values (true, false, true, true).
153+
];
154+
let expected = &[1, 0, 1, 1];
155+
156+
let mut decoder = Decoder::new(&values[4..], bit_width);
157+
158+
if let Ok(HybridEncoded::Bitpacked(values)) = decoder.next().unwrap() {
159+
assert_eq!(values, &[0b00001101]);
160+
161+
let result = bitpacked::Decoder::<u8>::try_new(values, bit_width, length)
162+
.unwrap()
163+
.collect::<Vec<_>>();
164+
assert_eq!(result, expected);
165+
} else {
166+
panic!("Expected bitpacked encoding");
167+
}
168+
}
169+
170+
#[test]
171+
fn test_bool_rle() {
172+
let bit_width = 1usize;
173+
let length = 4;
174+
let values = [
175+
2, 0, 0, 0, // Length indicator as u32.
176+
0b00001000, // RLE indicator (4 << 1 | 0).
177+
true as u8 // Value to repeat.
178+
];
179+
180+
let mut decoder = Decoder::new(&values[4..], bit_width);
181+
182+
if let Ok(HybridEncoded::Rle(value, run_length)) = decoder.next().unwrap() {
183+
assert_eq!(value, &[1u8]); // true encoded as 1.
184+
assert_eq!(run_length, length); // Repeated 4 times.
185+
} else {
186+
panic!("Expected RLE encoding");
187+
}
188+
}
189+
190+
#[test]
191+
fn test_bool_mixed_rle() {
192+
let bit_width = 1usize;
193+
let values = [
194+
4, 0, 0, 0, // Length indicator as u32.
195+
0b00000011, // Bitpacked indicator with 1 value (1 << 1 | 1).
196+
0b00001101, // Values (true, false, true, true).
197+
0b00001000, // RLE indicator (4 << 1 | 0)
198+
false as u8 // RLE value
199+
];
200+
201+
let mut decoder = Decoder::new(&values[4..], bit_width);
202+
203+
// Decode bitpacked values.
204+
if let Ok(HybridEncoded::Bitpacked(values)) = decoder.next().unwrap() {
205+
assert_eq!(values, &[0b00001101]);
206+
} else {
207+
panic!("Expected bitpacked encoding");
208+
}
209+
210+
// Decode RLE values.
211+
if let Ok(HybridEncoded::Rle(value, run_length)) = decoder.next().unwrap() {
212+
assert_eq!(value, &[0u8]); // false encoded as 0.
213+
assert_eq!(run_length, 4);
214+
} else {
215+
panic!("Expected RLE encoding");
216+
}
217+
}
218+
219+
#[test]
220+
fn test_bool_nothing_encoded() {
221+
let bit_width = 1usize;
222+
let values = [0, 0, 0, 0]; // Length indicator only.
223+
224+
let mut decoder = Decoder::new(&values[4..], bit_width);
225+
assert!(decoder.next().is_none());
226+
}
227+
228+
#[test]
229+
fn test_bool_invalid_encoding() {
230+
let bit_width = 1usize;
231+
let values = [
232+
2, 0, 0, 0, // Length indicator as u32.
233+
0b00000101, // Bitpacked indicator with 1 value (2 << 1 | 1).
234+
true as u8 // Incomplete encoding (should have another u8).
235+
];
236+
237+
let mut decoder = Decoder::new(&values[4..], bit_width);
238+
239+
if let Ok(HybridEncoded::Bitpacked(values)) = decoder.next().unwrap() {
240+
assert_eq!(values, &[1u8]);
241+
} else {
242+
panic!("Expected bitpacked encoding");
243+
}
244+
245+
// Next call should return None since we've exhausted the buffer.
246+
assert!(decoder.next().is_none());
247+
}
144248
}

tests/io/test_parquet_roundtrip.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22

33
import datetime
44
import decimal
5+
import random
56

67
import numpy as np
78
import pyarrow as pa
9+
import pyarrow.parquet as papq
810
import pytest
911

1012
import daft
@@ -150,6 +152,22 @@ def test_roundtrip_sparse_tensor_types(tmp_path, fixed_shape):
150152
assert before.to_arrow() == after.to_arrow()
151153

152154

155+
@pytest.mark.parametrize("has_none", [True, False])
156+
def test_roundtrip_boolean_rle(tmp_path, has_none):
157+
file_path = f"{tmp_path}/test.parquet"
158+
if has_none:
159+
# Create an array of random True/False values that are None 10% of the time.
160+
random_bools = random.choices([True, False, None], weights=[45, 45, 10], k=1000_000)
161+
else:
162+
# Create an array of random True/False values.
163+
random_bools = random.choices([True, False], k=1000_000)
164+
pa_original = pa.table({"bools": pa.array(random_bools, type=pa.bool_())})
165+
# Use data page version 2.0 which uses RLE encoding for booleans.
166+
papq.write_table(pa_original, file_path, data_page_version="2.0")
167+
df_roundtrip = daft.read_parquet(file_path)
168+
assert pa_original == df_roundtrip.to_arrow()
169+
170+
153171
# TODO: reading/writing:
154172
# 1. Embedding type
155173
# 2. Image type

0 commit comments

Comments
 (0)