Skip to content

Commit b1b70f7

Browse files
feat: Implement Efficient Skipping for RowSelection (Phase 2) (#60)
* init api for PrimitiveValueDecoder * impl skip_values for rle_v1 * impl skip_valuse for RleDecoders * impl skip PrimitiveValueDecoder * add tests * impl skip_values * fix test * support async arrow reader * fix clippy * refactor * add test for async * fix clippy
1 parent 47d26d6 commit b1b70f7

20 files changed

Lines changed: 1590 additions & 111 deletions

File tree

src/array_decoder/decimal.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@ impl ArrayBatchDecoder for DecimalArrayDecoder {
101101
let array = Arc::new(array) as ArrayRef;
102102
Ok(array)
103103
}
104+
105+
fn skip_values(&mut self, n: usize, parent_present: Option<&NullBuffer>) -> Result<()> {
106+
self.inner.skip_values(n, parent_present)
107+
}
104108
}
105109

106110
/// This iter fixes the scales of the varints decoded as scale is specified on a per
@@ -112,6 +116,12 @@ struct DecimalScaleRepairDecoder {
112116
}
113117

114118
impl PrimitiveValueDecoder<i128> for DecimalScaleRepairDecoder {
119+
fn skip(&mut self, n: usize) -> Result<()> {
120+
self.varint_iter.skip(n)?;
121+
self.scale_iter.skip(n)?;
122+
Ok(())
123+
}
124+
115125
fn decode(&mut self, out: &mut [i128]) -> Result<()> {
116126
// TODO: can probably optimize, reuse buffers?
117127
let mut varint = vec![0; out.len()];

src/array_decoder/list.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,4 +85,21 @@ impl ArrayBatchDecoder for ListArrayDecoder {
8585
let array = Arc::new(array);
8686
Ok(array)
8787
}
88+
89+
fn skip_values(&mut self, n: usize, parent_present: Option<&NullBuffer>) -> Result<()> {
90+
use super::skip_present_and_get_non_null_count;
91+
92+
let non_null_count =
93+
skip_present_and_get_non_null_count(&mut self.present, parent_present, n)?;
94+
95+
// Decode lengths to determine how many child values to skip
96+
let mut lengths = vec![0; non_null_count];
97+
self.lengths.decode(&mut lengths)?;
98+
let total_length: i64 = lengths.iter().sum();
99+
100+
// Skip the child values (children don't have parent_present from list)
101+
self.inner.skip_values(total_length as usize, None)?;
102+
103+
Ok(())
104+
}
88105
}

src/array_decoder/map.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,4 +102,22 @@ impl ArrayBatchDecoder for MapArrayDecoder {
102102
let array = Arc::new(array);
103103
Ok(array)
104104
}
105+
106+
fn skip_values(&mut self, n: usize, parent_present: Option<&NullBuffer>) -> Result<()> {
107+
use super::skip_present_and_get_non_null_count;
108+
109+
let non_null_count =
110+
skip_present_and_get_non_null_count(&mut self.present, parent_present, n)?;
111+
112+
// Decode lengths to determine how many entries to skip
113+
let mut lengths = vec![0; non_null_count];
114+
self.lengths.decode(&mut lengths)?;
115+
let total_length: i64 = lengths.iter().sum();
116+
117+
// Skip both keys and values (they don't have parent_present from map)
118+
self.keys.skip_values(total_length as usize, None)?;
119+
self.values.skip_values(total_length as usize, None)?;
120+
121+
Ok(())
122+
}
105123
}

src/array_decoder/mod.rs

Lines changed: 128 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,13 @@ pub trait ArrayBatchDecoder: Send {
7575
batch_size: usize,
7676
parent_present: Option<&NullBuffer>,
7777
) -> Result<ArrayRef>;
78+
79+
/// Skip the next `n` values without decoding them, failing if it cannot skip the enough values.
80+
/// If parent nested type (e.g. Struct) indicates a null in it's PRESENT stream,
81+
/// then the child doesn't have a value (similar to other nullability). So we need
82+
/// to take care to insert these null values as Arrow requires the child to hold
83+
/// data in the null slot of the child.
84+
fn skip_values(&mut self, n: usize, parent_present: Option<&NullBuffer>) -> Result<()>;
7885
}
7986

8087
struct PrimitiveArrayDecoder<T: ArrowPrimitiveType> {
@@ -123,6 +130,12 @@ impl<T: ArrowPrimitiveType> ArrayBatchDecoder for PrimitiveArrayDecoder<T> {
123130
let array = Arc::new(array) as ArrayRef;
124131
Ok(array)
125132
}
133+
134+
fn skip_values(&mut self, n: usize, parent_present: Option<&NullBuffer>) -> Result<()> {
135+
let non_null_count =
136+
skip_present_and_get_non_null_count(&mut self.present, parent_present, n)?;
137+
self.iter.skip(non_null_count)
138+
}
126139
}
127140

128141
type Int64ArrayDecoder = PrimitiveArrayDecoder<Int64Type>;
@@ -168,6 +181,12 @@ impl ArrayBatchDecoder for BooleanArrayDecoder {
168181
};
169182
Ok(Arc::new(array))
170183
}
184+
185+
fn skip_values(&mut self, n: usize, parent_present: Option<&NullBuffer>) -> Result<()> {
186+
let non_null_count =
187+
skip_present_and_get_non_null_count(&mut self.present, parent_present, n)?;
188+
self.iter.skip(non_null_count)
189+
}
171190
}
172191

173192
struct PresentDecoder {
@@ -232,6 +251,42 @@ fn derive_present_vec(
232251
}
233252
}
234253

254+
/// Skip n values and return the non-null count for the data stream
255+
fn skip_present_and_get_non_null_count(
256+
present: &mut Option<PresentDecoder>,
257+
parent_present: Option<&NullBuffer>,
258+
n: usize,
259+
) -> Result<usize> {
260+
match (present, parent_present) {
261+
(Some(present), Some(parent_present)) => {
262+
// Parent has nulls, so we need to decode parent present to know how many
263+
// of our present values to skip
264+
let non_null_in_parent = parent_present.len() - parent_present.null_count();
265+
266+
// Skip our present values for non-null parents and count non-nulls
267+
let mut our_present = vec![false; non_null_in_parent];
268+
present.inner.decode(&mut our_present)?;
269+
let our_non_null_count = our_present.iter().filter(|&&v| v).count();
270+
271+
Ok(our_non_null_count)
272+
}
273+
(Some(present), None) => {
274+
// No parent present, skip n values and count non-nulls
275+
let mut present_values = vec![false; n];
276+
present.inner.decode(&mut present_values)?;
277+
Ok(present_values.iter().filter(|&&v| v).count())
278+
}
279+
(None, Some(parent_present)) => {
280+
// No our present stream, all non-null parents have data
281+
Ok(parent_present.len() - parent_present.null_count())
282+
}
283+
(None, None) => {
284+
// No nulls at all, all n values have data
285+
Ok(n)
286+
}
287+
}
288+
}
289+
235290
pub struct NaiveStripeDecoder {
236291
stripe: Stripe,
237292
schema_ref: SchemaRef,
@@ -243,56 +298,81 @@ pub struct NaiveStripeDecoder {
243298
selection_index: usize,
244299
}
245300

301+
impl NaiveStripeDecoder {
302+
/// Advance according to the configured row selection and return the next batch, if any.
303+
///
304+
/// Behavior:
305+
/// - Iterates `RowSelection` segments (skip/select) starting at `selection_index`.
306+
/// - For skip segments: clamp to remaining rows in this stripe, advance decoders via
307+
/// `skip_rows(actual_skip)`, and advance `index`. If the segment is fully consumed,
308+
/// increment `selection_index`.
309+
/// - For select segments: decode up to `min(row_count, batch_size, remaining_in_stripe)`,
310+
/// advance `index`, update `selection_index` if fully consumed, and return the batch.
311+
/// - If a segment requests rows beyond the end of the stripe, it is skipped (advancing
312+
/// `selection_index`) without touching decoders.
313+
fn next_with_row_selection(&mut self) -> Option<Result<RecordBatch>> {
314+
// Process selectors until we produce a batch or exhaust selection
315+
loop {
316+
let (is_skip, row_count) = {
317+
let selectors = self.row_selection.as_ref().unwrap().selectors();
318+
if self.selection_index >= selectors.len() {
319+
return None;
320+
}
321+
let selector = selectors[self.selection_index];
322+
(selector.skip, selector.row_count)
323+
};
324+
325+
if is_skip {
326+
let remaining = self.number_of_rows - self.index;
327+
let actual_skip = row_count.min(remaining);
328+
329+
if actual_skip == 0 {
330+
// Nothing to skip in this stripe; try next selector
331+
self.selection_index += 1;
332+
continue;
333+
}
334+
335+
// Keep decoders in sync by skipping values per column
336+
if let Err(e) = self.skip_rows(actual_skip) {
337+
return Some(Err(e));
338+
}
339+
self.index += actual_skip;
340+
341+
if actual_skip >= row_count {
342+
self.selection_index += 1;
343+
}
344+
} else {
345+
let rows_to_read = row_count.min(self.batch_size);
346+
let remaining = self.number_of_rows - self.index;
347+
let actual_rows = rows_to_read.min(remaining);
348+
349+
if actual_rows == 0 {
350+
// Nothing to read from this selector in this stripe; advance selector
351+
self.selection_index += 1;
352+
continue;
353+
}
354+
355+
let record = self.decode_next_batch(actual_rows).transpose()?;
356+
self.index += actual_rows;
357+
358+
if actual_rows >= row_count {
359+
self.selection_index += 1;
360+
}
361+
return Some(record);
362+
}
363+
}
364+
}
365+
}
366+
246367
impl Iterator for NaiveStripeDecoder {
247368
type Item = Result<RecordBatch>;
248369

370+
// TODO: check if we can make this more efficient
249371
fn next(&mut self) -> Option<Self::Item> {
250372
if self.index < self.number_of_rows {
251373
// Handle row selection if present
252374
if self.row_selection.is_some() {
253-
// Process selectors until we find rows to select or exhaust the selection
254-
loop {
255-
let (is_skip, row_count) = {
256-
// Safety: this has been checked above
257-
let selectors = self.row_selection.as_ref().unwrap().selectors();
258-
if self.selection_index >= selectors.len() {
259-
return None;
260-
}
261-
let selector = selectors[self.selection_index];
262-
(selector.skip, selector.row_count)
263-
};
264-
265-
if is_skip {
266-
// Skip these rows by advancing the index
267-
self.index += row_count;
268-
self.selection_index += 1;
269-
270-
// Decode and discard the skipped rows to advance the internal decoders
271-
if let Err(e) = self.skip_rows(row_count) {
272-
return Some(Err(e));
273-
}
274-
} else {
275-
// Select these rows
276-
let rows_to_read = row_count.min(self.batch_size);
277-
let remaining = self.number_of_rows - self.index;
278-
let actual_rows = rows_to_read.min(remaining);
279-
280-
if actual_rows == 0 {
281-
self.selection_index += 1;
282-
continue;
283-
}
284-
285-
let record = self.decode_next_batch(actual_rows).transpose()?;
286-
self.index += actual_rows;
287-
288-
// Update selector to track progress
289-
if actual_rows >= row_count {
290-
self.selection_index += 1;
291-
}
292-
293-
return Some(record);
294-
}
295-
}
375+
self.next_with_row_selection()
296376
} else {
297377
// No row selection - decode normally
298378
let record = self
@@ -513,14 +593,12 @@ impl NaiveStripeDecoder {
513593
})
514594
}
515595

516-
/// Skip the specified number of rows by decoding and discarding them
596+
/// Skip the specified number of rows by calling skip_values on each decoder
517597
fn skip_rows(&mut self, count: usize) -> Result<()> {
518-
// Decode in batches to avoid large memory allocations
519-
let mut remaining = count;
520-
while remaining > 0 {
521-
let chunk = self.batch_size.min(remaining);
522-
let _ = self.inner_decode_next_batch(chunk)?;
523-
remaining -= chunk;
598+
// Call skip_values on each decoder to efficiently skip rows
599+
// Top-level decoders don't have parent_present
600+
for decoder in &mut self.decoders {
601+
decoder.skip_values(count, None)?;
524602
}
525603
Ok(())
526604
}

src/array_decoder/string.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,28 @@ impl<T: ByteArrayType> ArrayBatchDecoder for GenericByteArrayDecoder<T> {
155155
let array = Arc::new(array) as ArrayRef;
156156
Ok(array)
157157
}
158+
159+
fn skip_values(&mut self, n: usize, parent_present: Option<&NullBuffer>) -> Result<()> {
160+
use crate::array_decoder::skip_present_and_get_non_null_count;
161+
162+
let non_null_count =
163+
skip_present_and_get_non_null_count(&mut self.present, parent_present, n)?;
164+
165+
// Decode lengths to determine how many bytes to skip
166+
let mut lengths = vec![0; non_null_count];
167+
self.lengths.decode(&mut lengths)?;
168+
let total_bytes: i64 = lengths.iter().sum();
169+
170+
// Skip the data bytes
171+
// TODO: can we use the decompressor to skip the bytes?
172+
std::io::copy(
173+
&mut self.bytes.by_ref().take(total_bytes as u64),
174+
&mut std::io::sink(),
175+
)
176+
.context(IoSnafu)?;
177+
178+
Ok(())
179+
}
158180
}
159181

160182
pub struct DictionaryStringArrayDecoder {
@@ -192,4 +214,8 @@ impl ArrayBatchDecoder for DictionaryStringArrayDecoder {
192214
let array = Arc::new(array);
193215
Ok(array)
194216
}
217+
218+
fn skip_values(&mut self, n: usize, parent_present: Option<&NullBuffer>) -> Result<()> {
219+
self.indexes.skip_values(n, parent_present)
220+
}
195221
}

src/array_decoder/struct_decoder.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,4 +76,19 @@ impl ArrayBatchDecoder for StructArrayDecoder {
7676
let array = Arc::new(array);
7777
Ok(array)
7878
}
79+
80+
fn skip_values(&mut self, n: usize, parent_present: Option<&NullBuffer>) -> Result<()> {
81+
use super::derive_present_vec;
82+
83+
// Derive the combined present buffer like in next_batch
84+
let present = derive_present_vec(&mut self.present, parent_present, n).transpose()?;
85+
86+
// Skip values in all child decoders
87+
// Pass the present buffer to children so they know which values to skip
88+
for decoder in &mut self.decoders {
89+
decoder.skip_values(n, present.as_ref())?;
90+
}
91+
92+
Ok(())
93+
}
7994
}

src/array_decoder/timestamp.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,10 @@ impl<T: ArrowTimestampType> ArrayBatchDecoder for TimestampOffsetArrayDecoder<T>
267267
let array = Arc::new(array) as ArrayRef;
268268
Ok(array)
269269
}
270+
271+
fn skip_values(&mut self, n: usize, parent_present: Option<&NullBuffer>) -> Result<()> {
272+
self.inner.skip_values(n, parent_present)
273+
}
270274
}
271275

272276
/// Wrapper around PrimitiveArrayDecoder to allow specifying the timezone of the output
@@ -286,6 +290,10 @@ impl<T: ArrowTimestampType> ArrayBatchDecoder for TimestampInstantArrayDecoder<T
286290
let array = Arc::new(array) as ArrayRef;
287291
Ok(array)
288292
}
293+
294+
fn skip_values(&mut self, n: usize, parent_present: Option<&NullBuffer>) -> Result<()> {
295+
self.0.skip_values(n, parent_present)
296+
}
289297
}
290298

291299
struct TimestampNanosecondAsDecimalWithTzDecoder(TimestampNanosecondAsDecimalDecoder, Tz);
@@ -308,6 +316,11 @@ impl TimestampNanosecondAsDecimalWithTzDecoder {
308316
}
309317

310318
impl PrimitiveValueDecoder<i128> for TimestampNanosecondAsDecimalWithTzDecoder {
319+
fn skip(&mut self, n: usize) -> Result<()> {
320+
self.0.skip(n)?;
321+
Ok(())
322+
}
323+
311324
fn decode(&mut self, out: &mut [i128]) -> Result<()> {
312325
self.0.decode(out)?;
313326
for x in out.iter_mut() {

0 commit comments

Comments
 (0)