Skip to content

Commit 201f2ee

Browse files
Add actual task scheduling latency tracking
- Add schedulable_time_millis field to TaskDescription to track when a task became schedulable (when its stage transitioned to running state) - Update all TaskDescription creation sites to pass RunningStage.stage_running_time - Calculate actual scheduling latency in record_task_scheduled calls by computing the difference between current time and schedulable_time_millis - This enables accurate scheduler_task_scheduling_latency_ms metrics instead of the previous placeholder value of 0
1 parent 683e793 commit 201f2ee

4 files changed

Lines changed: 26 additions & 8 deletions

File tree

ballista/scheduler/src/cluster/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,7 @@ pub(crate) async fn bind_task_bias(
512512
task_attempt: running_stage.task_failure_numbers[partition_id],
513513
plan: running_stage.plan.clone(),
514514
session_config: running_stage.session_config.clone(),
515+
schedulable_time_millis: running_stage.stage_running_time,
515516
};
516517
result.bound_tasks.push((executor_id, task_desc));
517518

@@ -614,6 +615,7 @@ pub(crate) async fn bind_task_round_robin(
614615
task_attempt: running_stage.task_failure_numbers[partition_id],
615616
plan: running_stage.plan.clone(),
616617
session_config: running_stage.session_config.clone(),
618+
schedulable_time_millis: running_stage.stage_running_time,
617619
};
618620
result.bound_tasks.push((executor_id, task_desc));
619621

@@ -762,6 +764,7 @@ pub(crate) async fn bind_task_consistent_hash(
762764
[partition_id],
763765
plan: running_stage.plan.clone(),
764766
session_config: running_stage.session_config.clone(),
767+
schedulable_time_millis: running_stage.stage_running_time,
765768
};
766769
result.bound_tasks.push((executor_id, task_desc));
767770

ballista/scheduler/src/scheduler_server/grpc.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,17 +161,22 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
161161
}
162162

163163
let mut tasks = vec![];
164+
let now_millis = SystemTime::now()
165+
.duration_since(UNIX_EPOCH)
166+
.unwrap_or_default()
167+
.as_millis();
168+
164169
for (_, task) in binding_result.bound_tasks {
165170
let job_id = task.partition.job_id.clone();
166171
let stage_id = task.partition.stage_id;
167172

168-
// Record task scheduling metric
169-
// Note: latency_ms is 0 since we don't currently track when tasks became schedulable.
173+
// Record task scheduling metric with actual latency
174+
let latency_ms = now_millis.saturating_sub(task.schedulable_time_millis);
170175
self.state.metrics_collector.record_task_scheduled(
171176
&job_id,
172177
stage_id,
173178
&executor_id,
174-
0,
179+
latency_ms as u64,
175180
);
176181

177182
match self.state.task_manager.prepare_task_definition(task) {

ballista/scheduler/src/state/execution_graph.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1135,7 +1135,8 @@ impl ExecutionGraph {
11351135
task_id,
11361136
task_attempt,
11371137
plan: stage.plan.clone(),
1138-
session_config: self.session_config.clone()
1138+
session_config: self.session_config.clone(),
1139+
schedulable_time_millis: stage.stage_running_time,
11391140
})
11401141
} else {
11411142
Err(BallistaError::General(format!("Stage {stage_id} is not a running stage")))
@@ -1728,6 +1729,9 @@ pub struct TaskDescription {
17281729
pub plan: Arc<dyn ExecutionPlan>,
17291730
/// Session configuration for this task's execution context.
17301731
pub session_config: Arc<SessionConfig>,
1732+
/// Timestamp (millis since epoch) when this task became schedulable.
1733+
/// This is when the stage transitioned to running state.
1734+
pub schedulable_time_millis: u128,
17311735
}
17321736

17331737
impl Debug for TaskDescription {

ballista/scheduler/src/state/mod.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
2323
use std::any::type_name;
2424
use std::collections::HashMap;
2525
use std::sync::Arc;
26-
use std::time::Instant;
26+
use std::time::{Instant, SystemTime, UNIX_EPOCH};
2727

2828
use crate::scheduler_server::event::QueryStageSchedulerEvent;
2929

@@ -302,15 +302,21 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
302302
&self,
303303
bound_tasks: Vec<BoundTask>,
304304
) -> Result<Vec<ExecutorSlot>> {
305+
// Get current time once for all latency calculations
306+
let now_millis = SystemTime::now()
307+
.duration_since(UNIX_EPOCH)
308+
.unwrap_or_default()
309+
.as_millis();
310+
305311
// Record task scheduling metrics for each task
306312
for (executor_id, task) in &bound_tasks {
307-
// Note: latency_ms is 0 since we don't currently track when tasks became schedulable.
308-
// This could be enhanced by adding a schedulable_time field to TaskDescription.
313+
// Calculate scheduling latency: time from when task became schedulable to now
314+
let latency_ms = now_millis.saturating_sub(task.schedulable_time_millis);
309315
self.metrics_collector.record_task_scheduled(
310316
&task.partition.job_id,
311317
task.partition.stage_id,
312318
executor_id,
313-
0, // latency_ms placeholder
319+
latency_ms as u64,
314320
);
315321
}
316322

0 commit comments

Comments
 (0)