From 1e3e2801b2e9a289656a4469f723a9a80af54f2e Mon Sep 17 00:00:00 2001 From: peasee <98815791+peasee@users.noreply.github.com> Date: Wed, 22 Apr 2026 13:23:05 +1000 Subject: [PATCH 01/19] fix: Ensure offers are revived on job update for saturated tasks --- .../scheduler_server/query_stage_scheduler.rs | 118 +++++++++ .../scheduler/src/state/execution_graph.rs | 230 ++++++++++++++++++ 2 files changed, 348 insertions(+) diff --git a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs index 1b3deb38d8..36bc011fbb 100644 --- a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs +++ b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs @@ -316,6 +316,17 @@ impl if let Err(e) = self.state.task_manager.update_job(&job_id).await { error!("Fail to invoke update_job for job {job_id} due to {e:?}"); } + + // After update_job revives newly resolved stages (Resolved → Running), + // trigger scheduling so tasks for those stages are actually bound and + // launched. Without this, newly Running stages could sit idle if the + // preceding ReviveOffers consumed all slots before these stages were + // resolved. + if self.state.config.is_push_staged_scheduling() { + event_sender + .post_event(QueryStageSchedulerEvent::ReviveOffers) + .await?; + } } QueryStageSchedulerEvent::JobCancel(job_id) => { self.metrics_collector.record_cancelled(&job_id); @@ -413,6 +424,14 @@ impl error!("{msg}"); } } + + // After executor_lost resets tasks (task_info → None), trigger + // scheduling so those tasks can be re-bound to surviving executors. + if self.state.config.is_push_staged_scheduling() { + event_sender + .post_event(QueryStageSchedulerEvent::ReviveOffers) + .await?; + } } QueryStageSchedulerEvent::CancelTasks(tasks) => { if let Err(e) = self @@ -534,4 +553,103 @@ mod tests { .build() .unwrap() } + + fn test_join_plan_logical(partitions: usize) -> LogicalPlan { + let schema = Schema::new(vec![ + Field::new("id", DataType::Utf8, false), + Field::new("gmv", DataType::UInt64, false), + ]); + + let left_plan = + scan_empty_with_partitions(Some("left"), &schema, None, partitions).unwrap(); + let right_plan = + scan_empty_with_partitions(Some("right"), &schema, None, partitions) + .unwrap() + .build() + .unwrap(); + + left_plan + .join( + right_plan, + datafusion::prelude::JoinType::Inner, + (vec!["id"], vec!["id"]), + None, + ) + .unwrap() + .aggregate(vec![col("left.id")], vec![sum(col("left.gmv"))]) + .unwrap() + .build() + .unwrap() + } + + /// Regression test: a multi-stage job (join) should complete end-to-end + /// through the scheduler's push-based scheduling pipeline. + /// + /// This tests for a bug where jobs with dependent stages would hang + /// after the leaf stages completed because newly resolved stages were + /// never picked up for scheduling. + #[tokio::test] + async fn test_multi_stage_job_completes_push_scheduling() -> Result<()> { + let config = SchedulerConfig::default() + .with_scheduler_policy(TaskSchedulingPolicy::PushStaged); + let metrics = Arc::new(TestMetricsCollector::default()); + + let mut test = SchedulerTest::new(config, metrics.clone(), 2, 4, None).await?; + + // Join plan creates multiple stages with dependencies + let plan = test_join_plan_logical(4); + + let result = tokio::time::timeout( + Duration::from_secs(30), + test.run("multi_stage_join", &plan), + ) + .await; + + match result { + Ok(Ok((status, _job_id))) => { + assert!( + matches!(status.status, Some(ballista_core::serde::protobuf::job_status::Status::Successful(_))), + "Expected job to succeed but got: {:?}", + status.status + ); + } + Ok(Err(e)) => panic!("Job execution error: {e}"), + Err(_) => panic!( + "Job timed out after 30s - suspected scheduling deadlock where \ + dependent stages are never picked up after leaf stages complete" + ), + } + + Ok(()) + } + + /// Test that a simple aggregation also completes via push scheduling. + #[tokio::test] + async fn test_aggregation_completes_push_scheduling() -> Result<()> { + let config = SchedulerConfig::default() + .with_scheduler_policy(TaskSchedulingPolicy::PushStaged); + let metrics = Arc::new(TestMetricsCollector::default()); + + let mut test = SchedulerTest::new(config, metrics, 2, 4, None).await?; + + let plan = test_plan(4); + + let result = + tokio::time::timeout(Duration::from_secs(30), test.run("agg_test", &plan)) + .await; + + match result { + Ok(Ok((status, _job_id))) => { + assert!( + matches!(status.status, Some(ballista_core::serde::protobuf::job_status::Status::Successful(_))), + "Expected job to succeed but got: {:?}", + status.status + ); + } + Ok(Err(e)) => panic!("Job execution error: {e}"), + Err(_) => panic!("Aggregation job timed out - scheduling deadlock"), + } + + Ok(()) + } } diff --git a/ballista/scheduler/src/state/execution_graph.rs b/ballista/scheduler/src/state/execution_graph.rs index 9e5b3e00d5..5a696dbfb5 100644 --- a/ballista/scheduler/src/state/execution_graph.rs +++ b/ballista/scheduler/src/state/execution_graph.rs @@ -1965,6 +1965,10 @@ pub(crate) fn partition_to_location( #[cfg(test)] mod test { use std::collections::HashSet; + use std::sync::Arc; + + use ballista_core::extension::SessionConfigExt; + use datafusion::prelude::SessionConfig; use crate::scheduler_server::event::QueryStageSchedulerEvent; use ballista_core::error::Result; @@ -1972,6 +1976,7 @@ mod test { self, ExecutionError, FailedTask, FetchPartitionError, IoError, JobStatus, TaskKilled, failed_task, job_status, }; + use ballista_core::serde::scheduler::PartitionId; use super::StaticExecutionGraph; use crate::state::execution_graph::ExecutionGraph; @@ -3138,4 +3143,229 @@ mod test { Ok(()) } + + /// Simulates the scheduler's bind-task flow using `fetch_running_stage` + /// (the real path used by bind_task_bias/round_robin), rather than + /// the test-only `pop_next_task`. This simulates having a limited number + /// of executor task slots, binding tasks, completing them, and checking + /// that subsequent stages are correctly picked up. + /// + /// This is a regression test for a bug where jobs with multiple + /// dependent stages would hang because newly resolved stages were + /// never scheduled after prior stages completed. + #[tokio::test] + async fn test_fetch_running_stage_multi_stage_progression() -> Result<()> { + let executor = mock_executor("executor-id1".to_string()); + let mut graph = test_join_plan(4).await; + + // join_plan has 4 stages: 2 leaf stages + 1 join stage + 1 final stage + assert_eq!(graph.stage_count(), 4); + assert_eq!(graph.available_tasks(), 0); + + // Simulate what submit_job does: revive to convert Resolved → Running + graph.revive(); + let initial_tasks = graph.available_tasks(); + assert!( + initial_tasks > 0, + "Expected available tasks after revive, got 0" + ); + + // Simulate the scheduler bind flow with limited task slots. + // Use fetch_running_stage (the actual scheduler code path) to bind and + // complete tasks in rounds, just like ReviveOffers does. + let max_slots_per_round = 2; // Simulate limited executor slots + let mut total_completed = 0; + let mut rounds = 0; + let max_rounds = 100; // Safety limit + + while !graph.is_successful() && rounds < max_rounds { + rounds += 1; + let mut bound_in_round = 0; + let mut black_list: Vec = vec![]; + + // Bind phase: fetch running stages and bind tasks (like bind_task_bias) + let mut tasks_to_complete = vec![]; + while let Some((running_stage, task_id_gen)) = + graph.fetch_running_stage(&black_list) + { + if bound_in_round >= max_slots_per_round { + break; + } + let stage_id = running_stage.stage_id; + + let runnable: Vec = running_stage + .task_infos + .iter() + .enumerate() + .filter(|(_, info)| info.is_none()) + .map(|(i, _)| i) + .take(max_slots_per_round - bound_in_round) + .collect(); + + if runnable.is_empty() { + black_list.push(stage_id); + continue; + } + + for partition_id in runnable { + let task_id = *task_id_gen; + *task_id_gen += 1; + running_stage.task_infos[partition_id] = + Some(super::create_task_info(executor.id.clone(), task_id)); + tasks_to_complete.push((stage_id, partition_id, task_id)); + bound_in_round += 1; + } + } + + assert!( + bound_in_round > 0 || graph.is_successful(), + "Round {rounds}: No tasks bound and job not successful. \ + This indicates a scheduling deadlock! Graph:\n{graph:?}" + ); + + // Complete phase: simulate task execution and status updates + for (stage_id, partition_id, task_id) in &tasks_to_complete { + let task = super::TaskDescription { + session_id: graph.session_id().to_string(), + partition: PartitionId { + job_id: graph.job_id().to_string(), + stage_id: *stage_id, + partition_id: *partition_id, + }, + stage_attempt_num: 0, + task_id: *task_id, + task_attempt: 0, + plan: graph + .stages() + .get(stage_id) + .and_then(|s| { + if let super::ExecutionStage::Running(rs) = s { + Some(rs.plan.clone()) + } else { + None + } + }) + .unwrap(), + session_config: Arc::new(SessionConfig::new_with_ballista()), + schedulable_time_millis: 0, + }; + let status = mock_completed_task(task, &executor.id); + graph.update_task_status(&executor, vec![status], 4, 4)?; + total_completed += 1; + } + } + + assert!( + graph.is_successful(), + "Job did not complete after {rounds} rounds ({total_completed} tasks). \ + Suspected scheduling deadlock. Graph:\n{graph:?}" + ); + + Ok(()) + } + + /// Regression test: stages resolved in the same update_task_status call + /// should become schedulable via fetch_running_stage in subsequent calls. + #[tokio::test] + async fn test_resolved_stages_become_runnable_after_update() -> Result<()> { + let executor = mock_executor("executor-id1".to_string()); + let mut graph = test_join_plan(4).await; + + // Revive leaf stages + graph.revive(); + + // Complete all tasks in both leaf stages, one at a time + // After both complete, the join stage should be resolvable and then runnable + let mut completed_stages: HashSet = HashSet::new(); + + loop { + // Try to find a running stage + let black_list: Vec = vec![]; + if let Some((running_stage, task_id_gen)) = + graph.fetch_running_stage(&black_list) + { + let stage_id = running_stage.stage_id; + + // Find first available task + if let Some((partition_id, _)) = running_stage + .task_infos + .iter() + .enumerate() + .find(|(_, info)| info.is_none()) + { + let task_id = *task_id_gen; + *task_id_gen += 1; + running_stage.task_infos[partition_id] = + Some(super::create_task_info(executor.id.clone(), task_id)); + + let task = super::TaskDescription { + session_id: graph.session_id().to_string(), + partition: PartitionId { + job_id: graph.job_id().to_string(), + stage_id, + partition_id, + }, + stage_attempt_num: 0, + task_id, + task_attempt: 0, + plan: graph + .stages() + .get(&stage_id) + .and_then(|s| { + if let super::ExecutionStage::Running(rs) = s { + Some(rs.plan.clone()) + } else { + None + } + }) + .unwrap(), + session_config: Arc::new(SessionConfig::new_with_ballista()), + schedulable_time_millis: 0, + }; + let status = mock_completed_task(task, &executor.id); + let events = + graph.update_task_status(&executor, vec![status], 4, 4)?; + + // Check if any stage completed + if !completed_stages.contains(&stage_id) { + let stage = graph.stages().get(&stage_id); + if matches!(stage, Some(super::ExecutionStage::Successful(_))) { + completed_stages.insert(stage_id); + } + } + + // After completion, check for JobUpdated events and verify + // that fetch_running_stage finds new stages + for event in &events { + if matches!(event, QueryStageSchedulerEvent::JobUpdated(_)) { + // Verify: after a JobUpdated, fetch_running_stage should + // find new stages (it calls revive internally) + let bl: Vec = vec![]; + let has_stage = graph.fetch_running_stage(&bl).is_some(); + assert!( + has_stage || graph.is_successful(), + "After JobUpdated event, fetch_running_stage should \ + find stages but found none. Graph:\n{graph:?}" + ); + } + } + } else { + break; // No more tasks + } + } else { + break; // No more stages + } + + if graph.is_successful() { + break; + } + } + + assert!( + graph.is_successful(), + "Graph should be successful. Graph:\n{graph:?}" + ); + + Ok(()) + } } From e38b4e7968fc3b0a23e3bb4ccb15fff9ca9ef279 Mon Sep 17 00:00:00 2001 From: peasee <98815791+peasee@users.noreply.github.com> Date: Wed, 22 Apr 2026 16:56:42 +1000 Subject: [PATCH 02/19] wip --- .../scheduler_server/query_stage_scheduler.rs | 7 +++ .../scheduler/src/state/execution_graph.rs | 55 ++++++++++++++++++- ballista/scheduler/src/state/task_manager.rs | 11 +++- 3 files changed, 70 insertions(+), 3 deletions(-) diff --git a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs index 36bc011fbb..8677de4f61 100644 --- a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs +++ b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs @@ -376,6 +376,13 @@ impl .await { Ok(stage_events) => { + info!( + "TaskUpdating from executor {executor_id}: {num_status} tasks processed, \ + {} stage events emitted: {:?}", + stage_events.len(), + stage_events.iter().map(|e| format!("{e:?}")).collect::>() + ); + if self.state.config.is_push_staged_scheduling() { event_sender .post_event(QueryStageSchedulerEvent::ReviveOffers) diff --git a/ballista/scheduler/src/state/execution_graph.rs b/ballista/scheduler/src/state/execution_graph.rs index 5a696dbfb5..4454b39d80 100644 --- a/ballista/scheduler/src/state/execution_graph.rs +++ b/ballista/scheduler/src/state/execution_graph.rs @@ -397,6 +397,19 @@ impl StaticExecutionGraph { let mut job_err_msg = "".to_owned(); let mut stage_metrics = StageMetricsInfo::default(); + info!( + "Job {job_id} processing_stages_update: resolved_stages={:?}, successful_stages={:?}, \ + failed_stages={:?}, rollback_running_stages={:?}, resubmit_successful_stages={:?}", + updated_stages.resolved_stages, + updated_stages.successful_stages, + updated_stages.failed_stages.keys().collect::>(), + updated_stages + .rollback_running_stages + .keys() + .collect::>(), + updated_stages.resubmit_successful_stages, + ); + for stage_id in updated_stages.resolved_stages { self.resolve_stage(stage_id)?; has_resolved = true; @@ -494,7 +507,29 @@ impl StaticExecutionGraph { completed_at: timestamp_millis(), }); } else if has_resolved { + info!("Job {job_id} has newly resolved stages, emitting JobUpdated"); events.push(QueryStageSchedulerEvent::JobUpdated(job_id)) + } else { + // Log stage summary when no terminal event is emitted — helps diagnose hangs + let stage_summary: Vec = self.stages.iter().map(|(id, stage)| { + match stage { + ExecutionStage::UnResolved(s) => { + let complete_inputs: Vec = s.inputs.iter() + .filter(|(_, inp)| inp.is_complete()).map(|(id, _)| *id).collect(); + let incomplete_inputs: Vec = s.inputs.iter() + .filter(|(_, inp)| !inp.is_complete()).map(|(id, _)| *id).collect(); + format!("stage {id}: UnResolved(complete_inputs={complete_inputs:?}, incomplete_inputs={incomplete_inputs:?}, resolvable={})", s.resolvable()) + } + ExecutionStage::Resolved(_) => format!("stage {id}: Resolved"), + ExecutionStage::Running(s) => format!("stage {id}: Running(available_tasks={}, is_successful={})", s.available_tasks(), s.is_successful()), + ExecutionStage::Successful(_) => format!("stage {id}: Successful"), + ExecutionStage::Failed(_) => format!("stage {id}: Failed"), + } + }).collect(); + info!( + "Job {job_id} no terminal event emitted (not failed, not successful, no newly resolved). Stage states: [{}]", + stage_summary.join(", ") + ); } Ok((events, stage_metrics)) } @@ -509,6 +544,11 @@ impl StaticExecutionGraph { ) -> Result> { let mut resolved_stages = vec![]; let job_id = &self.job_id; + info!( + "Job {job_id} update_stage_output_links: stage_id={stage_id}, is_completed={is_completed}, \ + num_locations={}, output_links={output_links:?}", + locations.len() + ); if output_links.is_empty() { // If `output_links` is empty, then this is a final stage self.output_locations.extend(locations); @@ -528,7 +568,20 @@ impl StaticExecutionGraph { } // If all input partitions are ready, we can resolve any UnresolvedShuffleExec in the parent stage plan - if linked_unresolved_stage.resolvable() { + let resolvable = linked_unresolved_stage.resolvable(); + info!( + "Job {job_id} stage {link} (child of {stage_id}): input complete={is_completed}, resolvable={resolvable}, \ + inputs_status=[{}]", + linked_unresolved_stage + .inputs + .iter() + .map(|(id, inp)| { + format!("{id}:complete={}", inp.is_complete()) + }) + .collect::>() + .join(", ") + ); + if resolvable { resolved_stages.push(linked_unresolved_stage.stage_id); } } else { diff --git a/ballista/scheduler/src/state/task_manager.rs b/ballista/scheduler/src/state/task_manager.rs index de0a4dd6a5..d78311a652 100644 --- a/ballista/scheduler/src/state/task_manager.rs +++ b/ballista/scheduler/src/state/task_manager.rs @@ -549,19 +549,26 @@ impl TaskManager /// Updates the job state and returns the number of new available tasks. pub async fn update_job(&self, job_id: &str) -> Result { - debug!("Update active job {job_id}"); + info!("Update active job {job_id}"); if let Some(graph) = self.get_active_execution_graph(job_id) { let mut graph = graph.write().await; let curr_available_tasks = graph.available_tasks(); + info!("Job {job_id} before revive: available_tasks={curr_available_tasks}"); graph.revive(); + let new_available_tasks = graph.available_tasks(); + info!( + "Job {job_id} after revive: available_tasks={new_available_tasks} (new={})", + new_available_tasks - curr_available_tasks + ); + info!("Saving job with status {:?}", graph.status()); self.state.save_job(job_id, &graph).await?; - let new_tasks = graph.available_tasks() - curr_available_tasks; + let new_tasks = new_available_tasks - curr_available_tasks; Ok(new_tasks) } else { From e1d5a33c1c30a7a14cbbabffe022fc65a8bbb431 Mon Sep 17 00:00:00 2001 From: peasee <98815791+peasee@users.noreply.github.com> Date: Wed, 22 Apr 2026 18:06:43 +1000 Subject: [PATCH 03/19] wip --- .../src/execution_plans/shuffle_reader.rs | 22 +++++- .../src/execution_plans/shuffle_writer.rs | 7 ++ ballista/scheduler/src/cluster/mod.rs | 18 ++++- .../scheduler_server/query_stage_scheduler.rs | 18 +++-- .../scheduler/src/state/execution_graph.rs | 77 +++++++++++-------- ballista/scheduler/src/state/mod.rs | 28 ++++++- 6 files changed, 126 insertions(+), 44 deletions(-) diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs b/ballista/core/src/execution_plans/shuffle_reader.rs index ed4924e459..53746d0f21 100644 --- a/ballista/core/src/execution_plans/shuffle_reader.rs +++ b/ballista/core/src/execution_plans/shuffle_reader.rs @@ -66,7 +66,7 @@ use crate::error::BallistaError; use datafusion::execution::context::TaskContext; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use itertools::Itertools; -use log::{debug, error, trace}; +use log::{debug, error, info, trace}; use rand::prelude::SliceRandom; use rand::rng; use tokio::sync::{Semaphore, mpsc}; @@ -170,7 +170,10 @@ impl ExecutionPlan for ShuffleReaderExec { context: Arc, ) -> Result { let task_id = context.task_id().unwrap_or_else(|| partition.to_string()); - debug!("ShuffleReaderExec::execute({task_id})"); + info!("ShuffleReaderExec::execute({task_id}) partition={partition}, num_locations={}, stage_id={}", + self.partition.get(partition).map(|p| p.len()).unwrap_or(0), + self.stage_id, + ); let config = context.session_config(); @@ -491,6 +494,14 @@ fn send_fetch_partitions( locations.object_store.len(), locations.remote.len() ); + info!( + "send_fetch_partitions: {} total locations (memory={}, local={}, object_store={}, remote={})", + locations.memory.len() + locations.local.len() + locations.object_store.len() + locations.remote.len(), + locations.memory.len(), + locations.local.len(), + locations.object_store.len(), + locations.remote.len() + ); // Read memory partitions first (fastest path) let response_sender_m = response_sender.clone(); @@ -697,10 +708,13 @@ async fn fetch_partition_remote( ) -> result::Result { let metadata = &location.executor_meta; let partition_id = &location.partition_id; - // TODO for shuffle client connections, we should avoid creating new connections again and again. - // And we should also avoid to keep alive too many connections for long time. let host = metadata.host.as_str(); let port = metadata.port; + info!( + "fetch_partition_remote: fetching {}/{}/{} from {}:{}", + partition_id.job_id, partition_id.stage_id, partition_id.partition_id, + host, port + ); let mut ballista_client = BallistaClient::try_new( host, port, diff --git a/ballista/core/src/execution_plans/shuffle_writer.rs b/ballista/core/src/execution_plans/shuffle_writer.rs index a4ee4c3927..e802aa3baa 100644 --- a/ballista/core/src/execution_plans/shuffle_writer.rs +++ b/ballista/core/src/execution_plans/shuffle_writer.rs @@ -339,7 +339,14 @@ impl ShuffleWriterExec { async move { let now = Instant::now(); + info!( + "ShuffleWriter {job_id}/{stage_id} partition {input_partition}: creating execution stream" + ); let mut stream = plan.execute(input_partition, context)?; + info!( + "ShuffleWriter {job_id}/{stage_id} partition {input_partition}: stream created in {:.2}s, starting write (memory={use_memory}, object_store={use_object_store})", + now.elapsed().as_secs_f64() + ); if use_memory { // Use in-memory shuffle storage with configurable format diff --git a/ballista/scheduler/src/cluster/mod.rs b/ballista/scheduler/src/cluster/mod.rs index bec60565a4..827b1db184 100644 --- a/ballista/scheduler/src/cluster/mod.rs +++ b/ballista/scheduler/src/cluster/mod.rs @@ -28,7 +28,7 @@ use datafusion::error::DataFusionError; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::{SessionConfig, SessionContext}; use futures::Stream; -use log::debug; +use log::{debug, info}; use ballista_core::consistent_hash::ConsistentHash; use ballista_core::error::Result; @@ -543,11 +543,23 @@ pub(crate) async fn bind_task_round_robin( let mut result = BindingResult::new(); let mut total_slots = slots.iter().fold(0, |acc, s| acc + s.slots); + let slot_detail: Vec = slots + .iter() + .map(|s| format!("{}={}", s.executor_id, s.slots)) + .collect(); if total_slots == 0 { - debug!("Not enough available executor slots for task running!!!"); + info!( + "bind_task_round_robin: 0 available executor slots across {} executors [{}]", + slots.len(), + slot_detail.join(", ") + ); return result; } - debug!("Total slot number is {total_slots}"); + info!( + "bind_task_round_robin: {total_slots} available slots across {} executors [{}]", + slots.len(), + slot_detail.join(", ") + ); // Sort the slots by descending order slots.sort_by(|a, b| Ord::cmp(&b.slots, &a.slots)); diff --git a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs index 8677de4f61..d3beee2b70 100644 --- a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs +++ b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs @@ -376,12 +376,17 @@ impl .await { Ok(stage_events) => { - info!( - "TaskUpdating from executor {executor_id}: {num_status} tasks processed, \ - {} stage events emitted: {:?}", - stage_events.len(), - stage_events.iter().map(|e| format!("{e:?}")).collect::>() - ); + if !stage_events.is_empty() { + info!( + "TaskUpdating from executor {executor_id}: {num_status} tasks processed, \ + {} stage events emitted: {:?}", + stage_events.len(), + stage_events + .iter() + .map(|e| format!("{e:?}")) + .collect::>() + ); + } if self.state.config.is_push_staged_scheduling() { event_sender @@ -409,6 +414,7 @@ impl } } QueryStageSchedulerEvent::ReviveOffers => { + trace!("Processing ReviveOffers event"); self.state.revive_offers(event_sender).await?; } QueryStageSchedulerEvent::ExecutorLost(executor_id, _) => { diff --git a/ballista/scheduler/src/state/execution_graph.rs b/ballista/scheduler/src/state/execution_graph.rs index 4454b39d80..1d9b0592c4 100644 --- a/ballista/scheduler/src/state/execution_graph.rs +++ b/ballista/scheduler/src/state/execution_graph.rs @@ -397,18 +397,25 @@ impl StaticExecutionGraph { let mut job_err_msg = "".to_owned(); let mut stage_metrics = StageMetricsInfo::default(); - info!( - "Job {job_id} processing_stages_update: resolved_stages={:?}, successful_stages={:?}, \ - failed_stages={:?}, rollback_running_stages={:?}, resubmit_successful_stages={:?}", - updated_stages.resolved_stages, - updated_stages.successful_stages, - updated_stages.failed_stages.keys().collect::>(), - updated_stages - .rollback_running_stages - .keys() - .collect::>(), - updated_stages.resubmit_successful_stages, - ); + let has_activity = !updated_stages.resolved_stages.is_empty() + || !updated_stages.successful_stages.is_empty() + || !updated_stages.failed_stages.is_empty() + || !updated_stages.rollback_running_stages.is_empty() + || !updated_stages.resubmit_successful_stages.is_empty(); + if has_activity { + info!( + "Job {job_id} processing_stages_update: resolved_stages={:?}, successful_stages={:?}, \ + failed_stages={:?}, rollback_running_stages={:?}, resubmit_successful_stages={:?}", + updated_stages.resolved_stages, + updated_stages.successful_stages, + updated_stages.failed_stages.keys().collect::>(), + updated_stages + .rollback_running_stages + .keys() + .collect::>(), + updated_stages.resubmit_successful_stages, + ); + } for stage_id in updated_stages.resolved_stages { self.resolve_stage(stage_id)?; @@ -526,7 +533,7 @@ impl StaticExecutionGraph { ExecutionStage::Failed(_) => format!("stage {id}: Failed"), } }).collect(); - info!( + debug!( "Job {job_id} no terminal event emitted (not failed, not successful, no newly resolved). Stage states: [{}]", stage_summary.join(", ") ); @@ -544,11 +551,19 @@ impl StaticExecutionGraph { ) -> Result> { let mut resolved_stages = vec![]; let job_id = &self.job_id; - info!( - "Job {job_id} update_stage_output_links: stage_id={stage_id}, is_completed={is_completed}, \ - num_locations={}, output_links={output_links:?}", - locations.len() - ); + if is_completed { + info!( + "Job {job_id} update_stage_output_links: stage_id={stage_id}, is_completed={is_completed}, \ + num_locations={}, output_links={output_links:?}", + locations.len() + ); + } else { + debug!( + "Job {job_id} update_stage_output_links: stage_id={stage_id}, is_completed={is_completed}, \ + num_locations={}, output_links={output_links:?}", + locations.len() + ); + } if output_links.is_empty() { // If `output_links` is empty, then this is a final stage self.output_locations.extend(locations); @@ -569,18 +584,20 @@ impl StaticExecutionGraph { // If all input partitions are ready, we can resolve any UnresolvedShuffleExec in the parent stage plan let resolvable = linked_unresolved_stage.resolvable(); - info!( - "Job {job_id} stage {link} (child of {stage_id}): input complete={is_completed}, resolvable={resolvable}, \ - inputs_status=[{}]", - linked_unresolved_stage - .inputs - .iter() - .map(|(id, inp)| { - format!("{id}:complete={}", inp.is_complete()) - }) - .collect::>() - .join(", ") - ); + if is_completed || resolvable { + info!( + "Job {job_id} stage {link} (child of {stage_id}): input complete={is_completed}, resolvable={resolvable}, \ + inputs_status=[{}]", + linked_unresolved_stage + .inputs + .iter() + .map(|(id, inp)| { + format!("{id}:complete={}", inp.is_complete()) + }) + .collect::>() + .join(", ") + ); + } if resolvable { resolved_stages.push(linked_unresolved_stage.stage_id); } diff --git a/ballista/scheduler/src/state/mod.rs b/ballista/scheduler/src/state/mod.rs index b297a89cfb..10311cdd49 100644 --- a/ballista/scheduler/src/state/mod.rs +++ b/ballista/scheduler/src/state/mod.rs @@ -209,9 +209,35 @@ impl SchedulerState> = + std::collections::HashMap::new(); + for (executor_id, task) in &binding_result.bound_tasks { + summary + .entry(executor_id.clone()) + .or_default() + .push(format!( + "{}/{}/{}", + task.partition.job_id, + task.partition.stage_id, + task.partition.partition_id + )); + } + summary + .into_iter() + .map(|(exe, tasks)| format!("{exe}: {tasks:?}")) + .collect::>() + .join(", ") + } + ); // Record shuffle affinity metrics for affinity in &binding_result.shuffle_affinity { From ee23c42b72b23981b8d560011d8b3800388d0cd6 Mon Sep 17 00:00:00 2001 From: peasee <98815791+peasee@users.noreply.github.com> Date: Thu, 23 Apr 2026 09:27:53 +1000 Subject: [PATCH 04/19] wip --- .../src/execution_plans/shuffle_reader.rs | 34 ++++++++++++-- .../src/execution_plans/shuffle_writer.rs | 47 +++++++++++++++++-- ballista/executor/src/executor_server.rs | 5 +- 3 files changed, 77 insertions(+), 9 deletions(-) diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs b/ballista/core/src/execution_plans/shuffle_reader.rs index 53746d0f21..8e80b8db51 100644 --- a/ballista/core/src/execution_plans/shuffle_reader.rs +++ b/ballista/core/src/execution_plans/shuffle_reader.rs @@ -170,7 +170,8 @@ impl ExecutionPlan for ShuffleReaderExec { context: Arc, ) -> Result { let task_id = context.task_id().unwrap_or_else(|| partition.to_string()); - info!("ShuffleReaderExec::execute({task_id}) partition={partition}, num_locations={}, stage_id={}", + info!( + "ShuffleReaderExec::execute({task_id}) partition={partition}, num_locations={}, stage_id={}", self.partition.get(partition).map(|p| p.len()).unwrap_or(0), self.stage_id, ); @@ -496,7 +497,10 @@ fn send_fetch_partitions( ); info!( "send_fetch_partitions: {} total locations (memory={}, local={}, object_store={}, remote={})", - locations.memory.len() + locations.local.len() + locations.object_store.len() + locations.remote.len(), + locations.memory.len() + + locations.local.len() + + locations.object_store.len() + + locations.remote.len(), locations.memory.len(), locations.local.len(), locations.object_store.len(), @@ -522,8 +526,18 @@ fn send_fetch_partitions( let customize_endpoint_c = customize_endpoint.clone(); let metrics_callback_c = metrics_callback.clone(); let local_locations = locations.local; + let local_count = local_locations.len(); spawned_tasks.push(SpawnedTask::spawn(async move { - for p in local_locations { + for (i, p) in local_locations.into_iter().enumerate() { + info!( + "fetch_local[{}/{}]: reading {}/{}/{} from {}", + i + 1, + local_count, + p.partition_id.job_id, + p.partition_id.stage_id, + p.partition_id.partition_id, + p.path + ); let start_time = std::time::Instant::now(); let r = PartitionReaderEnum::Local .fetch_partition( @@ -534,6 +548,17 @@ fn send_fetch_partitions( use_tls, ) .await; + let ok = r.is_ok(); + info!( + "fetch_local[{}/{}]: {}/{}/{} completed in {:.3}s, ok={}", + i + 1, + local_count, + p.partition_id.job_id, + p.partition_id.stage_id, + p.partition_id.partition_id, + start_time.elapsed().as_secs_f64(), + ok + ); // Record local read metrics if callback is set and read succeeded if r.is_ok() @@ -712,8 +737,7 @@ async fn fetch_partition_remote( let port = metadata.port; info!( "fetch_partition_remote: fetching {}/{}/{} from {}:{}", - partition_id.job_id, partition_id.stage_id, partition_id.partition_id, - host, port + partition_id.job_id, partition_id.stage_id, partition_id.partition_id, host, port ); let mut ballista_client = BallistaClient::try_new( host, diff --git a/ballista/core/src/execution_plans/shuffle_writer.rs b/ballista/core/src/execution_plans/shuffle_writer.rs index e802aa3baa..0f4dcb1872 100644 --- a/ballista/core/src/execution_plans/shuffle_writer.rs +++ b/ballista/core/src/execution_plans/shuffle_writer.rs @@ -68,7 +68,7 @@ use datafusion::arrow::error::ArrowError; use datafusion::execution::context::TaskContext; use datafusion::physical_plan::repartition::BatchPartitioner; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; -use log::{debug, info}; +use log::{debug, info, warn}; use super::shuffle_writer_trait::ShuffleWriter; @@ -348,7 +348,30 @@ impl ShuffleWriterExec { now.elapsed().as_secs_f64() ); - if use_memory { + // Watchdog: log periodically if no progress is made + let watchdog_job_id = job_id.clone(); + let watchdog_flag = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let watchdog_flag_clone = watchdog_flag.clone(); + let _watchdog = tokio::task::spawn(async move { + let mut interval = + tokio::time::interval(std::time::Duration::from_secs(30)); + interval.tick().await; // skip first immediate tick + loop { + interval.tick().await; + if watchdog_flag_clone.load(std::sync::atomic::Ordering::Relaxed) { + break; + } + warn!( + "ShuffleWriter {}/{} partition {}: STALLED - no first batch received after {:.0}s", + watchdog_job_id, + stage_id, + input_partition, + now.elapsed().as_secs_f64() + ); + } + }); + + let result = if use_memory { // Use in-memory shuffle storage with configurable format Self::execute_shuffle_write_memory( &job_id, @@ -393,7 +416,9 @@ impl ShuffleWriterExec { file_ext, ) .await - } + }; + watchdog_flag.store(true, std::sync::atomic::Ordering::Relaxed); + result } } @@ -470,9 +495,25 @@ impl ShuffleWriterExec { )?; let schema = stream.schema(); + let mut batch_count: u64 = 0; + let mut total_rows: u64 = 0; while let Some(result) = stream.next().await { let input_batch = result?; + batch_count += 1; + total_rows += input_batch.num_rows() as u64; + if batch_count == 1 { + info!( + "ShuffleWriter partition {input_partition}: received first batch ({} rows) after {:.2}s", + input_batch.num_rows(), + now.elapsed().as_secs_f64() + ); + } else if batch_count % 100 == 0 { + info!( + "ShuffleWriter partition {input_partition}: processed {batch_count} batches ({total_rows} rows) in {:.2}s", + now.elapsed().as_secs_f64() + ); + } write_metrics.input_rows.add(input_batch.num_rows()); diff --git a/ballista/executor/src/executor_server.rs b/ballista/executor/src/executor_server.rs index d1c3cf44b6..884e751f04 100644 --- a/ballista/executor/src/executor_server.rs +++ b/ballista/executor/src/executor_server.rs @@ -606,7 +606,10 @@ impl TaskRunnerPool Date: Thu, 23 Apr 2026 10:12:16 +1000 Subject: [PATCH 05/19] wip --- .../src/execution_plans/shuffle_reader.rs | 91 ++++++++++++++++++- .../src/execution_plans/shuffle_writer.rs | 26 +++++- 2 files changed, 113 insertions(+), 4 deletions(-) diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs b/ballista/core/src/execution_plans/shuffle_reader.rs index 8e80b8db51..dda11cdd98 100644 --- a/ballista/core/src/execution_plans/shuffle_reader.rs +++ b/ballista/core/src/execution_plans/shuffle_reader.rs @@ -66,12 +66,93 @@ use crate::error::BallistaError; use datafusion::execution::context::TaskContext; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use itertools::Itertools; -use log::{debug, error, info, trace}; +use log::{debug, error, info, trace, warn}; use rand::prelude::SliceRandom; use rand::rng; use tokio::sync::{Semaphore, mpsc}; use tokio_stream::wrappers::ReceiverStream; +/// A stream wrapper that logs when it is first polled and when it first yields data. +/// This is used to diagnose hangs in the execution pipeline: if "first_poll" appears +/// but "first_batch" doesn't, the inner stream is stuck. +struct InstrumentedStream { + inner: SendableRecordBatchStream, + label: String, + first_poll_logged: bool, + first_batch_logged: bool, + poll_count: u64, + start: std::time::Instant, +} + +impl InstrumentedStream { + fn new(inner: SendableRecordBatchStream, label: String) -> Self { + Self { + inner, + label, + first_poll_logged: false, + first_batch_logged: false, + poll_count: 0, + start: std::time::Instant::now(), + } + } +} + +impl Stream for InstrumentedStream { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = &mut *self; + if !this.first_poll_logged { + this.first_poll_logged = true; + info!("InstrumentedStream({}): first poll", this.label); + } + this.poll_count += 1; + + let result = this.inner.as_mut().poll_next(cx); + match &result { + Poll::Ready(Some(Ok(batch))) => { + if !this.first_batch_logged { + this.first_batch_logged = true; + info!( + "InstrumentedStream({}): first batch ({} rows) after {:.3}s, {} polls", + this.label, + batch.num_rows(), + this.start.elapsed().as_secs_f64(), + this.poll_count + ); + } + } + Poll::Ready(Some(Err(e))) => { + warn!( + "InstrumentedStream({}): error after {:.3}s: {}", + this.label, + this.start.elapsed().as_secs_f64(), + e + ); + } + Poll::Ready(None) => { + info!( + "InstrumentedStream({}): stream ended after {:.3}s, {} polls", + this.label, + this.start.elapsed().as_secs_f64(), + this.poll_count + ); + } + Poll::Pending => {} + } + result + } +} + +impl RecordBatchStream for InstrumentedStream { + fn schema(&self) -> SchemaRef { + self.inner.schema() + } +} + /// ShuffleReaderExec reads partitions that have already been materialized by a ShuffleWriterExec /// being executed by an executor #[derive(Debug, Clone)] @@ -231,8 +312,13 @@ impl ExecutionPlan for ShuffleReaderExec { response_receiver.try_flatten(), )); - Ok(Box::pin(CoalescedShuffleReaderStream::new( + let instrumented = Box::pin(InstrumentedStream::new( input_stream, + format!("ShuffleReader({task_id}/stg{}/p{partition})", self.stage_id), + )); + + Ok(Box::pin(CoalescedShuffleReaderStream::new( + instrumented, batch_size, None, &self.metrics, @@ -528,6 +614,7 @@ fn send_fetch_partitions( let local_locations = locations.local; let local_count = local_locations.len(); spawned_tasks.push(SpawnedTask::spawn(async move { + info!("fetch_local_task: STARTED, {local_count} local files to read"); for (i, p) in local_locations.into_iter().enumerate() { info!( "fetch_local[{}/{}]: reading {}/{}/{} from {}", diff --git a/ballista/core/src/execution_plans/shuffle_writer.rs b/ballista/core/src/execution_plans/shuffle_writer.rs index 0f4dcb1872..468c087abe 100644 --- a/ballista/core/src/execution_plans/shuffle_writer.rs +++ b/ballista/core/src/execution_plans/shuffle_writer.rs @@ -361,12 +361,31 @@ impl ShuffleWriterExec { if watchdog_flag_clone.load(std::sync::atomic::Ordering::Relaxed) { break; } + // Probe: spawn a trivial task to check runtime responsiveness + let probe_start = std::time::Instant::now(); + let probe = tokio::task::spawn(async { 42u64 }); + let probe_ok = match tokio::time::timeout( + std::time::Duration::from_secs(5), + probe, + ) + .await + { + Ok(Ok(_)) => { + format!( + "ok in {:.3}ms", + probe_start.elapsed().as_secs_f64() * 1000.0 + ) + } + Ok(Err(e)) => format!("join error: {e}"), + Err(_) => "TIMEOUT (5s) - runtime may be starved!".to_string(), + }; warn!( - "ShuffleWriter {}/{} partition {}: STALLED - no first batch received after {:.0}s", + "ShuffleWriter {}/{} partition {}: STALLED - no first batch received after {:.0}s (runtime probe: {})", watchdog_job_id, stage_id, input_partition, - now.elapsed().as_secs_f64() + now.elapsed().as_secs_f64(), + probe_ok, ); } }); @@ -498,6 +517,9 @@ impl ShuffleWriterExec { let mut batch_count: u64 = 0; let mut total_rows: u64 = 0; + info!( + "ShuffleWriter partition {input_partition}: entering write loop, about to poll stream for first batch" + ); while let Some(result) = stream.next().await { let input_batch = result?; batch_count += 1; From c078cae8db3bd41c05e22bf47f0cc2236fb2b536 Mon Sep 17 00:00:00 2001 From: peasee <98815791+peasee@users.noreply.github.com> Date: Thu, 23 Apr 2026 10:57:10 +1000 Subject: [PATCH 06/19] wip --- .../src/execution_plans/shuffle_writer.rs | 125 ++++++++++++++++++ 1 file changed, 125 insertions(+) diff --git a/ballista/core/src/execution_plans/shuffle_writer.rs b/ballista/core/src/execution_plans/shuffle_writer.rs index 468c087abe..e85c72b850 100644 --- a/ballista/core/src/execution_plans/shuffle_writer.rs +++ b/ballista/core/src/execution_plans/shuffle_writer.rs @@ -72,6 +72,122 @@ use log::{debug, info, warn}; use super::shuffle_writer_trait::ShuffleWriter; +/// Wraps an ExecutionPlan tree to log every `execute()` call with the node name and partition. +/// This helps diagnose which nodes in a complex plan are/aren't being executed. +#[derive(Debug)] +struct TracingExec { + inner: Arc, + label: String, + children: Vec>, +} + +impl TracingExec { + /// Wrap an entire plan tree with tracing. Each node gets a label like "depth.index: NodeName". + fn wrap( + plan: Arc, + job_id: &str, + stage_id: usize, + ) -> Arc { + Self::wrap_recursive(plan, job_id, stage_id, 0) + } + + fn wrap_recursive( + plan: Arc, + job_id: &str, + stage_id: usize, + depth: usize, + ) -> Arc { + let children: Vec> = plan + .children() + .into_iter() + .enumerate() + .map(|(_i, child)| { + Self::wrap_recursive(Arc::clone(child), job_id, stage_id, depth + 1) + }) + .collect(); + + let name = plan.name().to_string(); + let label = format!("{job_id}/{stage_id} d{depth} {name}"); + + Arc::new(TracingExec { + inner: plan, + label, + children, + }) + } +} + +impl DisplayAs for TracingExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + self.inner.fmt_as(t, f) + } +} + +impl ExecutionPlan for TracingExec { + fn name(&self) -> &str { + self.inner.name() + } + + fn as_any(&self) -> &dyn Any { + self.inner.as_any() + } + + fn schema(&self) -> SchemaRef { + self.inner.schema() + } + + fn properties(&self) -> &PlanProperties { + self.inner.properties() + } + + fn children(&self) -> Vec<&Arc> { + self.children.iter().collect() + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + Ok(Arc::new(TracingExec { + inner: Arc::clone(&self.inner), + label: self.label.clone(), + children, + })) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + info!( + "TracingExec::execute({}) partition={}", + self.label, partition + ); + // Replace children on inner plan with our traced children, then execute + let rebuilt = if self.children.is_empty() { + Arc::clone(&self.inner) + } else { + self.inner + .clone() + .with_new_children(self.children.clone())? + }; + rebuilt.execute(partition, context) + } + + fn metrics(&self) -> Option { + self.inner.metrics() + } + + fn statistics(&self) -> Result { + self.inner.statistics() + } +} + /// ShuffleWriterExec represents a section of a query plan that has consistent partitioning and /// can be executed as one unit with each partition being executed in parallel. The output of each /// partition is re-partitioned and streamed to disk in Arrow IPC format. Future stages of the query @@ -339,6 +455,15 @@ impl ShuffleWriterExec { async move { let now = Instant::now(); + // Wrap plan with tracing to log every execute() call + let plan = TracingExec::wrap(plan, &job_id, stage_id); + // Log the plan tree once (partition 0 only) to help debug execution issues + if input_partition == 0 { + info!( + "ShuffleWriter {job_id}/{stage_id} plan tree:\n{}", + datafusion::physical_plan::displayable(plan.as_ref()).indent(true) + ); + } info!( "ShuffleWriter {job_id}/{stage_id} partition {input_partition}: creating execution stream" ); From 4966e409a3b77ae45f2c0d477902c7d7c40ce80f Mon Sep 17 00:00:00 2001 From: peasee <98815791+peasee@users.noreply.github.com> Date: Thu, 23 Apr 2026 11:11:44 +1000 Subject: [PATCH 07/19] wip --- .../core/src/execution_plans/shuffle_reader.rs | 18 +++++++++--------- .../core/src/execution_plans/shuffle_writer.rs | 10 +++++----- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs b/ballista/core/src/execution_plans/shuffle_reader.rs index dda11cdd98..dac1bd8311 100644 --- a/ballista/core/src/execution_plans/shuffle_reader.rs +++ b/ballista/core/src/execution_plans/shuffle_reader.rs @@ -107,7 +107,7 @@ impl Stream for InstrumentedStream { let this = &mut *self; if !this.first_poll_logged { this.first_poll_logged = true; - info!("InstrumentedStream({}): first poll", this.label); + debug!("InstrumentedStream({}): first poll", this.label); } this.poll_count += 1; @@ -116,7 +116,7 @@ impl Stream for InstrumentedStream { Poll::Ready(Some(Ok(batch))) => { if !this.first_batch_logged { this.first_batch_logged = true; - info!( + debug!( "InstrumentedStream({}): first batch ({} rows) after {:.3}s, {} polls", this.label, batch.num_rows(), @@ -134,7 +134,7 @@ impl Stream for InstrumentedStream { ); } Poll::Ready(None) => { - info!( + debug!( "InstrumentedStream({}): stream ended after {:.3}s, {} polls", this.label, this.start.elapsed().as_secs_f64(), @@ -251,7 +251,7 @@ impl ExecutionPlan for ShuffleReaderExec { context: Arc, ) -> Result { let task_id = context.task_id().unwrap_or_else(|| partition.to_string()); - info!( + debug!( "ShuffleReaderExec::execute({task_id}) partition={partition}, num_locations={}, stage_id={}", self.partition.get(partition).map(|p| p.len()).unwrap_or(0), self.stage_id, @@ -581,7 +581,7 @@ fn send_fetch_partitions( locations.object_store.len(), locations.remote.len() ); - info!( + debug!( "send_fetch_partitions: {} total locations (memory={}, local={}, object_store={}, remote={})", locations.memory.len() + locations.local.len() @@ -614,9 +614,9 @@ fn send_fetch_partitions( let local_locations = locations.local; let local_count = local_locations.len(); spawned_tasks.push(SpawnedTask::spawn(async move { - info!("fetch_local_task: STARTED, {local_count} local files to read"); + debug!("fetch_local_task: STARTED, {local_count} local files to read"); for (i, p) in local_locations.into_iter().enumerate() { - info!( + debug!( "fetch_local[{}/{}]: reading {}/{}/{} from {}", i + 1, local_count, @@ -636,7 +636,7 @@ fn send_fetch_partitions( ) .await; let ok = r.is_ok(); - info!( + debug!( "fetch_local[{}/{}]: {}/{}/{} completed in {:.3}s, ok={}", i + 1, local_count, @@ -822,7 +822,7 @@ async fn fetch_partition_remote( let partition_id = &location.partition_id; let host = metadata.host.as_str(); let port = metadata.port; - info!( + debug!( "fetch_partition_remote: fetching {}/{}/{} from {}:{}", partition_id.job_id, partition_id.stage_id, partition_id.partition_id, host, port ); diff --git a/ballista/core/src/execution_plans/shuffle_writer.rs b/ballista/core/src/execution_plans/shuffle_writer.rs index e85c72b850..28fc05922c 100644 --- a/ballista/core/src/execution_plans/shuffle_writer.rs +++ b/ballista/core/src/execution_plans/shuffle_writer.rs @@ -464,11 +464,11 @@ impl ShuffleWriterExec { datafusion::physical_plan::displayable(plan.as_ref()).indent(true) ); } - info!( + debug!( "ShuffleWriter {job_id}/{stage_id} partition {input_partition}: creating execution stream" ); let mut stream = plan.execute(input_partition, context)?; - info!( + debug!( "ShuffleWriter {job_id}/{stage_id} partition {input_partition}: stream created in {:.2}s, starting write (memory={use_memory}, object_store={use_object_store})", now.elapsed().as_secs_f64() ); @@ -642,7 +642,7 @@ impl ShuffleWriterExec { let mut batch_count: u64 = 0; let mut total_rows: u64 = 0; - info!( + debug!( "ShuffleWriter partition {input_partition}: entering write loop, about to poll stream for first batch" ); while let Some(result) = stream.next().await { @@ -650,13 +650,13 @@ impl ShuffleWriterExec { batch_count += 1; total_rows += input_batch.num_rows() as u64; if batch_count == 1 { - info!( + debug!( "ShuffleWriter partition {input_partition}: received first batch ({} rows) after {:.2}s", input_batch.num_rows(), now.elapsed().as_secs_f64() ); } else if batch_count % 100 == 0 { - info!( + debug!( "ShuffleWriter partition {input_partition}: processed {batch_count} batches ({total_rows} rows) in {:.2}s", now.elapsed().as_secs_f64() ); From b81643a2a98f95e313b3129eb2c81c4b688c0c17 Mon Sep 17 00:00:00 2001 From: peasee <98815791+peasee@users.noreply.github.com> Date: Thu, 23 Apr 2026 11:57:05 +1000 Subject: [PATCH 08/19] wip --- .../src/execution_plans/shuffle_writer.rs | 94 ++++++++++++++++++- 1 file changed, 92 insertions(+), 2 deletions(-) diff --git a/ballista/core/src/execution_plans/shuffle_writer.rs b/ballista/core/src/execution_plans/shuffle_writer.rs index 28fc05922c..32022319a1 100644 --- a/ballista/core/src/execution_plans/shuffle_writer.rs +++ b/ballista/core/src/execution_plans/shuffle_writer.rs @@ -66,12 +66,84 @@ use futures::{StreamExt, TryFutureExt, TryStreamExt}; use datafusion::arrow::error::ArrowError; use datafusion::execution::context::TaskContext; +use datafusion::physical_plan::RecordBatchStream; use datafusion::physical_plan::repartition::BatchPartitioner; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use futures::Stream; use log::{debug, info, warn}; +use std::pin::Pin; +use std::task::{Context, Poll}; use super::shuffle_writer_trait::ShuffleWriter; +/// Stream wrapper used by TracingExec to log data flow through each plan node. +/// Logs first_batch and stream_end to identify where data gets stuck. +struct TracingStream { + inner: SendableRecordBatchStream, + label: String, + partition: usize, + batch_count: u64, + row_count: u64, + first_batch_logged: bool, + start: Instant, +} + +impl Stream for TracingStream { + type Item = datafusion::error::Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = &mut *self; + let result = this.inner.as_mut().poll_next(cx); + match &result { + Poll::Ready(Some(Ok(batch))) => { + this.batch_count += 1; + this.row_count += batch.num_rows() as u64; + if !this.first_batch_logged { + this.first_batch_logged = true; + info!( + "TracingStream({} p={}): first_batch {} rows after {:.3}s", + this.label, + this.partition, + batch.num_rows(), + this.start.elapsed().as_secs_f64() + ); + } + } + Poll::Ready(Some(Err(e))) => { + warn!( + "TracingStream({} p={}): error after {:.3}s, {} batches: {}", + this.label, + this.partition, + this.start.elapsed().as_secs_f64(), + this.batch_count, + e + ); + } + Poll::Ready(None) => { + info!( + "TracingStream({} p={}): ended after {:.3}s, {} batches, {} rows", + this.label, + this.partition, + this.start.elapsed().as_secs_f64(), + this.batch_count, + this.row_count + ); + } + Poll::Pending => {} + } + result + } +} + +impl RecordBatchStream for TracingStream { + fn schema(&self) -> SchemaRef { + self.inner.schema() + } +} + /// Wraps an ExecutionPlan tree to log every `execute()` call with the node name and partition. /// This helps diagnose which nodes in a complex plan are/aren't being executed. #[derive(Debug)] @@ -164,7 +236,7 @@ impl ExecutionPlan for TracingExec { partition: usize, context: Arc, ) -> Result { - info!( + debug!( "TracingExec::execute({}) partition={}", self.label, partition ); @@ -176,7 +248,25 @@ impl ExecutionPlan for TracingExec { .clone() .with_new_children(self.children.clone())? }; - rebuilt.execute(partition, context) + let stream = rebuilt.execute(partition, context)?; + + // Wrap stream for non-ShuffleReaderExec nodes to trace data flow. + // ShuffleReaderExec is skipped because CoalescePartitionsExec spawns + // one per partition (44+), making it too noisy. + let name = self.inner.name(); + if name == "ShuffleReaderExec" { + Ok(stream) + } else { + Ok(Box::pin(TracingStream { + inner: stream, + label: self.label.clone(), + partition, + batch_count: 0, + row_count: 0, + first_batch_logged: false, + start: Instant::now(), + })) + } } fn metrics(&self) -> Option { From 4355e0eb00ddf64a28b6fb09e0865c38b6ed0619 Mon Sep 17 00:00:00 2001 From: peasee <98815791+peasee@users.noreply.github.com> Date: Thu, 23 Apr 2026 12:41:09 +1000 Subject: [PATCH 09/19] wip --- ballista/core/src/extension.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/ballista/core/src/extension.rs b/ballista/core/src/extension.rs index 4bf8b81b84..3f3c18771f 100644 --- a/ballista/core/src/extension.rs +++ b/ballista/core/src/extension.rs @@ -821,6 +821,15 @@ impl SessionConfigHelperExt for SessionConfig { "datafusion.optimizer.hash_join_single_partition_threshold_rows", 0, ) + // Dynamic filter pushdown for hash joins uses a tokio::sync::Barrier + // that expects ALL probe-side partitions to call report_build_data(). + // In Ballista's distributed execution, each task runs only ONE partition, + // so the barrier waits forever for the other partitions that never arrive. + // Disable until Ballista can handle the cross-partition synchronization. + .set_bool( + "datafusion.optimizer.enable_join_dynamic_filter_pushdown", + false, + ) } } From 7a77fb7d70fa05eb2c3539c5fd4af3941e47f7f4 Mon Sep 17 00:00:00 2001 From: peasee <98815791+peasee@users.noreply.github.com> Date: Thu, 23 Apr 2026 13:25:37 +1000 Subject: [PATCH 10/19] wip --- ballista/client/tests/context_checks.rs | 72 +++++++++++++++++++++++++ ballista/core/src/extension.rs | 9 ---- ballista/scheduler/src/state/mod.rs | 9 ++++ 3 files changed, 81 insertions(+), 9 deletions(-) diff --git a/ballista/client/tests/context_checks.rs b/ballista/client/tests/context_checks.rs index 0faf3b9b7d..a05feda022 100644 --- a/ballista/client/tests/context_checks.rs +++ b/ballista/client/tests/context_checks.rs @@ -1105,4 +1105,76 @@ mod supported { Ok(()) } + + /// Regression test: nested CollectLeft HashJoinExec with + /// CoalescePartitionsExec should not deadlock. + /// + /// This reproduces the pattern from TPC-H Q2 where a chain of + /// small-table inner joins (region→nation→supplier) is broadcast-joined + /// against a large partitioned table (partsupp). The scheduler enables + /// CollectLeft for inner joins under the broadcast threshold, and each + /// executor task runs exactly ONE partition. If any cross-partition + /// synchronisation (e.g. a tokio Barrier) is used in the build-side + /// completion path, it will deadlock because only one partition + /// participates per task. + #[rstest] + #[case::standalone(standalone_context())] + #[tokio::test] + async fn nested_collect_left_should_not_deadlock( + #[future(awt)] + #[case] + ctx: SessionContext, + test_data: String, + ) -> datafusion::error::Result<()> { + // Use alltypes_plain.parquet registered as 3 different tables + // to create a nested inner join query where the optimizer + // should choose CollectLeft for the small tables. + ctx.register_parquet( + "fact_table", + &format!("{test_data}/alltypes_plain.parquet"), + Default::default(), + ) + .await?; + + ctx.register_parquet( + "dim_a", + &format!("{test_data}/alltypes_plain.parquet"), + Default::default(), + ) + .await?; + + ctx.register_parquet( + "dim_b", + &format!("{test_data}/alltypes_plain.parquet"), + Default::default(), + ) + .await?; + + // Query with nested inner joins: dim_b → dim_a → fact_table + let result = tokio::time::timeout( + std::time::Duration::from_secs(120), + ctx.sql( + "SELECT f.id, a.int_col, b.string_col + FROM fact_table f + INNER JOIN dim_a a ON f.id = a.id + INNER JOIN dim_b b ON a.tinyint_col = b.tinyint_col + ORDER BY f.id + LIMIT 5", + ) + .await? + .collect(), + ) + .await + .expect("nested CollectLeft joins should complete within 120s, not deadlock"); + + let result = result?; + // Verify we got results + assert!(!result.is_empty(), "query should return results"); + assert!( + result[0].num_rows() > 0, + "query should return at least one row" + ); + + Ok(()) + } } diff --git a/ballista/core/src/extension.rs b/ballista/core/src/extension.rs index 3f3c18771f..4bf8b81b84 100644 --- a/ballista/core/src/extension.rs +++ b/ballista/core/src/extension.rs @@ -821,15 +821,6 @@ impl SessionConfigHelperExt for SessionConfig { "datafusion.optimizer.hash_join_single_partition_threshold_rows", 0, ) - // Dynamic filter pushdown for hash joins uses a tokio::sync::Barrier - // that expects ALL probe-side partitions to call report_build_data(). - // In Ballista's distributed execution, each task runs only ONE partition, - // so the barrier waits forever for the other partitions that never arrive. - // Disable until Ballista can handle the cross-partition synchronization. - .set_bool( - "datafusion.optimizer.enable_join_dynamic_filter_pushdown", - false, - ) } } diff --git a/ballista/scheduler/src/state/mod.rs b/ballista/scheduler/src/state/mod.rs index 10311cdd49..a1e9498f34 100644 --- a/ballista/scheduler/src/state/mod.rs +++ b/ballista/scheduler/src/state/mod.rs @@ -574,6 +574,15 @@ impl SchedulerState Date: Thu, 23 Apr 2026 13:50:16 +1000 Subject: [PATCH 11/19] wip --- ballista/scheduler/src/state/mod.rs | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/ballista/scheduler/src/state/mod.rs b/ballista/scheduler/src/state/mod.rs index a1e9498f34..c9d9e49160 100644 --- a/ballista/scheduler/src/state/mod.rs +++ b/ballista/scheduler/src/state/mod.rs @@ -602,12 +602,23 @@ impl SchedulerState| { if node.output_partitioning().partition_count() == 0 { let empty: Arc = From 5bd6d02f7c246d1fac114d707e77c3790e940f9d Mon Sep 17 00:00:00 2001 From: peasee <98815791+peasee@users.noreply.github.com> Date: Thu, 23 Apr 2026 14:08:58 +1000 Subject: [PATCH 12/19] wip --- ballista/scheduler/src/state/mod.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/ballista/scheduler/src/state/mod.rs b/ballista/scheduler/src/state/mod.rs index c9d9e49160..e661609613 100644 --- a/ballista/scheduler/src/state/mod.rs +++ b/ballista/scheduler/src/state/mod.rs @@ -612,11 +612,15 @@ impl SchedulerState| { From d7d7a81b7dd9b4f948220cefc328d091ff0d420f Mon Sep 17 00:00:00 2001 From: peasee <98815791+peasee@users.noreply.github.com> Date: Thu, 23 Apr 2026 14:30:25 +1000 Subject: [PATCH 13/19] wip --- ballista/scheduler/src/state/mod.rs | 31 +++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/ballista/scheduler/src/state/mod.rs b/ballista/scheduler/src/state/mod.rs index e661609613..548d1b7d08 100644 --- a/ballista/scheduler/src/state/mod.rs +++ b/ballista/scheduler/src/state/mod.rs @@ -48,6 +48,7 @@ use ballista_core::serde::protobuf::TaskStatus; use datafusion::logical_expr::LogicalPlan; use datafusion::physical_plan::display::DisplayableExecutionPlan; use datafusion::physical_plan::empty::EmptyExec; +use datafusion::physical_plan::joins::HashJoinExec; use datafusion::prelude::SessionContext; use datafusion_proto::logical_plan::AsLogicalPlan; use datafusion_proto::physical_plan::AsExecutionPlan; @@ -624,6 +625,36 @@ impl SchedulerState| { + // Strip dynamic-filter accumulators from HashJoinExec nodes. + // Custom DataFusion builds may inject SharedBuildAccumulator + // with a cross-partition Barrier that deadlocks in Ballista + // (each task runs one partition, so the barrier never completes). + // Reconstructing via try_new() produces a clean node without + // the accumulator. + if let Some(hash_join) = node.as_any().downcast_ref::() { + let display = DisplayableExecutionPlan::new(node.as_ref()) + .one_line() + .to_string(); + if display.contains("accumulator") { + info!( + "Job {job_id}: stripping dynamic-filter accumulator from {display}" + ); + let rebuilt: Arc = Arc::new( + HashJoinExec::try_new( + Arc::clone(hash_join.left()), + Arc::clone(hash_join.right()), + hash_join.on().to_vec(), + hash_join.filter().cloned(), + hash_join.join_type(), + hash_join.projection.clone(), + *hash_join.partition_mode(), + hash_join.null_equality(), + ) + .map_err(|e| DataFusionError::External(Box::new(e)))?, + ); + return Ok(Transformed::yes(rebuilt)); + } + } if node.output_partitioning().partition_count() == 0 { let empty: Arc = Arc::new(EmptyExec::new(node.schema())); From 2684dcdc19e88adf51353b629439dbda32a82734 Mon Sep 17 00:00:00 2001 From: peasee <98815791+peasee@users.noreply.github.com> Date: Thu, 23 Apr 2026 14:56:17 +1000 Subject: [PATCH 14/19] wip --- .../src/execution_plans/shuffle_writer.rs | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/ballista/core/src/execution_plans/shuffle_writer.rs b/ballista/core/src/execution_plans/shuffle_writer.rs index 32022319a1..d66f05cb56 100644 --- a/ballista/core/src/execution_plans/shuffle_writer.rs +++ b/ballista/core/src/execution_plans/shuffle_writer.rs @@ -58,6 +58,8 @@ use datafusion::physical_plan::metrics::{ self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, }; +use datafusion::common::tree_node::{Transformed, TreeNode}; +use datafusion::physical_plan::joins::HashJoinExec; use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, SendableRecordBatchStream, Statistics, displayable, @@ -545,6 +547,34 @@ impl ShuffleWriterExec { async move { let now = Instant::now(); + // Strip dynamic-filter accumulators from HashJoinExec nodes. + // Custom DataFusion builds may re-inject SharedBuildAccumulator + // during deserialization. The cross-partition Barrier deadlocks + // in Ballista where each task runs a single partition. + let plan = plan.transform_down(&|node: Arc| { + if let Some(hj) = node.as_any().downcast_ref::() { + let disp = displayable(node.as_ref()).one_line().to_string(); + if disp.contains("accumulator") { + info!( + "ShuffleWriter {job_id}/{stage_id}: stripping accumulator from {disp}" + ); + let rebuilt: Arc = Arc::new( + HashJoinExec::try_new( + Arc::clone(hj.left()), + Arc::clone(hj.right()), + hj.on().to_vec(), + hj.filter().cloned(), + hj.join_type(), + hj.projection.clone(), + *hj.partition_mode(), + hj.null_equality(), + )? + ); + return Ok(Transformed::yes(rebuilt)); + } + } + Ok(Transformed::no(node)) + })?.data; // Wrap plan with tracing to log every execute() call let plan = TracingExec::wrap(plan, &job_id, stage_id); // Log the plan tree once (partition 0 only) to help debug execution issues From af764ed4104752719fed2d7ea10d21492af1436f Mon Sep 17 00:00:00 2001 From: peasee <98815791+peasee@users.noreply.github.com> Date: Thu, 23 Apr 2026 15:14:40 +1000 Subject: [PATCH 15/19] wip --- .../core/src/execution_plans/shuffle_writer.rs | 14 ++++++++++++-- ballista/scheduler/src/state/mod.rs | 14 ++++++++++++-- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/ballista/core/src/execution_plans/shuffle_writer.rs b/ballista/core/src/execution_plans/shuffle_writer.rs index d66f05cb56..084983aeb5 100644 --- a/ballista/core/src/execution_plans/shuffle_writer.rs +++ b/ballista/core/src/execution_plans/shuffle_writer.rs @@ -59,7 +59,8 @@ use datafusion::physical_plan::metrics::{ }; use datafusion::common::tree_node::{Transformed, TreeNode}; -use datafusion::physical_plan::joins::HashJoinExec; +use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; +use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode}; use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, SendableRecordBatchStream, Statistics, displayable, @@ -558,9 +559,18 @@ impl ShuffleWriterExec { info!( "ShuffleWriter {job_id}/{stage_id}: stripping accumulator from {disp}" ); + let left = Arc::clone(hj.left()); + let left: Arc = + if *hj.partition_mode() == PartitionMode::CollectLeft + && left.properties().output_partitioning().partition_count() > 1 + { + Arc::new(CoalescePartitionsExec::new(left)) + } else { + left + }; let rebuilt: Arc = Arc::new( HashJoinExec::try_new( - Arc::clone(hj.left()), + left, Arc::clone(hj.right()), hj.on().to_vec(), hj.filter().cloned(), diff --git a/ballista/scheduler/src/state/mod.rs b/ballista/scheduler/src/state/mod.rs index 548d1b7d08..38e53c8f6b 100644 --- a/ballista/scheduler/src/state/mod.rs +++ b/ballista/scheduler/src/state/mod.rs @@ -46,9 +46,10 @@ use ballista_core::event_loop::EventSender; use ballista_core::serde::BallistaCodec; use ballista_core::serde::protobuf::TaskStatus; use datafusion::logical_expr::LogicalPlan; +use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::display::DisplayableExecutionPlan; use datafusion::physical_plan::empty::EmptyExec; -use datafusion::physical_plan::joins::HashJoinExec; +use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode}; use datafusion::prelude::SessionContext; use datafusion_proto::logical_plan::AsLogicalPlan; use datafusion_proto::physical_plan::AsExecutionPlan; @@ -639,9 +640,18 @@ impl SchedulerState = + if *hash_join.partition_mode() == PartitionMode::CollectLeft + && left.properties().output_partitioning().partition_count() > 1 + { + Arc::new(CoalescePartitionsExec::new(left)) + } else { + left + }; let rebuilt: Arc = Arc::new( HashJoinExec::try_new( - Arc::clone(hash_join.left()), + left, Arc::clone(hash_join.right()), hash_join.on().to_vec(), hash_join.filter().cloned(), From 5b7b187469451cafdf139e5cf2cc183d71002833 Mon Sep 17 00:00:00 2001 From: peasee <98815791+peasee@users.noreply.github.com> Date: Thu, 23 Apr 2026 16:07:31 +1000 Subject: [PATCH 16/19] fix: Reconstruct HashJoinExec nodes --- .../src/execution_plans/shuffle_reader.rs | 137 +------ .../src/execution_plans/shuffle_writer.rs | 338 ++---------------- ballista/executor/src/executor_server.rs | 5 +- ballista/scheduler/src/cluster/mod.rs | 18 +- .../scheduler_server/query_stage_scheduler.rs | 131 ------- .../scheduler/src/state/execution_graph.rs | 302 +--------------- ballista/scheduler/src/state/mod.rs | 116 ++---- ballista/scheduler/src/state/task_manager.rs | 11 +- 8 files changed, 71 insertions(+), 987 deletions(-) diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs b/ballista/core/src/execution_plans/shuffle_reader.rs index dac1bd8311..ed4924e459 100644 --- a/ballista/core/src/execution_plans/shuffle_reader.rs +++ b/ballista/core/src/execution_plans/shuffle_reader.rs @@ -66,93 +66,12 @@ use crate::error::BallistaError; use datafusion::execution::context::TaskContext; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use itertools::Itertools; -use log::{debug, error, info, trace, warn}; +use log::{debug, error, trace}; use rand::prelude::SliceRandom; use rand::rng; use tokio::sync::{Semaphore, mpsc}; use tokio_stream::wrappers::ReceiverStream; -/// A stream wrapper that logs when it is first polled and when it first yields data. -/// This is used to diagnose hangs in the execution pipeline: if "first_poll" appears -/// but "first_batch" doesn't, the inner stream is stuck. -struct InstrumentedStream { - inner: SendableRecordBatchStream, - label: String, - first_poll_logged: bool, - first_batch_logged: bool, - poll_count: u64, - start: std::time::Instant, -} - -impl InstrumentedStream { - fn new(inner: SendableRecordBatchStream, label: String) -> Self { - Self { - inner, - label, - first_poll_logged: false, - first_batch_logged: false, - poll_count: 0, - start: std::time::Instant::now(), - } - } -} - -impl Stream for InstrumentedStream { - type Item = Result; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - let this = &mut *self; - if !this.first_poll_logged { - this.first_poll_logged = true; - debug!("InstrumentedStream({}): first poll", this.label); - } - this.poll_count += 1; - - let result = this.inner.as_mut().poll_next(cx); - match &result { - Poll::Ready(Some(Ok(batch))) => { - if !this.first_batch_logged { - this.first_batch_logged = true; - debug!( - "InstrumentedStream({}): first batch ({} rows) after {:.3}s, {} polls", - this.label, - batch.num_rows(), - this.start.elapsed().as_secs_f64(), - this.poll_count - ); - } - } - Poll::Ready(Some(Err(e))) => { - warn!( - "InstrumentedStream({}): error after {:.3}s: {}", - this.label, - this.start.elapsed().as_secs_f64(), - e - ); - } - Poll::Ready(None) => { - debug!( - "InstrumentedStream({}): stream ended after {:.3}s, {} polls", - this.label, - this.start.elapsed().as_secs_f64(), - this.poll_count - ); - } - Poll::Pending => {} - } - result - } -} - -impl RecordBatchStream for InstrumentedStream { - fn schema(&self) -> SchemaRef { - self.inner.schema() - } -} - /// ShuffleReaderExec reads partitions that have already been materialized by a ShuffleWriterExec /// being executed by an executor #[derive(Debug, Clone)] @@ -251,11 +170,7 @@ impl ExecutionPlan for ShuffleReaderExec { context: Arc, ) -> Result { let task_id = context.task_id().unwrap_or_else(|| partition.to_string()); - debug!( - "ShuffleReaderExec::execute({task_id}) partition={partition}, num_locations={}, stage_id={}", - self.partition.get(partition).map(|p| p.len()).unwrap_or(0), - self.stage_id, - ); + debug!("ShuffleReaderExec::execute({task_id})"); let config = context.session_config(); @@ -312,13 +227,8 @@ impl ExecutionPlan for ShuffleReaderExec { response_receiver.try_flatten(), )); - let instrumented = Box::pin(InstrumentedStream::new( - input_stream, - format!("ShuffleReader({task_id}/stg{}/p{partition})", self.stage_id), - )); - Ok(Box::pin(CoalescedShuffleReaderStream::new( - instrumented, + input_stream, batch_size, None, &self.metrics, @@ -581,17 +491,6 @@ fn send_fetch_partitions( locations.object_store.len(), locations.remote.len() ); - debug!( - "send_fetch_partitions: {} total locations (memory={}, local={}, object_store={}, remote={})", - locations.memory.len() - + locations.local.len() - + locations.object_store.len() - + locations.remote.len(), - locations.memory.len(), - locations.local.len(), - locations.object_store.len(), - locations.remote.len() - ); // Read memory partitions first (fastest path) let response_sender_m = response_sender.clone(); @@ -612,19 +511,8 @@ fn send_fetch_partitions( let customize_endpoint_c = customize_endpoint.clone(); let metrics_callback_c = metrics_callback.clone(); let local_locations = locations.local; - let local_count = local_locations.len(); spawned_tasks.push(SpawnedTask::spawn(async move { - debug!("fetch_local_task: STARTED, {local_count} local files to read"); - for (i, p) in local_locations.into_iter().enumerate() { - debug!( - "fetch_local[{}/{}]: reading {}/{}/{} from {}", - i + 1, - local_count, - p.partition_id.job_id, - p.partition_id.stage_id, - p.partition_id.partition_id, - p.path - ); + for p in local_locations { let start_time = std::time::Instant::now(); let r = PartitionReaderEnum::Local .fetch_partition( @@ -635,17 +523,6 @@ fn send_fetch_partitions( use_tls, ) .await; - let ok = r.is_ok(); - debug!( - "fetch_local[{}/{}]: {}/{}/{} completed in {:.3}s, ok={}", - i + 1, - local_count, - p.partition_id.job_id, - p.partition_id.stage_id, - p.partition_id.partition_id, - start_time.elapsed().as_secs_f64(), - ok - ); // Record local read metrics if callback is set and read succeeded if r.is_ok() @@ -820,12 +697,10 @@ async fn fetch_partition_remote( ) -> result::Result { let metadata = &location.executor_meta; let partition_id = &location.partition_id; + // TODO for shuffle client connections, we should avoid creating new connections again and again. + // And we should also avoid to keep alive too many connections for long time. let host = metadata.host.as_str(); let port = metadata.port; - debug!( - "fetch_partition_remote: fetching {}/{}/{} from {}:{}", - partition_id.job_id, partition_id.stage_id, partition_id.partition_id, host, port - ); let mut ballista_client = BallistaClient::try_new( host, port, diff --git a/ballista/core/src/execution_plans/shuffle_writer.rs b/ballista/core/src/execution_plans/shuffle_writer.rs index 084983aeb5..c342d957c2 100644 --- a/ballista/core/src/execution_plans/shuffle_writer.rs +++ b/ballista/core/src/execution_plans/shuffle_writer.rs @@ -69,218 +69,12 @@ use futures::{StreamExt, TryFutureExt, TryStreamExt}; use datafusion::arrow::error::ArrowError; use datafusion::execution::context::TaskContext; -use datafusion::physical_plan::RecordBatchStream; use datafusion::physical_plan::repartition::BatchPartitioner; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; -use futures::Stream; -use log::{debug, info, warn}; -use std::pin::Pin; -use std::task::{Context, Poll}; +use log::{debug, info}; use super::shuffle_writer_trait::ShuffleWriter; -/// Stream wrapper used by TracingExec to log data flow through each plan node. -/// Logs first_batch and stream_end to identify where data gets stuck. -struct TracingStream { - inner: SendableRecordBatchStream, - label: String, - partition: usize, - batch_count: u64, - row_count: u64, - first_batch_logged: bool, - start: Instant, -} - -impl Stream for TracingStream { - type Item = datafusion::error::Result; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - let this = &mut *self; - let result = this.inner.as_mut().poll_next(cx); - match &result { - Poll::Ready(Some(Ok(batch))) => { - this.batch_count += 1; - this.row_count += batch.num_rows() as u64; - if !this.first_batch_logged { - this.first_batch_logged = true; - info!( - "TracingStream({} p={}): first_batch {} rows after {:.3}s", - this.label, - this.partition, - batch.num_rows(), - this.start.elapsed().as_secs_f64() - ); - } - } - Poll::Ready(Some(Err(e))) => { - warn!( - "TracingStream({} p={}): error after {:.3}s, {} batches: {}", - this.label, - this.partition, - this.start.elapsed().as_secs_f64(), - this.batch_count, - e - ); - } - Poll::Ready(None) => { - info!( - "TracingStream({} p={}): ended after {:.3}s, {} batches, {} rows", - this.label, - this.partition, - this.start.elapsed().as_secs_f64(), - this.batch_count, - this.row_count - ); - } - Poll::Pending => {} - } - result - } -} - -impl RecordBatchStream for TracingStream { - fn schema(&self) -> SchemaRef { - self.inner.schema() - } -} - -/// Wraps an ExecutionPlan tree to log every `execute()` call with the node name and partition. -/// This helps diagnose which nodes in a complex plan are/aren't being executed. -#[derive(Debug)] -struct TracingExec { - inner: Arc, - label: String, - children: Vec>, -} - -impl TracingExec { - /// Wrap an entire plan tree with tracing. Each node gets a label like "depth.index: NodeName". - fn wrap( - plan: Arc, - job_id: &str, - stage_id: usize, - ) -> Arc { - Self::wrap_recursive(plan, job_id, stage_id, 0) - } - - fn wrap_recursive( - plan: Arc, - job_id: &str, - stage_id: usize, - depth: usize, - ) -> Arc { - let children: Vec> = plan - .children() - .into_iter() - .enumerate() - .map(|(_i, child)| { - Self::wrap_recursive(Arc::clone(child), job_id, stage_id, depth + 1) - }) - .collect(); - - let name = plan.name().to_string(); - let label = format!("{job_id}/{stage_id} d{depth} {name}"); - - Arc::new(TracingExec { - inner: plan, - label, - children, - }) - } -} - -impl DisplayAs for TracingExec { - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - self.inner.fmt_as(t, f) - } -} - -impl ExecutionPlan for TracingExec { - fn name(&self) -> &str { - self.inner.name() - } - - fn as_any(&self) -> &dyn Any { - self.inner.as_any() - } - - fn schema(&self) -> SchemaRef { - self.inner.schema() - } - - fn properties(&self) -> &PlanProperties { - self.inner.properties() - } - - fn children(&self) -> Vec<&Arc> { - self.children.iter().collect() - } - - fn with_new_children( - self: Arc, - children: Vec>, - ) -> Result> { - Ok(Arc::new(TracingExec { - inner: Arc::clone(&self.inner), - label: self.label.clone(), - children, - })) - } - - fn execute( - &self, - partition: usize, - context: Arc, - ) -> Result { - debug!( - "TracingExec::execute({}) partition={}", - self.label, partition - ); - // Replace children on inner plan with our traced children, then execute - let rebuilt = if self.children.is_empty() { - Arc::clone(&self.inner) - } else { - self.inner - .clone() - .with_new_children(self.children.clone())? - }; - let stream = rebuilt.execute(partition, context)?; - - // Wrap stream for non-ShuffleReaderExec nodes to trace data flow. - // ShuffleReaderExec is skipped because CoalescePartitionsExec spawns - // one per partition (44+), making it too noisy. - let name = self.inner.name(); - if name == "ShuffleReaderExec" { - Ok(stream) - } else { - Ok(Box::pin(TracingStream { - inner: stream, - label: self.label.clone(), - partition, - batch_count: 0, - row_count: 0, - first_batch_logged: false, - start: Instant::now(), - })) - } - } - - fn metrics(&self) -> Option { - self.inner.metrics() - } - - fn statistics(&self) -> Result { - self.inner.statistics() - } -} - /// ShuffleWriterExec represents a section of a query plan that has consistent partitioning and /// can be executed as one unit with each partition being executed in parallel. The output of each /// partition is re-partitioned and streamed to disk in Arrow IPC format. Future stages of the query @@ -548,28 +342,26 @@ impl ShuffleWriterExec { async move { let now = Instant::now(); - // Strip dynamic-filter accumulators from HashJoinExec nodes. - // Custom DataFusion builds may re-inject SharedBuildAccumulator - // during deserialization. The cross-partition Barrier deadlocks + // Reconstruct HashJoinExec nodes via try_new() to strip any + // dynamic-filter accumulator (e.g. SharedBuildAccumulator). + // The accumulator uses a cross-partition Barrier that deadlocks // in Ballista where each task runs a single partition. - let plan = plan.transform_down(&|node: Arc| { - if let Some(hj) = node.as_any().downcast_ref::() { - let disp = displayable(node.as_ref()).one_line().to_string(); - if disp.contains("accumulator") { - info!( - "ShuffleWriter {job_id}/{stage_id}: stripping accumulator from {disp}" - ); + // try_new() never adds an accumulator, so this is always safe. + let plan = plan + .transform_down(&|node: Arc| { + if let Some(hj) = node.as_any().downcast_ref::() { let left = Arc::clone(hj.left()); - let left: Arc = - if *hj.partition_mode() == PartitionMode::CollectLeft - && left.properties().output_partitioning().partition_count() > 1 - { - Arc::new(CoalescePartitionsExec::new(left)) - } else { - left - }; - let rebuilt: Arc = Arc::new( - HashJoinExec::try_new( + let left: Arc = if *hj.partition_mode() + == PartitionMode::CollectLeft + && left.properties().output_partitioning().partition_count() + > 1 + { + Arc::new(CoalescePartitionsExec::new(left)) + } else { + left + }; + let rebuilt: Arc = + Arc::new(HashJoinExec::try_new( left, Arc::clone(hj.right()), hj.on().to_vec(), @@ -578,74 +370,15 @@ impl ShuffleWriterExec { hj.projection.clone(), *hj.partition_mode(), hj.null_equality(), - )? - ); + )?); return Ok(Transformed::yes(rebuilt)); } - } - Ok(Transformed::no(node)) - })?.data; - // Wrap plan with tracing to log every execute() call - let plan = TracingExec::wrap(plan, &job_id, stage_id); - // Log the plan tree once (partition 0 only) to help debug execution issues - if input_partition == 0 { - info!( - "ShuffleWriter {job_id}/{stage_id} plan tree:\n{}", - datafusion::physical_plan::displayable(plan.as_ref()).indent(true) - ); - } - debug!( - "ShuffleWriter {job_id}/{stage_id} partition {input_partition}: creating execution stream" - ); + Ok(Transformed::no(node)) + })? + .data; let mut stream = plan.execute(input_partition, context)?; - debug!( - "ShuffleWriter {job_id}/{stage_id} partition {input_partition}: stream created in {:.2}s, starting write (memory={use_memory}, object_store={use_object_store})", - now.elapsed().as_secs_f64() - ); - - // Watchdog: log periodically if no progress is made - let watchdog_job_id = job_id.clone(); - let watchdog_flag = Arc::new(std::sync::atomic::AtomicBool::new(false)); - let watchdog_flag_clone = watchdog_flag.clone(); - let _watchdog = tokio::task::spawn(async move { - let mut interval = - tokio::time::interval(std::time::Duration::from_secs(30)); - interval.tick().await; // skip first immediate tick - loop { - interval.tick().await; - if watchdog_flag_clone.load(std::sync::atomic::Ordering::Relaxed) { - break; - } - // Probe: spawn a trivial task to check runtime responsiveness - let probe_start = std::time::Instant::now(); - let probe = tokio::task::spawn(async { 42u64 }); - let probe_ok = match tokio::time::timeout( - std::time::Duration::from_secs(5), - probe, - ) - .await - { - Ok(Ok(_)) => { - format!( - "ok in {:.3}ms", - probe_start.elapsed().as_secs_f64() * 1000.0 - ) - } - Ok(Err(e)) => format!("join error: {e}"), - Err(_) => "TIMEOUT (5s) - runtime may be starved!".to_string(), - }; - warn!( - "ShuffleWriter {}/{} partition {}: STALLED - no first batch received after {:.0}s (runtime probe: {})", - watchdog_job_id, - stage_id, - input_partition, - now.elapsed().as_secs_f64(), - probe_ok, - ); - } - }); - let result = if use_memory { + if use_memory { // Use in-memory shuffle storage with configurable format Self::execute_shuffle_write_memory( &job_id, @@ -690,9 +423,7 @@ impl ShuffleWriterExec { file_ext, ) .await - }; - watchdog_flag.store(true, std::sync::atomic::Ordering::Relaxed); - result + } } } @@ -769,28 +500,9 @@ impl ShuffleWriterExec { )?; let schema = stream.schema(); - let mut batch_count: u64 = 0; - let mut total_rows: u64 = 0; - debug!( - "ShuffleWriter partition {input_partition}: entering write loop, about to poll stream for first batch" - ); while let Some(result) = stream.next().await { let input_batch = result?; - batch_count += 1; - total_rows += input_batch.num_rows() as u64; - if batch_count == 1 { - debug!( - "ShuffleWriter partition {input_partition}: received first batch ({} rows) after {:.2}s", - input_batch.num_rows(), - now.elapsed().as_secs_f64() - ); - } else if batch_count % 100 == 0 { - debug!( - "ShuffleWriter partition {input_partition}: processed {batch_count} batches ({total_rows} rows) in {:.2}s", - now.elapsed().as_secs_f64() - ); - } write_metrics.input_rows.add(input_batch.num_rows()); diff --git a/ballista/executor/src/executor_server.rs b/ballista/executor/src/executor_server.rs index 884e751f04..d1c3cf44b6 100644 --- a/ballista/executor/src/executor_server.rs +++ b/ballista/executor/src/executor_server.rs @@ -606,10 +606,7 @@ impl TaskRunnerPool = slots - .iter() - .map(|s| format!("{}={}", s.executor_id, s.slots)) - .collect(); if total_slots == 0 { - info!( - "bind_task_round_robin: 0 available executor slots across {} executors [{}]", - slots.len(), - slot_detail.join(", ") - ); + debug!("Not enough available executor slots for task running!!!"); return result; } - info!( - "bind_task_round_robin: {total_slots} available slots across {} executors [{}]", - slots.len(), - slot_detail.join(", ") - ); + debug!("Total slot number is {total_slots}"); // Sort the slots by descending order slots.sort_by(|a, b| Ord::cmp(&b.slots, &a.slots)); diff --git a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs index d3beee2b70..1b3deb38d8 100644 --- a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs +++ b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs @@ -316,17 +316,6 @@ impl if let Err(e) = self.state.task_manager.update_job(&job_id).await { error!("Fail to invoke update_job for job {job_id} due to {e:?}"); } - - // After update_job revives newly resolved stages (Resolved → Running), - // trigger scheduling so tasks for those stages are actually bound and - // launched. Without this, newly Running stages could sit idle if the - // preceding ReviveOffers consumed all slots before these stages were - // resolved. - if self.state.config.is_push_staged_scheduling() { - event_sender - .post_event(QueryStageSchedulerEvent::ReviveOffers) - .await?; - } } QueryStageSchedulerEvent::JobCancel(job_id) => { self.metrics_collector.record_cancelled(&job_id); @@ -376,18 +365,6 @@ impl .await { Ok(stage_events) => { - if !stage_events.is_empty() { - info!( - "TaskUpdating from executor {executor_id}: {num_status} tasks processed, \ - {} stage events emitted: {:?}", - stage_events.len(), - stage_events - .iter() - .map(|e| format!("{e:?}")) - .collect::>() - ); - } - if self.state.config.is_push_staged_scheduling() { event_sender .post_event(QueryStageSchedulerEvent::ReviveOffers) @@ -414,7 +391,6 @@ impl } } QueryStageSchedulerEvent::ReviveOffers => { - trace!("Processing ReviveOffers event"); self.state.revive_offers(event_sender).await?; } QueryStageSchedulerEvent::ExecutorLost(executor_id, _) => { @@ -437,14 +413,6 @@ impl error!("{msg}"); } } - - // After executor_lost resets tasks (task_info → None), trigger - // scheduling so those tasks can be re-bound to surviving executors. - if self.state.config.is_push_staged_scheduling() { - event_sender - .post_event(QueryStageSchedulerEvent::ReviveOffers) - .await?; - } } QueryStageSchedulerEvent::CancelTasks(tasks) => { if let Err(e) = self @@ -566,103 +534,4 @@ mod tests { .build() .unwrap() } - - fn test_join_plan_logical(partitions: usize) -> LogicalPlan { - let schema = Schema::new(vec![ - Field::new("id", DataType::Utf8, false), - Field::new("gmv", DataType::UInt64, false), - ]); - - let left_plan = - scan_empty_with_partitions(Some("left"), &schema, None, partitions).unwrap(); - let right_plan = - scan_empty_with_partitions(Some("right"), &schema, None, partitions) - .unwrap() - .build() - .unwrap(); - - left_plan - .join( - right_plan, - datafusion::prelude::JoinType::Inner, - (vec!["id"], vec!["id"]), - None, - ) - .unwrap() - .aggregate(vec![col("left.id")], vec![sum(col("left.gmv"))]) - .unwrap() - .build() - .unwrap() - } - - /// Regression test: a multi-stage job (join) should complete end-to-end - /// through the scheduler's push-based scheduling pipeline. - /// - /// This tests for a bug where jobs with dependent stages would hang - /// after the leaf stages completed because newly resolved stages were - /// never picked up for scheduling. - #[tokio::test] - async fn test_multi_stage_job_completes_push_scheduling() -> Result<()> { - let config = SchedulerConfig::default() - .with_scheduler_policy(TaskSchedulingPolicy::PushStaged); - let metrics = Arc::new(TestMetricsCollector::default()); - - let mut test = SchedulerTest::new(config, metrics.clone(), 2, 4, None).await?; - - // Join plan creates multiple stages with dependencies - let plan = test_join_plan_logical(4); - - let result = tokio::time::timeout( - Duration::from_secs(30), - test.run("multi_stage_join", &plan), - ) - .await; - - match result { - Ok(Ok((status, _job_id))) => { - assert!( - matches!(status.status, Some(ballista_core::serde::protobuf::job_status::Status::Successful(_))), - "Expected job to succeed but got: {:?}", - status.status - ); - } - Ok(Err(e)) => panic!("Job execution error: {e}"), - Err(_) => panic!( - "Job timed out after 30s - suspected scheduling deadlock where \ - dependent stages are never picked up after leaf stages complete" - ), - } - - Ok(()) - } - - /// Test that a simple aggregation also completes via push scheduling. - #[tokio::test] - async fn test_aggregation_completes_push_scheduling() -> Result<()> { - let config = SchedulerConfig::default() - .with_scheduler_policy(TaskSchedulingPolicy::PushStaged); - let metrics = Arc::new(TestMetricsCollector::default()); - - let mut test = SchedulerTest::new(config, metrics, 2, 4, None).await?; - - let plan = test_plan(4); - - let result = - tokio::time::timeout(Duration::from_secs(30), test.run("agg_test", &plan)) - .await; - - match result { - Ok(Ok((status, _job_id))) => { - assert!( - matches!(status.status, Some(ballista_core::serde::protobuf::job_status::Status::Successful(_))), - "Expected job to succeed but got: {:?}", - status.status - ); - } - Ok(Err(e)) => panic!("Job execution error: {e}"), - Err(_) => panic!("Aggregation job timed out - scheduling deadlock"), - } - - Ok(()) - } } diff --git a/ballista/scheduler/src/state/execution_graph.rs b/ballista/scheduler/src/state/execution_graph.rs index 1d9b0592c4..9e5b3e00d5 100644 --- a/ballista/scheduler/src/state/execution_graph.rs +++ b/ballista/scheduler/src/state/execution_graph.rs @@ -397,26 +397,6 @@ impl StaticExecutionGraph { let mut job_err_msg = "".to_owned(); let mut stage_metrics = StageMetricsInfo::default(); - let has_activity = !updated_stages.resolved_stages.is_empty() - || !updated_stages.successful_stages.is_empty() - || !updated_stages.failed_stages.is_empty() - || !updated_stages.rollback_running_stages.is_empty() - || !updated_stages.resubmit_successful_stages.is_empty(); - if has_activity { - info!( - "Job {job_id} processing_stages_update: resolved_stages={:?}, successful_stages={:?}, \ - failed_stages={:?}, rollback_running_stages={:?}, resubmit_successful_stages={:?}", - updated_stages.resolved_stages, - updated_stages.successful_stages, - updated_stages.failed_stages.keys().collect::>(), - updated_stages - .rollback_running_stages - .keys() - .collect::>(), - updated_stages.resubmit_successful_stages, - ); - } - for stage_id in updated_stages.resolved_stages { self.resolve_stage(stage_id)?; has_resolved = true; @@ -514,29 +494,7 @@ impl StaticExecutionGraph { completed_at: timestamp_millis(), }); } else if has_resolved { - info!("Job {job_id} has newly resolved stages, emitting JobUpdated"); events.push(QueryStageSchedulerEvent::JobUpdated(job_id)) - } else { - // Log stage summary when no terminal event is emitted — helps diagnose hangs - let stage_summary: Vec = self.stages.iter().map(|(id, stage)| { - match stage { - ExecutionStage::UnResolved(s) => { - let complete_inputs: Vec = s.inputs.iter() - .filter(|(_, inp)| inp.is_complete()).map(|(id, _)| *id).collect(); - let incomplete_inputs: Vec = s.inputs.iter() - .filter(|(_, inp)| !inp.is_complete()).map(|(id, _)| *id).collect(); - format!("stage {id}: UnResolved(complete_inputs={complete_inputs:?}, incomplete_inputs={incomplete_inputs:?}, resolvable={})", s.resolvable()) - } - ExecutionStage::Resolved(_) => format!("stage {id}: Resolved"), - ExecutionStage::Running(s) => format!("stage {id}: Running(available_tasks={}, is_successful={})", s.available_tasks(), s.is_successful()), - ExecutionStage::Successful(_) => format!("stage {id}: Successful"), - ExecutionStage::Failed(_) => format!("stage {id}: Failed"), - } - }).collect(); - debug!( - "Job {job_id} no terminal event emitted (not failed, not successful, no newly resolved). Stage states: [{}]", - stage_summary.join(", ") - ); } Ok((events, stage_metrics)) } @@ -551,19 +509,6 @@ impl StaticExecutionGraph { ) -> Result> { let mut resolved_stages = vec![]; let job_id = &self.job_id; - if is_completed { - info!( - "Job {job_id} update_stage_output_links: stage_id={stage_id}, is_completed={is_completed}, \ - num_locations={}, output_links={output_links:?}", - locations.len() - ); - } else { - debug!( - "Job {job_id} update_stage_output_links: stage_id={stage_id}, is_completed={is_completed}, \ - num_locations={}, output_links={output_links:?}", - locations.len() - ); - } if output_links.is_empty() { // If `output_links` is empty, then this is a final stage self.output_locations.extend(locations); @@ -583,22 +528,7 @@ impl StaticExecutionGraph { } // If all input partitions are ready, we can resolve any UnresolvedShuffleExec in the parent stage plan - let resolvable = linked_unresolved_stage.resolvable(); - if is_completed || resolvable { - info!( - "Job {job_id} stage {link} (child of {stage_id}): input complete={is_completed}, resolvable={resolvable}, \ - inputs_status=[{}]", - linked_unresolved_stage - .inputs - .iter() - .map(|(id, inp)| { - format!("{id}:complete={}", inp.is_complete()) - }) - .collect::>() - .join(", ") - ); - } - if resolvable { + if linked_unresolved_stage.resolvable() { resolved_stages.push(linked_unresolved_stage.stage_id); } } else { @@ -2035,10 +1965,6 @@ pub(crate) fn partition_to_location( #[cfg(test)] mod test { use std::collections::HashSet; - use std::sync::Arc; - - use ballista_core::extension::SessionConfigExt; - use datafusion::prelude::SessionConfig; use crate::scheduler_server::event::QueryStageSchedulerEvent; use ballista_core::error::Result; @@ -2046,7 +1972,6 @@ mod test { self, ExecutionError, FailedTask, FetchPartitionError, IoError, JobStatus, TaskKilled, failed_task, job_status, }; - use ballista_core::serde::scheduler::PartitionId; use super::StaticExecutionGraph; use crate::state::execution_graph::ExecutionGraph; @@ -3213,229 +3138,4 @@ mod test { Ok(()) } - - /// Simulates the scheduler's bind-task flow using `fetch_running_stage` - /// (the real path used by bind_task_bias/round_robin), rather than - /// the test-only `pop_next_task`. This simulates having a limited number - /// of executor task slots, binding tasks, completing them, and checking - /// that subsequent stages are correctly picked up. - /// - /// This is a regression test for a bug where jobs with multiple - /// dependent stages would hang because newly resolved stages were - /// never scheduled after prior stages completed. - #[tokio::test] - async fn test_fetch_running_stage_multi_stage_progression() -> Result<()> { - let executor = mock_executor("executor-id1".to_string()); - let mut graph = test_join_plan(4).await; - - // join_plan has 4 stages: 2 leaf stages + 1 join stage + 1 final stage - assert_eq!(graph.stage_count(), 4); - assert_eq!(graph.available_tasks(), 0); - - // Simulate what submit_job does: revive to convert Resolved → Running - graph.revive(); - let initial_tasks = graph.available_tasks(); - assert!( - initial_tasks > 0, - "Expected available tasks after revive, got 0" - ); - - // Simulate the scheduler bind flow with limited task slots. - // Use fetch_running_stage (the actual scheduler code path) to bind and - // complete tasks in rounds, just like ReviveOffers does. - let max_slots_per_round = 2; // Simulate limited executor slots - let mut total_completed = 0; - let mut rounds = 0; - let max_rounds = 100; // Safety limit - - while !graph.is_successful() && rounds < max_rounds { - rounds += 1; - let mut bound_in_round = 0; - let mut black_list: Vec = vec![]; - - // Bind phase: fetch running stages and bind tasks (like bind_task_bias) - let mut tasks_to_complete = vec![]; - while let Some((running_stage, task_id_gen)) = - graph.fetch_running_stage(&black_list) - { - if bound_in_round >= max_slots_per_round { - break; - } - let stage_id = running_stage.stage_id; - - let runnable: Vec = running_stage - .task_infos - .iter() - .enumerate() - .filter(|(_, info)| info.is_none()) - .map(|(i, _)| i) - .take(max_slots_per_round - bound_in_round) - .collect(); - - if runnable.is_empty() { - black_list.push(stage_id); - continue; - } - - for partition_id in runnable { - let task_id = *task_id_gen; - *task_id_gen += 1; - running_stage.task_infos[partition_id] = - Some(super::create_task_info(executor.id.clone(), task_id)); - tasks_to_complete.push((stage_id, partition_id, task_id)); - bound_in_round += 1; - } - } - - assert!( - bound_in_round > 0 || graph.is_successful(), - "Round {rounds}: No tasks bound and job not successful. \ - This indicates a scheduling deadlock! Graph:\n{graph:?}" - ); - - // Complete phase: simulate task execution and status updates - for (stage_id, partition_id, task_id) in &tasks_to_complete { - let task = super::TaskDescription { - session_id: graph.session_id().to_string(), - partition: PartitionId { - job_id: graph.job_id().to_string(), - stage_id: *stage_id, - partition_id: *partition_id, - }, - stage_attempt_num: 0, - task_id: *task_id, - task_attempt: 0, - plan: graph - .stages() - .get(stage_id) - .and_then(|s| { - if let super::ExecutionStage::Running(rs) = s { - Some(rs.plan.clone()) - } else { - None - } - }) - .unwrap(), - session_config: Arc::new(SessionConfig::new_with_ballista()), - schedulable_time_millis: 0, - }; - let status = mock_completed_task(task, &executor.id); - graph.update_task_status(&executor, vec![status], 4, 4)?; - total_completed += 1; - } - } - - assert!( - graph.is_successful(), - "Job did not complete after {rounds} rounds ({total_completed} tasks). \ - Suspected scheduling deadlock. Graph:\n{graph:?}" - ); - - Ok(()) - } - - /// Regression test: stages resolved in the same update_task_status call - /// should become schedulable via fetch_running_stage in subsequent calls. - #[tokio::test] - async fn test_resolved_stages_become_runnable_after_update() -> Result<()> { - let executor = mock_executor("executor-id1".to_string()); - let mut graph = test_join_plan(4).await; - - // Revive leaf stages - graph.revive(); - - // Complete all tasks in both leaf stages, one at a time - // After both complete, the join stage should be resolvable and then runnable - let mut completed_stages: HashSet = HashSet::new(); - - loop { - // Try to find a running stage - let black_list: Vec = vec![]; - if let Some((running_stage, task_id_gen)) = - graph.fetch_running_stage(&black_list) - { - let stage_id = running_stage.stage_id; - - // Find first available task - if let Some((partition_id, _)) = running_stage - .task_infos - .iter() - .enumerate() - .find(|(_, info)| info.is_none()) - { - let task_id = *task_id_gen; - *task_id_gen += 1; - running_stage.task_infos[partition_id] = - Some(super::create_task_info(executor.id.clone(), task_id)); - - let task = super::TaskDescription { - session_id: graph.session_id().to_string(), - partition: PartitionId { - job_id: graph.job_id().to_string(), - stage_id, - partition_id, - }, - stage_attempt_num: 0, - task_id, - task_attempt: 0, - plan: graph - .stages() - .get(&stage_id) - .and_then(|s| { - if let super::ExecutionStage::Running(rs) = s { - Some(rs.plan.clone()) - } else { - None - } - }) - .unwrap(), - session_config: Arc::new(SessionConfig::new_with_ballista()), - schedulable_time_millis: 0, - }; - let status = mock_completed_task(task, &executor.id); - let events = - graph.update_task_status(&executor, vec![status], 4, 4)?; - - // Check if any stage completed - if !completed_stages.contains(&stage_id) { - let stage = graph.stages().get(&stage_id); - if matches!(stage, Some(super::ExecutionStage::Successful(_))) { - completed_stages.insert(stage_id); - } - } - - // After completion, check for JobUpdated events and verify - // that fetch_running_stage finds new stages - for event in &events { - if matches!(event, QueryStageSchedulerEvent::JobUpdated(_)) { - // Verify: after a JobUpdated, fetch_running_stage should - // find new stages (it calls revive internally) - let bl: Vec = vec![]; - let has_stage = graph.fetch_running_stage(&bl).is_some(); - assert!( - has_stage || graph.is_successful(), - "After JobUpdated event, fetch_running_stage should \ - find stages but found none. Graph:\n{graph:?}" - ); - } - } - } else { - break; // No more tasks - } - } else { - break; // No more stages - } - - if graph.is_successful() { - break; - } - } - - assert!( - graph.is_successful(), - "Graph should be successful. Graph:\n{graph:?}" - ); - - Ok(()) - } } diff --git a/ballista/scheduler/src/state/mod.rs b/ballista/scheduler/src/state/mod.rs index 38e53c8f6b..5a1dd432ea 100644 --- a/ballista/scheduler/src/state/mod.rs +++ b/ballista/scheduler/src/state/mod.rs @@ -211,35 +211,9 @@ impl SchedulerState> = - std::collections::HashMap::new(); - for (executor_id, task) in &binding_result.bound_tasks { - summary - .entry(executor_id.clone()) - .or_default() - .push(format!( - "{}/{}/{}", - task.partition.job_id, - task.partition.stage_id, - task.partition.partition_id - )); - } - summary - .into_iter() - .map(|(exe, tasks)| format!("{exe}: {tasks:?}")) - .collect::>() - .join(", ") - } - ); // Record shuffle affinity metrics for affinity in &binding_result.shuffle_affinity { @@ -604,66 +578,42 @@ impl SchedulerState| { - // Strip dynamic-filter accumulators from HashJoinExec nodes. - // Custom DataFusion builds may inject SharedBuildAccumulator - // with a cross-partition Barrier that deadlocks in Ballista - // (each task runs one partition, so the barrier never completes). - // Reconstructing via try_new() produces a clean node without - // the accumulator. + // Reconstruct HashJoinExec nodes via try_new() to strip any + // dynamic-filter accumulator (e.g. SharedBuildAccumulator). + // The accumulator uses a cross-partition Barrier that deadlocks + // in Ballista where each task runs a single partition. + // try_new() never adds an accumulator, so this is always safe. if let Some(hash_join) = node.as_any().downcast_ref::() { - let display = DisplayableExecutionPlan::new(node.as_ref()) - .one_line() - .to_string(); - if display.contains("accumulator") { - info!( - "Job {job_id}: stripping dynamic-filter accumulator from {display}" - ); - let left = Arc::clone(hash_join.left()); - let left: Arc = - if *hash_join.partition_mode() == PartitionMode::CollectLeft - && left.properties().output_partitioning().partition_count() > 1 - { - Arc::new(CoalescePartitionsExec::new(left)) - } else { - left - }; - let rebuilt: Arc = Arc::new( - HashJoinExec::try_new( - left, - Arc::clone(hash_join.right()), - hash_join.on().to_vec(), - hash_join.filter().cloned(), - hash_join.join_type(), - hash_join.projection.clone(), - *hash_join.partition_mode(), - hash_join.null_equality(), - ) - .map_err(|e| DataFusionError::External(Box::new(e)))?, - ); - return Ok(Transformed::yes(rebuilt)); - } + let left = Arc::clone(hash_join.left()); + let left: Arc = if *hash_join.partition_mode() + == PartitionMode::CollectLeft + && left.properties().output_partitioning().partition_count() > 1 + { + Arc::new(CoalescePartitionsExec::new(left)) + } else { + left + }; + let rebuilt: Arc = Arc::new( + HashJoinExec::try_new( + left, + Arc::clone(hash_join.right()), + hash_join.on().to_vec(), + hash_join.filter().cloned(), + hash_join.join_type(), + hash_join.projection.clone(), + *hash_join.partition_mode(), + hash_join.null_equality(), + ) + .map_err(|e| DataFusionError::External(Box::new(e)))?, + ); + return Ok(Transformed::yes(rebuilt)); } if node.output_partitioning().partition_count() == 0 { let empty: Arc = diff --git a/ballista/scheduler/src/state/task_manager.rs b/ballista/scheduler/src/state/task_manager.rs index d78311a652..de0a4dd6a5 100644 --- a/ballista/scheduler/src/state/task_manager.rs +++ b/ballista/scheduler/src/state/task_manager.rs @@ -549,26 +549,19 @@ impl TaskManager /// Updates the job state and returns the number of new available tasks. pub async fn update_job(&self, job_id: &str) -> Result { - info!("Update active job {job_id}"); + debug!("Update active job {job_id}"); if let Some(graph) = self.get_active_execution_graph(job_id) { let mut graph = graph.write().await; let curr_available_tasks = graph.available_tasks(); - info!("Job {job_id} before revive: available_tasks={curr_available_tasks}"); graph.revive(); - let new_available_tasks = graph.available_tasks(); - info!( - "Job {job_id} after revive: available_tasks={new_available_tasks} (new={})", - new_available_tasks - curr_available_tasks - ); - info!("Saving job with status {:?}", graph.status()); self.state.save_job(job_id, &graph).await?; - let new_tasks = new_available_tasks - curr_available_tasks; + let new_tasks = graph.available_tasks() - curr_available_tasks; Ok(new_tasks) } else { From 4d34173ebbf8d6486abf5333d56e74fcba5ed208 Mon Sep 17 00:00:00 2001 From: peasee <98815791+peasee@users.noreply.github.com> Date: Thu, 23 Apr 2026 16:32:48 +1000 Subject: [PATCH 17/19] review: Address comment --- ballista/scheduler/src/state/mod.rs | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/ballista/scheduler/src/state/mod.rs b/ballista/scheduler/src/state/mod.rs index 5a1dd432ea..01219a2d93 100644 --- a/ballista/scheduler/src/state/mod.rs +++ b/ballista/scheduler/src/state/mod.rs @@ -600,19 +600,16 @@ impl SchedulerState = Arc::new( - HashJoinExec::try_new( - left, - Arc::clone(hash_join.right()), - hash_join.on().to_vec(), - hash_join.filter().cloned(), - hash_join.join_type(), - hash_join.projection.clone(), - *hash_join.partition_mode(), - hash_join.null_equality(), - ) - .map_err(|e| DataFusionError::External(Box::new(e)))?, - ); + let rebuilt: Arc = Arc::new(HashJoinExec::try_new( + left, + Arc::clone(hash_join.right()), + hash_join.on().to_vec(), + hash_join.filter().cloned(), + hash_join.join_type(), + hash_join.projection.clone(), + *hash_join.partition_mode(), + hash_join.null_equality(), + )?); return Ok(Transformed::yes(rebuilt)); } if node.output_partitioning().partition_count() == 0 { From 5cdc37ac4183e18482ce0a93c0ec9efc1e70716b Mon Sep 17 00:00:00 2001 From: peasee <98815791+peasee@users.noreply.github.com> Date: Fri, 24 Apr 2026 09:09:00 +1000 Subject: [PATCH 18/19] chore: fmt --- ballista/scheduler/src/state/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ballista/scheduler/src/state/mod.rs b/ballista/scheduler/src/state/mod.rs index 002a6ecf28..f6fab54fa7 100644 --- a/ballista/scheduler/src/state/mod.rs +++ b/ballista/scheduler/src/state/mod.rs @@ -46,9 +46,9 @@ use ballista_core::event_loop::EventSender; use ballista_core::extension::BallistaExplainNode; use ballista_core::serde::BallistaCodec; use ballista_core::serde::protobuf::TaskStatus; -use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::common::format::ExplainFormat; use datafusion::logical_expr::{Explain as DFExplain, LogicalPlan}; +use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::display::DisplayableExecutionPlan; use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode}; From dcaf3f0d8e5f61fd29f7f7b61603a9c4171f6007 Mon Sep 17 00:00:00 2001 From: peasee <98815791+peasee@users.noreply.github.com> Date: Mon, 27 Apr 2026 09:59:12 +1000 Subject: [PATCH 19/19] refactor: Combine hashjoin rebuilds into helper --- ballista/core/src/execution_plans/mod.rs | 42 +++++++++++++++++++ .../src/execution_plans/shuffle_writer.rs | 37 +--------------- ballista/scheduler/src/state/mod.rs | 33 ++------------- 3 files changed, 48 insertions(+), 64 deletions(-) diff --git a/ballista/core/src/execution_plans/mod.rs b/ballista/core/src/execution_plans/mod.rs index 9f8f944b2b..96d4e5c1cb 100644 --- a/ballista/core/src/execution_plans/mod.rs +++ b/ballista/core/src/execution_plans/mod.rs @@ -48,3 +48,45 @@ pub use vortex_shuffle::{ LocalVortexShuffleStream, VortexWriteTracker, vortex_file_extension, write_stream_to_disk_vortex, }; + +use datafusion::common::tree_node::Transformed; +use datafusion::error::Result; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; +use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode}; +use std::sync::Arc; + +/// Rebuild a `HashJoinExec` node via `try_new()` to strip any dynamic-filter +/// accumulator (e.g. `SharedBuildAccumulator`). The accumulator uses a +/// cross-partition `Barrier` that deadlocks in Ballista where each task runs a +/// single partition. `try_new()` never attaches an accumulator, so this is +/// always safe. +/// +/// If `node` is not a `HashJoinExec`, returns `Transformed::no(node)`. +pub fn rebuild_hash_join_without_accumulator( + node: Arc, +) -> Result>> { + if let Some(hj) = node.as_any().downcast_ref::() { + let left = Arc::clone(hj.left()); + let left: Arc = if *hj.partition_mode() + == PartitionMode::CollectLeft + && left.properties().output_partitioning().partition_count() > 1 + { + Arc::new(CoalescePartitionsExec::new(left)) + } else { + left + }; + let rebuilt: Arc = Arc::new(HashJoinExec::try_new( + left, + Arc::clone(hj.right()), + hj.on().to_vec(), + hj.filter().cloned(), + hj.join_type(), + hj.projection.clone(), + *hj.partition_mode(), + hj.null_equality(), + )?); + return Ok(Transformed::yes(rebuilt)); + } + Ok(Transformed::no(node)) +} diff --git a/ballista/core/src/execution_plans/shuffle_writer.rs b/ballista/core/src/execution_plans/shuffle_writer.rs index c342d957c2..251f89e82e 100644 --- a/ballista/core/src/execution_plans/shuffle_writer.rs +++ b/ballista/core/src/execution_plans/shuffle_writer.rs @@ -58,9 +58,7 @@ use datafusion::physical_plan::metrics::{ self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, }; -use datafusion::common::tree_node::{Transformed, TreeNode}; -use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; -use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode}; +use datafusion::common::tree_node::TreeNode; use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, SendableRecordBatchStream, Statistics, displayable, @@ -342,39 +340,8 @@ impl ShuffleWriterExec { async move { let now = Instant::now(); - // Reconstruct HashJoinExec nodes via try_new() to strip any - // dynamic-filter accumulator (e.g. SharedBuildAccumulator). - // The accumulator uses a cross-partition Barrier that deadlocks - // in Ballista where each task runs a single partition. - // try_new() never adds an accumulator, so this is always safe. let plan = plan - .transform_down(&|node: Arc| { - if let Some(hj) = node.as_any().downcast_ref::() { - let left = Arc::clone(hj.left()); - let left: Arc = if *hj.partition_mode() - == PartitionMode::CollectLeft - && left.properties().output_partitioning().partition_count() - > 1 - { - Arc::new(CoalescePartitionsExec::new(left)) - } else { - left - }; - let rebuilt: Arc = - Arc::new(HashJoinExec::try_new( - left, - Arc::clone(hj.right()), - hj.on().to_vec(), - hj.filter().cloned(), - hj.join_type(), - hj.projection.clone(), - *hj.partition_mode(), - hj.null_equality(), - )?); - return Ok(Transformed::yes(rebuilt)); - } - Ok(Transformed::no(node)) - })? + .transform_down(&super::rebuild_hash_join_without_accumulator)? .data; let mut stream = plan.execute(input_partition, context)?; diff --git a/ballista/scheduler/src/state/mod.rs b/ballista/scheduler/src/state/mod.rs index f6fab54fa7..adaf9f7587 100644 --- a/ballista/scheduler/src/state/mod.rs +++ b/ballista/scheduler/src/state/mod.rs @@ -48,10 +48,8 @@ use ballista_core::serde::BallistaCodec; use ballista_core::serde::protobuf::TaskStatus; use datafusion::common::format::ExplainFormat; use datafusion::logical_expr::{Explain as DFExplain, LogicalPlan}; -use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::display::DisplayableExecutionPlan; use datafusion::physical_plan::empty::EmptyExec; -use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode}; use datafusion::prelude::SessionContext; use datafusion_proto::logical_plan::AsLogicalPlan; use datafusion_proto::physical_plan::AsExecutionPlan; @@ -640,33 +638,10 @@ impl SchedulerState| { - // Reconstruct HashJoinExec nodes via try_new() to strip any - // dynamic-filter accumulator (e.g. SharedBuildAccumulator). - // The accumulator uses a cross-partition Barrier that deadlocks - // in Ballista where each task runs a single partition. - // try_new() never adds an accumulator, so this is always safe. - if let Some(hash_join) = node.as_any().downcast_ref::() { - let left = Arc::clone(hash_join.left()); - let left: Arc = if *hash_join.partition_mode() - == PartitionMode::CollectLeft - && left.properties().output_partitioning().partition_count() > 1 - { - Arc::new(CoalescePartitionsExec::new(left)) - } else { - left - }; - let rebuilt: Arc = Arc::new(HashJoinExec::try_new( - left, - Arc::clone(hash_join.right()), - hash_join.on().to_vec(), - hash_join.filter().cloned(), - hash_join.join_type(), - hash_join.projection.clone(), - *hash_join.partition_mode(), - hash_join.null_equality(), - )?); - return Ok(Transformed::yes(rebuilt)); - } + let node = match ballista_core::execution_plans::rebuild_hash_join_without_accumulator(node)? { + t if t.transformed => return Ok(t), + t => t.data, + }; if node.output_partitioning().partition_count() == 0 { let empty: Arc = Arc::new(EmptyExec::new(node.schema()));