Skip to content

Commit df594b2

Browse files
c
1 parent 5dd9b23 commit df594b2

File tree

21 files changed

+330
-42
lines changed

21 files changed

+330
-42
lines changed

crates/polars-expr/src/reduce/convert.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ pub fn into_reduction(
2525
node: Node,
2626
expr_arena: &mut Arena<AExpr>,
2727
schema: &Schema,
28+
is_aggregation_context: bool,
2829
) -> PolarsResult<(Box<dyn GroupedReduction>, Vec<Node>)> {
2930
let get_dt = |node| {
3031
expr_arena
@@ -94,6 +95,12 @@ pub fn into_reduction(
9495
// project to the height of the DataFrame (in the PhysicalExpr impl).
9596
// * This approach is not sound for `update_groups()`, but currently that case is
9697
// not hit (it would need group-by -> len on empty morsels).
98+
polars_ensure!(
99+
!is_aggregation_context,
100+
ComputeError:
101+
"not implemented: len() of groups with no columns"
102+
);
103+
97104
let out: Box<dyn GroupedReduction> = new_sum_reduction(DataType::IDX_DTYPE)?;
98105
let expr = expr_arena.add(AExpr::Len);
99106

crates/polars-stream/src/nodes/in_memory_source.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,17 @@ impl InMemorySourceNode {
2222
seq_offset,
2323
}
2424
}
25+
26+
pub fn new_no_morsel_split(source: Arc<DataFrame>, seq_offset: MorselSeq) -> Self {
27+
let morsel_size = source.height();
28+
29+
InMemorySourceNode {
30+
source: Some(source),
31+
morsel_size,
32+
seq: AtomicU64::new(0),
33+
seq_offset,
34+
}
35+
}
2536
}
2637

2738
impl ComputeNode for InMemorySourceNode {

crates/polars-stream/src/nodes/io_sources/batch.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ impl FileReader for BatchFnReader {
180180
predicate: None,
181181
cast_columns_policy: _,
182182
num_pipelines: _,
183+
disable_morsel_split: _,
183184
callbacks:
184185
FileReaderCallbacks {
185186
mut file_schema_tx,

crates/polars-stream/src/nodes/io_sources/csv.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ impl FileReader for CsvFileReader {
137137
predicate: None,
138138
cast_columns_policy: _,
139139
num_pipelines,
140+
disable_morsel_split: _,
140141
callbacks:
141142
FileReaderCallbacks {
142143
file_schema_tx,

crates/polars-stream/src/nodes/io_sources/ipc/mod.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ impl FileReader for IpcFileReader {
162162
predicate: None,
163163
cast_columns_policy: _,
164164
num_pipelines,
165+
disable_morsel_split,
165166
callbacks:
166167
FileReaderCallbacks {
167168
file_schema_tx,
@@ -397,6 +398,20 @@ impl FileReader for IpcFileReader {
397398
if df.height() == 0 {
398399
continue;
399400
}
401+
402+
if disable_morsel_split {
403+
if morsel_send
404+
.send_morsel(Morsel::new(df, morsel_seq, source_token.clone()))
405+
.await
406+
.is_err()
407+
{
408+
return Ok(());
409+
}
410+
drop(permit);
411+
morsel_seq = morsel_seq.successor();
412+
continue;
413+
}
414+
400415
next = Some((df, permit));
401416
break;
402417
}

crates/polars-stream/src/nodes/io_sources/multi_scan/components/projection/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ pub enum Projection {
3535
}
3636

3737
impl Projection {
38+
pub fn is_empty(&self) -> bool {
39+
self.projected_schema().is_empty()
40+
}
41+
3842
/// Returns the full projected schema, keyed by the output name.
3943
pub fn projected_schema(&self) -> &SchemaRef {
4044
match self {

crates/polars-stream/src/nodes/io_sources/multi_scan/components/row_deletions.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ impl DeletionFilesProvider {
157157
predicate: None,
158158
cast_columns_policy: CastColumnsPolicy::ERROR_ON_MISMATCH,
159159
num_pipelines,
160+
disable_morsel_split: false,
160161
callbacks: FileReaderCallbacks {
161162
file_schema_tx: None,
162163
n_rows_in_file_tx: None,

crates/polars-stream/src/nodes/io_sources/multi_scan/config.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ pub struct MultiScanConfig {
4949
/// step.
5050
pub n_readers_pre_init: RelaxedCell<usize>,
5151
pub max_concurrent_scans: RelaxedCell<usize>,
52+
pub disable_morsel_split: bool,
5253

5354
pub verbose: bool,
5455
}

crates/polars-stream/src/nodes/io_sources/multi_scan/pipeline/initialization.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,14 @@ pub fn initialize_multi_scan_pipeline(
4242
reader name: {}, \
4343
{:?}, \
4444
n_readers_pre_init: {}, \
45-
max_concurrent_scans: {}",
45+
max_concurrent_scans: {}, \
46+
disable_morsel_split: {}",
4647
config.sources.len(),
4748
config.file_reader_builder.reader_name(),
4849
config.reader_capabilities(),
4950
config.n_readers_pre_init(),
5051
config.max_concurrent_scans(),
52+
config.disable_morsel_split,
5153
);
5254
}
5355

@@ -363,6 +365,7 @@ async fn finish_initialize_multi_scan_pipeline(
363365
let final_output_schema = config.final_output_schema.clone();
364366
let file_projection_builder = config.file_projection_builder.clone();
365367
let max_concurrent_scans = config.max_concurrent_scans();
368+
let disable_morsel_split = config.disable_morsel_split;
366369

367370
let (started_reader_tx, started_reader_rx) =
368371
tokio::sync::mpsc::channel(max_concurrent_scans.max(2) - 1);
@@ -387,6 +390,7 @@ async fn finish_initialize_multi_scan_pipeline(
387390
missing_columns_policy,
388391
forbid_extra_columns: config.forbid_extra_columns.clone(),
389392
num_pipelines,
393+
disable_morsel_split,
390394
verbose,
391395
},
392396
verbose,

crates/polars-stream/src/nodes/io_sources/multi_scan/pipeline/models.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ pub(super) struct StartReaderArgsConstant {
8383
pub(super) missing_columns_policy: MissingColumnsPolicy,
8484
pub(super) forbid_extra_columns: Option<ForbidExtraColumns>,
8585
pub(super) num_pipelines: usize,
86+
pub(super) disable_morsel_split: bool,
8687
pub(super) verbose: bool,
8788
}
8889

0 commit comments

Comments
 (0)