Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 133 additions & 89 deletions src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ pub struct Decoder {

header_skipped: bool,

// cached SIMD results — positions from a single scan, consumed across multiple decode() calls
cached_buf: Vec<u8>,
cached_comma_pos: Vec<u32>,
cached_newline_pos: Vec<u32>,
cached_ci: usize,
cached_ni: usize,
cached_comma_bitsets: Vec<u64>,
cached_newline_bitsets: Vec<u64>,
comma_current: u64,
comma_idx: usize,
newline_current: u64,
newline_idx: usize,
cached_start: usize,
cached_bytes_consumed: usize,
}
Expand All @@ -44,10 +45,12 @@ impl Decoder {
num_rows: 0,
header_skipped: !has_header,
cached_buf: Vec::new(),
cached_comma_pos: Vec::new(),
cached_newline_pos: Vec::new(),
cached_ci: 0,
cached_ni: 0,
cached_comma_bitsets: Vec::new(),
cached_newline_bitsets: Vec::new(),
comma_current: 0,
comma_idx: 0,
newline_current: 0,
newline_idx: 0,
cached_start: 0,
cached_bytes_consumed: 0,
}
Expand All @@ -58,7 +61,7 @@ impl Decoder {
return Ok(0);
}

// if we have cached positions from a previous scan, continue extracting from them
// if we have cached bitsets from a previous scan, continue extracting from them
if !self.cached_buf.is_empty() {
return self.extract_from_cache();
}
Expand All @@ -69,21 +72,19 @@ impl Decoder {
let original_len = self.cached_buf.len();
self.cached_buf.resize(original_len.next_multiple_of(64), 0);

let padded = &self.cached_buf;

// phase 1+2: classify and build bitsets in one pass
let low_nibbles = u8x16::from_slice_unchecked(&LOW_NIBBLES);
let high_nibbles = u8x16::from_slice_unchecked(&HIGH_NIBBLES);
let comma_bc = u8x16::broadcast(COMMA);
let newline_bc = u8x16::broadcast(NEWLINE);
let quote_bc = u8x16::broadcast(QUOTES);

let cap = padded.len() / 64;
let cap = self.cached_buf.len() / 64;
let mut comma_bitsets = Vec::with_capacity(cap);
let mut newline_bitsets = Vec::with_capacity(cap);
let mut quote_bitsets = Vec::with_capacity(cap);

for chunk in padded.chunks_exact(64) {
for chunk in self.cached_buf.chunks_exact(64) {
let v0 = classify_one(&chunk[0..16], high_nibbles, low_nibbles);
let v1 = classify_one(&chunk[16..32], high_nibbles, low_nibbles);
let v2 = classify_one(&chunk[32..48], high_nibbles, low_nibbles);
Expand All @@ -106,53 +107,40 @@ impl Decoder {
newline_bitsets[i] &= outside;
}

// phase 4: extract positions
let comma_count = comma_bitsets.iter().map(|b| b.count_ones() as usize).sum();
let newline_count: usize = newline_bitsets
.iter()
.map(|b| b.count_ones() as usize)
.sum();

let mut comma_pos = Vec::with_capacity(comma_count);
let mut newline_pos = Vec::with_capacity(newline_count);

for (i, (&c, &n)) in comma_bitsets.iter().zip(&newline_bitsets).enumerate() {
let base = i * 64;
extract_positions(c, base, &mut comma_pos);
extract_positions(n, base, &mut newline_pos);
}

// cache bitsets and initialize iterator state
self.cached_buf.truncate(original_len);
self.cached_comma_pos = comma_pos;
self.cached_newline_pos = newline_pos;
self.cached_ci = 0;
self.cached_ni = 0;
self.cached_comma_bitsets = comma_bitsets;
self.cached_newline_bitsets = newline_bitsets;
self.comma_current = self.cached_comma_bitsets.first().copied().unwrap_or(0);
self.comma_idx = 0;
self.newline_current = self.cached_newline_bitsets.first().copied().unwrap_or(0);
self.newline_idx = 0;
self.cached_start = 0;
self.cached_bytes_consumed = 0;

// skip header from cache
// skip header
if !self.header_skipped {
if let Some(&nl) = self.cached_newline_pos.first() {
let pos = nl as usize;
if self.cached_buf[pos] == b'\r' && self.cached_buf.get(pos + 1) == Some(&b'\n') {
self.cached_start = pos + 2;
if self
.cached_newline_pos
.get(1)
.copied()
.is_some_and(|p| p == nl + 1)
{
self.cached_ni += 1;
if let Some(nl_pos) = self.next_newline() {
if self.cached_buf[nl_pos] == b'\r'
&& self.cached_buf.get(nl_pos + 1) == Some(&b'\n')
{
self.cached_start = nl_pos + 2;
// skip the \n that follows \r
if self.peek_newline() == Some(nl_pos + 1) {
self.next_newline();
}
} else {
self.cached_start = pos + 1;
self.cached_start = nl_pos + 1;
}
while self.cached_ci < self.cached_comma_pos.len()
&& (self.cached_comma_pos[self.cached_ci] as usize) < self.cached_start
{
self.cached_ci += 1;

loop {
match self.peek_comma() {
Some(pos) if pos < self.cached_start => {
self.next_comma();
}
_ => break,
}
}
self.cached_ni += 1;
self.cached_bytes_consumed = self.cached_start;
self.header_skipped = true;
} else {
Expand All @@ -169,60 +157,92 @@ impl Decoder {
let mut rows_read = 0;
let cols = self.num_columns;

while self.cached_ni < self.cached_newline_pos.len() && rows_read < rows_to_read {
let pos = self.cached_newline_pos[self.cached_ni] as usize;
while rows_read < rows_to_read {
let nl_pos = match self.next_newline() {
Some(pos) => pos,
None => break,
};

for _ in 0..cols.saturating_sub(1) {
if self.cached_ci < self.cached_comma_pos.len() {
let end = self.cached_comma_pos[self.cached_ci] as usize;
if let Some(end) = self.next_comma() {
let start = self.cached_start;
self.push_field_from_cache(start, end);
self.cached_start = end + 1;
self.cached_ci += 1;
}
}

let start = self.cached_start;
self.push_field_from_cache(start, pos);

if self.cached_buf[pos] == b'\r' && self.cached_buf.get(pos + 1) == Some(&b'\n') {
self.cached_start = pos + 2;
if self
.cached_newline_pos
.get(self.cached_ni + 1)
.copied()
.is_some_and(|p| p == self.cached_newline_pos[self.cached_ni] + 1)
{
self.cached_ni += 1;
self.push_field_from_cache(start, nl_pos);

if self.cached_buf[nl_pos] == b'\r' && self.cached_buf.get(nl_pos + 1) == Some(&b'\n') {
self.cached_start = nl_pos + 2;
if self.peek_newline() == Some(nl_pos + 1) {
self.next_newline();
}
} else {
self.cached_start = pos + 1;
self.cached_start = nl_pos + 1;
}

self.num_rows += 1;
rows_read += 1;
self.cached_ni += 1;
self.cached_bytes_consumed = self.cached_start;
}

// if we've consumed all cached positions, return bytes consumed and clear cache
if self.cached_ni >= self.cached_newline_pos.len() {
// if we've consumed all cached newlines, return bytes consumed and clear cache
if self.peek_newline().is_none() {
let consumed = self.cached_bytes_consumed;
self.clear_cache();
return Ok(consumed);
}

// still have cached data — return 0 to signal "flush then call decode again"
// (the next decode call will continue from cache without re-scanning)
Ok(0)
}

#[inline(always)]
fn next_comma(&mut self) -> Option<usize> {
next_bit(
&mut self.comma_current,
&mut self.comma_idx,
&self.cached_comma_bitsets,
)
}

#[inline(always)]
fn peek_comma(&self) -> Option<usize> {
peek_bit(
self.comma_current,
self.comma_idx,
&self.cached_comma_bitsets,
)
}

#[inline(always)]
fn next_newline(&mut self) -> Option<usize> {
next_bit(
&mut self.newline_current,
&mut self.newline_idx,
&self.cached_newline_bitsets,
)
}

#[inline(always)]
fn peek_newline(&self) -> Option<usize> {
peek_bit(
self.newline_current,
self.newline_idx,
&self.cached_newline_bitsets,
)
}

fn clear_cache(&mut self) {
self.cached_buf.clear();
self.cached_comma_pos.clear();
self.cached_newline_pos.clear();
self.cached_ci = 0;
self.cached_ni = 0;
self.cached_comma_bitsets.clear();
self.cached_newline_bitsets.clear();
self.comma_current = 0;
self.comma_idx = 0;
self.newline_current = 0;
self.newline_idx = 0;
self.cached_start = 0;
self.cached_bytes_consumed = 0;
}
Expand Down Expand Up @@ -256,10 +276,8 @@ impl Decoder {
}

fn get_field_str(&self, row: usize, col: usize) -> Result<&str, ArrowError> {
let field = self.get_field(row, col);

std::str::from_utf8(field).map_err(|e| {
ArrowError::ParseError(format!("invalid utf at row {row}, col {col}: {e}"))
std::str::from_utf8(self.get_field(row, col)).map_err(|e| {
ArrowError::ParseError(format!("invalid utf-8 at row {row}, col {col}: {e}"))
})
}

Expand Down Expand Up @@ -385,13 +403,41 @@ fn build_u64_from_classified(v0: u8x16, v1: u8x16, v2: u8x16, v3: u8x16, broadca
a | (b << 16) | (c << 32) | (d << 48)
}

#[inline]
fn extract_positions(mut bitmask: u64, base: usize, out: &mut Vec<u32>) {
while bitmask != 0 {
let pos = bitmask.trailing_zeros();
out.push((base + pos as usize) as u32);
bitmask &= bitmask - 1;
#[inline(always)]
fn next_bit(current: &mut u64, idx: &mut usize, bitsets: &[u64]) -> Option<usize> {
while *current == 0 {
*idx += 1;

if *idx >= bitsets.len() {
return None;
}

*current = unsafe { *bitsets.get_unchecked(*idx) };
}

let pos = current.trailing_zeros() as usize;

*current &= *current - 1;

Some(*idx * 64 + pos)
}

#[inline(always)]
fn peek_bit(current: u64, idx: usize, bitsets: &[u64]) -> Option<usize> {
let mut current = current;
let mut idx = idx;

while current == 0 {
idx += 1;

if idx >= bitsets.len() {
return None;
}

current = unsafe { *bitsets.get_unchecked(idx) };
}

Some(idx * 64 + current.trailing_zeros() as usize)
}

#[cfg(test)]
Expand Down Expand Up @@ -502,8 +548,6 @@ mod tests {
let input = b"a\nb\nc\nd\n";

let consumed1 = decoder.decode(input).unwrap();
// consumed1 is 0 because cache still has data, need flush first
// OR it consumed all and filled batch
let batch1 = decoder.flush().unwrap().unwrap();
assert_eq!(batch1.num_rows(), 2);

Expand Down
Loading