Skip to content

Commit 2e98399

Browse files
committed
review: Address comments
1 parent ceee000 commit 2e98399

4 files changed

Lines changed: 22 additions & 21 deletions

File tree

ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -247,13 +247,18 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
247247
// subscribers (e.g. SpiceAI's QueryHandle) who receive the
248248
// Completed event can immediately read the Successful status
249249
// without hitting a retry/polling loop.
250-
if let Err(e) = self.state.task_manager.succeed_job(&job_id).await {
251-
error!("Fail to invoke succeed_job for job {job_id} due to {e:?}");
250+
match self.state.task_manager.succeed_job(&job_id).await {
251+
Ok(()) => {
252+
// Broadcast job completed state after status is persisted
253+
self.broadcast_job_state(JobStateEvent::completed(&job_id));
254+
}
255+
Err(e) => {
256+
error!(
257+
"Fail to invoke succeed_job for job {job_id} due to {e:?}"
258+
);
259+
}
252260
}
253261

254-
// Broadcast job completed state after status is persisted
255-
self.broadcast_job_state(JobStateEvent::completed(&job_id));
256-
257262
self.state.clean_up_successful_job(job_id);
258263
}
259264
QueryStageSchedulerEvent::JobRunningFailed {
@@ -283,15 +288,17 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
283288
))
284289
.await?;
285290
}
291+
// Broadcast job failed state after status is persisted
292+
self.broadcast_job_state(JobStateEvent::failed(
293+
&job_id,
294+
&fail_message,
295+
));
286296
}
287297
Err(e) => {
288298
error!("Fail to invoke abort_job for job {job_id} due to {e:?}");
289299
}
290300
}
291301

292-
// Broadcast job failed state after status is persisted
293-
self.broadcast_job_state(JobStateEvent::failed(&job_id, &fail_message));
294-
295302
self.state.clean_up_failed_job(job_id);
296303
}
297304
QueryStageSchedulerEvent::JobUpdated(job_id) => {
@@ -306,23 +313,23 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
306313
info!("Job {job_id} Cancelled");
307314

308315
// Persist terminal status before broadcasting so subscribers
309-
// can immediately read the Cancelled status on receipt of the event.
316+
// can immediately read the terminal status on receipt of the event.
317+
// Note: cancel_job routes to abort_job, persisting a Failed status.
310318
match self.state.task_manager.cancel_job(&job_id).await {
311319
Ok((running_tasks, _pending_tasks)) => {
312320
event_sender
313321
.post_event(QueryStageSchedulerEvent::CancelTasks(
314322
running_tasks,
315323
))
316324
.await?;
325+
// Broadcast cancelled state after status is persisted
326+
self.broadcast_job_state(JobStateEvent::cancelled(&job_id));
317327
}
318328
Err(e) => {
319329
error!("Fail to invoke cancel_job for job {job_id} due to {e:?}");
320330
}
321331
}
322332

323-
// Broadcast cancelled state after status is persisted
324-
self.broadcast_job_state(JobStateEvent::cancelled(&job_id));
325-
326333
self.state.clean_up_failed_job(job_id);
327334
}
328335
QueryStageSchedulerEvent::TaskUpdating(executor_id, tasks_status) => {

ballista/scheduler/src/state/aqe/mod.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -919,9 +919,7 @@ impl ExecutionGraph for AdaptiveExecutionGraph {
919919

920920
/// Return all currently running tasks along with the executor ID on which they are assigned
921921
fn running_tasks(&self) -> Vec<RunningTaskInfo> {
922-
self.stages
923-
.iter()
924-
.flat_map(|(_, stage)| {
922+
self.stages.values().flat_map(|stage| {
925923
if let ExecutionStage::Running(stage) = stage {
926924
stage
927925
.running_tasks()

ballista/scheduler/src/state/execution_graph.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1319,9 +1319,7 @@ impl ExecutionGraph for StaticExecutionGraph {
13191319

13201320
/// Return all currently running tasks along with the executor ID on which they are assigned
13211321
fn running_tasks(&self) -> Vec<RunningTaskInfo> {
1322-
self.stages
1323-
.iter()
1324-
.flat_map(|(_, stage)| {
1322+
self.stages.values().flat_map(|stage| {
13251323
if let ExecutionStage::Running(stage) = stage {
13261324
stage
13271325
.running_tasks()

ballista/scheduler/src/state/executor_manager.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -438,9 +438,7 @@ impl ExecutorManager {
438438
get_time_before(self.config.executor_termination_grace_period);
439439

440440
self.cluster_state
441-
.executor_heartbeats()
442-
.iter()
443-
.filter_map(|(_exec, heartbeat)| {
441+
.executor_heartbeats().values().filter_map(|heartbeat| {
444442
let terminating = matches!(
445443
heartbeat
446444
.status

0 commit comments

Comments
 (0)