Skip to content

Commit 4c14e70

Browse files
authored
perf: Vectorized nested loop join for in-memory engine (#20495)
1 parent dff1ad7 commit 4c14e70

File tree

47 files changed

+557
-193
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+557
-193
lines changed

Diff for: crates/polars-core/src/frame/horizontal.rs

+2
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ impl DataFrame {
4141
}
4242
}
4343

44+
self.clear_schema();
4445
self.columns.extend_from_slice(columns);
4546
self
4647
}
@@ -99,6 +100,7 @@ pub fn concat_df_horizontal(dfs: &[DataFrame], check_duplicates: bool) -> Polars
99100
unsafe { df.get_columns_mut() }.iter_mut().for_each(|c| {
100101
*c = c.extend_constant(AnyValue::Null, diff).unwrap();
101102
});
103+
df.clear_schema();
102104
unsafe {
103105
df.set_height(output_height);
104106
}

Diff for: crates/polars-core/src/frame/mod.rs

+39-11
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,10 @@ pub struct DataFrame {
176176
}
177177

178178
impl DataFrame {
179+
pub fn clear_schema(&mut self) {
180+
self.cached_schema = OnceLock::new();
181+
}
182+
179183
#[inline]
180184
pub fn materialized_column_iter(&self) -> impl ExactSizeIterator<Item = &Series> {
181185
self.columns.iter().map(Column::as_materialized_series)
@@ -416,6 +420,8 @@ impl DataFrame {
416420
/// # Ok::<(), PolarsError>(())
417421
/// ```
418422
pub fn pop(&mut self) -> Option<Column> {
423+
self.clear_schema();
424+
419425
self.columns.pop()
420426
}
421427

@@ -477,6 +483,7 @@ impl DataFrame {
477483
);
478484
ca.set_sorted_flag(IsSorted::Ascending);
479485

486+
self.clear_schema();
480487
self.columns.insert(0, ca.into_series().into());
481488
self
482489
}
@@ -687,14 +694,22 @@ impl DataFrame {
687694
/// let f2: Field = Field::new("Diameter (m)".into(), DataType::Float64);
688695
/// let sc: Schema = Schema::from_iter(vec![f1, f2]);
689696
///
690-
/// assert_eq!(df.schema(), sc);
697+
/// assert_eq!(&**df.schema(), &sc);
691698
/// # Ok::<(), PolarsError>(())
692699
/// ```
693-
pub fn schema(&self) -> Schema {
694-
self.columns
695-
.iter()
696-
.map(|x| (x.name().clone(), x.dtype().clone()))
697-
.collect()
700+
pub fn schema(&self) -> &SchemaRef {
701+
let out = self.cached_schema.get_or_init(|| {
702+
Arc::new(
703+
self.columns
704+
.iter()
705+
.map(|x| (x.name().clone(), x.dtype().clone()))
706+
.collect(),
707+
)
708+
});
709+
710+
debug_assert_eq!(out.len(), self.width());
711+
712+
out
698713
}
699714

700715
/// Get a reference to the [`DataFrame`] columns.
@@ -723,14 +738,17 @@ impl DataFrame {
723738
///
724739
/// The caller must ensure the length of all [`Series`] remains equal to `height` or
725740
/// [`DataFrame::set_height`] is called afterwards with the appropriate `height`.
741+
/// The caller must ensure that the cached schema is cleared if it modifies the schema by
742+
/// calling [`DataFrame::clear_schema`].
726743
pub unsafe fn get_columns_mut(&mut self) -> &mut Vec<Column> {
727744
&mut self.columns
728745
}
729746

730747
#[inline]
731748
/// Remove all the columns in the [`DataFrame`] but keep the `height`.
732749
pub fn clear_columns(&mut self) {
733-
unsafe { self.get_columns_mut() }.clear()
750+
unsafe { self.get_columns_mut() }.clear();
751+
self.clear_schema();
734752
}
735753

736754
#[inline]
@@ -744,7 +762,8 @@ impl DataFrame {
744762
/// `DataFrame`]s with no columns (ZCDFs), it is important that the height is set afterwards
745763
/// with [`DataFrame::set_height`].
746764
pub unsafe fn column_extend_unchecked(&mut self, iter: impl IntoIterator<Item = Column>) {
747-
unsafe { self.get_columns_mut() }.extend(iter)
765+
unsafe { self.get_columns_mut() }.extend(iter);
766+
self.clear_schema();
748767
}
749768

750769
/// Take ownership of the underlying columns vec.
@@ -834,6 +853,7 @@ impl DataFrame {
834853
s
835854
})
836855
.collect();
856+
self.clear_schema();
837857
Ok(())
838858
}
839859

@@ -1194,6 +1214,7 @@ impl DataFrame {
11941214
Ok(())
11951215
})?;
11961216
self.height += other.height;
1217+
self.clear_schema();
11971218
Ok(())
11981219
}
11991220

@@ -1215,6 +1236,7 @@ impl DataFrame {
12151236
/// ```
12161237
pub fn drop_in_place(&mut self, name: &str) -> PolarsResult<Column> {
12171238
let idx = self.check_name_to_idx(name)?;
1239+
self.clear_schema();
12181240
Ok(self.columns.remove(idx))
12191241
}
12201242

@@ -1347,6 +1369,7 @@ impl DataFrame {
13471369
}
13481370

13491371
self.columns.insert(index, column);
1372+
self.clear_schema();
13501373
Ok(self)
13511374
}
13521375

@@ -1370,6 +1393,7 @@ impl DataFrame {
13701393
}
13711394

13721395
self.columns.push(column);
1396+
self.clear_schema();
13731397
}
13741398
Ok(())
13751399
}
@@ -1417,6 +1441,7 @@ impl DataFrame {
14171441
unsafe { self.set_height(column.len()) };
14181442
}
14191443
unsafe { self.get_columns_mut() }.push(column);
1444+
self.clear_schema();
14201445

14211446
self
14221447
}
@@ -1433,6 +1458,7 @@ impl DataFrame {
14331458
}
14341459

14351460
self.columns.push(c);
1461+
self.clear_schema();
14361462
}
14371463
// Schema is incorrect fallback to search
14381464
else {
@@ -1448,6 +1474,7 @@ impl DataFrame {
14481474
}
14491475

14501476
self.columns.push(c);
1477+
self.clear_schema();
14511478
}
14521479

14531480
Ok(())
@@ -1637,7 +1664,7 @@ impl DataFrame {
16371664
/// # Ok::<(), PolarsError>(())
16381665
/// ```
16391666
pub fn get_column_index(&self, name: &str) -> Option<usize> {
1640-
let schema = self.cached_schema.get_or_init(|| Arc::new(self.schema()));
1667+
let schema = self.schema();
16411668
if let Some(idx) = schema.index_of(name) {
16421669
if self
16431670
.get_columns()
@@ -1775,7 +1802,7 @@ impl DataFrame {
17751802
cols: &[PlSmallStr],
17761803
schema: &Schema,
17771804
) -> PolarsResult<Vec<Column>> {
1778-
debug_ensure_matching_schema_names(schema, &self.schema())?;
1805+
debug_ensure_matching_schema_names(schema, self.schema())?;
17791806

17801807
cols.iter()
17811808
.map(|name| {
@@ -1984,7 +2011,7 @@ impl DataFrame {
19842011
return Ok(self);
19852012
}
19862013
polars_ensure!(
1987-
self.columns.iter().all(|c| c.name() != &name),
2014+
!self.schema().contains(&name),
19882015
Duplicate: "column rename attempted with already existing name \"{name}\""
19892016
);
19902017

@@ -2326,6 +2353,7 @@ impl DataFrame {
23262353
);
23272354
let old_col = &mut self.columns[index];
23282355
mem::swap(old_col, &mut new_column);
2356+
self.clear_schema();
23292357
Ok(self)
23302358
}
23312359

Diff for: crates/polars-expr/src/expressions/window.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,7 @@ impl PhysicalExpr for WindowExpr {
399399
// 4. select the final column and return
400400

401401
if df.is_empty() {
402-
let field = self.phys_function.to_field(&df.schema())?;
402+
let field = self.phys_function.to_field(df.schema())?;
403403
return Ok(Column::full_null(field.name().clone(), 0, field.dtype()));
404404
}
405405

Diff for: crates/polars-io/src/avro/write.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ where
6464
}
6565

6666
fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {
67-
let schema = schema_to_arrow_checked(&df.schema(), CompatLevel::oldest(), "avro")?;
67+
let schema = schema_to_arrow_checked(df.schema(), CompatLevel::oldest(), "avro")?;
6868
let record = write::to_record(&schema, self.name.clone())?;
6969

7070
let mut data = vec![];

Diff for: crates/polars-io/src/ipc/write.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ where
116116
}
117117

118118
fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {
119-
let schema = schema_to_arrow_checked(&df.schema(), self.compat_level, "ipc")?;
119+
let schema = schema_to_arrow_checked(df.schema(), self.compat_level, "ipc")?;
120120
let mut ipc_writer = write::FileWriter::try_new(
121121
&mut self.writer,
122122
Arc::new(schema),

Diff for: crates/polars-io/src/parquet/write/writer.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ where
124124
/// Write the given DataFrame in the writer `W`. Returns the total size of the file.
125125
pub fn finish(self, df: &mut DataFrame) -> PolarsResult<u64> {
126126
let chunked_df = chunk_df_for_writing(df, self.row_group_size.unwrap_or(512 * 512))?;
127-
let mut batched = self.batched(&chunked_df.schema())?;
127+
let mut batched = self.batched(chunked_df.schema())?;
128128
batched.write_batch(&chunked_df)?;
129129
batched.finish()
130130
}

Diff for: crates/polars-lazy/src/physical_plan/exotic.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ pub(crate) fn prepare_expression_for_context(
2525
// type coercion and simplify expression optimizations run.
2626
let column = Series::full_null(name, 0, dtype);
2727
let df = column.into_frame();
28-
let input_schema = Arc::new(df.schema());
28+
let input_schema = df.schema().clone();
2929
let lf = df
3030
.lazy()
3131
.without_optimizations()

Diff for: crates/polars-lazy/src/tests/io.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -672,7 +672,7 @@ fn scan_anonymous_fn_with_options() -> PolarsResult<()> {
672672
let function = Arc::new(MyScan {});
673673

674674
let args = ScanArgsAnonymous {
675-
schema: Some(Arc::new(fruits_cars().schema())),
675+
schema: Some(fruits_cars().schema().clone()),
676676
..ScanArgsAnonymous::default()
677677
};
678678

Diff for: crates/polars-lazy/src/tests/pdsh.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ fn test_q2() -> PolarsResult<()> {
107107
Field::new("s_phone".into(), DataType::String),
108108
Field::new("s_comment".into(), DataType::String),
109109
]);
110-
assert_eq!(&out.schema(), &schema);
110+
assert_eq!(&**out.schema(), &schema);
111111

112112
Ok(())
113113
}

Diff for: crates/polars-mem-engine/src/executors/join.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ pub struct JoinExec {
99
right_on: Vec<Arc<dyn PhysicalExpr>>,
1010
parallel: bool,
1111
args: JoinArgs,
12+
options: Option<JoinTypeOptions>,
1213
}
1314

1415
impl JoinExec {
@@ -20,6 +21,7 @@ impl JoinExec {
2021
right_on: Vec<Arc<dyn PhysicalExpr>>,
2122
parallel: bool,
2223
args: JoinArgs,
24+
options: Option<JoinTypeOptions>,
2325
) -> Self {
2426
JoinExec {
2527
input_left: Some(input_left),
@@ -28,6 +30,7 @@ impl JoinExec {
2830
right_on,
2931
parallel,
3032
args,
33+
options,
3134
}
3235
}
3336
}
@@ -75,7 +78,7 @@ impl Executor for JoinExec {
7578
let by = self
7679
.left_on
7780
.iter()
78-
.map(|s| Ok(s.to_field(&df_left.schema())?.name))
81+
.map(|s| Ok(s.to_field(df_left.schema())?.name))
7982
.collect::<PolarsResult<Vec<_>>>()?;
8083
let name = comma_delimited("join".to_string(), &by);
8184
Cow::Owned(name)
@@ -142,6 +145,7 @@ impl Executor for JoinExec {
142145
left_on_series.into_iter().map(|c| c.take_materialized_series()).collect(),
143146
right_on_series.into_iter().map(|c| c.take_materialized_series()).collect(),
144147
self.args.clone(),
148+
self.options.clone(),
145149
true,
146150
state.verbose(),
147151
);

Diff for: crates/polars-mem-engine/src/executors/sort.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ impl Executor for SortExec {
6161
let by = self
6262
.by_column
6363
.iter()
64-
.map(|s| Ok(s.to_field(&df.schema())?.name))
64+
.map(|s| Ok(s.to_field(df.schema())?.name))
6565
.collect::<PolarsResult<Vec<_>>>()?;
6666
let name = comma_delimited("sort".to_string(), &by);
6767
Cow::Owned(name)

Diff for: crates/polars-mem-engine/src/planner/lp.rs

+30
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use polars_core::prelude::*;
22
use polars_core::POOL;
3+
use polars_expr::state::ExecutionState;
34
use polars_plan::global::_set_n_rows_for_scan;
45
use polars_plan::plans::expr_ir::ExprIR;
56

@@ -484,6 +485,7 @@ fn create_physical_plan_impl(
484485
left_on,
485486
right_on,
486487
options,
488+
schema,
487489
..
488490
} => {
489491
let parallel = if options.force_parallel {
@@ -521,13 +523,41 @@ fn create_physical_plan_impl(
521523
&mut ExpressionConversionState::new(true, state.expr_depth),
522524
)?;
523525
let options = Arc::try_unwrap(options).unwrap_or_else(|options| (*options).clone());
526+
527+
// Convert the join options, to the physical join options. This requires the physical
528+
// planner, so we do this last minute.
529+
let join_type_options = options
530+
.options
531+
.map(|o| {
532+
o.compile(|e| {
533+
let phys_expr = create_physical_expr(
534+
e,
535+
Context::Default,
536+
expr_arena,
537+
&schema,
538+
&mut ExpressionConversionState::new(false, state.expr_depth),
539+
)?;
540+
541+
let execution_state = ExecutionState::default();
542+
543+
Ok(Arc::new(move |df: DataFrame| {
544+
let mask = phys_expr.evaluate(&df, &execution_state)?;
545+
let mask = mask.as_materialized_series();
546+
let mask = mask.bool()?;
547+
df._filter_seq(mask)
548+
}))
549+
})
550+
})
551+
.transpose()?;
552+
524553
Ok(Box::new(executors::JoinExec::new(
525554
input_left,
526555
input_right,
527556
left_on,
528557
right_on,
529558
parallel,
530559
options.args,
560+
join_type_options,
531561
)))
532562
},
533563
HStack {

0 commit comments

Comments
 (0)