Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement base scan_avro #21700

Closed
wants to merge 3 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat(scan_avro): add rust scan avro
Summary:

Test Plan:
erikbrinkman committed Mar 12, 2025
commit e6de8966ef27f6146881714743e3de17b0bbe1df
8 changes: 8 additions & 0 deletions crates/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
@@ -76,6 +76,12 @@ json = [
"polars-mem-engine/json",
"polars-stream?/json",
]
avro = [
"polars-io/avro",
"polars-plan/avro",
"polars-mem-engine/avro",
"polars-stream?/avro",
]
csv = ["polars-io/csv", "polars-plan/csv", "polars-pipe?/csv", "polars-mem-engine/csv", "polars-stream?/csv"]
temporal = [
"dtype-datetime",
@@ -304,6 +310,7 @@ test = [
"diff",
"abs",
"parquet",
"avro",
"ipc",
"dtype-date",
]
@@ -340,6 +347,7 @@ features = [
"arg_where",
"asof_join",
"async",
"avro",
"bigidx",
"binary_encoding",
"cloud",
2 changes: 2 additions & 0 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
@@ -19,6 +19,8 @@ use std::path::Path;
use std::sync::{Arc, Mutex};

pub use anonymous_scan::*;
#[cfg(feature = "avro")]
pub use avro::*;
#[cfg(feature = "csv")]
pub use csv::*;
#[cfg(not(target_arch = "wasm32"))]
149 changes: 149 additions & 0 deletions crates/polars-lazy/src/scan/avro.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
use std::path::Path;

use polars_core::prelude::*;
use polars_io::cloud::CloudOptions;
use polars_io::{HiveOptions, RowIndex};
use polars_plan::dsl::{DslPlan, FileScan, ScanSources};
use polars_plan::prelude::FileScanOptions;

use crate::prelude::LazyFrame;
use crate::scan::file_list_reader::LazyFileListReader;

#[derive(Clone)]
pub struct LazyAvroReader {
pub(crate) sources: ScanSources,
pub(crate) rechunk: bool,
pub(crate) row_index: Option<RowIndex>,
pub(crate) n_rows: Option<usize>,
pub(crate) include_file_paths: Option<PlSmallStr>,
pub(crate) cloud_options: Option<CloudOptions>,
}

impl LazyAvroReader {
#[must_use]
pub fn new_with_sources(sources: ScanSources) -> Self {
Self {
sources,
rechunk: false,
row_index: None,
n_rows: None,
include_file_paths: None,
cloud_options: None,
}
}

#[must_use]
pub fn new(path: impl AsRef<Path>) -> Self {
Self::new_with_sources(ScanSources::Paths([path.as_ref().to_path_buf()].into()))
}

/// Add a row index column.
#[must_use]
pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
self.row_index = row_index;
self
}

/// Try to stop parsing when `n` rows are parsed. During multithreaded parsing the upper bound `n` cannot
/// be guaranteed.
#[must_use]
pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
self.n_rows = num_rows;
self
}

#[must_use]
pub fn with_cloud_options(mut self, cloud_options: Option<CloudOptions>) -> Self {
self.cloud_options = cloud_options;
self
}

#[must_use]
pub fn with_include_file_paths(mut self, include_file_paths: Option<PlSmallStr>) -> Self {
self.include_file_paths = include_file_paths;
self
}
}

impl LazyFileListReader for LazyAvroReader {
fn finish(self) -> PolarsResult<LazyFrame> {
let file_options = Box::new(FileScanOptions {
pre_slice: self.n_rows.map(|x| (0, x)),
with_columns: None,
cache: false,
row_index: self.row_index,
rechunk: self.rechunk,
file_counter: 0,
hive_options: HiveOptions {
enabled: Some(false),
hive_start_idx: 0,
schema: None,
try_parse_dates: true,
},
glob: true,
include_file_paths: self.include_file_paths,
allow_missing_columns: false,
});

let scan_type = Box::new(FileScan::Avro {
cloud_options: self.cloud_options,
});

Ok(LazyFrame::from(DslPlan::Scan {
sources: self.sources,
file_info: None,
file_options,
scan_type,
cached_ir: Default::default(),
}))
}

fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
unreachable!();
}

fn sources(&self) -> &ScanSources {
&self.sources
}

fn with_sources(mut self, sources: ScanSources) -> Self {
self.sources = sources;
self
}

fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
self.n_rows = n_rows.into();
self
}

fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
self.row_index = row_index.into();
self
}

fn rechunk(&self) -> bool {
self.rechunk
}

/// Rechunk the memory to contiguous chunks when parsing is done.
fn with_rechunk(mut self, toggle: bool) -> Self {
self.rechunk = toggle;
self
}

/// Try to stop parsing when `n` rows are parsed. During multithreaded parsing the upper bound `n` cannot
/// be guaranteed.
fn n_rows(&self) -> Option<usize> {
self.n_rows
}

/// Add a row index column.
fn row_index(&self) -> Option<&RowIndex> {
self.row_index.as_ref()
}

/// [CloudOptions] used to list files.
fn cloud_options(&self) -> Option<&CloudOptions> {
self.cloud_options.as_ref()
}
}
2 changes: 2 additions & 0 deletions crates/polars-lazy/src/scan/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
pub(super) mod anonymous_scan;
#[cfg(feature = "avro")]
pub(super) mod avro;
#[cfg(feature = "csv")]
pub(super) mod csv;
pub(super) mod file_list_reader;
1 change: 1 addition & 0 deletions crates/polars-mem-engine/Cargo.toml
Original file line number Diff line number Diff line change
@@ -35,6 +35,7 @@ python = ["pyo3", "polars-plan/python", "polars-core/python", "polars-io/python"
ipc = ["polars-io/ipc", "polars-plan/ipc"]
json = ["polars-io/json", "polars-plan/json", "polars-json"]
csv = ["polars-io/csv", "polars-plan/csv"]
avro = ["polars-io/avro", "polars-plan/avro"]
cloud = ["async", "polars-plan/cloud", "tokio", "futures"]
parquet = ["polars-io/parquet", "polars-plan/parquet"]
dtype-categorical = ["polars-plan/dtype-categorical"]
9 changes: 9 additions & 0 deletions crates/polars-mem-engine/src/executors/multi_file_scan.rs
Original file line number Diff line number Diff line change
@@ -9,6 +9,8 @@ use polars_io::predicates::SkipBatchPredicate;

use super::Executor;
use crate::ScanPredicate;
#[cfg(feature = "avro")]
use crate::executors::AvroExec;
#[cfg(feature = "csv")]
use crate::executors::CsvExec;
#[cfg(feature = "ipc")]
@@ -197,6 +199,13 @@ fn source_to_exec(
None,
))
},
#[cfg(feature = "avro")]
FileScan::Avro { .. } => Box::new(AvroExec::new(
source,
file_info,
Box::new(file_options.clone()),
None,
)),
FileScan::Anonymous { .. } => unreachable!(),
})
}
Loading