Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 30 additions & 12 deletions ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,12 +243,22 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>

info!("Job {job_id} success");

// Broadcast job completed state
self.broadcast_job_state(JobStateEvent::completed(&job_id));

if let Err(e) = self.state.task_manager.succeed_job(&job_id).await {
error!("Fail to invoke succeed_job for job {job_id} due to {e:?}");
// Persist terminal status BEFORE broadcasting completion so that
// subscribers (e.g. SpiceAI's QueryHandle) who receive the
// Completed event can immediately read the Successful status
// without hitting a retry/polling loop.
match self.state.task_manager.succeed_job(&job_id).await {
Ok(()) => {
// Broadcast job completed state after status is persisted
self.broadcast_job_state(JobStateEvent::completed(&job_id));
Comment thread
peasee marked this conversation as resolved.
}
Comment thread
peasee marked this conversation as resolved.
Err(e) => {
error!(
"Fail to invoke succeed_job for job {job_id} due to {e:?}"
);
}
}
Comment thread
peasee marked this conversation as resolved.

self.state.clean_up_successful_job(job_id);
}
Comment thread
peasee marked this conversation as resolved.
QueryStageSchedulerEvent::JobRunningFailed {
Expand All @@ -262,13 +272,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>

error!("Job {job_id} running failed");

// Broadcast job failed state
self.broadcast_job_state(JobStateEvent::failed(&job_id, &fail_message));

// Persist terminal status before broadcasting so subscribers
// can immediately read the Failed status on receipt of the event.
match self
.state
.task_manager
.abort_job(&job_id, fail_message)
.abort_job(&job_id, fail_message.clone())
.await
{
Ok((running_tasks, _pending_tasks)) => {
Expand All @@ -279,11 +288,17 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
))
.await?;
}
Comment thread
peasee marked this conversation as resolved.
// Broadcast job failed state after status is persisted
self.broadcast_job_state(JobStateEvent::failed(
&job_id,
&fail_message,
));
Comment thread
peasee marked this conversation as resolved.
}
Err(e) => {
error!("Fail to invoke abort_job for job {job_id} due to {e:?}");
}
}

self.state.clean_up_failed_job(job_id);
}
QueryStageSchedulerEvent::JobUpdated(job_id) => {
Expand All @@ -297,21 +312,24 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>

info!("Job {job_id} Cancelled");

// Broadcast job cancelled state
self.broadcast_job_state(JobStateEvent::cancelled(&job_id));

// Persist terminal status before broadcasting so subscribers
// can immediately read the terminal status on receipt of the event.
// Note: cancel_job routes to abort_job, persisting a Failed status.
match self.state.task_manager.cancel_job(&job_id).await {
Comment thread
peasee marked this conversation as resolved.
Ok((running_tasks, _pending_tasks)) => {
event_sender
.post_event(QueryStageSchedulerEvent::CancelTasks(
running_tasks,
))
.await?;
// Broadcast cancelled state after status is persisted
self.broadcast_job_state(JobStateEvent::cancelled(&job_id));
}
Comment thread
peasee marked this conversation as resolved.
Err(e) => {
error!("Fail to invoke cancel_job for job {job_id} due to {e:?}");
}
}

self.state.clean_up_failed_job(job_id);
}
QueryStageSchedulerEvent::TaskUpdating(executor_id, tasks_status) => {
Expand Down
4 changes: 2 additions & 2 deletions ballista/scheduler/src/state/aqe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -920,8 +920,8 @@ impl ExecutionGraph for AdaptiveExecutionGraph {
/// Return all currently running tasks along with the executor ID on which they are assigned
fn running_tasks(&self) -> Vec<RunningTaskInfo> {
self.stages
.iter()
.flat_map(|(_, stage)| {
.values()
.flat_map(|stage| {
if let ExecutionStage::Running(stage) = stage {
stage
.running_tasks()
Expand Down
4 changes: 2 additions & 2 deletions ballista/scheduler/src/state/execution_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1320,8 +1320,8 @@ impl ExecutionGraph for StaticExecutionGraph {
/// Return all currently running tasks along with the executor ID on which they are assigned
fn running_tasks(&self) -> Vec<RunningTaskInfo> {
self.stages
.iter()
.flat_map(|(_, stage)| {
.values()
.flat_map(|stage| {
if let ExecutionStage::Running(stage) = stage {
stage
.running_tasks()
Expand Down
4 changes: 2 additions & 2 deletions ballista/scheduler/src/state/executor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,8 @@ impl ExecutorManager {

self.cluster_state
.executor_heartbeats()
.iter()
.filter_map(|(_exec, heartbeat)| {
.values()
.filter_map(|heartbeat| {
let terminating = matches!(
heartbeat
.status
Expand Down
Loading