Skip to content

Commit 2175484

Browse files
committed
fix: drop input plan early in CoalescePartitionsExec
- Fixes apache#22016 - `CoalescePartitionsExec` uses `RecordBatchReceiverStreamBuilder` which was holding to an Arc ref of the input plan, until the stream was done. - Now we drop it early. - The only other usage of `input` was in debug prints, to display the input plan. We now take the display string early, but only when debug logging is enabled. - Added a test that ensures this. Verified that the test fails without the fix.
1 parent c134a84 commit 2175484

2 files changed

Lines changed: 56 additions & 9 deletions

File tree

datafusion/physical-plan/src/coalesce_partitions.rs

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,11 +351,14 @@ impl ExecutionPlan for CoalescePartitionsExec {
351351
mod tests {
352352
use super::*;
353353
use crate::test::exec::{
354-
BlockingExec, PanicExec, assert_strong_count_converges_to_zero,
354+
BarrierExec, BlockingExec, PanicExec, assert_strong_count_converges_to_zero,
355355
};
356356
use crate::test::{self, assert_is_pending};
357357
use crate::{collect, common};
358358

359+
use std::time::Duration;
360+
361+
use arrow::array::RecordBatch;
359362
use arrow::datatypes::{DataType, Field, Schema};
360363

361364
use futures::FutureExt;
@@ -390,6 +393,45 @@ mod tests {
390393
Ok(())
391394
}
392395

396+
#[tokio::test]
397+
async fn drops_input_plan_after_input_streams_start() -> Result<()> {
398+
let task_ctx = Arc::new(TaskContext::default());
399+
let schema =
400+
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
401+
let input_partitions = 2;
402+
let batch = RecordBatch::new_empty(Arc::clone(&schema));
403+
let input = Arc::new(
404+
BarrierExec::new(vec![vec![batch]; input_partitions], schema)
405+
.without_start_barrier()
406+
.with_finish_barrier()
407+
.with_log(false),
408+
);
409+
let refs = Arc::downgrade(&input);
410+
411+
let input_plan: Arc<BarrierExec> = Arc::clone(&input);
412+
let coalesce = CoalescePartitionsExec::new(input_plan);
413+
let stream = coalesce.execute(0, task_ctx)?;
414+
drop(coalesce);
415+
416+
tokio::time::timeout(Duration::from_secs(5), async {
417+
// Why not `wait_finish` here: that releases the barrier which lets the input tasks
418+
// finish, which drops the input Arcs and hides the bug.
419+
while !input.is_finish_barrier_reached() {
420+
tokio::task::yield_now().await;
421+
}
422+
})
423+
.await
424+
.expect("input streams should reach pending");
425+
426+
drop(input);
427+
428+
assert_strong_count_converges_to_zero(refs).await;
429+
430+
drop(stream);
431+
432+
Ok(())
433+
}
434+
393435
#[tokio::test]
394436
async fn test_drop_cancel() -> Result<()> {
395437
let task_ctx = Arc::new(TaskContext::default());

datafusion/physical-plan/src/stream.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,11 @@ impl RecordBatchReceiverStreamBuilder {
330330
context: Arc<TaskContext>,
331331
) {
332332
let output = self.tx();
333+
let input_display = if log::log_enabled!(log::Level::Debug) {
334+
displayable(input.as_ref()).one_line().to_string()
335+
} else {
336+
"".to_owned()
337+
};
333338

334339
self.inner.spawn(async move {
335340
let mut stream = match input.execute(partition, context) {
@@ -338,14 +343,18 @@ impl RecordBatchReceiverStreamBuilder {
338343
// is no place to send the error and no reason to continue.
339344
output.send(Err(e)).await.ok();
340345
debug!(
341-
"Stopping execution: error executing input: {}",
342-
displayable(input.as_ref()).one_line()
346+
"Stopping execution: error executing input: {input_display}",
343347
);
344348
return Ok(());
345349
}
346350
Ok(stream) => stream,
347351
};
348352

353+
// Drop the input early, as soon as we're done with it.
354+
// Holding on to it can cause delays in cancelling the child plan when the query is
355+
// cancelled.
356+
drop(input);
357+
349358
// Transfer batches from inner stream to the output tx
350359
// immediately.
351360
while let Some(item) = stream.next().await {
@@ -355,19 +364,15 @@ impl RecordBatchReceiverStreamBuilder {
355364
// place to send the error and no reason to continue.
356365
if output.send(item).await.is_err() {
357366
debug!(
358-
"Stopping execution: output is gone, plan cancelling: {}",
359-
displayable(input.as_ref()).one_line()
367+
"Stopping execution: output is gone, plan cancelling: {input_display}",
360368
);
361369
return Ok(());
362370
}
363371

364372
// Stop after the first error is encountered (Don't
365373
// drive all streams to completion)
366374
if is_err {
367-
debug!(
368-
"Stopping execution: plan returned error: {}",
369-
displayable(input.as_ref()).one_line()
370-
);
375+
debug!("Stopping execution: plan returned error: {input_display}");
371376
return Ok(());
372377
}
373378
}

0 commit comments

Comments
 (0)