Skip to content

Commit c078cae

Browse files
committed
wip
1 parent 7b6870a commit c078cae

1 file changed

Lines changed: 125 additions & 0 deletions

File tree

ballista/core/src/execution_plans/shuffle_writer.rs

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,122 @@ use log::{debug, info, warn};
7272

7373
use super::shuffle_writer_trait::ShuffleWriter;
7474

75+
/// Wraps an ExecutionPlan tree to log every `execute()` call with the node name and partition.
76+
/// This helps diagnose which nodes in a complex plan are/aren't being executed.
77+
#[derive(Debug)]
78+
struct TracingExec {
79+
inner: Arc<dyn ExecutionPlan>,
80+
label: String,
81+
children: Vec<Arc<dyn ExecutionPlan>>,
82+
}
83+
84+
impl TracingExec {
85+
/// Wrap an entire plan tree with tracing. Each node gets a label like "depth.index: NodeName".
86+
fn wrap(
87+
plan: Arc<dyn ExecutionPlan>,
88+
job_id: &str,
89+
stage_id: usize,
90+
) -> Arc<dyn ExecutionPlan> {
91+
Self::wrap_recursive(plan, job_id, stage_id, 0)
92+
}
93+
94+
fn wrap_recursive(
95+
plan: Arc<dyn ExecutionPlan>,
96+
job_id: &str,
97+
stage_id: usize,
98+
depth: usize,
99+
) -> Arc<dyn ExecutionPlan> {
100+
let children: Vec<Arc<dyn ExecutionPlan>> = plan
101+
.children()
102+
.into_iter()
103+
.enumerate()
104+
.map(|(_i, child)| {
105+
Self::wrap_recursive(Arc::clone(child), job_id, stage_id, depth + 1)
106+
})
107+
.collect();
108+
109+
let name = plan.name().to_string();
110+
let label = format!("{job_id}/{stage_id} d{depth} {name}");
111+
112+
Arc::new(TracingExec {
113+
inner: plan,
114+
label,
115+
children,
116+
})
117+
}
118+
}
119+
120+
impl DisplayAs for TracingExec {
121+
fn fmt_as(
122+
&self,
123+
t: DisplayFormatType,
124+
f: &mut std::fmt::Formatter,
125+
) -> std::fmt::Result {
126+
self.inner.fmt_as(t, f)
127+
}
128+
}
129+
130+
impl ExecutionPlan for TracingExec {
131+
fn name(&self) -> &str {
132+
self.inner.name()
133+
}
134+
135+
fn as_any(&self) -> &dyn Any {
136+
self.inner.as_any()
137+
}
138+
139+
fn schema(&self) -> SchemaRef {
140+
self.inner.schema()
141+
}
142+
143+
fn properties(&self) -> &PlanProperties {
144+
self.inner.properties()
145+
}
146+
147+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
148+
self.children.iter().collect()
149+
}
150+
151+
fn with_new_children(
152+
self: Arc<Self>,
153+
children: Vec<Arc<dyn ExecutionPlan>>,
154+
) -> Result<Arc<dyn ExecutionPlan>> {
155+
Ok(Arc::new(TracingExec {
156+
inner: Arc::clone(&self.inner),
157+
label: self.label.clone(),
158+
children,
159+
}))
160+
}
161+
162+
fn execute(
163+
&self,
164+
partition: usize,
165+
context: Arc<TaskContext>,
166+
) -> Result<SendableRecordBatchStream> {
167+
info!(
168+
"TracingExec::execute({}) partition={}",
169+
self.label, partition
170+
);
171+
// Replace children on inner plan with our traced children, then execute
172+
let rebuilt = if self.children.is_empty() {
173+
Arc::clone(&self.inner)
174+
} else {
175+
self.inner
176+
.clone()
177+
.with_new_children(self.children.clone())?
178+
};
179+
rebuilt.execute(partition, context)
180+
}
181+
182+
fn metrics(&self) -> Option<MetricsSet> {
183+
self.inner.metrics()
184+
}
185+
186+
fn statistics(&self) -> Result<Statistics> {
187+
self.inner.statistics()
188+
}
189+
}
190+
75191
/// ShuffleWriterExec represents a section of a query plan that has consistent partitioning and
76192
/// can be executed as one unit with each partition being executed in parallel. The output of each
77193
/// partition is re-partitioned and streamed to disk in Arrow IPC format. Future stages of the query
@@ -339,6 +455,15 @@ impl ShuffleWriterExec {
339455

340456
async move {
341457
let now = Instant::now();
458+
// Wrap plan with tracing to log every execute() call
459+
let plan = TracingExec::wrap(plan, &job_id, stage_id);
460+
// Log the plan tree once (partition 0 only) to help debug execution issues
461+
if input_partition == 0 {
462+
info!(
463+
"ShuffleWriter {job_id}/{stage_id} plan tree:\n{}",
464+
datafusion::physical_plan::displayable(plan.as_ref()).indent(true)
465+
);
466+
}
342467
info!(
343468
"ShuffleWriter {job_id}/{stage_id} partition {input_partition}: creating execution stream"
344469
);

0 commit comments

Comments
 (0)