Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 59 additions & 26 deletions ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,18 +219,25 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>

error!("Job {job_id} failed: {fail_message}");

// Broadcast job failed state
self.broadcast_job_state(JobStateEvent::failed(&job_id, &fail_message));

if let Err(e) = self
// Persist terminal status before broadcasting so subscribers
// can immediately read the Failed status on receipt of the event.
match self
.state
.task_manager
.fail_unscheduled_job(&job_id, fail_message)
.fail_unscheduled_job(&job_id, fail_message.clone())
.await
{
error!(
"Fail to invoke fail_unscheduled_job for job {job_id} due to {e:?}"
);
Ok(()) => {
self.broadcast_job_state(JobStateEvent::failed(
&job_id,
&fail_message,
));
}
Err(e) => {
error!(
"Fail to invoke fail_unscheduled_job for job {job_id} due to {e:?}"
);
}
}
}
QueryStageSchedulerEvent::JobFinished {
Expand All @@ -243,13 +250,22 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>

info!("Job {job_id} success");

// Broadcast job completed state
self.broadcast_job_state(JobStateEvent::completed(&job_id));

if let Err(e) = self.state.task_manager.succeed_job(&job_id).await {
error!("Fail to invoke succeed_job for job {job_id} due to {e:?}");
// Persist terminal status BEFORE broadcasting completion so that
// subscribers (e.g. SpiceAI's QueryHandle) who receive the
// Completed event can immediately read the Successful status
// without hitting a retry/polling loop.
match self.state.task_manager.succeed_job(&job_id).await {
Ok(()) => {
// Broadcast job completed state after status is persisted
self.broadcast_job_state(JobStateEvent::completed(&job_id));
Comment thread
peasee marked this conversation as resolved.
self.state.clean_up_successful_job(job_id);
Comment thread
peasee marked this conversation as resolved.
}
Comment thread
peasee marked this conversation as resolved.
Err(e) => {
error!(
"Fail to invoke succeed_job for job {job_id} due to {e:?}"
);
}
}
Comment thread
peasee marked this conversation as resolved.
self.state.clean_up_successful_job(job_id);
}
Comment thread
peasee marked this conversation as resolved.
QueryStageSchedulerEvent::JobRunningFailed {
job_id,
Expand All @@ -262,28 +278,37 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>

error!("Job {job_id} running failed");

// Broadcast job failed state
self.broadcast_job_state(JobStateEvent::failed(&job_id, &fail_message));

// Persist terminal status before broadcasting so subscribers
// can immediately read the Failed status on receipt of the event.
match self
.state
.task_manager
.abort_job(&job_id, fail_message)
.abort_job(&job_id, fail_message.clone())
.await
{
Ok((running_tasks, _pending_tasks)) => {
if !running_tasks.is_empty() {
event_sender
// Broadcast job failed state immediately after status is persisted
self.broadcast_job_state(JobStateEvent::failed(
&job_id,
&fail_message,
));
Comment thread
peasee marked this conversation as resolved.
if !running_tasks.is_empty()
Comment thread
peasee marked this conversation as resolved.
&& let Err(e) = event_sender
.post_event(QueryStageSchedulerEvent::CancelTasks(
running_tasks,
))
.await?;
.await
{
error!(
"Fail to post CancelTasks for job {job_id} due to {e:?}"
);
}
Comment thread
peasee marked this conversation as resolved.
}
Err(e) => {
error!("Fail to invoke abort_job for job {job_id} due to {e:?}");
}
}

self.state.clean_up_failed_job(job_id);
}
QueryStageSchedulerEvent::JobUpdated(job_id) => {
Expand All @@ -297,21 +322,29 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>

info!("Job {job_id} Cancelled");

// Broadcast job cancelled state
self.broadcast_job_state(JobStateEvent::cancelled(&job_id));

// Persist terminal status before broadcasting so subscribers
// can immediately read the terminal status on receipt of the event.
// Note: cancel_job routes to abort_job, persisting a Failed status.
match self.state.task_manager.cancel_job(&job_id).await {
Comment thread
peasee marked this conversation as resolved.
Ok((running_tasks, _pending_tasks)) => {
event_sender
// Broadcast cancelled state immediately after status is persisted
self.broadcast_job_state(JobStateEvent::cancelled(&job_id));
if let Err(e) = event_sender
Comment thread
peasee marked this conversation as resolved.
.post_event(QueryStageSchedulerEvent::CancelTasks(
running_tasks,
))
.await?;
.await
{
error!(
"Fail to post CancelTasks for job {job_id} due to {e:?}"
);
}
}
Comment thread
peasee marked this conversation as resolved.
Err(e) => {
error!("Fail to invoke cancel_job for job {job_id} due to {e:?}");
}
}

self.state.clean_up_failed_job(job_id);
}
QueryStageSchedulerEvent::TaskUpdating(executor_id, tasks_status) => {
Expand Down
4 changes: 2 additions & 2 deletions ballista/scheduler/src/state/aqe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -920,8 +920,8 @@ impl ExecutionGraph for AdaptiveExecutionGraph {
/// Return all currently running tasks along with the executor ID on which they are assigned
fn running_tasks(&self) -> Vec<RunningTaskInfo> {
self.stages
.iter()
.flat_map(|(_, stage)| {
.values()
.flat_map(|stage| {
if let ExecutionStage::Running(stage) = stage {
stage
.running_tasks()
Expand Down
4 changes: 2 additions & 2 deletions ballista/scheduler/src/state/execution_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1320,8 +1320,8 @@ impl ExecutionGraph for StaticExecutionGraph {
/// Return all currently running tasks along with the executor ID on which they are assigned
fn running_tasks(&self) -> Vec<RunningTaskInfo> {
self.stages
.iter()
.flat_map(|(_, stage)| {
.values()
.flat_map(|stage| {
if let ExecutionStage::Running(stage) = stage {
stage
.running_tasks()
Expand Down
4 changes: 2 additions & 2 deletions ballista/scheduler/src/state/executor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,8 @@ impl ExecutorManager {

self.cluster_state
.executor_heartbeats()
.iter()
.filter_map(|(_exec, heartbeat)| {
.values()
.filter_map(|heartbeat| {
let terminating = matches!(
heartbeat
.status
Expand Down
Loading