Skip to content

Commit 0f36679

Browse files
authored
feat(python): push down predicates to pyarrow datasets (#5780)
1 parent fe2a03a commit 0f36679

File tree

18 files changed

+327
-149
lines changed

18 files changed

+327
-149
lines changed

polars/polars-lazy/polars-plan/src/dsl/expr.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::fmt::{Debug, Formatter};
1+
use std::fmt::{Debug, Display, Formatter};
22
use std::hash::{Hash, Hasher};
33
use std::ops::Deref;
44

@@ -432,6 +432,31 @@ pub enum Operator {
432432
Xor,
433433
}
434434

435+
impl Display for Operator {
436+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
437+
use Operator::*;
438+
let tkn = match self {
439+
Eq => "==",
440+
NotEq => "!=",
441+
Lt => "<",
442+
LtEq => "<=",
443+
Gt => ">",
444+
GtEq => ">=",
445+
Plus => "+",
446+
Minus => "-",
447+
Multiply => "*",
448+
Divide => "//",
449+
TrueDivide => "/",
450+
FloorDivide => "floor_div",
451+
Modulus => "%",
452+
And => "&",
453+
Or => "|",
454+
Xor => "^",
455+
};
456+
write!(f, "{}", tkn)
457+
}
458+
}
459+
435460
impl Operator {
436461
pub(crate) fn is_comparison(&self) -> bool {
437462
matches!(

polars/polars-lazy/polars-plan/src/logical_plan/alp.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ pub enum ALogicalPlan {
3030
#[cfg(feature = "python")]
3131
PythonScan {
3232
options: PythonOptions,
33+
predicate: Option<Node>,
3334
},
3435
Melt {
3536
input: Node,
@@ -164,7 +165,7 @@ impl ALogicalPlan {
164165
use ALogicalPlan::*;
165166
match self {
166167
#[cfg(feature = "python")]
167-
PythonScan { options } => &options.schema,
168+
PythonScan { options, .. } => &options.schema,
168169
#[cfg(feature = "csv-file")]
169170
CsvScan { file_info, .. } => &file_info.schema,
170171
#[cfg(feature = "parquet")]
@@ -181,7 +182,7 @@ impl ALogicalPlan {
181182
use ALogicalPlan::*;
182183
let schema = match self {
183184
#[cfg(feature = "python")]
184-
PythonScan { options } => &options.schema,
185+
PythonScan { options, .. } => &options.schema,
185186
Union { inputs, .. } => return arena.get(inputs[0]).schema(arena),
186187
Cache { input, .. } => return arena.get(*input).schema(arena),
187188
Sort { input, .. } => return arena.get(*input).schema(arena),
@@ -249,8 +250,9 @@ impl ALogicalPlan {
249250

250251
match self {
251252
#[cfg(feature = "python")]
252-
PythonScan { options } => PythonScan {
253+
PythonScan { options, predicate } => PythonScan {
253254
options: options.clone(),
255+
predicate: *predicate,
254256
},
255257
Union { options, .. } => Union {
256258
inputs,

polars/polars-lazy/polars-plan/src/logical_plan/conversion.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,10 @@ pub fn to_alp(
175175
options,
176176
},
177177
#[cfg(feature = "python")]
178-
LogicalPlan::PythonScan { options } => ALogicalPlan::PythonScan { options },
178+
LogicalPlan::PythonScan { options } => ALogicalPlan::PythonScan {
179+
options,
180+
predicate: None,
181+
},
179182
LogicalPlan::Union { inputs, options } => {
180183
let inputs = inputs
181184
.into_iter()
@@ -668,7 +671,7 @@ impl ALogicalPlan {
668671
options,
669672
},
670673
#[cfg(feature = "python")]
671-
ALogicalPlan::PythonScan { options } => LogicalPlan::PythonScan { options },
674+
ALogicalPlan::PythonScan { options, .. } => LogicalPlan::PythonScan { options },
672675
ALogicalPlan::Union { inputs, options } => {
673676
let inputs = inputs
674677
.into_iter()

polars/polars-lazy/polars-plan/src/logical_plan/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ mod lit;
2424
pub(crate) mod optimizer;
2525
pub(crate) mod options;
2626
mod projection;
27+
#[cfg(feature = "python")]
28+
mod pyarrow;
2729
mod schema;
2830

2931
pub use aexpr::*;

polars/polars-lazy/polars-plan/src/logical_plan/optimizer/cse.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ fn lp_node_equal(a: &ALogicalPlan, b: &ALogicalPlan, expr_arena: &Arena<AExpr>)
230230
&& expr_nodes_equal(agg_l, agg_r, expr_arena)
231231
}
232232
#[cfg(feature = "python")]
233-
(PythonScan { options: l }, PythonScan { options: r, .. }) => l == r,
233+
(PythonScan { options: l, .. }, PythonScan { options: r, .. }) => l == r,
234234
_ => {
235235
// joins and unions are also false
236236
// they do not originate from a single trail

polars/polars-lazy/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -544,9 +544,28 @@ impl PredicatePushDown {
544544
self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena)
545545
}
546546
#[cfg(feature = "python")]
547-
// python node does not yet support predicates
548-
lp @ PythonScan {..} => {
549-
self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena)
547+
PythonScan {mut options, predicate} => {
548+
if options.pyarrow {
549+
let predicate = predicate_at_scan(acc_predicates, predicate, expr_arena);
550+
551+
if let Some(predicate) = predicate {
552+
match super::super::pyarrow::predicate_to_pa(predicate, expr_arena) {
553+
// we we able to create a pyarrow string, mutate the options
554+
Some(eval_str) => {
555+
options.predicate = Some(eval_str)
556+
},
557+
// we were not able to translate the predicate
558+
// apply here
559+
None => {
560+
let lp = PythonScan { options, predicate: None };
561+
return Ok(self.optional_apply_predicate(lp, vec![predicate], lp_arena, expr_arena))
562+
}
563+
}
564+
}
565+
Ok(PythonScan {options, predicate})
566+
} else {
567+
self.no_pushdown_restart_opt(PythonScan {options, predicate}, acc_predicates, lp_arena, expr_arena)
568+
}
550569
}
551570

552571
}

polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/mod.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,10 @@ impl ProjectionPushDown {
455455
Ok(lp)
456456
}
457457
#[cfg(feature = "python")]
458-
PythonScan { mut options } => {
458+
PythonScan {
459+
mut options,
460+
predicate,
461+
} => {
459462
options.with_columns = get_scan_columns(&mut acc_projections, expr_arena);
460463

461464
options.output_schema = if options.with_columns.is_none() {
@@ -468,7 +471,7 @@ impl ProjectionPushDown {
468471
true,
469472
)?))
470473
};
471-
Ok(PythonScan { options })
474+
Ok(PythonScan { options, predicate })
472475
}
473476
#[cfg(feature = "csv-file")]
474477
CsvScan {

polars/polars-lazy/polars-plan/src/logical_plan/options.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,10 @@ pub struct PythonOptions {
219219
pub schema: SchemaRef,
220220
pub output_schema: Option<SchemaRef>,
221221
pub with_columns: Option<Arc<Vec<String>>>,
222+
pub pyarrow: bool,
223+
// a pyarrow predicate python expression
224+
// can be evaluated with python.eval
225+
pub predicate: Option<String>,
222226
}
223227

224228
#[derive(Clone, PartialEq, Eq, Debug, Default)]
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
use crate::prelude::*;
2+
3+
// convert to a pyarrow expression that can be evaluated with pythons eval
4+
pub(super) fn predicate_to_pa(predicate: Node, expr_arena: &Arena<AExpr>) -> Option<String> {
5+
match expr_arena.get(predicate) {
6+
AExpr::BinaryExpr { left, right, op } => {
7+
if op.is_comparison() {
8+
let left = predicate_to_pa(*left, expr_arena)?;
9+
let right = predicate_to_pa(*right, expr_arena)?;
10+
Some(format!("({} {} {})", left, op, right))
11+
} else {
12+
None
13+
}
14+
}
15+
AExpr::Column(name) => Some(format!("pa.dataset.field('{}')", name.as_ref())),
16+
AExpr::Alias(input, _) => predicate_to_pa(*input, expr_arena),
17+
AExpr::Literal(lv) => {
18+
let av = lv.to_anyvalue()?;
19+
let dtype = av.dtype();
20+
if dtype.is_float() {
21+
let val = av.extract::<f64>()?;
22+
Some(format!("{}", val))
23+
} else if dtype.is_integer() {
24+
let val = av.extract::<i64>()?;
25+
Some(format!("{}", val))
26+
} else {
27+
None
28+
}
29+
}
30+
AExpr::Function {
31+
function: FunctionExpr::Not,
32+
input,
33+
..
34+
} => {
35+
let input = input.first().unwrap();
36+
let input = predicate_to_pa(*input, expr_arena)?;
37+
Some(format!("~({})", input))
38+
}
39+
AExpr::Function {
40+
function: FunctionExpr::IsNull,
41+
input,
42+
..
43+
} => {
44+
let input = input.first().unwrap();
45+
let input = predicate_to_pa(*input, expr_arena)?;
46+
Some(format!("({}).is_null()", input))
47+
}
48+
AExpr::Function {
49+
function: FunctionExpr::IsNotNull,
50+
input,
51+
..
52+
} => {
53+
let input = input.first().unwrap();
54+
let input = predicate_to_pa(*input, expr_arena)?;
55+
Some(format!("~({}).is_null()", input))
56+
}
57+
_ => None,
58+
}
59+
}

polars/polars-lazy/src/frame/python.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@ use polars_core::prelude::*;
33
use crate::prelude::*;
44

55
impl LazyFrame {
6-
pub fn scan_from_python_function(schema: Schema, scan_fn: Vec<u8>) -> Self {
6+
pub fn scan_from_python_function(schema: Schema, scan_fn: Vec<u8>, pyarrow: bool) -> Self {
77
LogicalPlan::PythonScan {
88
options: PythonOptions {
99
scan_fn,
1010
schema: Arc::new(schema),
11+
pyarrow,
1112
..Default::default()
1213
},
1314
}

0 commit comments

Comments
 (0)