diff --git a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs index d127545ba1..1b3deb38d8 100644 --- a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs +++ b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs @@ -219,18 +219,25 @@ impl error!("Job {job_id} failed: {fail_message}"); - // Broadcast job failed state - self.broadcast_job_state(JobStateEvent::failed(&job_id, &fail_message)); - - if let Err(e) = self + // Persist terminal status before broadcasting so subscribers + // can immediately read the Failed status on receipt of the event. + match self .state .task_manager - .fail_unscheduled_job(&job_id, fail_message) + .fail_unscheduled_job(&job_id, fail_message.clone()) .await { - error!( - "Fail to invoke fail_unscheduled_job for job {job_id} due to {e:?}" - ); + Ok(()) => { + self.broadcast_job_state(JobStateEvent::failed( + &job_id, + &fail_message, + )); + } + Err(e) => { + error!( + "Fail to invoke fail_unscheduled_job for job {job_id} due to {e:?}" + ); + } } } QueryStageSchedulerEvent::JobFinished { @@ -243,13 +250,22 @@ impl info!("Job {job_id} success"); - // Broadcast job completed state - self.broadcast_job_state(JobStateEvent::completed(&job_id)); - - if let Err(e) = self.state.task_manager.succeed_job(&job_id).await { - error!("Fail to invoke succeed_job for job {job_id} due to {e:?}"); + // Persist terminal status BEFORE broadcasting completion so that + // subscribers (e.g. SpiceAI's QueryHandle) who receive the + // Completed event can immediately read the Successful status + // without hitting a retry/polling loop. + match self.state.task_manager.succeed_job(&job_id).await { + Ok(()) => { + // Broadcast job completed state after status is persisted + self.broadcast_job_state(JobStateEvent::completed(&job_id)); + self.state.clean_up_successful_job(job_id); + } + Err(e) => { + error!( + "Fail to invoke succeed_job for job {job_id} due to {e:?}" + ); + } } - self.state.clean_up_successful_job(job_id); } QueryStageSchedulerEvent::JobRunningFailed { job_id, @@ -262,28 +278,37 @@ impl error!("Job {job_id} running failed"); - // Broadcast job failed state - self.broadcast_job_state(JobStateEvent::failed(&job_id, &fail_message)); - + // Persist terminal status before broadcasting so subscribers + // can immediately read the Failed status on receipt of the event. match self .state .task_manager - .abort_job(&job_id, fail_message) + .abort_job(&job_id, fail_message.clone()) .await { Ok((running_tasks, _pending_tasks)) => { - if !running_tasks.is_empty() { - event_sender + // Broadcast job failed state immediately after status is persisted + self.broadcast_job_state(JobStateEvent::failed( + &job_id, + &fail_message, + )); + if !running_tasks.is_empty() + && let Err(e) = event_sender .post_event(QueryStageSchedulerEvent::CancelTasks( running_tasks, )) - .await?; + .await + { + error!( + "Fail to post CancelTasks for job {job_id} due to {e:?}" + ); } } Err(e) => { error!("Fail to invoke abort_job for job {job_id} due to {e:?}"); } } + self.state.clean_up_failed_job(job_id); } QueryStageSchedulerEvent::JobUpdated(job_id) => { @@ -297,21 +322,29 @@ impl info!("Job {job_id} Cancelled"); - // Broadcast job cancelled state - self.broadcast_job_state(JobStateEvent::cancelled(&job_id)); - + // Persist terminal status before broadcasting so subscribers + // can immediately read the terminal status on receipt of the event. + // Note: cancel_job routes to abort_job, persisting a Failed status. match self.state.task_manager.cancel_job(&job_id).await { Ok((running_tasks, _pending_tasks)) => { - event_sender + // Broadcast cancelled state immediately after status is persisted + self.broadcast_job_state(JobStateEvent::cancelled(&job_id)); + if let Err(e) = event_sender .post_event(QueryStageSchedulerEvent::CancelTasks( running_tasks, )) - .await?; + .await + { + error!( + "Fail to post CancelTasks for job {job_id} due to {e:?}" + ); + } } Err(e) => { error!("Fail to invoke cancel_job for job {job_id} due to {e:?}"); } } + self.state.clean_up_failed_job(job_id); } QueryStageSchedulerEvent::TaskUpdating(executor_id, tasks_status) => { diff --git a/ballista/scheduler/src/state/aqe/mod.rs b/ballista/scheduler/src/state/aqe/mod.rs index 8200b2a519..2436729021 100644 --- a/ballista/scheduler/src/state/aqe/mod.rs +++ b/ballista/scheduler/src/state/aqe/mod.rs @@ -920,8 +920,8 @@ impl ExecutionGraph for AdaptiveExecutionGraph { /// Return all currently running tasks along with the executor ID on which they are assigned fn running_tasks(&self) -> Vec { self.stages - .iter() - .flat_map(|(_, stage)| { + .values() + .flat_map(|stage| { if let ExecutionStage::Running(stage) = stage { stage .running_tasks() diff --git a/ballista/scheduler/src/state/execution_graph.rs b/ballista/scheduler/src/state/execution_graph.rs index 37ba3f828e..9e5b3e00d5 100644 --- a/ballista/scheduler/src/state/execution_graph.rs +++ b/ballista/scheduler/src/state/execution_graph.rs @@ -1320,8 +1320,8 @@ impl ExecutionGraph for StaticExecutionGraph { /// Return all currently running tasks along with the executor ID on which they are assigned fn running_tasks(&self) -> Vec { self.stages - .iter() - .flat_map(|(_, stage)| { + .values() + .flat_map(|stage| { if let ExecutionStage::Running(stage) = stage { stage .running_tasks() diff --git a/ballista/scheduler/src/state/executor_manager.rs b/ballista/scheduler/src/state/executor_manager.rs index 9d47310b80..bdacc2d4e2 100644 --- a/ballista/scheduler/src/state/executor_manager.rs +++ b/ballista/scheduler/src/state/executor_manager.rs @@ -439,8 +439,8 @@ impl ExecutorManager { self.cluster_state .executor_heartbeats() - .iter() - .filter_map(|(_exec, heartbeat)| { + .values() + .filter_map(|heartbeat| { let terminating = matches!( heartbeat .status