Skip to content

Commit 47d26d6

Browse files
feat: Implement RowSelection API for efficient row filtering (Phase 1) (#59)
* init mvp * add regression-test * clean * fmt * fix clippy * fix
1 parent 93484df commit 47d26d6

5 files changed

Lines changed: 1085 additions & 7 deletions

File tree

src/array_decoder/mod.rs

Lines changed: 79 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use crate::error::{
4040
use crate::proto::stream::Kind;
4141
use crate::schema::DataType;
4242
use crate::stripe::Stripe;
43+
use crate::RowSelection;
4344

4445
use self::decimal::new_decimal_decoder;
4546
use self::list::ListArrayDecoder;
@@ -238,18 +239,68 @@ pub struct NaiveStripeDecoder {
238239
index: usize,
239240
batch_size: usize,
240241
number_of_rows: usize,
242+
row_selection: Option<RowSelection>,
243+
selection_index: usize,
241244
}
242245

243246
impl Iterator for NaiveStripeDecoder {
244247
type Item = Result<RecordBatch>;
245248

246249
fn next(&mut self) -> Option<Self::Item> {
247250
if self.index < self.number_of_rows {
248-
let record = self
249-
.decode_next_batch(self.number_of_rows - self.index)
250-
.transpose()?;
251-
self.index += self.batch_size;
252-
Some(record)
251+
// Handle row selection if present
252+
if self.row_selection.is_some() {
253+
// Process selectors until we find rows to select or exhaust the selection
254+
loop {
255+
let (is_skip, row_count) = {
256+
// Safety: this has been checked above
257+
let selectors = self.row_selection.as_ref().unwrap().selectors();
258+
if self.selection_index >= selectors.len() {
259+
return None;
260+
}
261+
let selector = selectors[self.selection_index];
262+
(selector.skip, selector.row_count)
263+
};
264+
265+
if is_skip {
266+
// Skip these rows by advancing the index
267+
self.index += row_count;
268+
self.selection_index += 1;
269+
270+
// Decode and discard the skipped rows to advance the internal decoders
271+
if let Err(e) = self.skip_rows(row_count) {
272+
return Some(Err(e));
273+
}
274+
} else {
275+
// Select these rows
276+
let rows_to_read = row_count.min(self.batch_size);
277+
let remaining = self.number_of_rows - self.index;
278+
let actual_rows = rows_to_read.min(remaining);
279+
280+
if actual_rows == 0 {
281+
self.selection_index += 1;
282+
continue;
283+
}
284+
285+
let record = self.decode_next_batch(actual_rows).transpose()?;
286+
self.index += actual_rows;
287+
288+
// Update selector to track progress
289+
if actual_rows >= row_count {
290+
self.selection_index += 1;
291+
}
292+
293+
return Some(record);
294+
}
295+
}
296+
} else {
297+
// No row selection - decode normally
298+
let record = self
299+
.decode_next_batch(self.number_of_rows - self.index)
300+
.transpose()?;
301+
self.index += self.batch_size;
302+
Some(record)
303+
}
253304
} else {
254305
None
255306
}
@@ -433,6 +484,15 @@ impl NaiveStripeDecoder {
433484
}
434485

435486
pub fn new(stripe: Stripe, schema_ref: SchemaRef, batch_size: usize) -> Result<Self> {
487+
Self::new_with_selection(stripe, schema_ref, batch_size, None)
488+
}
489+
490+
pub fn new_with_selection(
491+
stripe: Stripe,
492+
schema_ref: SchemaRef,
493+
batch_size: usize,
494+
row_selection: Option<RowSelection>,
495+
) -> Result<Self> {
436496
let number_of_rows = stripe.number_of_rows();
437497
let decoders = stripe
438498
.columns()
@@ -448,6 +508,20 @@ impl NaiveStripeDecoder {
448508
index: 0,
449509
batch_size,
450510
number_of_rows,
511+
row_selection,
512+
selection_index: 0,
451513
})
452514
}
515+
516+
/// Skip the specified number of rows by decoding and discarding them
517+
fn skip_rows(&mut self, count: usize) -> Result<()> {
518+
// Decode in batches to avoid large memory allocations
519+
let mut remaining = count;
520+
while remaining > 0 {
521+
let chunk = self.batch_size.min(remaining);
522+
let _ = self.inner_decode_next_batch(chunk)?;
523+
remaining -= chunk;
524+
}
525+
Ok(())
526+
}
453527
}

src/arrow_reader.rs

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use crate::error::Result;
2828
use crate::projection::ProjectionMask;
2929
use crate::reader::metadata::{read_metadata, FileMetadata};
3030
use crate::reader::ChunkReader;
31+
use crate::row_selection::RowSelection;
3132
use crate::schema::RootDataType;
3233
use crate::stripe::{Stripe, StripeMetadata};
3334

@@ -40,6 +41,7 @@ pub struct ArrowReaderBuilder<R> {
4041
pub(crate) projection: ProjectionMask,
4142
pub(crate) schema_ref: Option<SchemaRef>,
4243
pub(crate) file_byte_range: Option<Range<usize>>,
44+
pub(crate) row_selection: Option<RowSelection>,
4345
}
4446

4547
impl<R> ArrowReaderBuilder<R> {
@@ -51,6 +53,7 @@ impl<R> ArrowReaderBuilder<R> {
5153
projection: ProjectionMask::all(),
5254
schema_ref: None,
5355
file_byte_range: None,
56+
row_selection: None,
5457
}
5558
}
5659

@@ -79,6 +82,33 @@ impl<R> ArrowReaderBuilder<R> {
7982
self
8083
}
8184

85+
/// Set a [`RowSelection`] to filter rows
86+
///
87+
/// The [`RowSelection`] specifies which rows should be decoded from the ORC file.
88+
/// This can be used to skip rows that don't match predicates, reducing I/O and
89+
/// improving query performance.
90+
///
91+
/// # Example
92+
///
93+
/// ```no_run
94+
/// # use std::fs::File;
95+
/// # use orc_rust::arrow_reader::ArrowReaderBuilder;
96+
/// # use orc_rust::row_selection::{RowSelection, RowSelector};
97+
/// let file = File::open("data.orc").unwrap();
98+
/// let selection = vec![
99+
/// RowSelector::skip(100),
100+
/// RowSelector::select(50),
101+
/// ].into();
102+
/// let reader = ArrowReaderBuilder::try_new(file)
103+
/// .unwrap()
104+
/// .with_row_selection(selection)
105+
/// .build();
106+
/// ```
107+
pub fn with_row_selection(mut self, row_selection: RowSelection) -> Self {
108+
self.row_selection = Some(row_selection);
109+
self
110+
}
111+
82112
/// Returns the currently computed schema
83113
///
84114
/// Unless [`with_schema`](Self::with_schema) was called, this is computed dynamically
@@ -124,6 +154,7 @@ impl<R: ChunkReader> ArrowReaderBuilder<R> {
124154
schema_ref,
125155
current_stripe: None,
126156
batch_size: self.batch_size,
157+
row_selection: self.row_selection,
127158
}
128159
}
129160
}
@@ -133,6 +164,7 @@ pub struct ArrowReader<R> {
133164
schema_ref: SchemaRef,
134165
current_stripe: Option<Box<dyn Iterator<Item = Result<RecordBatch>> + Send>>,
135166
batch_size: usize,
167+
row_selection: Option<RowSelection>,
136168
}
137169

138170
impl<R> ArrowReader<R> {
@@ -146,8 +178,22 @@ impl<R: ChunkReader> ArrowReader<R> {
146178
let stripe = self.cursor.next().transpose()?;
147179
match stripe {
148180
Some(stripe) => {
149-
let decoder =
150-
NaiveStripeDecoder::new(stripe, self.schema_ref.clone(), self.batch_size)?;
181+
// Split off the row selection for this stripe
182+
let stripe_rows = stripe.number_of_rows();
183+
let selection = self.row_selection.as_mut().and_then(|s| {
184+
if s.row_count() > 0 {
185+
Some(s.split_off(stripe_rows))
186+
} else {
187+
None
188+
}
189+
});
190+
191+
let decoder = NaiveStripeDecoder::new_with_selection(
192+
stripe,
193+
self.schema_ref.clone(),
194+
self.batch_size,
195+
selection,
196+
)?;
151197
self.current_stripe = Some(Box::new(decoder));
152198
self.next().transpose()
153199
}

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ pub mod projection;
6161
#[allow(dead_code)]
6262
mod proto;
6363
pub mod reader;
64+
pub mod row_selection;
6465
pub mod schema;
6566
pub mod statistics;
6667
pub mod stripe;
@@ -70,3 +71,4 @@ pub use arrow_reader::{ArrowReader, ArrowReaderBuilder};
7071
pub use arrow_writer::{ArrowWriter, ArrowWriterBuilder};
7172
#[cfg(feature = "async")]
7273
pub use async_arrow_reader::ArrowStreamReader;
74+
pub use row_selection::{RowSelection, RowSelector};

0 commit comments

Comments
 (0)