Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Impl RleV1Encoder for integer #37

Merged
merged 15 commits into from
Jan 19, 2025
1 change: 0 additions & 1 deletion src/encoding/byte.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ impl ByteRleEncoder {
self.tail_run_length = 1;
} else if let Some(run_value) = self.run_value {
// Run mode

if value == run_value {
// Continue buffering for Run sequence, flushing if reaching max length
self.num_literals += 1;
Expand Down
267 changes: 251 additions & 16 deletions src/encoding/integer/rle_v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,29 @@

use std::{io::Read, marker::PhantomData};

use bytes::{BufMut, BytesMut};
use snafu::OptionExt;

use crate::{
encoding::{
rle::GenericRle,
util::{read_u8, try_read_u8},
PrimitiveValueEncoder,
},
error::{OutOfSpecSnafu, Result},
memory::EstimateMemory,
};

use super::{util::read_varint_zigzagged, EncodingSign, NInt};
use super::{
util::{read_varint_zigzagged, write_varint_zigzagged},
EncodingSign, NInt,
};

const MAX_RUN_LENGTH: usize = 130;
const MIN_RUN_LENGTH: usize = 3;
const MAX_RUN_LENGTH: usize = 127 + MIN_RUN_LENGTH;
const MAX_LITERAL_LENGTH: usize = 128;
const MAX_DELTA: i64 = 127;
const MIN_DELTA: i64 = -128;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
enum EncodingType {
Expand Down Expand Up @@ -147,6 +157,213 @@ impl<N: NInt, R: Read, S: EncodingSign> GenericRle<N> for RleV1Decoder<N, R, S>
}
}

/// Represents the state of the RLE V1 encoder.
///
/// The encoder can be in one of three states:
///
/// 1. `Empty`: The buffer is empty and there are no values to encode.
/// 2. `Run`: The encoder is in run mode, with a run value, delta, and length.
/// 3. `Literal`: The encoder is in literal mode, with values saved in a buffer.
#[derive(Debug, Clone, Eq, PartialEq)]
enum RleV1EncodingState<N: NInt> {
Empty,
Run {
value: N,
delta: i8,
length: usize,
},
Literal {
buffer: [N; MAX_LITERAL_LENGTH],
length: usize,
},
}

impl<N: NInt> Default for RleV1EncodingState<N> {
fn default() -> Self {
Self::Empty
}
}

/// `RleV1Encoder` is responsible for encoding a stream of integers using the Run Length Encoding (RLE) version 1 format.
///
/// # Type Parameters
/// - `N`: The integer type to be encoded, which must implement the `NInt` trait.
/// - `S`: The encoding sign type, which must implement the `EncodingSign` trait.
///
/// # Fields
/// - `writer`: A `BytesMut` buffer that holds the encoded bytes.
/// - `state`: The current state of the encoder, which can be `Empty`, `Run`, or `Literal`.
/// - `sign`: A `PhantomData` marker for the encoding sign type.
pub struct RleV1Encoder<N: NInt, S: EncodingSign> {
writer: BytesMut,
state: RleV1EncodingState<N>,
sign: PhantomData<S>,
}

impl<N: NInt, S: EncodingSign> RleV1Encoder<N, S> {
/// Processes a given value and updates the encoder state accordingly.
///
/// The function handles three possible states of the encoder:
///
/// 1. `RleV1EncoderState::Empty`:
/// - Transitions to the `Literal` state with the given value as the first element in the buffer.
///
/// 2. `RleV1EncoderState::Run`:
/// - If the value continues the current run (i.e., it matches the expected value based on the run's delta and length),
/// the run length is incremented. If the run length reaches `MAX_RUN_LENGTH`, the run is written out and the state
/// transitions to `Empty`.
/// - If the value does not continue the current run, the existing run is written out and the state transitions to
/// `Literal` with the new value as the first element in the buffer.
///
/// 3. `RleV1EncoderState::Literal`:
/// - The value is added to the buffer. If the buffer length reaches `MAX_LITERAL_LENGTH`, the buffer is written out
/// and the state transitions to `Empty`.
/// - If the buffer length is at least `MIN_RUN_LENGTH` and the values in the buffer form a valid run (i.e., the deltas
/// between consecutive values are consistent and within the allowed range), the state transitions to `Run`.
/// - Otherwise, the state remains `Literal`.
fn process_value(&mut self, value: N) {
match &mut self.state {
RleV1EncodingState::Empty => {
// change to literal model
self.state = RleV1EncodingState::Literal {
buffer: {
let mut buf = [N::zero(); MAX_LITERAL_LENGTH];
buf[0] = value;
buf
},
length: 1,
}
}
RleV1EncodingState::Run {
value: run_value,
delta,
length,
} => {
if run_value.as_i64() + *delta as i64 * *length as i64 == value.as_i64() {
// keep run model
*length += 1;
if *length == MAX_RUN_LENGTH {
// reach run limit
write_run::<_, S>(&mut self.writer, *run_value, *delta, *length);
self.state = RleV1EncodingState::Empty;
}
} else {
// write run values and change to literal model
write_run::<_, S>(&mut self.writer, *run_value, *delta, *length);
self.state = RleV1EncodingState::Literal {
buffer: {
let mut buf = [N::zero(); MAX_LITERAL_LENGTH];
buf[0] = value;
buf
},
length: 1,
};
}
}
RleV1EncodingState::Literal { buffer, length } => {
buffer[*length] = value;
*length += 1;
let delta = (value - buffer[*length - 2]).as_i64();
// check if can change to run model
if *length >= MIN_RUN_LENGTH
&& (MIN_DELTA..=MAX_DELTA).contains(&delta)
&& delta == (buffer[*length - 2] - buffer[*length - 3]).as_i64()
{
// change to run model
if *length > MIN_RUN_LENGTH {
// write the left literals
write_literals::<_, S>(&mut self.writer, buffer, *length - MIN_RUN_LENGTH);
}
self.state = RleV1EncodingState::Run {
value: buffer[*length - MIN_RUN_LENGTH],
delta: delta as i8,
length: MIN_RUN_LENGTH,
}
} else if *length == MAX_LITERAL_LENGTH {
// reach buffer limit, write literals and change to empty state
write_literals::<_, S>(&mut self.writer, buffer, MAX_LITERAL_LENGTH);
self.state = RleV1EncodingState::Empty;
} else {
// keep literal mode
}
}
}
}

/// Flushes the current state of the encoder, writing out any buffered values.
///
/// This function handles the three possible states of the encoder:
///
/// 1. `RleV1EncoderState::Empty`:
/// - No action is needed as there are no buffered values to write.
///
/// 2. `RleV1EncoderState::Run`:
/// - Writes out the current run of values.
///
/// 3. `RleV1EncoderState::Literal`:
/// - Writes out the buffered literal values.
///
/// After calling this function, the encoder state will be reset to `Empty`.
fn flush(&mut self) {
let state = std::mem::take(&mut self.state);
match state {
RleV1EncodingState::Empty => {}
RleV1EncodingState::Run {
value,
delta,
length,
} => {
write_run::<_, S>(&mut self.writer, value, delta, length);
}
RleV1EncodingState::Literal { buffer, length } => {
write_literals::<_, S>(&mut self.writer, &buffer, length);
}
}
}
}

fn write_run<N: NInt, S: EncodingSign>(writer: &mut BytesMut, value: N, delta: i8, length: usize) {
// write header
writer.put_u8(length as u8 - 3);
writer.put_u8(delta as u8);
// write run value
write_varint_zigzagged::<_, S>(writer, value);
}

fn write_literals<N: NInt, S: EncodingSign>(writer: &mut BytesMut, buffer: &[N], length: usize) {
// write header
writer.put_u8(-(length as i8) as u8);
// write literals
for literal in buffer.iter().take(length) {
write_varint_zigzagged::<_, S>(writer, *literal);
}
}

impl<N: NInt, S: EncodingSign> EstimateMemory for RleV1Encoder<N, S> {
fn estimate_memory_size(&self) -> usize {
self.writer.len()
}
}

impl<N: NInt, S: EncodingSign> PrimitiveValueEncoder<N> for RleV1Encoder<N, S> {
fn new() -> Self {
Self {
writer: BytesMut::new(),
state: Default::default(),
sign: Default::default(),
}
}

fn write_one(&mut self, value: N) {
self.process_value(value);
}

fn take_inner(&mut self) -> bytes::Bytes {
self.flush();
std::mem::take(&mut self.writer).into()
}
}

#[cfg(test)]
mod tests {
use std::io::Cursor;
Expand All @@ -155,32 +372,50 @@ mod tests {

use super::*;

fn test_helper(data: &[u8], expected: &[i64]) {
let mut reader = RleV1Decoder::<i64, _, UnsignedEncoding>::new(Cursor::new(data));
let mut actual = vec![0; expected.len()];
reader.decode(&mut actual).unwrap();
assert_eq!(actual, expected);
fn test_helper(original: &[i64], encoded: &[u8]) {
let mut encoder = RleV1Encoder::<i64, UnsignedEncoding>::new();
encoder.write_slice(original);
encoder.flush();
let actual_encoded = encoder.take_inner();
assert_eq!(actual_encoded, encoded);

let mut decoder = RleV1Decoder::<i64, _, UnsignedEncoding>::new(Cursor::new(encoded));
let mut actual_decoded = vec![0; original.len()];
decoder.decode(&mut actual_decoded).unwrap();
assert_eq!(actual_decoded, original);
}

#[test]
fn test_run() -> Result<()> {
let data = [0x61, 0x00, 0x07];
let expected = [7; 100];
test_helper(&data, &expected);
let original = [7; 100];
let encoded = [0x61, 0x00, 0x07];
test_helper(&original, &encoded);

let original = (1..=100).rev().collect::<Vec<_>>();
let encoded = [0x61, 0xff, 0x64];
test_helper(&original, &encoded);

let data = [0x61, 0xff, 0x64];
let expected = (1..=100).rev().collect::<Vec<_>>();
test_helper(&data, &expected);
let original = (1..=150).rev().collect::<Vec<_>>();
let encoded = [0x7f, 0xff, 0x96, 0x01, 0x11, 0xff, 0x14];
test_helper(&original, &encoded);

let original = [2, 4, 6, 8, 1, 3, 5, 7, 255];
let encoded = [0x01, 0x02, 0x02, 0x01, 0x02, 0x01, 0xff, 0xff, 0x01];
test_helper(&original, &encoded);
Ok(())
}

#[test]
fn test_literal() -> Result<()> {
let data = [0xfb, 0x02, 0x03, 0x06, 0x07, 0xb];
let expected = vec![2, 3, 6, 7, 11];
test_helper(&data, &expected);
let original = vec![2, 3, 6, 7, 11];
let encoded = [0xfb, 0x02, 0x03, 0x06, 0x07, 0xb];
test_helper(&original, &encoded);

let original = vec![2, 3, 6, 7, 11, 1, 2, 3, 0, 256];
let encoded = [
0xfb, 0x02, 0x03, 0x06, 0x07, 0x0b, 0x00, 0x01, 0x01, 0xfe, 0x00, 0x80, 0x02,
];
test_helper(&original, &encoded);
Ok(())
}
}
Loading