Skip to content

Commit 0d03440

Browse files
committed
adapt arrow_reader and async_arrow_reader
1 parent 97a6940 commit 0d03440

2 files changed

Lines changed: 177 additions & 18 deletions

File tree

src/arrow_reader.rs

Lines changed: 97 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@ use arrow::record_batch::{RecordBatch, RecordBatchReader};
2525

2626
use crate::array_decoder::NaiveStripeDecoder;
2727
use crate::error::Result;
28+
use crate::predicate::Predicate;
2829
use crate::projection::ProjectionMask;
2930
use crate::reader::metadata::{read_metadata, FileMetadata};
3031
use crate::reader::ChunkReader;
32+
use crate::row_group_filter::evaluate_predicate;
3133
use crate::row_selection::RowSelection;
3234
use crate::schema::RootDataType;
3335
use crate::stripe::{Stripe, StripeMetadata};
@@ -42,6 +44,7 @@ pub struct ArrowReaderBuilder<R> {
4244
pub(crate) schema_ref: Option<SchemaRef>,
4345
pub(crate) file_byte_range: Option<Range<usize>>,
4446
pub(crate) row_selection: Option<RowSelection>,
47+
pub(crate) predicate: Option<Predicate>,
4548
}
4649

4750
impl<R> ArrowReaderBuilder<R> {
@@ -54,6 +57,7 @@ impl<R> ArrowReaderBuilder<R> {
5457
schema_ref: None,
5558
file_byte_range: None,
5659
row_selection: None,
60+
predicate: None,
5761
}
5862
}
5963

@@ -109,6 +113,44 @@ impl<R> ArrowReaderBuilder<R> {
109113
self
110114
}
111115

116+
/// Set a predicate for row group filtering
117+
///
118+
/// The predicate will be evaluated against row group statistics to automatically
119+
/// generate a [`RowSelection`] that skips filtered row groups. This provides
120+
/// efficient predicate pushdown based on ORC row indexes.
121+
///
122+
/// The predicate is evaluated lazily when each stripe is read, using the row group
123+
/// statistics from the stripe's index section.
124+
///
125+
/// If both `with_predicate()` and `with_row_selection()` are called, the results
126+
/// are combined using logical AND (both conditions must be satisfied).
127+
///
128+
/// # Example
129+
///
130+
/// ```no_run
131+
/// # use std::fs::File;
132+
/// # use orc_rust::{ArrowReaderBuilder, Predicate, PredicateValue};
133+
/// let file = File::open("data.orc").unwrap();
134+
///
135+
/// // Filter: age >= 18
136+
/// let predicate = Predicate::gte("age", PredicateValue::Int32(Some(18)));
137+
///
138+
/// let reader = ArrowReaderBuilder::try_new(file)
139+
/// .unwrap()
140+
/// .with_predicate(predicate)
141+
/// .build();
142+
/// ```
143+
///
144+
/// # Notes
145+
///
146+
/// - Predicate evaluation requires row indexes to be present in the ORC file
147+
/// - If row indexes are missing, the predicate is ignored (all row groups are kept)
148+
/// - Only primitive columns have row indexes; predicates on compound types may be limited
149+
pub fn with_predicate(mut self, predicate: Predicate) -> Self {
150+
self.predicate = Some(predicate);
151+
self
152+
}
153+
112154
/// Returns the currently computed schema
113155
///
114156
/// Unless [`with_schema`](Self::with_schema) was called, this is computed dynamically
@@ -142,6 +184,7 @@ impl<R: ChunkReader> ArrowReaderBuilder<R> {
142184
.file_metadata
143185
.root_data_type()
144186
.project(&self.projection);
187+
let projected_data_type_clone = projected_data_type.clone();
145188
let cursor = Cursor {
146189
reader: self.reader,
147190
file_metadata: self.file_metadata,
@@ -155,6 +198,8 @@ impl<R: ChunkReader> ArrowReaderBuilder<R> {
155198
current_stripe: None,
156199
batch_size: self.batch_size,
157200
row_selection: self.row_selection,
201+
predicate: self.predicate,
202+
projected_data_type: projected_data_type_clone,
158203
}
159204
}
160205
}
@@ -165,6 +210,8 @@ pub struct ArrowReader<R> {
165210
current_stripe: Option<Box<dyn Iterator<Item = Result<RecordBatch>> + Send>>,
166211
batch_size: usize,
167212
row_selection: Option<RowSelection>,
213+
predicate: Option<Predicate>,
214+
projected_data_type: RootDataType,
168215
}
169216

170217
impl<R> ArrowReader<R> {
@@ -178,21 +225,63 @@ impl<R: ChunkReader> ArrowReader<R> {
178225
let stripe = self.cursor.next().transpose()?;
179226
match stripe {
180227
Some(stripe) => {
181-
// Split off the row selection for this stripe
182228
let stripe_rows = stripe.number_of_rows();
183-
let selection = self.row_selection.as_mut().and_then(|s| {
184-
if s.row_count() > 0 {
185-
Some(s.split_off(stripe_rows))
186-
} else {
187-
None
229+
230+
// Evaluate predicate if present
231+
let mut stripe_selection: Option<RowSelection> = None;
232+
if let Some(ref predicate) = self.predicate {
233+
// Try to read row indexes for this stripe
234+
match stripe.read_row_indexes(&self.cursor.file_metadata) {
235+
Ok(row_index) => {
236+
// Evaluate predicate against row group statistics
237+
match evaluate_predicate(predicate, &row_index, &self.projected_data_type) {
238+
Ok(row_group_filter) => {
239+
// Generate RowSelection from filter results
240+
let rows_per_group = self
241+
.cursor
242+
.file_metadata
243+
.row_index_stride()
244+
.unwrap_or(10_000);
245+
stripe_selection = Some(RowSelection::from_row_group_filter(
246+
&row_group_filter,
247+
rows_per_group,
248+
stripe_rows,
249+
));
250+
}
251+
Err(_) => {
252+
// Predicate evaluation failed (e.g., column not found)
253+
// Keep all rows (maybe)
254+
stripe_selection = Some(RowSelection::select_all(stripe_rows));
255+
}
256+
}
257+
}
258+
Err(_) => {
259+
// Row indexes not available, keep all rows (maybe)
260+
stripe_selection = Some(RowSelection::select_all(stripe_rows));
261+
}
262+
}
263+
}
264+
265+
// Combine with existing row_selection if present
266+
let mut final_selection = stripe_selection;
267+
if let Some(ref mut existing_selection) = self.row_selection {
268+
if existing_selection.row_count() > 0 {
269+
let existing_for_stripe = existing_selection.split_off(stripe_rows);
270+
final_selection = match final_selection {
271+
Some(predicate_selection) => {
272+
// Both predicate and manual selection: combine with AND
273+
Some(existing_for_stripe.and_then(&predicate_selection))
274+
}
275+
None => Some(existing_for_stripe),
276+
};
188277
}
189-
});
278+
}
190279

191280
let decoder = NaiveStripeDecoder::new_with_selection(
192281
stripe,
193282
self.schema_ref.clone(),
194283
self.batch_size,
195-
selection,
284+
final_selection,
196285
)?;
197286
self.current_stripe = Some(Box::new(decoder));
198287
self.next().transpose()

src/async_arrow_reader.rs

Lines changed: 80 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,12 @@ use futures_util::FutureExt;
3030
use crate::array_decoder::NaiveStripeDecoder;
3131
use crate::arrow_reader::Cursor;
3232
use crate::error::Result;
33+
use crate::predicate::Predicate;
3334
use crate::reader::metadata::read_metadata_async;
3435
use crate::reader::AsyncChunkReader;
36+
use crate::row_group_filter::evaluate_predicate;
3537
use crate::row_selection::RowSelection;
38+
use crate::schema::RootDataType;
3639
use crate::stripe::{Stripe, StripeMetadata};
3740
use crate::ArrowReaderBuilder;
3841

@@ -79,6 +82,9 @@ pub struct ArrowStreamReader<R: AsyncChunkReader> {
7982
batch_size: usize,
8083
schema_ref: SchemaRef,
8184
row_selection: Option<RowSelection>,
85+
predicate: Option<Predicate>,
86+
projected_data_type: RootDataType,
87+
file_metadata: Arc<crate::reader::metadata::FileMetadata>,
8288
state: StreamState<R>,
8389
}
8490

@@ -131,12 +137,18 @@ impl<R: AsyncChunkReader + 'static> ArrowStreamReader<R> {
131137
batch_size: usize,
132138
schema_ref: SchemaRef,
133139
row_selection: Option<RowSelection>,
140+
predicate: Option<Predicate>,
141+
projected_data_type: RootDataType,
142+
file_metadata: Arc<crate::reader::metadata::FileMetadata>,
134143
) -> Self {
135144
Self {
136145
factory: Some(Box::new(cursor.into())),
137146
batch_size,
138147
schema_ref,
139148
row_selection,
149+
predicate,
150+
projected_data_type,
151+
file_metadata,
140152
state: StreamState::Init,
141153
}
142154
}
@@ -180,21 +192,70 @@ impl<R: AsyncChunkReader + 'static> ArrowStreamReader<R> {
180192
Ok((factory, Some(stripe))) => {
181193
self.factory = Some(Box::new(factory));
182194

183-
// Split off the row selection for this stripe
184195
let stripe_rows = stripe.number_of_rows();
185-
let selection = self.row_selection.as_mut().and_then(|s| {
186-
if s.row_count() > 0 {
187-
Some(s.split_off(stripe_rows))
188-
} else {
189-
None
196+
197+
// Evaluate predicate if present
198+
let mut stripe_selection: Option<RowSelection> = None;
199+
if let Some(ref predicate) = self.predicate {
200+
// Try to read row indexes for this stripe
201+
match stripe.read_row_indexes(&self.file_metadata) {
202+
Ok(row_index) => {
203+
// Evaluate predicate against row group statistics
204+
match evaluate_predicate(
205+
predicate,
206+
&row_index,
207+
&self.projected_data_type,
208+
) {
209+
Ok(row_group_filter) => {
210+
// Generate RowSelection from filter results
211+
let rows_per_group = self
212+
.file_metadata
213+
.row_index_stride()
214+
.unwrap_or(10_000);
215+
stripe_selection =
216+
Some(RowSelection::from_row_group_filter(
217+
&row_group_filter,
218+
rows_per_group,
219+
stripe_rows,
220+
));
221+
}
222+
Err(_) => {
223+
// Predicate evaluation failed (e.g., column not found)
224+
// Keep all rows (maybe)
225+
stripe_selection =
226+
Some(RowSelection::select_all(stripe_rows));
227+
}
228+
}
229+
}
230+
Err(_) => {
231+
// Row indexes not available, keep all rows (maybe)
232+
stripe_selection = Some(RowSelection::select_all(stripe_rows));
233+
}
234+
}
235+
}
236+
237+
// Combine with existing row_selection if present
238+
let mut final_selection = stripe_selection;
239+
if let Some(ref mut existing_selection) = self.row_selection {
240+
if existing_selection.row_count() > 0 {
241+
let existing_for_stripe = existing_selection.split_off(stripe_rows);
242+
final_selection = match final_selection {
243+
Some(predicate_selection) => {
244+
// Both predicate and manual selection: combine with AND
245+
Some(
246+
existing_for_stripe.and_then(&predicate_selection),
247+
)
248+
}
249+
None => Some(existing_for_stripe),
250+
};
190251
}
191-
});
252+
}
192253

193254
match NaiveStripeDecoder::new_with_selection(
194255
stripe,
195256
self.schema_ref.clone(),
196257
self.batch_size,
197-
selection,
258+
final_selection,
198259
) {
199260
Ok(decoder) => {
200261
self.state = StreamState::Decoding(Box::new(decoder));
@@ -241,14 +302,23 @@ impl<R: AsyncChunkReader + 'static> ArrowReaderBuilder<R> {
241302
.file_metadata()
242303
.root_data_type()
243304
.project(&self.projection);
305+
let projected_data_type_clone = projected_data_type.clone();
244306
let schema_ref = self.schema();
245307
let cursor = Cursor {
246308
reader: self.reader,
247-
file_metadata: self.file_metadata,
309+
file_metadata: self.file_metadata.clone(),
248310
projected_data_type,
249311
stripe_index: 0,
250312
file_byte_range: self.file_byte_range,
251313
};
252-
ArrowStreamReader::new(cursor, self.batch_size, schema_ref, self.row_selection)
314+
ArrowStreamReader::new(
315+
cursor,
316+
self.batch_size,
317+
schema_ref,
318+
self.row_selection,
319+
self.predicate,
320+
projected_data_type_clone,
321+
self.file_metadata,
322+
)
253323
}
254324
}

0 commit comments

Comments
 (0)