Skip to content

Commit 33008af

Browse files
author
wiedld
committed
refactor: imrpove performance
1 parent 2bac638 commit 33008af

1 file changed

Lines changed: 22 additions & 5 deletions

File tree

datafusion/datasource/src/memory.rs

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -789,15 +789,26 @@ impl MemorySourceConfig {
789789
for rep in to_repartition {
790790
max_heap.push(rep);
791791
}
792+
let mut cannot_split_further = Vec::with_capacity(target_partitions);
792793
for _ in 0..cnt_to_repartition {
793-
let Some(to_split) = max_heap.pop() else {
794-
unreachable!()
795-
};
796-
for new_partition in to_split.split() {
797-
max_heap.push(new_partition);
794+
loop {
795+
let Some(to_split) = max_heap.pop() else {
796+
break;
797+
};
798+
799+
let mut new_partitions = to_split.split();
800+
if new_partitions.len() > 1 {
801+
for new_partition in new_partitions {
802+
max_heap.push(new_partition);
803+
}
804+
break;
805+
} else {
806+
cannot_split_further.push(new_partitions.remove(0));
807+
}
798808
}
799809
}
800810
let mut partitions = max_heap.drain().collect_vec();
811+
partitions.extend(cannot_split_further);
801812
partitions.sort_by_key(|p| p.idx);
802813
let partitions = partitions.into_iter().map(|rep| rep.batches).collect_vec();
803814

@@ -862,7 +873,13 @@ struct RePartition {
862873

863874
impl RePartition {
864875
/// Split [`RePartition`] into 2 pieces, consuming self.
876+
///
877+
/// Returns only 1 partition if cannot be split further.
865878
fn split(self) -> Vec<Self> {
879+
if self.batches.len() == 1 {
880+
return vec![self];
881+
}
882+
866883
let new_0 = RePartition {
867884
idx: self.idx,
868885
row_count: 0,

0 commit comments

Comments
 (0)