Skip to content

Commit ca8c1ef

Browse files
fix: Allow nested is_in() in when()/then() for full-streaming (#20052)
1 parent 88d102a commit ca8c1ef

File tree

27 files changed

+466
-457
lines changed

27 files changed

+466
-457
lines changed

Diff for: crates/polars-lazy/src/physical_plan/streaming/convert_alp.rs

+7-11
Original file line numberDiff line numberDiff line change
@@ -163,18 +163,16 @@ pub(crate) fn insert_streaming_nodes(
163163
execution_id += 1;
164164
match lp_arena.get(root) {
165165
Filter { input, predicate }
166-
if is_streamable(
167-
predicate.node(),
168-
expr_arena,
169-
IsStreamableContext::new(Default::default()),
170-
) =>
166+
if is_elementwise_rec(expr_arena.get(predicate.node()), expr_arena) =>
171167
{
172168
state.streamable = true;
173169
state.operators_sinks.push(PipelineNode::Operator(root));
174170
stack.push(StackFrame::new(*input, state, current_idx))
175171
},
176172
HStack { input, exprs, .. }
177-
if all_streamable(exprs, expr_arena, Default::default()) =>
173+
if exprs
174+
.iter()
175+
.all(|e| is_elementwise_rec(expr_arena.get(e.node()), expr_arena)) =>
178176
{
179177
state.streamable = true;
180178
state.operators_sinks.push(PipelineNode::Operator(root));
@@ -201,11 +199,9 @@ pub(crate) fn insert_streaming_nodes(
201199
stack.push(StackFrame::new(*input, state, current_idx))
202200
},
203201
Select { input, expr, .. }
204-
if all_streamable(
205-
expr,
206-
expr_arena,
207-
IsStreamableContext::new(Default::default()),
208-
) =>
202+
if expr
203+
.iter()
204+
.all(|e| is_elementwise_rec(expr_arena.get(e.node()), expr_arena)) =>
209205
{
210206
state.streamable = true;
211207
state.operators_sinks.push(PipelineNode::Operator(root));

Diff for: crates/polars-mem-engine/src/executors/projection.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ pub struct ProjectionExec {
1313
pub(crate) schema: SchemaRef,
1414
pub(crate) options: ProjectionOptions,
1515
// Can run all operations elementwise
16-
pub(crate) streamable: bool,
16+
pub(crate) allow_vertical_parallelism: bool,
1717
}
1818

1919
impl ProjectionExec {
@@ -23,7 +23,7 @@ impl ProjectionExec {
2323
mut df: DataFrame,
2424
) -> PolarsResult<DataFrame> {
2525
// Vertical and horizontal parallelism.
26-
let df = if self.streamable
26+
let df = if self.allow_vertical_parallelism
2727
&& df.first_col_n_chunks() > 1
2828
&& df.height() > POOL.current_num_threads() * 2
2929
&& self.options.run_parallel

Diff for: crates/polars-mem-engine/src/executors/stack.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ pub struct StackExec {
1111
pub(crate) output_schema: SchemaRef,
1212
pub(crate) options: ProjectionOptions,
1313
// Can run all operations elementwise
14-
pub(crate) streamable: bool,
14+
pub(crate) allow_vertical_parallelism: bool,
1515
}
1616

1717
impl StackExec {
@@ -23,7 +23,7 @@ impl StackExec {
2323
let schema = &*self.output_schema;
2424

2525
// Vertical and horizontal parallelism.
26-
let df = if self.streamable
26+
let df = if self.allow_vertical_parallelism
2727
&& df.first_col_n_chunks() > 1
2828
&& df.height() > 0
2929
&& self.options.run_parallel

Diff for: crates/polars-mem-engine/src/planner/lp.rs

+9-14
Original file line numberDiff line numberDiff line change
@@ -239,11 +239,8 @@ fn create_physical_plan_impl(
239239
Ok(Box::new(executors::SliceExec { input, offset, len }))
240240
},
241241
Filter { input, predicate } => {
242-
let mut streamable = is_streamable(
243-
predicate.node(),
244-
expr_arena,
245-
IsStreamableContext::new(Context::Default).with_allow_cast_categorical(false),
246-
);
242+
let mut streamable =
243+
is_elementwise_rec_no_cat_cast(expr_arena.get(predicate.node()), expr_arena);
247244
let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
248245
if streamable {
249246
// This can cause problems with string caches
@@ -386,7 +383,7 @@ fn create_physical_plan_impl(
386383
&mut state,
387384
)?;
388385

389-
let streamable = options.should_broadcast && all_streamable(&expr, expr_arena, IsStreamableContext::new(Context::Default).with_allow_cast_categorical(false))
386+
let allow_vertical_parallelism = options.should_broadcast && expr.iter().all(|e| is_elementwise_rec_no_cat_cast(expr_arena.get(e.node()), expr_arena))
390387
// If all columns are literal we would get a 1 row per thread.
391388
&& !phys_expr.iter().all(|p| {
392389
p.is_literal()
@@ -400,7 +397,7 @@ fn create_physical_plan_impl(
400397
#[cfg(test)]
401398
schema: _schema,
402399
options,
403-
streamable,
400+
allow_vertical_parallelism,
404401
}))
405402
},
406403
Reduce {
@@ -635,12 +632,10 @@ fn create_physical_plan_impl(
635632
let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
636633
let input = create_physical_plan_impl(input, lp_arena, expr_arena, state)?;
637634

638-
let streamable = options.should_broadcast
639-
&& all_streamable(
640-
&exprs,
641-
expr_arena,
642-
IsStreamableContext::new(Context::Default).with_allow_cast_categorical(false),
643-
);
635+
let allow_vertical_parallelism = options.should_broadcast
636+
&& exprs
637+
.iter()
638+
.all(|e| is_elementwise_rec_no_cat_cast(expr_arena.get(e.node()), expr_arena));
644639

645640
let mut state = ExpressionConversionState::new(
646641
POOL.current_num_threads() > exprs.len(),
@@ -661,7 +656,7 @@ fn create_physical_plan_impl(
661656
input_schema,
662657
output_schema,
663658
options,
664-
streamable,
659+
allow_vertical_parallelism,
665660
}))
666661
},
667662
MapFunction {

Diff for: crates/polars-plan/src/plans/aexpr/mod.rs

+32-25
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ pub use scalar::is_scalar_ae;
1818
use serde::{Deserialize, Serialize};
1919
use strum_macros::IntoStaticStr;
2020
pub use traverse::*;
21+
pub(crate) use utils::permits_filter_pushdown;
2122
pub use utils::*;
2223

2324
use crate::constants::LEN;
@@ -218,35 +219,41 @@ impl AExpr {
218219
pub(crate) fn col(name: PlSmallStr) -> Self {
219220
AExpr::Column(name)
220221
}
221-
/// Any expression that is sensitive to the number of elements in a group
222-
/// - Aggregations
223-
/// - Sorts
224-
/// - Counts
225-
/// - ..
226-
pub(crate) fn groups_sensitive(&self) -> bool {
222+
223+
/// Checks whether this expression is elementwise. This only checks the top level expression.
224+
pub(crate) fn is_elementwise_top_level(&self) -> bool {
227225
use AExpr::*;
226+
228227
match self {
229-
Function { options, .. } | AnonymousFunction { options, .. } => {
230-
options.is_groups_sensitive()
231-
}
232-
Sort { .. }
233-
| SortBy { .. }
234-
| Agg { .. }
235-
| Window { .. }
228+
AnonymousFunction { options, .. } => options.is_elementwise(),
229+
230+
// Non-strict strptime must be done in-memory to ensure the format
231+
// is consistent across the entire dataframe.
232+
#[cfg(feature = "strings")]
233+
Function {
234+
options,
235+
function: FunctionExpr::StringExpr(StringFunction::Strptime(_, opts)),
236+
..
237+
} => {
238+
assert!(options.is_elementwise());
239+
opts.strict
240+
},
241+
242+
Function { options, .. } => options.is_elementwise(),
243+
244+
Literal(v) => v.projects_as_scalar(),
245+
246+
Alias(_, _) | BinaryExpr { .. } | Column(_) | Ternary { .. } | Cast { .. } => true,
247+
248+
Agg { .. }
249+
| Explode(_)
250+
| Filter { .. }
251+
| Gather { .. }
236252
| Len
237253
| Slice { .. }
238-
| Gather { .. }
239-
=> true,
240-
Alias(_, _)
241-
| Explode(_)
242-
| Column(_)
243-
| Literal(_)
244-
// a caller should traverse binary and ternary
245-
// to determine if the whole expr. is group sensitive
246-
| BinaryExpr { .. }
247-
| Ternary { .. }
248-
| Cast { .. }
249-
| Filter { .. } => false,
254+
| Sort { .. }
255+
| SortBy { .. }
256+
| Window { .. } => false,
250257
}
251258
}
252259

Diff for: crates/polars-plan/src/plans/aexpr/traverse.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use super::*;
22

33
impl AExpr {
44
/// Push nodes at this level to a pre-allocated stack.
5-
pub(crate) fn nodes<C: PushNode>(&self, container: &mut C) {
5+
pub(crate) fn nodes(&self, container: &mut impl PushNode) {
66
use AExpr::*;
77

88
match self {

0 commit comments

Comments
 (0)