Skip to content

Commit b81643a

Browse files
committed
wip
1 parent 4966e40 commit b81643a

1 file changed

Lines changed: 92 additions & 2 deletions

File tree

ballista/core/src/execution_plans/shuffle_writer.rs

Lines changed: 92 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,84 @@ use futures::{StreamExt, TryFutureExt, TryStreamExt};
6666

6767
use datafusion::arrow::error::ArrowError;
6868
use datafusion::execution::context::TaskContext;
69+
use datafusion::physical_plan::RecordBatchStream;
6970
use datafusion::physical_plan::repartition::BatchPartitioner;
7071
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
72+
use futures::Stream;
7173
use log::{debug, info, warn};
74+
use std::pin::Pin;
75+
use std::task::{Context, Poll};
7276

7377
use super::shuffle_writer_trait::ShuffleWriter;
7478

79+
/// Stream wrapper used by TracingExec to log data flow through each plan node.
80+
/// Logs first_batch and stream_end to identify where data gets stuck.
81+
struct TracingStream {
82+
inner: SendableRecordBatchStream,
83+
label: String,
84+
partition: usize,
85+
batch_count: u64,
86+
row_count: u64,
87+
first_batch_logged: bool,
88+
start: Instant,
89+
}
90+
91+
impl Stream for TracingStream {
92+
type Item = datafusion::error::Result<RecordBatch>;
93+
94+
fn poll_next(
95+
mut self: Pin<&mut Self>,
96+
cx: &mut Context<'_>,
97+
) -> Poll<Option<Self::Item>> {
98+
let this = &mut *self;
99+
let result = this.inner.as_mut().poll_next(cx);
100+
match &result {
101+
Poll::Ready(Some(Ok(batch))) => {
102+
this.batch_count += 1;
103+
this.row_count += batch.num_rows() as u64;
104+
if !this.first_batch_logged {
105+
this.first_batch_logged = true;
106+
info!(
107+
"TracingStream({} p={}): first_batch {} rows after {:.3}s",
108+
this.label,
109+
this.partition,
110+
batch.num_rows(),
111+
this.start.elapsed().as_secs_f64()
112+
);
113+
}
114+
}
115+
Poll::Ready(Some(Err(e))) => {
116+
warn!(
117+
"TracingStream({} p={}): error after {:.3}s, {} batches: {}",
118+
this.label,
119+
this.partition,
120+
this.start.elapsed().as_secs_f64(),
121+
this.batch_count,
122+
e
123+
);
124+
}
125+
Poll::Ready(None) => {
126+
info!(
127+
"TracingStream({} p={}): ended after {:.3}s, {} batches, {} rows",
128+
this.label,
129+
this.partition,
130+
this.start.elapsed().as_secs_f64(),
131+
this.batch_count,
132+
this.row_count
133+
);
134+
}
135+
Poll::Pending => {}
136+
}
137+
result
138+
}
139+
}
140+
141+
impl RecordBatchStream for TracingStream {
142+
fn schema(&self) -> SchemaRef {
143+
self.inner.schema()
144+
}
145+
}
146+
75147
/// Wraps an ExecutionPlan tree to log every `execute()` call with the node name and partition.
76148
/// This helps diagnose which nodes in a complex plan are/aren't being executed.
77149
#[derive(Debug)]
@@ -164,7 +236,7 @@ impl ExecutionPlan for TracingExec {
164236
partition: usize,
165237
context: Arc<TaskContext>,
166238
) -> Result<SendableRecordBatchStream> {
167-
info!(
239+
debug!(
168240
"TracingExec::execute({}) partition={}",
169241
self.label, partition
170242
);
@@ -176,7 +248,25 @@ impl ExecutionPlan for TracingExec {
176248
.clone()
177249
.with_new_children(self.children.clone())?
178250
};
179-
rebuilt.execute(partition, context)
251+
let stream = rebuilt.execute(partition, context)?;
252+
253+
// Wrap stream for non-ShuffleReaderExec nodes to trace data flow.
254+
// ShuffleReaderExec is skipped because CoalescePartitionsExec spawns
255+
// one per partition (44+), making it too noisy.
256+
let name = self.inner.name();
257+
if name == "ShuffleReaderExec" {
258+
Ok(stream)
259+
} else {
260+
Ok(Box::pin(TracingStream {
261+
inner: stream,
262+
label: self.label.clone(),
263+
partition,
264+
batch_count: 0,
265+
row_count: 0,
266+
first_batch_logged: false,
267+
start: Instant::now(),
268+
}))
269+
}
180270
}
181271

182272
fn metrics(&self) -> Option<MetricsSet> {

0 commit comments

Comments
 (0)