Aisle is a high-performance Parquet file reader that implements page-level predicate pushdown for efficient data filtering. By pushing predicates down to individual pages using statistics, Aisle significantly reduces I/O operations and memory usage compared to traditional Parquet readers.
- Predicate Pushdown: Filters data at row group, page, and row levels to minimize I/O.
- High Performance: Efficient for selective queries on large datasets.
- Easy Integration: Drop-in replacement for Parquet ParquetRecordBatchStreamBuilder and minimal code changes required to migrate.
In developing Tonbo, we need an efficient Parquet reader that could take advantage of page indexes and filters. But Parquet doesn't utilize page index and only filter data at row level. Aisle address these needs by providing:
- Row group level filters by using row group statistics
- Page level filters by using page statistics
- Row level filters provided by Parquet ParquetRecordBatchStreamBuilder
Aisle provides the most benefit in the following scenarios:
- Selective Queries: The more selective your predicates, the greater the benefit from page-level filtering.
- Sorted or Partially Sorted Data: When data are generally ordered across row groups or pages, Aisle will become more effective
If your columns are totally random values(e.g. identifiers or uuids), we recommend to disable page index in ArrowReaderOptions
See benchmark for more details.
See more benchmark results here
- Rust
- parquet 55.1.0 or later
Add Aisle to your Cargo.toml
:
[dependencies]
aisle = { git = "https://github.com/tonbo-io/aisle.git", branch = "main" }
parquet = { version = "55.1.0" }
The basic usage is similar to Parquet ParquetRecordBatchStreamBuilder
use aisle::{
ArrowReaderOptions, ParquetRecordBatchStreamBuilder, ProjectionMask,
filter::RowFilter,
};
use fusio::{DynFs, disk::TokioFs, fs::OpenOptions, path::Path};
use fusio_parquet::reader::AsyncReader;
use futures_util::StreamExt;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create AsyncFileReader
let fs = TokioFs {};
let file = fs
.open_options(
&Path::new("./data/data.parquet").unwrap(),
OpenOptions::default().read(true),
)
.await?;
let size = file.size().await.unwrap();
let reader = AsyncReader::new(file, size).await.unwrap();
// Initialize builder with page index enabled
let builder = ParquetRecordBatchStreamBuilder::new_with_options(
reader,
ArrowReaderOptions::new().with_page_index(true),
)
.await?;
// Build and process the filtered stream
let mut stream = builder
.with_row_filter(RowFilter::new(filters))
.with_limit(10)
.with_projection(ProjectionMask::all())
.build()?;
while let Some(batch_result) = stream.next().await {
let batch = batch_result?;
// Process your batch
}
Ok(())
}
use aisle::{
ProjectionMask,
ord::{gt, lt},
filter::RowFilter,
predicate::{AislePredicate, AislePredicateFn},
};
use arrow::array::{Datum, UInt64Array};
use std::sync::Arc;
// Create range predicate (100 < x < 1000)
let schema = builder.parquet_schema();
let filters: Vec<Box<dyn AislePredicate>> = vec![
Box::new(AislePredicateFn::new(
ProjectionMask::roots(schema, vec![0]),
|batch| {
let key = Arc::new(UInt64Array::new_scalar(100)) as Arc<dyn Datum>;
gt(batch.column(0), key.as_ref())
},
)),
Box::new(AislePredicateFn::new(
ProjectionMask::roots(schema, vec![0]),
|batch| {
let key = Arc::new(UInt64Array::new_scalar(1000)) as Arc<dyn Datum>;
lt(batch.column(0), key.as_ref())
},
)),
];
// Build with row filter
let stream = builder.with_row_filter(RowFilter::new(filters)).build()?;
Migrating from Parquet ParquetRecordBatchStreamBuilder
to Aisle is straightforward, just follows these steps:
- replace Parquet
ParquetRecordBatchStreamBuilder
with AisleParquetRecordBatchStreamBuilder
- replace Parquet
RowFilter
with aisleRowFilter
- replace Parquet
ArrowPredicateFn
withAislePredicateFn
- replace Parquet
gt
,gt_eq
,lt
,lt_eq
with Aislegt
,gt_eq
,lt
,lt_eq
Only >
, >=
, <
, <=
are supported for now. If you want to use =
or other operations, please use them in combination. For example:
// x = 100 becomes x >= 100 AND x <= 100
let filters: Vec<Box<dyn AislePredicate>> = vec![
Box::new(AislePredicateFn::new(
ProjectionMask::roots(schema, vec![0]),
|batch| {
let key = Arc::new(UInt64Array::new_scalar(100)) as Arc<dyn Datum>;
gt_eq(batch.column(0), key.as_ref())
},
)),
Box::new(AislePredicateFn::new(
ProjectionMask::roots(schema, vec![0]),
|batch| {
let key = Arc::new(UInt64Array::new_scalar(100)) as Arc<dyn Datum>;
lt_eq(batch.column(0), key.as_ref())
},
)),
];
Build the project:
cargo build
Run tests:
cargo test
Run benchmark:
cargo bench --bench read_bench --features tokio