Skip to content

Commit e1153d7

Browse files
fix: handle None task slot in update_task_info after executor lost (#23)
When an executor heartbeat times out, reset_tasks() sets task_infos[partition_id] to None. If the executor later reconnects and sends a late status update, update_task_info() would panic on .unwrap() of the None value. Now gracefully returns false (update rejected) with a warning log when the task slot is None, preventing the scheduler from crashing. Fixes spiceai/spiceai#9636
1 parent b4a797c commit e1153d7

1 file changed

Lines changed: 125 additions & 1 deletion

File tree

ballista/scheduler/src/state/execution_stage.rs

Lines changed: 125 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -646,7 +646,12 @@ impl RunningStage {
646646
/// Update the TaskInfo for task partition
647647
pub fn update_task_info(&mut self, partition_id: usize, status: TaskStatus) -> bool {
648648
debug!("Updating TaskInfo for partition {partition_id}");
649-
let task_info = self.task_infos[partition_id].as_ref().unwrap();
649+
let Some(task_info) = self.task_infos[partition_id].as_ref() else {
650+
warn!(
651+
"Ignore TaskStatus update for partition {partition_id} because the task was already reset (executor lost)"
652+
);
653+
return false;
654+
};
650655
let task_id = task_info.task_id;
651656
if (status.task_id as usize) < task_id {
652657
warn!(
@@ -1049,3 +1054,122 @@ impl StageOutput {
10491054
partition_locations
10501055
}
10511056
}
1057+
1058+
#[cfg(test)]
1059+
mod tests {
1060+
use super::*;
1061+
use ballista_core::serde::protobuf::{SuccessfulTask, TaskStatus, task_status};
1062+
use datafusion::physical_plan::empty::EmptyExec;
1063+
use datafusion::prelude::SessionConfig;
1064+
use std::collections::HashMap;
1065+
1066+
fn make_running_stage(partitions: usize) -> RunningStage {
1067+
let schema = Arc::new(datafusion::arrow::datatypes::Schema::empty());
1068+
let plan: Arc<dyn ExecutionPlan> = Arc::new(EmptyExec::new(schema));
1069+
RunningStage::new(
1070+
1,
1071+
0,
1072+
plan,
1073+
partitions,
1074+
vec![],
1075+
HashMap::new(),
1076+
Arc::new(SessionConfig::default()),
1077+
)
1078+
}
1079+
1080+
fn make_task_status(task_id: u32, partition_id: u32) -> TaskStatus {
1081+
TaskStatus {
1082+
task_id,
1083+
job_id: "test-job".to_string(),
1084+
stage_id: 1,
1085+
stage_attempt_num: 0,
1086+
partition_id,
1087+
launch_time: 100,
1088+
start_exec_time: 200,
1089+
end_exec_time: 300,
1090+
status: Some(task_status::Status::Successful(SuccessfulTask {
1091+
executor_id: "executor-1".to_string(),
1092+
partitions: vec![],
1093+
})),
1094+
metrics: vec![],
1095+
}
1096+
}
1097+
1098+
/// Regression test: `update_task_info` must not panic when the task slot
1099+
/// is `None` (task was reset after executor heartbeat timeout).
1100+
#[test]
1101+
fn test_update_task_info_after_reset_does_not_panic() {
1102+
let mut stage = make_running_stage(2);
1103+
1104+
// Both task slots start as None (not yet scheduled).
1105+
// Simulates receiving a status update for a task that was already
1106+
// reset (e.g., executor heartbeat timed out).
1107+
let status = make_task_status(0, 0);
1108+
let result = stage.update_task_info(0, status);
1109+
1110+
// Should return false (update rejected), not panic.
1111+
assert!(!result);
1112+
}
1113+
1114+
/// Verify that a normal update succeeds when the task slot is populated.
1115+
#[test]
1116+
fn test_update_task_info_normal_update_succeeds() {
1117+
let mut stage = make_running_stage(2);
1118+
1119+
// Simulate scheduling the task: populate the task slot.
1120+
stage.task_infos[0] = Some(TaskInfo {
1121+
task_id: 0,
1122+
scheduled_time: 50,
1123+
launch_time: 0,
1124+
start_exec_time: 0,
1125+
end_exec_time: 0,
1126+
finish_time: 0,
1127+
task_status: task_status::Status::Running(RunningTask {
1128+
executor_id: "executor-1".to_string(),
1129+
}),
1130+
});
1131+
1132+
let status = make_task_status(0, 0);
1133+
let result = stage.update_task_info(0, status);
1134+
1135+
assert!(result);
1136+
assert!(matches!(
1137+
stage.task_infos[0].as_ref().unwrap().task_status,
1138+
task_status::Status::Successful(_)
1139+
));
1140+
}
1141+
1142+
/// After reset_tasks sets a slot to None, update_task_info must not panic.
1143+
#[test]
1144+
fn test_update_task_info_after_executor_lost() {
1145+
let mut stage = make_running_stage(2);
1146+
1147+
// Populate tasks as running on executor-1.
1148+
for i in 0..2 {
1149+
stage.task_infos[i] = Some(TaskInfo {
1150+
task_id: i,
1151+
scheduled_time: 50,
1152+
launch_time: 100,
1153+
start_exec_time: 200,
1154+
end_exec_time: 0,
1155+
finish_time: 0,
1156+
task_status: task_status::Status::Running(RunningTask {
1157+
executor_id: "executor-1".to_string(),
1158+
}),
1159+
});
1160+
}
1161+
1162+
// Executor heartbeat times out - tasks are reset.
1163+
let reset_count = stage.reset_tasks("executor-1");
1164+
assert_eq!(reset_count, 2);
1165+
assert!(stage.task_infos[0].is_none());
1166+
assert!(stage.task_infos[1].is_none());
1167+
1168+
// Executor sends a late status update for partition 0.
1169+
let status = make_task_status(0, 0);
1170+
let result = stage.update_task_info(0, status);
1171+
1172+
// Should gracefully reject the update, not panic.
1173+
assert!(!result);
1174+
}
1175+
}

0 commit comments

Comments
 (0)