Skip to content

Commit 947f4a4

Browse files
Impl RleV1Encoder for integer (#37)
* impl RleV1Encoder * add test * use zero either default * refactor RleV1Encoder in stateful code * clean clippy * fix and add more test * add test * avoid new vec * add comments * fix * fix * fix * use Rc<RefCell> to alloc buffer on heap and reuse it * simply code * fix
1 parent 03429d6 commit 947f4a4

File tree

2 files changed

+228
-18
lines changed

2 files changed

+228
-18
lines changed

src/encoding/byte.rs

-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ impl ByteRleEncoder {
6868
self.tail_run_length = 1;
6969
} else if let Some(run_value) = self.run_value {
7070
// Run mode
71-
7271
if value == run_value {
7372
// Continue buffering for Run sequence, flushing if reaching max length
7473
self.num_literals += 1;

src/encoding/integer/rle_v1.rs

+228-17
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,30 @@
1717

1818
//! Handling decoding of Integer Run Length Encoded V1 data in ORC files
1919
20-
use std::{io::Read, marker::PhantomData};
20+
use std::{io::Read, marker::PhantomData, ops::RangeInclusive};
2121

22+
use bytes::{BufMut, BytesMut};
2223
use snafu::OptionExt;
2324

2425
use crate::{
2526
encoding::{
2627
rle::GenericRle,
2728
util::{read_u8, try_read_u8},
29+
PrimitiveValueEncoder,
2830
},
2931
error::{OutOfSpecSnafu, Result},
32+
memory::EstimateMemory,
3033
};
3134

32-
use super::{util::read_varint_zigzagged, EncodingSign, NInt};
35+
use super::{
36+
util::{read_varint_zigzagged, write_varint_zigzagged},
37+
EncodingSign, NInt,
38+
};
3339

34-
const MAX_RUN_LENGTH: usize = 130;
40+
const MIN_RUN_LENGTH: usize = 3;
41+
const MAX_RUN_LENGTH: usize = 127 + MIN_RUN_LENGTH;
42+
const MAX_LITERAL_LENGTH: usize = 128;
43+
const DELAT_RANGE: RangeInclusive<i64> = -128..=127;
3544

3645
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
3746
enum EncodingType {
@@ -147,6 +156,190 @@ impl<N: NInt, R: Read, S: EncodingSign> GenericRle<N> for RleV1Decoder<N, R, S>
147156
}
148157
}
149158

159+
/// Represents the state of the RLE V1 encoder.
160+
///
161+
/// The encoder can be in one of three states:
162+
///
163+
/// 1. `Empty`: The buffer is empty and there are no values to encode.
164+
/// 2. `Literal`: The encoder is in literal mode, with values saved in buffer.
165+
/// 3. `Run`: The encoder is in run mode, with a run value, delta, and length.
166+
#[derive(Debug, Clone, Eq, PartialEq)]
167+
enum RleV1EncodingState<N: NInt> {
168+
Empty,
169+
Literal,
170+
Run { value: N, delta: i8, length: usize },
171+
}
172+
173+
impl<N: NInt> Default for RleV1EncodingState<N> {
174+
fn default() -> Self {
175+
Self::Empty
176+
}
177+
}
178+
179+
/// `RleV1Encoder` is responsible for encoding a stream of integers using the Run Length Encoding (RLE) version 1 format.
180+
pub struct RleV1Encoder<N: NInt, S: EncodingSign> {
181+
writer: BytesMut,
182+
state: RleV1EncodingState<N>,
183+
buffer: Vec<N>,
184+
sign: PhantomData<S>,
185+
}
186+
187+
impl<N: NInt, S: EncodingSign> RleV1Encoder<N, S> {
188+
/// Processes a given value and updates the encoder state accordingly.
189+
///
190+
/// The function handles three possible states of the encoder:
191+
///
192+
/// 1. `RleV1EncoderState::Empty`:
193+
/// - Transitions to the `Literal` state with the given value as the first element in the buffer.
194+
///
195+
/// 2. `RleV1EncoderState::Run`:
196+
/// - If the value continues the current run (i.e., it matches the expected value based on the run's delta and length),
197+
/// the run length is incremented. If the run length reaches `MAX_RUN_LENGTH`, the run is written out and the state
198+
/// transitions to `Empty`.
199+
/// - If the value does not continue the current run, the existing run is written out and the state transitions to
200+
/// `Literal` with the new value as the first element in the buffer.
201+
///
202+
/// 3. `RleV1EncoderState::Literal`:
203+
/// - The value is added to the buffer. If the buffer length reaches `MAX_LITERAL_LENGTH`, the buffer is written out
204+
/// and the state transitions to `Empty`.
205+
/// - If the buffer length is at least `MIN_RUN_LENGTH` and the values in the buffer form a valid run (i.e., the deltas
206+
/// between consecutive values are consistent and within the allowed range), the state transitions to `Run`.
207+
/// - Otherwise, the state remains `Literal`.
208+
///
209+
fn process_value(&mut self, value: N) {
210+
match &mut self.state {
211+
RleV1EncodingState::Empty => {
212+
// change to literal model
213+
self.buffer.clear();
214+
self.buffer.push(value);
215+
self.state = RleV1EncodingState::Literal;
216+
}
217+
RleV1EncodingState::Literal => {
218+
let buf = &mut self.buffer;
219+
buf.push(value);
220+
let length = buf.len();
221+
let delta = (value - buf[length - 2]).as_i64();
222+
// check if can change to run model
223+
if length >= MIN_RUN_LENGTH
224+
&& DELAT_RANGE.contains(&delta)
225+
&& delta == (buf[length - 2] - buf[length - 3]).as_i64()
226+
{
227+
// change to run model
228+
if length > MIN_RUN_LENGTH {
229+
// write the left literals
230+
write_literals::<_, S>(&mut self.writer, &buf[..(length - MIN_RUN_LENGTH)]);
231+
}
232+
self.state = RleV1EncodingState::Run {
233+
value: buf[length - MIN_RUN_LENGTH],
234+
delta: delta as i8,
235+
length: MIN_RUN_LENGTH,
236+
}
237+
} else if length == MAX_LITERAL_LENGTH {
238+
// reach buffer limit, write literals and change to empty state
239+
write_literals::<_, S>(&mut self.writer, buf);
240+
self.state = RleV1EncodingState::Empty;
241+
}
242+
// else keep literal mode
243+
}
244+
RleV1EncodingState::Run {
245+
value: run_value,
246+
delta,
247+
length,
248+
} => {
249+
if run_value.as_i64() + (*delta as i64) * (*length as i64) == value.as_i64() {
250+
// keep run model
251+
*length += 1;
252+
if *length == MAX_RUN_LENGTH {
253+
// reach run limit
254+
write_run::<_, S>(&mut self.writer, *run_value, *delta, *length);
255+
self.state = RleV1EncodingState::Empty;
256+
}
257+
} else {
258+
// write run values and change to literal model
259+
write_run::<_, S>(&mut self.writer, *run_value, *delta, *length);
260+
self.buffer.clear();
261+
self.buffer.push(value);
262+
self.state = RleV1EncodingState::Literal;
263+
}
264+
}
265+
}
266+
}
267+
268+
/// Flushes the current state of the encoder, writing out any buffered values.
269+
///
270+
/// This function handles the three possible states of the encoder:
271+
///
272+
/// 1. `RleV1EncoderState::Empty`:
273+
/// - No action is needed as there are no buffered values to write.
274+
///
275+
/// 3. `RleV1EncoderState::Literal`:
276+
/// - Writes out the buffered literal values.
277+
///
278+
/// 2. `RleV1EncoderState::Run`:
279+
/// - Writes out the current run of values.
280+
///
281+
/// After calling this function, the encoder state will be reset to `Empty`.
282+
fn flush(&mut self) {
283+
let state = std::mem::take(&mut self.state);
284+
match state {
285+
RleV1EncodingState::Empty => {}
286+
RleV1EncodingState::Literal => {
287+
write_literals::<_, S>(&mut self.writer, &self.buffer);
288+
}
289+
RleV1EncodingState::Run {
290+
value,
291+
delta,
292+
length,
293+
} => {
294+
write_run::<_, S>(&mut self.writer, value, delta, length);
295+
}
296+
}
297+
}
298+
}
299+
300+
fn write_run<N: NInt, S: EncodingSign>(writer: &mut BytesMut, value: N, delta: i8, length: usize) {
301+
// write header
302+
writer.put_u8(length as u8 - 3);
303+
writer.put_u8(delta as u8);
304+
// write run value
305+
write_varint_zigzagged::<_, S>(writer, value);
306+
}
307+
308+
fn write_literals<N: NInt, S: EncodingSign>(writer: &mut BytesMut, buffer: &[N]) {
309+
// write header
310+
writer.put_u8(-(buffer.len() as i8) as u8);
311+
// write literals
312+
for literal in buffer {
313+
write_varint_zigzagged::<_, S>(writer, *literal);
314+
}
315+
}
316+
317+
impl<N: NInt, S: EncodingSign> EstimateMemory for RleV1Encoder<N, S> {
318+
fn estimate_memory_size(&self) -> usize {
319+
self.writer.len()
320+
}
321+
}
322+
323+
impl<N: NInt, S: EncodingSign> PrimitiveValueEncoder<N> for RleV1Encoder<N, S> {
324+
fn new() -> Self {
325+
Self {
326+
writer: BytesMut::new(),
327+
state: Default::default(),
328+
buffer: Vec::with_capacity(MAX_LITERAL_LENGTH),
329+
sign: Default::default(),
330+
}
331+
}
332+
333+
fn write_one(&mut self, value: N) {
334+
self.process_value(value);
335+
}
336+
337+
fn take_inner(&mut self) -> bytes::Bytes {
338+
self.flush();
339+
std::mem::take(&mut self.writer).into()
340+
}
341+
}
342+
150343
#[cfg(test)]
151344
mod tests {
152345
use std::io::Cursor;
@@ -155,32 +348,50 @@ mod tests {
155348

156349
use super::*;
157350

158-
fn test_helper(data: &[u8], expected: &[i64]) {
159-
let mut reader = RleV1Decoder::<i64, _, UnsignedEncoding>::new(Cursor::new(data));
160-
let mut actual = vec![0; expected.len()];
161-
reader.decode(&mut actual).unwrap();
162-
assert_eq!(actual, expected);
351+
fn test_helper(original: &[i64], encoded: &[u8]) {
352+
let mut encoder = RleV1Encoder::<i64, UnsignedEncoding>::new();
353+
encoder.write_slice(original);
354+
encoder.flush();
355+
let actual_encoded = encoder.take_inner();
356+
assert_eq!(actual_encoded, encoded);
357+
358+
let mut decoder = RleV1Decoder::<i64, _, UnsignedEncoding>::new(Cursor::new(encoded));
359+
let mut actual_decoded = vec![0; original.len()];
360+
decoder.decode(&mut actual_decoded).unwrap();
361+
assert_eq!(actual_decoded, original);
163362
}
164363

165364
#[test]
166365
fn test_run() -> Result<()> {
167-
let data = [0x61, 0x00, 0x07];
168-
let expected = [7; 100];
169-
test_helper(&data, &expected);
366+
let original = [7; 100];
367+
let encoded = [0x61, 0x00, 0x07];
368+
test_helper(&original, &encoded);
369+
370+
let original = (1..=100).rev().collect::<Vec<_>>();
371+
let encoded = [0x61, 0xff, 0x64];
372+
test_helper(&original, &encoded);
170373

171-
let data = [0x61, 0xff, 0x64];
172-
let expected = (1..=100).rev().collect::<Vec<_>>();
173-
test_helper(&data, &expected);
374+
let original = (1..=150).rev().collect::<Vec<_>>();
375+
let encoded = [0x7f, 0xff, 0x96, 0x01, 0x11, 0xff, 0x14];
376+
test_helper(&original, &encoded);
174377

378+
let original = [2, 4, 6, 8, 1, 3, 5, 7, 255];
379+
let encoded = [0x01, 0x02, 0x02, 0x01, 0x02, 0x01, 0xff, 0xff, 0x01];
380+
test_helper(&original, &encoded);
175381
Ok(())
176382
}
177383

178384
#[test]
179385
fn test_literal() -> Result<()> {
180-
let data = [0xfb, 0x02, 0x03, 0x06, 0x07, 0xb];
181-
let expected = vec![2, 3, 6, 7, 11];
182-
test_helper(&data, &expected);
386+
let original = vec![2, 3, 6, 7, 11];
387+
let encoded = [0xfb, 0x02, 0x03, 0x06, 0x07, 0xb];
388+
test_helper(&original, &encoded);
183389

390+
let original = vec![2, 3, 6, 7, 11, 1, 2, 3, 0, 256];
391+
let encoded = [
392+
0xfb, 0x02, 0x03, 0x06, 0x07, 0x0b, 0x00, 0x01, 0x01, 0xfe, 0x00, 0x80, 0x02,
393+
];
394+
test_helper(&original, &encoded);
184395
Ok(())
185396
}
186397
}

0 commit comments

Comments
 (0)