Skip to content

Commit 7ca6cc2

Browse files
committed
init mvp
1 parent 93484df commit 7ca6cc2

5 files changed

Lines changed: 836 additions & 7 deletions

File tree

examples/row_selection_example.rs

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Example demonstrating how to use RowSelection to skip rows when reading ORC files
19+
20+
use std::fs::File;
21+
use std::sync::Arc;
22+
23+
use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
24+
use arrow::datatypes::{DataType, Field, Schema};
25+
use orc_rust::arrow_reader::ArrowReaderBuilder;
26+
use orc_rust::arrow_writer::ArrowWriterBuilder;
27+
use orc_rust::row_selection::{RowSelection, RowSelector};
28+
29+
fn main() -> Result<(), Box<dyn std::error::Error>> {
30+
// Step 1: Create a sample ORC file with 100 rows
31+
println!("Creating sample ORC file...");
32+
let file_path = "/tmp/row_selection_example.orc";
33+
create_sample_orc_file(file_path)?;
34+
35+
// Step 2: Read the file without row selection (baseline)
36+
println!("\n=== Reading all rows (no selection) ===");
37+
let file = File::open(file_path)?;
38+
let reader = ArrowReaderBuilder::try_new(file)?.build();
39+
let mut total_rows = 0;
40+
for batch in reader {
41+
let batch = batch?;
42+
total_rows += batch.num_rows();
43+
}
44+
println!("Total rows read: {}", total_rows);
45+
46+
// Step 3: Read with row selection - skip first 30 rows, select next 40, skip rest
47+
println!("\n=== Reading with row selection ===");
48+
let file = File::open(file_path)?;
49+
50+
// Create a selection: skip 30, select 40, skip 30
51+
let selection = vec![
52+
RowSelector::skip(30),
53+
RowSelector::select(40),
54+
RowSelector::skip(30),
55+
]
56+
.into();
57+
58+
let reader = ArrowReaderBuilder::try_new(file)?
59+
.with_row_selection(selection)
60+
.build();
61+
62+
let mut selected_rows = 0;
63+
let mut batches = Vec::new();
64+
for batch in reader {
65+
let batch = batch?;
66+
selected_rows += batch.num_rows();
67+
batches.push(batch);
68+
}
69+
70+
println!("Total rows selected: {}", selected_rows);
71+
println!("Expected: 40, Actual: {}", selected_rows);
72+
73+
// Display some of the selected data
74+
if let Some(first_batch) = batches.first() {
75+
let id_col = first_batch
76+
.column(0)
77+
.as_any()
78+
.downcast_ref::<Int32Array>()
79+
.unwrap();
80+
let name_col = first_batch
81+
.column(1)
82+
.as_any()
83+
.downcast_ref::<StringArray>()
84+
.unwrap();
85+
86+
println!("\nFirst 5 selected rows:");
87+
for i in 0..5.min(first_batch.num_rows()) {
88+
println!(
89+
" id: {}, name: {}",
90+
id_col.value(i),
91+
name_col.value(i)
92+
);
93+
}
94+
}
95+
96+
// Step 4: Read with multiple non-consecutive selections
97+
println!("\n=== Reading with multiple selections ===");
98+
let file = File::open(file_path)?;
99+
100+
// Select rows 10-20 and 60-70
101+
let selection = RowSelection::from_consecutive_ranges(
102+
vec![10..20, 60..70].into_iter(),
103+
100,
104+
);
105+
106+
let reader = ArrowReaderBuilder::try_new(file)?
107+
.with_row_selection(selection)
108+
.build();
109+
110+
let mut selected_rows = 0;
111+
for batch in reader {
112+
let batch = batch?;
113+
selected_rows += batch.num_rows();
114+
}
115+
116+
println!("Total rows selected: {}", selected_rows);
117+
println!("Expected: 20 (10 from each range)");
118+
119+
println!("\n✓ Row selection example completed successfully!");
120+
121+
Ok(())
122+
}
123+
124+
fn create_sample_orc_file(path: &str) -> Result<(), Box<dyn std::error::Error>> {
125+
let schema = Arc::new(Schema::new(vec![
126+
Field::new("id", DataType::Int32, false),
127+
Field::new("name", DataType::Utf8, false),
128+
]));
129+
130+
// Create 100 rows
131+
let ids: ArrayRef = Arc::new(Int32Array::from((0..100).collect::<Vec<i32>>()));
132+
let names: ArrayRef = Arc::new(StringArray::from(
133+
(0..100)
134+
.map(|i| format!("name_{}", i))
135+
.collect::<Vec<String>>(),
136+
));
137+
138+
let batch = RecordBatch::try_new(schema.clone(), vec![ids, names])?;
139+
140+
let file = File::create(path)?;
141+
let mut writer = ArrowWriterBuilder::new(file, schema).try_build()?;
142+
writer.write(&batch)?;
143+
writer.close()?;
144+
145+
Ok(())
146+
}
147+

src/array_decoder/mod.rs

Lines changed: 80 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -238,18 +238,70 @@ pub struct NaiveStripeDecoder {
238238
index: usize,
239239
batch_size: usize,
240240
number_of_rows: usize,
241+
row_selection: Option<crate::row_selection::RowSelection>,
242+
selection_index: usize,
241243
}
242244

243245
impl Iterator for NaiveStripeDecoder {
244246
type Item = Result<RecordBatch>;
245247

246248
fn next(&mut self) -> Option<Self::Item> {
247249
if self.index < self.number_of_rows {
248-
let record = self
249-
.decode_next_batch(self.number_of_rows - self.index)
250-
.transpose()?;
251-
self.index += self.batch_size;
252-
Some(record)
250+
// Handle row selection if present
251+
if self.row_selection.is_some() {
252+
// Process selectors until we find rows to select or exhaust the selection
253+
loop {
254+
let selector_info = {
255+
let selection = self.row_selection.as_ref().unwrap();
256+
let selectors = selection.selectors();
257+
if self.selection_index >= selectors.len() {
258+
return None;
259+
}
260+
let selector = selectors[self.selection_index];
261+
(selector.skip, selector.row_count)
262+
};
263+
264+
let (is_skip, row_count) = selector_info;
265+
266+
if is_skip {
267+
// Skip these rows by advancing the index
268+
self.index += row_count;
269+
self.selection_index += 1;
270+
271+
// Decode and discard the skipped rows to advance the internal decoders
272+
if let Err(e) = self.skip_rows(row_count) {
273+
return Some(Err(e));
274+
}
275+
} else {
276+
// Select these rows
277+
let rows_to_read = row_count.min(self.batch_size);
278+
let remaining = self.number_of_rows - self.index;
279+
let actual_rows = rows_to_read.min(remaining);
280+
281+
if actual_rows == 0 {
282+
self.selection_index += 1;
283+
continue;
284+
}
285+
286+
let record = self.decode_next_batch(actual_rows).transpose()?;
287+
self.index += actual_rows;
288+
289+
// Update selector to track progress
290+
if actual_rows >= row_count {
291+
self.selection_index += 1;
292+
}
293+
294+
return Some(record);
295+
}
296+
}
297+
} else {
298+
// No row selection - decode normally
299+
let record = self
300+
.decode_next_batch(self.number_of_rows - self.index)
301+
.transpose()?;
302+
self.index += self.batch_size;
303+
Some(record)
304+
}
253305
} else {
254306
None
255307
}
@@ -433,6 +485,15 @@ impl NaiveStripeDecoder {
433485
}
434486

435487
pub fn new(stripe: Stripe, schema_ref: SchemaRef, batch_size: usize) -> Result<Self> {
488+
Self::new_with_selection(stripe, schema_ref, batch_size, None)
489+
}
490+
491+
pub fn new_with_selection(
492+
stripe: Stripe,
493+
schema_ref: SchemaRef,
494+
batch_size: usize,
495+
row_selection: Option<crate::row_selection::RowSelection>,
496+
) -> Result<Self> {
436497
let number_of_rows = stripe.number_of_rows();
437498
let decoders = stripe
438499
.columns()
@@ -448,6 +509,20 @@ impl NaiveStripeDecoder {
448509
index: 0,
449510
batch_size,
450511
number_of_rows,
512+
row_selection,
513+
selection_index: 0,
451514
})
452515
}
516+
517+
/// Skip the specified number of rows by decoding and discarding them
518+
fn skip_rows(&mut self, count: usize) -> Result<()> {
519+
// Decode in batches to avoid large memory allocations
520+
let mut remaining = count;
521+
while remaining > 0 {
522+
let chunk = self.batch_size.min(remaining);
523+
let _ = self.inner_decode_next_batch(chunk)?;
524+
remaining -= chunk;
525+
}
526+
Ok(())
527+
}
453528
}

src/arrow_reader.rs

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use crate::error::Result;
2828
use crate::projection::ProjectionMask;
2929
use crate::reader::metadata::{read_metadata, FileMetadata};
3030
use crate::reader::ChunkReader;
31+
use crate::row_selection::RowSelection;
3132
use crate::schema::RootDataType;
3233
use crate::stripe::{Stripe, StripeMetadata};
3334

@@ -40,6 +41,7 @@ pub struct ArrowReaderBuilder<R> {
4041
pub(crate) projection: ProjectionMask,
4142
pub(crate) schema_ref: Option<SchemaRef>,
4243
pub(crate) file_byte_range: Option<Range<usize>>,
44+
pub(crate) row_selection: Option<RowSelection>,
4345
}
4446

4547
impl<R> ArrowReaderBuilder<R> {
@@ -51,6 +53,7 @@ impl<R> ArrowReaderBuilder<R> {
5153
projection: ProjectionMask::all(),
5254
schema_ref: None,
5355
file_byte_range: None,
56+
row_selection: None,
5457
}
5558
}
5659

@@ -79,6 +82,33 @@ impl<R> ArrowReaderBuilder<R> {
7982
self
8083
}
8184

85+
/// Set a [`RowSelection`] to filter rows
86+
///
87+
/// The [`RowSelection`] specifies which rows should be decoded from the ORC file.
88+
/// This can be used to skip rows that don't match predicates, reducing I/O and
89+
/// improving query performance.
90+
///
91+
/// # Example
92+
///
93+
/// ```no_run
94+
/// # use std::fs::File;
95+
/// # use orc_rust::arrow_reader::ArrowReaderBuilder;
96+
/// # use orc_rust::row_selection::{RowSelection, RowSelector};
97+
/// let file = File::open("data.orc").unwrap();
98+
/// let selection = vec![
99+
/// RowSelector::skip(100),
100+
/// RowSelector::select(50),
101+
/// ].into();
102+
/// let reader = ArrowReaderBuilder::try_new(file)
103+
/// .unwrap()
104+
/// .with_row_selection(selection)
105+
/// .build();
106+
/// ```
107+
pub fn with_row_selection(mut self, row_selection: RowSelection) -> Self {
108+
self.row_selection = Some(row_selection);
109+
self
110+
}
111+
82112
/// Returns the currently computed schema
83113
///
84114
/// Unless [`with_schema`](Self::with_schema) was called, this is computed dynamically
@@ -124,6 +154,7 @@ impl<R: ChunkReader> ArrowReaderBuilder<R> {
124154
schema_ref,
125155
current_stripe: None,
126156
batch_size: self.batch_size,
157+
row_selection: self.row_selection,
127158
}
128159
}
129160
}
@@ -133,6 +164,7 @@ pub struct ArrowReader<R> {
133164
schema_ref: SchemaRef,
134165
current_stripe: Option<Box<dyn Iterator<Item = Result<RecordBatch>> + Send>>,
135166
batch_size: usize,
167+
row_selection: Option<RowSelection>,
136168
}
137169

138170
impl<R> ArrowReader<R> {
@@ -146,8 +178,22 @@ impl<R: ChunkReader> ArrowReader<R> {
146178
let stripe = self.cursor.next().transpose()?;
147179
match stripe {
148180
Some(stripe) => {
149-
let decoder =
150-
NaiveStripeDecoder::new(stripe, self.schema_ref.clone(), self.batch_size)?;
181+
// Split off the row selection for this stripe
182+
let stripe_rows = stripe.number_of_rows();
183+
let selection = self.row_selection.as_mut().and_then(|s| {
184+
if s.row_count() > 0 {
185+
Some(s.split_off(stripe_rows))
186+
} else {
187+
None
188+
}
189+
});
190+
191+
let decoder = NaiveStripeDecoder::new_with_selection(
192+
stripe,
193+
self.schema_ref.clone(),
194+
self.batch_size,
195+
selection,
196+
)?;
151197
self.current_stripe = Some(Box::new(decoder));
152198
self.next().transpose()
153199
}

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ pub mod projection;
6161
#[allow(dead_code)]
6262
mod proto;
6363
pub mod reader;
64+
pub mod row_selection;
6465
pub mod schema;
6566
pub mod statistics;
6667
pub mod stripe;
@@ -70,3 +71,4 @@ pub use arrow_reader::{ArrowReader, ArrowReaderBuilder};
7071
pub use arrow_writer::{ArrowWriter, ArrowWriterBuilder};
7172
#[cfg(feature = "async")]
7273
pub use async_arrow_reader::ArrowStreamReader;
74+
pub use row_selection::{RowSelection, RowSelector};

0 commit comments

Comments
 (0)