Skip to content

Commit ceee000

Browse files
committed
fix: broadcast job state correctly
1 parent 729428c commit ceee000

1 file changed

Lines changed: 21 additions & 10 deletions

File tree

ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -243,12 +243,17 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
243243

244244
info!("Job {job_id} success");
245245

246-
// Broadcast job completed state
247-
self.broadcast_job_state(JobStateEvent::completed(&job_id));
248-
246+
// Persist terminal status BEFORE broadcasting completion so that
247+
// subscribers (e.g. SpiceAI's QueryHandle) who receive the
248+
// Completed event can immediately read the Successful status
249+
// without hitting a retry/polling loop.
249250
if let Err(e) = self.state.task_manager.succeed_job(&job_id).await {
250251
error!("Fail to invoke succeed_job for job {job_id} due to {e:?}");
251252
}
253+
254+
// Broadcast job completed state after status is persisted
255+
self.broadcast_job_state(JobStateEvent::completed(&job_id));
256+
252257
self.state.clean_up_successful_job(job_id);
253258
}
254259
QueryStageSchedulerEvent::JobRunningFailed {
@@ -262,13 +267,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
262267

263268
error!("Job {job_id} running failed");
264269

265-
// Broadcast job failed state
266-
self.broadcast_job_state(JobStateEvent::failed(&job_id, &fail_message));
267-
270+
// Persist terminal status before broadcasting so subscribers
271+
// can immediately read the Failed status on receipt of the event.
268272
match self
269273
.state
270274
.task_manager
271-
.abort_job(&job_id, fail_message)
275+
.abort_job(&job_id, fail_message.clone())
272276
.await
273277
{
274278
Ok((running_tasks, _pending_tasks)) => {
@@ -284,6 +288,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
284288
error!("Fail to invoke abort_job for job {job_id} due to {e:?}");
285289
}
286290
}
291+
292+
// Broadcast job failed state after status is persisted
293+
self.broadcast_job_state(JobStateEvent::failed(&job_id, &fail_message));
294+
287295
self.state.clean_up_failed_job(job_id);
288296
}
289297
QueryStageSchedulerEvent::JobUpdated(job_id) => {
@@ -297,9 +305,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
297305

298306
info!("Job {job_id} Cancelled");
299307

300-
// Broadcast job cancelled state
301-
self.broadcast_job_state(JobStateEvent::cancelled(&job_id));
302-
308+
// Persist terminal status before broadcasting so subscribers
309+
// can immediately read the Cancelled status on receipt of the event.
303310
match self.state.task_manager.cancel_job(&job_id).await {
304311
Ok((running_tasks, _pending_tasks)) => {
305312
event_sender
@@ -312,6 +319,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
312319
error!("Fail to invoke cancel_job for job {job_id} due to {e:?}");
313320
}
314321
}
322+
323+
// Broadcast cancelled state after status is persisted
324+
self.broadcast_job_state(JobStateEvent::cancelled(&job_id));
325+
315326
self.state.clean_up_failed_job(job_id);
316327
}
317328
QueryStageSchedulerEvent::TaskUpdating(executor_id, tasks_status) => {

0 commit comments

Comments
 (0)