Skip to content

Commit e38b4e7

Browse files
committed
wip
1 parent 1e3e280 commit e38b4e7

3 files changed

Lines changed: 70 additions & 3 deletions

File tree

ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,13 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
376376
.await
377377
{
378378
Ok(stage_events) => {
379+
info!(
380+
"TaskUpdating from executor {executor_id}: {num_status} tasks processed, \
381+
{} stage events emitted: {:?}",
382+
stage_events.len(),
383+
stage_events.iter().map(|e| format!("{e:?}")).collect::<Vec<_>>()
384+
);
385+
379386
if self.state.config.is_push_staged_scheduling() {
380387
event_sender
381388
.post_event(QueryStageSchedulerEvent::ReviveOffers)

ballista/scheduler/src/state/execution_graph.rs

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,19 @@ impl StaticExecutionGraph {
397397
let mut job_err_msg = "".to_owned();
398398
let mut stage_metrics = StageMetricsInfo::default();
399399

400+
info!(
401+
"Job {job_id} processing_stages_update: resolved_stages={:?}, successful_stages={:?}, \
402+
failed_stages={:?}, rollback_running_stages={:?}, resubmit_successful_stages={:?}",
403+
updated_stages.resolved_stages,
404+
updated_stages.successful_stages,
405+
updated_stages.failed_stages.keys().collect::<Vec<_>>(),
406+
updated_stages
407+
.rollback_running_stages
408+
.keys()
409+
.collect::<Vec<_>>(),
410+
updated_stages.resubmit_successful_stages,
411+
);
412+
400413
for stage_id in updated_stages.resolved_stages {
401414
self.resolve_stage(stage_id)?;
402415
has_resolved = true;
@@ -494,7 +507,29 @@ impl StaticExecutionGraph {
494507
completed_at: timestamp_millis(),
495508
});
496509
} else if has_resolved {
510+
info!("Job {job_id} has newly resolved stages, emitting JobUpdated");
497511
events.push(QueryStageSchedulerEvent::JobUpdated(job_id))
512+
} else {
513+
// Log stage summary when no terminal event is emitted — helps diagnose hangs
514+
let stage_summary: Vec<String> = self.stages.iter().map(|(id, stage)| {
515+
match stage {
516+
ExecutionStage::UnResolved(s) => {
517+
let complete_inputs: Vec<usize> = s.inputs.iter()
518+
.filter(|(_, inp)| inp.is_complete()).map(|(id, _)| *id).collect();
519+
let incomplete_inputs: Vec<usize> = s.inputs.iter()
520+
.filter(|(_, inp)| !inp.is_complete()).map(|(id, _)| *id).collect();
521+
format!("stage {id}: UnResolved(complete_inputs={complete_inputs:?}, incomplete_inputs={incomplete_inputs:?}, resolvable={})", s.resolvable())
522+
}
523+
ExecutionStage::Resolved(_) => format!("stage {id}: Resolved"),
524+
ExecutionStage::Running(s) => format!("stage {id}: Running(available_tasks={}, is_successful={})", s.available_tasks(), s.is_successful()),
525+
ExecutionStage::Successful(_) => format!("stage {id}: Successful"),
526+
ExecutionStage::Failed(_) => format!("stage {id}: Failed"),
527+
}
528+
}).collect();
529+
info!(
530+
"Job {job_id} no terminal event emitted (not failed, not successful, no newly resolved). Stage states: [{}]",
531+
stage_summary.join(", ")
532+
);
498533
}
499534
Ok((events, stage_metrics))
500535
}
@@ -509,6 +544,11 @@ impl StaticExecutionGraph {
509544
) -> Result<Vec<usize>> {
510545
let mut resolved_stages = vec![];
511546
let job_id = &self.job_id;
547+
info!(
548+
"Job {job_id} update_stage_output_links: stage_id={stage_id}, is_completed={is_completed}, \
549+
num_locations={}, output_links={output_links:?}",
550+
locations.len()
551+
);
512552
if output_links.is_empty() {
513553
// If `output_links` is empty, then this is a final stage
514554
self.output_locations.extend(locations);
@@ -528,7 +568,20 @@ impl StaticExecutionGraph {
528568
}
529569

530570
// If all input partitions are ready, we can resolve any UnresolvedShuffleExec in the parent stage plan
531-
if linked_unresolved_stage.resolvable() {
571+
let resolvable = linked_unresolved_stage.resolvable();
572+
info!(
573+
"Job {job_id} stage {link} (child of {stage_id}): input complete={is_completed}, resolvable={resolvable}, \
574+
inputs_status=[{}]",
575+
linked_unresolved_stage
576+
.inputs
577+
.iter()
578+
.map(|(id, inp)| {
579+
format!("{id}:complete={}", inp.is_complete())
580+
})
581+
.collect::<Vec<_>>()
582+
.join(", ")
583+
);
584+
if resolvable {
532585
resolved_stages.push(linked_unresolved_stage.stage_id);
533586
}
534587
} else {

ballista/scheduler/src/state/task_manager.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -549,19 +549,26 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
549549

550550
/// Updates the job state and returns the number of new available tasks.
551551
pub async fn update_job(&self, job_id: &str) -> Result<usize> {
552-
debug!("Update active job {job_id}");
552+
info!("Update active job {job_id}");
553553
if let Some(graph) = self.get_active_execution_graph(job_id) {
554554
let mut graph = graph.write().await;
555555

556556
let curr_available_tasks = graph.available_tasks();
557+
info!("Job {job_id} before revive: available_tasks={curr_available_tasks}");
557558

558559
graph.revive();
559560

561+
let new_available_tasks = graph.available_tasks();
562+
info!(
563+
"Job {job_id} after revive: available_tasks={new_available_tasks} (new={})",
564+
new_available_tasks - curr_available_tasks
565+
);
566+
560567
info!("Saving job with status {:?}", graph.status());
561568

562569
self.state.save_job(job_id, &graph).await?;
563570

564-
let new_tasks = graph.available_tasks() - curr_available_tasks;
571+
let new_tasks = new_available_tasks - curr_available_tasks;
565572

566573
Ok(new_tasks)
567574
} else {

0 commit comments

Comments
 (0)