Skip to content

Commit c538836

Browse files
refactor(rust): Add updated multiscan pipeline (#21925)
1 parent cc1d07a commit c538836

File tree

14 files changed

+1818
-31
lines changed

14 files changed

+1818
-31
lines changed

crates/polars-io/src/predicates.rs

+4
Original file line numberDiff line numberDiff line change
@@ -471,6 +471,10 @@ pub struct ScanIOPredicate {
471471
}
472472
impl ScanIOPredicate {
473473
pub fn set_external_constant_columns(&mut self, constant_columns: Vec<(PlSmallStr, Scalar)>) {
474+
if constant_columns.is_empty() {
475+
return;
476+
}
477+
474478
let mut live_columns = self.live_columns.as_ref().clone();
475479
for (c, _) in constant_columns.iter() {
476480
live_columns.swap_remove(c);

crates/polars-plan/src/dsl/scan_sources.rs

+8
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,14 @@ impl ScanSource {
100100
pub fn run_async(&self) -> bool {
101101
self.as_scan_source_ref().run_async()
102102
}
103+
104+
pub fn is_cloud_url(&self) -> bool {
105+
if let ScanSource::Path(path) = self {
106+
polars_io::is_cloud_url(path.as_ref())
107+
} else {
108+
false
109+
}
110+
}
103111
}
104112

105113
/// An iterator for [`ScanSources`]

crates/polars-plan/src/plans/hive.rs

+6
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,12 @@ impl HivePartitionsDf {
102102
}
103103
}
104104

105+
impl From<DataFrame> for HivePartitionsDf {
106+
fn from(value: DataFrame) -> Self {
107+
Self(value)
108+
}
109+
}
110+
105111
/// Note: Returned hive partitions are ordered by their position in the `reader_schema`
106112
///
107113
/// # Safety

crates/polars-stream/src/nodes/io_sources/multi_file_reader/extra_ops/apply.rs

+20-2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use polars_io::predicates::ScanIOPredicate;
1212
use polars_plan::dsl::ScanSource;
1313
use polars_plan::plans::hive::HivePartitionsDf;
1414
use polars_utils::IdxSize;
15+
use polars_utils::slice_enum::Slice;
1516

1617
use super::ExtraOperations;
1718
use super::cast_columns::CastColumns;
@@ -37,6 +38,7 @@ pub enum ApplyExtraOps {
3738
Initialized {
3839
// Note: These fields are ordered according to when they (should be) applied.
3940
row_index: Option<RowIndex>,
41+
pre_slice: Option<Slice>,
4042
cast_columns: Option<CastColumns>,
4143
/// This will have include_file_paths, hive columns, missing columns.
4244
extra_columns: Vec<ScalarColumn>,
@@ -75,8 +77,10 @@ impl ApplyExtraOps {
7577
scan_source_idx,
7678
hive_parts,
7779
} => {
78-
// This should always be pushed to the reader, or otherwise handled separately.
79-
assert!(pre_slice.is_none());
80+
// Negative slice should have been resolved earlier.
81+
if let Some(Slice::Negative { .. }) = pre_slice {
82+
panic!("impl error: negative pre_slice at post")
83+
}
8084

8185
let cast_columns = CastColumns::try_init_from_policy(
8286
cast_columns_policy,
@@ -126,6 +130,7 @@ impl ApplyExtraOps {
126130

127131
let mut slf = Self::Initialized {
128132
row_index,
133+
pre_slice,
129134
cast_columns,
130135
extra_columns,
131136
predicate,
@@ -166,6 +171,7 @@ impl ApplyExtraOps {
166171
let slf = match slf {
167172
Initialized {
168173
row_index: None,
174+
pre_slice: None,
169175
cast_columns: None,
170176
extra_columns,
171177
predicate: None,
@@ -191,6 +197,7 @@ impl ApplyExtraOps {
191197
) -> PolarsResult<()> {
192198
let Self::Initialized {
193199
row_index,
200+
pre_slice,
194201
cast_columns,
195202
extra_columns,
196203
predicate,
@@ -218,6 +225,17 @@ impl ApplyExtraOps {
218225
};
219226
}
220227

228+
if let Some(pre_slice) = pre_slice.clone() {
229+
let Slice::Positive { offset, len } = pre_slice
230+
.offsetted(usize::try_from(current_row_position).unwrap())
231+
.restrict_to_bounds(df.height())
232+
else {
233+
unreachable!()
234+
};
235+
236+
*df = df.slice(i64::try_from(offset).unwrap(), len)
237+
}
238+
221239
if let Some(cast_columns) = cast_columns {
222240
cast_columns.apply_cast(df)?;
223241
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
pub mod predicate;
2+
pub mod slice;
3+
4+
use std::sync::{Arc, Mutex};
5+
6+
use polars_error::PolarsResult;
7+
8+
use super::MultiFileReaderConfig;
9+
use super::bridge::BridgeState;
10+
use crate::async_executor::{self, AbortOnDropHandle, TaskPriority};
11+
use crate::async_primitives::connector::{self};
12+
use crate::async_primitives::wait_group::WaitToken;
13+
use crate::morsel::Morsel;
14+
use crate::nodes::io_sources::multi_file_reader::bridge::spawn_bridge;
15+
16+
pub struct MultiScanTaskInitializer {
17+
pub(super) config: Arc<MultiFileReaderConfig>,
18+
}
19+
20+
impl MultiScanTaskInitializer {
21+
pub fn new(config: Arc<MultiFileReaderConfig>) -> Self {
22+
Self { config }
23+
}
24+
25+
#[expect(clippy::type_complexity)]
26+
pub fn spawn_background_tasks(
27+
self,
28+
) -> (
29+
AbortOnDropHandle<PolarsResult<()>>,
30+
connector::Sender<(connector::Sender<Morsel>, WaitToken)>,
31+
Arc<Mutex<BridgeState>>,
32+
) {
33+
assert!(self.config.num_pipelines() > 0);
34+
let verbose = self.config.verbose();
35+
36+
if verbose {
37+
eprintln!(
38+
"[MultiScanTaskInitializer]: spawn_background_tasks(), {} sources, reader name: {}, {:?}",
39+
self.config.sources.len(),
40+
self.config.file_reader_builder.reader_name(),
41+
self.config.file_reader_builder.reader_capabilities(),
42+
)
43+
}
44+
45+
let bridge_state = Arc::new(Mutex::new(BridgeState::NotYetStarted));
46+
47+
let (bridge_handle, bridge_recv_port_tx, send_phase_chan_to_bridge) =
48+
spawn_bridge(bridge_state.clone());
49+
50+
let verbose = self.config.verbose();
51+
52+
let background_tasks_handle = AbortOnDropHandle::new(async_executor::spawn(
53+
TaskPriority::Low,
54+
async move {
55+
let (skip_files_mask, predicate) = self.initialize_predicate()?;
56+
57+
if verbose {
58+
eprintln!(
59+
"[MultiScanTaskInitializer]: \
60+
predicate: {:?}, \
61+
skip files mask: {:?}, \
62+
predicate to reader: {:?}",
63+
self.config.predicate.is_some().then_some("<predicate>"),
64+
skip_files_mask.is_some().then_some("<skip_files>"),
65+
predicate.is_some().then_some("<predicate>"),
66+
)
67+
}
68+
69+
#[expect(clippy::never_loop)]
70+
loop {
71+
if skip_files_mask
72+
.as_ref()
73+
.is_some_and(|x| x.unset_bits() == 0)
74+
{
75+
if verbose {
76+
eprintln!(
77+
"[MultiScanTaskInitializer]: early return (skip_files_mask / predicate)"
78+
)
79+
}
80+
} else if self.config.pre_slice.as_ref().is_some_and(|x| x.len() == 0) {
81+
if cfg!(debug_assertions) {
82+
panic!("should quit earlier");
83+
}
84+
85+
if verbose {
86+
eprintln!(
87+
"[MultiScanTaskInitializer]: early return (pre_slice.len == 0)"
88+
)
89+
}
90+
} else {
91+
break;
92+
}
93+
94+
return Ok(());
95+
}
96+
97+
let predicate = predicate.cloned();
98+
99+
self.init_and_run(bridge_recv_port_tx, skip_files_mask, predicate)
100+
.await?
101+
.await?;
102+
103+
bridge_handle.await;
104+
105+
Ok(())
106+
},
107+
));
108+
109+
(
110+
background_tasks_handle,
111+
send_phase_chan_to_bridge,
112+
bridge_state,
113+
)
114+
}
115+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
use arrow::bitmap::Bitmap;
2+
use polars_error::PolarsResult;
3+
use polars_io::predicates::ScanIOPredicate;
4+
5+
use super::MultiScanTaskInitializer;
6+
7+
impl MultiScanTaskInitializer {
8+
/// # Returns
9+
/// `(skip_files_mask, scan_predicate)`
10+
///
11+
/// TODO: Move logic here, rename to `evaluate_on_constant_columns`.
12+
pub fn initialize_predicate(&self) -> PolarsResult<(Option<Bitmap>, Option<&ScanIOPredicate>)> {
13+
if let Some(predicate) = &self.config.predicate {
14+
if let Some(hive_parts) = self.config.hive_parts.as_ref() {
15+
let (skip_files_mask, need_pred_for_inner_readers) =
16+
crate::nodes::io_sources::multi_scan::scan_predicate_to_mask(
17+
predicate,
18+
self.config.projected_file_schema.as_ref(),
19+
hive_parts.schema(),
20+
hive_parts,
21+
)?;
22+
23+
return Ok((
24+
skip_files_mask,
25+
need_pred_for_inner_readers.then_some(predicate),
26+
));
27+
}
28+
}
29+
30+
Ok((None, self.config.predicate.as_ref()))
31+
}
32+
}

0 commit comments

Comments
 (0)