Skip to content

Commit a34f5a4

Browse files
committed
review: Address comments
1 parent b908630 commit a34f5a4

1 file changed

Lines changed: 37 additions & 22 deletions

File tree

ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs

Lines changed: 37 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -219,18 +219,25 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
219219

220220
error!("Job {job_id} failed: {fail_message}");
221221

222-
// Broadcast job failed state
223-
self.broadcast_job_state(JobStateEvent::failed(&job_id, &fail_message));
224-
225-
if let Err(e) = self
222+
// Persist terminal status before broadcasting so subscribers
223+
// can immediately read the Failed status on receipt of the event.
224+
match self
226225
.state
227226
.task_manager
228-
.fail_unscheduled_job(&job_id, fail_message)
227+
.fail_unscheduled_job(&job_id, fail_message.clone())
229228
.await
230229
{
231-
error!(
232-
"Fail to invoke fail_unscheduled_job for job {job_id} due to {e:?}"
233-
);
230+
Ok(()) => {
231+
self.broadcast_job_state(JobStateEvent::failed(
232+
&job_id,
233+
&fail_message,
234+
));
235+
}
236+
Err(e) => {
237+
error!(
238+
"Fail to invoke fail_unscheduled_job for job {job_id} due to {e:?}"
239+
);
240+
}
234241
}
235242
}
236243
QueryStageSchedulerEvent::JobFinished {
@@ -251,15 +258,14 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
251258
Ok(()) => {
252259
// Broadcast job completed state after status is persisted
253260
self.broadcast_job_state(JobStateEvent::completed(&job_id));
261+
self.state.clean_up_successful_job(job_id);
254262
}
255263
Err(e) => {
256264
error!(
257265
"Fail to invoke succeed_job for job {job_id} due to {e:?}"
258266
);
259267
}
260268
}
261-
262-
self.state.clean_up_successful_job(job_id);
263269
}
264270
QueryStageSchedulerEvent::JobRunningFailed {
265271
job_id,
@@ -281,18 +287,22 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
281287
.await
282288
{
283289
Ok((running_tasks, _pending_tasks)) => {
284-
if !running_tasks.is_empty() {
285-
event_sender
286-
.post_event(QueryStageSchedulerEvent::CancelTasks(
287-
running_tasks,
288-
))
289-
.await?;
290-
}
291-
// Broadcast job failed state after status is persisted
290+
// Broadcast job failed state immediately after status is persisted
292291
self.broadcast_job_state(JobStateEvent::failed(
293292
&job_id,
294293
&fail_message,
295294
));
295+
if !running_tasks.is_empty()
296+
&& let Err(e) = event_sender
297+
.post_event(QueryStageSchedulerEvent::CancelTasks(
298+
running_tasks,
299+
))
300+
.await
301+
{
302+
error!(
303+
"Fail to post CancelTasks for job {job_id} due to {e:?}"
304+
);
305+
}
296306
}
297307
Err(e) => {
298308
error!("Fail to invoke abort_job for job {job_id} due to {e:?}");
@@ -317,13 +327,18 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
317327
// Note: cancel_job routes to abort_job, persisting a Failed status.
318328
match self.state.task_manager.cancel_job(&job_id).await {
319329
Ok((running_tasks, _pending_tasks)) => {
320-
event_sender
330+
// Broadcast cancelled state immediately after status is persisted
331+
self.broadcast_job_state(JobStateEvent::cancelled(&job_id));
332+
if let Err(e) = event_sender
321333
.post_event(QueryStageSchedulerEvent::CancelTasks(
322334
running_tasks,
323335
))
324-
.await?;
325-
// Broadcast cancelled state after status is persisted
326-
self.broadcast_job_state(JobStateEvent::cancelled(&job_id));
336+
.await
337+
{
338+
error!(
339+
"Fail to post CancelTasks for job {job_id} due to {e:?}"
340+
);
341+
}
327342
}
328343
Err(e) => {
329344
error!("Fail to invoke cancel_job for job {job_id} due to {e:?}");

0 commit comments

Comments
 (0)