Skip to content

Commit 2684dcd

Browse files
committed
wip
1 parent d7d7a81 commit 2684dcd

1 file changed

Lines changed: 30 additions & 0 deletions

File tree

ballista/core/src/execution_plans/shuffle_writer.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ use datafusion::physical_plan::metrics::{
5858
self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
5959
};
6060

61+
use datafusion::common::tree_node::{Transformed, TreeNode};
62+
use datafusion::physical_plan::joins::HashJoinExec;
6163
use datafusion::physical_plan::{
6264
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
6365
SendableRecordBatchStream, Statistics, displayable,
@@ -545,6 +547,34 @@ impl ShuffleWriterExec {
545547

546548
async move {
547549
let now = Instant::now();
550+
// Strip dynamic-filter accumulators from HashJoinExec nodes.
551+
// Custom DataFusion builds may re-inject SharedBuildAccumulator
552+
// during deserialization. The cross-partition Barrier deadlocks
553+
// in Ballista where each task runs a single partition.
554+
let plan = plan.transform_down(&|node: Arc<dyn ExecutionPlan>| {
555+
if let Some(hj) = node.as_any().downcast_ref::<HashJoinExec>() {
556+
let disp = displayable(node.as_ref()).one_line().to_string();
557+
if disp.contains("accumulator") {
558+
info!(
559+
"ShuffleWriter {job_id}/{stage_id}: stripping accumulator from {disp}"
560+
);
561+
let rebuilt: Arc<dyn ExecutionPlan> = Arc::new(
562+
HashJoinExec::try_new(
563+
Arc::clone(hj.left()),
564+
Arc::clone(hj.right()),
565+
hj.on().to_vec(),
566+
hj.filter().cloned(),
567+
hj.join_type(),
568+
hj.projection.clone(),
569+
*hj.partition_mode(),
570+
hj.null_equality(),
571+
)?
572+
);
573+
return Ok(Transformed::yes(rebuilt));
574+
}
575+
}
576+
Ok(Transformed::no(node))
577+
})?.data;
548578
// Wrap plan with tracing to log every execute() call
549579
let plan = TracingExec::wrap(plan, &job_id, stage_id);
550580
// Log the plan tree once (partition 0 only) to help debug execution issues

0 commit comments

Comments
 (0)