Skip to content

Commit af764ed

Browse files
committed
wip
1 parent 2684dcd commit af764ed

2 files changed

Lines changed: 24 additions & 4 deletions

File tree

ballista/core/src/execution_plans/shuffle_writer.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ use datafusion::physical_plan::metrics::{
5959
};
6060

6161
use datafusion::common::tree_node::{Transformed, TreeNode};
62-
use datafusion::physical_plan::joins::HashJoinExec;
62+
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
63+
use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
6364
use datafusion::physical_plan::{
6465
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
6566
SendableRecordBatchStream, Statistics, displayable,
@@ -558,9 +559,18 @@ impl ShuffleWriterExec {
558559
info!(
559560
"ShuffleWriter {job_id}/{stage_id}: stripping accumulator from {disp}"
560561
);
562+
let left = Arc::clone(hj.left());
563+
let left: Arc<dyn ExecutionPlan> =
564+
if *hj.partition_mode() == PartitionMode::CollectLeft
565+
&& left.properties().output_partitioning().partition_count() > 1
566+
{
567+
Arc::new(CoalescePartitionsExec::new(left))
568+
} else {
569+
left
570+
};
561571
let rebuilt: Arc<dyn ExecutionPlan> = Arc::new(
562572
HashJoinExec::try_new(
563-
Arc::clone(hj.left()),
573+
left,
564574
Arc::clone(hj.right()),
565575
hj.on().to_vec(),
566576
hj.filter().cloned(),

ballista/scheduler/src/state/mod.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,10 @@ use ballista_core::event_loop::EventSender;
4646
use ballista_core::serde::BallistaCodec;
4747
use ballista_core::serde::protobuf::TaskStatus;
4848
use datafusion::logical_expr::LogicalPlan;
49+
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
4950
use datafusion::physical_plan::display::DisplayableExecutionPlan;
5051
use datafusion::physical_plan::empty::EmptyExec;
51-
use datafusion::physical_plan::joins::HashJoinExec;
52+
use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
5253
use datafusion::prelude::SessionContext;
5354
use datafusion_proto::logical_plan::AsLogicalPlan;
5455
use datafusion_proto::physical_plan::AsExecutionPlan;
@@ -639,9 +640,18 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
639640
info!(
640641
"Job {job_id}: stripping dynamic-filter accumulator from {display}"
641642
);
643+
let left = Arc::clone(hash_join.left());
644+
let left: Arc<dyn ExecutionPlan> =
645+
if *hash_join.partition_mode() == PartitionMode::CollectLeft
646+
&& left.properties().output_partitioning().partition_count() > 1
647+
{
648+
Arc::new(CoalescePartitionsExec::new(left))
649+
} else {
650+
left
651+
};
642652
let rebuilt: Arc<dyn ExecutionPlan> = Arc::new(
643653
HashJoinExec::try_new(
644-
Arc::clone(hash_join.left()),
654+
left,
645655
Arc::clone(hash_join.right()),
646656
hash_join.on().to_vec(),
647657
hash_join.filter().cloned(),

0 commit comments

Comments
 (0)