Skip to content

Commit ce4cc6d

Browse files
Phase 2: expose ExecutionGraph for task_history; persist executor_id on terminal TaskInfo
1 parent 8bc4d75 commit ce4cc6d

4 files changed

Lines changed: 21 additions & 2 deletions

File tree

ballista/scheduler/src/state/aqe/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1267,6 +1267,7 @@ impl ExecutionGraph for AdaptiveExecutionGraph {
12671267
let task_attempt = stage.task_failure_numbers[partition_id];
12681268
let task_info = crate::state::execution_graph::TaskInfo {
12691269
task_id,
1270+
executor_id: executor_id.to_owned(),
12701271
scheduled_time: SystemTime::now()
12711272
.duration_since(UNIX_EPOCH)
12721273
.unwrap()

ballista/scheduler/src/state/execution_graph.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1658,6 +1658,7 @@ impl ExecutionGraph for StaticExecutionGraph {
16581658
let task_attempt = stage.task_failure_numbers[partition_id];
16591659
let task_info = TaskInfo {
16601660
task_id,
1661+
executor_id: executor_id.to_owned(),
16611662
scheduled_time: SystemTime::now()
16621663
.duration_since(UNIX_EPOCH)
16631664
.unwrap()
@@ -1742,6 +1743,7 @@ impl Debug for StaticExecutionGraph {
17421743
pub fn create_task_info(executor_id: String, task_id: usize) -> TaskInfo {
17431744
TaskInfo {
17441745
task_id,
1746+
executor_id: executor_id.clone(),
17451747
scheduled_time: SystemTime::now()
17461748
.duration_since(UNIX_EPOCH)
17471749
.unwrap()

ballista/scheduler/src/state/execution_stage.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,14 @@ pub struct FailedStage {
247247
pub struct TaskInfo {
248248
/// Unique task identifier within the execution graph.
249249
pub task_id: usize,
250+
/// ID of the executor that ran (or is running) this task.
251+
///
252+
/// Carried at the top level so it survives terminal status transitions
253+
/// to `Failed` — `FailedTask` does not embed an `executor_id`, so without
254+
/// this field the scheduler would lose the executor mapping for any
255+
/// failed partition. Populated at task launch and preserved through
256+
/// `update_task_info`.
257+
pub executor_id: String,
250258
/// Timestamp when the task was scheduled (in milliseconds since epoch).
251259
pub scheduled_time: u128,
252260
/// Timestamp when the task was launched on an executor (in milliseconds since epoch).
@@ -661,9 +669,11 @@ impl RunningStage {
661669
return false;
662670
}
663671
let scheduled_time = task_info.scheduled_time;
672+
let executor_id = task_info.executor_id.clone();
664673
let task_status = status.status.unwrap();
665674
let updated_task_info = TaskInfo {
666675
task_id,
676+
executor_id,
667677
scheduled_time,
668678
launch_time: status.launch_time as u128,
669679
start_exec_time: status.start_exec_time as u128,
@@ -898,6 +908,7 @@ impl SuccessfulStage {
898908
} if *executor == *executor_id => {
899909
*task = TaskInfo {
900910
task_id: *task_id,
911+
executor_id: executor_id.clone(),
901912
scheduled_time: *scheduled_time,
902913
launch_time: 0,
903914
start_exec_time: 0,
@@ -1119,6 +1130,7 @@ mod tests {
11191130
// Simulate scheduling the task: populate the task slot.
11201131
stage.task_infos[0] = Some(TaskInfo {
11211132
task_id: 0,
1133+
executor_id: "executor-1".to_string(),
11221134
scheduled_time: 50,
11231135
launch_time: 0,
11241136
start_exec_time: 0,
@@ -1148,6 +1160,7 @@ mod tests {
11481160
for i in 0..2 {
11491161
stage.task_infos[i] = Some(TaskInfo {
11501162
task_id: i,
1163+
executor_id: "executor-1".to_string(),
11511164
scheduled_time: 50,
11521165
launch_time: 100,
11531166
start_exec_time: 200,

ballista/scheduler/src/state/task_manager.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -408,8 +408,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
408408

409409
/// Get the execution graph of of a job. First look in the active cache.
410410
/// If no one found, then in the Active/Completed jobs.
411-
#[cfg(feature = "rest-api")]
412-
pub(crate) async fn get_job_execution_graph(
411+
///
412+
/// Exposed as `pub` so embedded callers (e.g., Spice's distributed
413+
/// task_history writer) can walk per-stage and per-task state directly
414+
/// without going through a gRPC method.
415+
pub async fn get_job_execution_graph(
413416
&self,
414417
job_id: &str,
415418
) -> Result<Option<ExecutionGraphBox>> {

0 commit comments

Comments
 (0)