Skip to content

Commit cd18612

Browse files
Add remaining scheduler metrics instrumentation (stage_started, task_scheduled, queue sizes)
1 parent 98f7964 commit cd18612

5 files changed

Lines changed: 86 additions & 14 deletions

File tree

ballista/scheduler/src/scheduler_server/grpc.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,17 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
140140
let mut tasks = vec![];
141141
for (_, task) in schedulable_tasks {
142142
let job_id = task.partition.job_id.clone();
143+
let stage_id = task.partition.stage_id;
144+
145+
// Record task scheduling metric
146+
// Note: latency_ms is 0 since we don't currently track when tasks became schedulable.
147+
self.state.metrics_collector.record_task_scheduled(
148+
&job_id,
149+
stage_id,
150+
&executor_id,
151+
0,
152+
);
153+
143154
match self.state.task_manager.prepare_task_definition(task) {
144155
Ok(task_definition) => tasks.push(task_definition),
145156
Err(e) => {

ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,18 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
323323
);
324324
}
325325
}
326+
327+
// Update queue size metrics after processing each event
328+
let pending_jobs = self.state.task_manager.pending_job_number();
329+
#[expect(clippy::cast_possible_truncation)]
330+
self.metrics_collector
331+
.set_pending_jobs_queue_size(pending_jobs as u64);
332+
333+
let pending_tasks = self.state.task_manager.total_pending_tasks().await;
334+
#[expect(clippy::cast_possible_truncation)]
335+
self.metrics_collector
336+
.set_pending_tasks_queue_size(pending_tasks as u64);
337+
326338
Ok(())
327339
}
328340

ballista/scheduler/src/state/execution_graph.rs

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,15 @@ impl ExecutionGraph {
305305
/// Revive the execution graph by converting the resolved stages to running stages
306306
/// If any stages are converted, return true; else false.
307307
pub fn revive(&mut self) -> bool {
308+
self.revive_with_metrics().0
309+
}
310+
311+
/// Revive the execution graph by converting the resolved stages to running stages.
312+
///
313+
/// Returns a tuple of (stages_converted, stages_started_info) where:
314+
/// - `stages_converted`: true if any stages were converted
315+
/// - `stages_started_info`: list of (stage_id, task_count, started_at_ms) for each started stage
316+
pub fn revive_with_metrics(&mut self) -> (bool, Vec<(usize, usize, u64)>) {
308317
let running_stages = self
309318
.stages
310319
.values()
@@ -318,15 +327,23 @@ impl ExecutionGraph {
318327
.collect::<Vec<_>>();
319328

320329
if running_stages.is_empty() {
321-
false
330+
(false, vec![])
322331
} else {
332+
let mut stages_started = Vec::with_capacity(running_stages.len());
323333
for running_stage in running_stages {
334+
let stage_id = running_stage.stage_id;
335+
let task_count = running_stage.partitions;
336+
#[expect(clippy::cast_possible_truncation)]
337+
let started_at_ms = running_stage.stage_running_time as u64;
338+
339+
stages_started.push((stage_id, task_count, started_at_ms));
340+
324341
self.stages.insert(
325342
running_stage.stage_id,
326343
ExecutionStage::Running(running_stage),
327344
);
328345
}
329-
true
346+
(true, stages_started)
330347
}
331348
}
332349

@@ -354,7 +371,15 @@ impl ExecutionGraph {
354371

355372
// Revive before updating due to some updates not saved
356373
// It will be refined later
357-
self.revive();
374+
let (_, stages_started) = self.revive_with_metrics();
375+
for (stage_id, task_count, started_at_ms) in stages_started {
376+
metrics_info.stages_started.push((
377+
job_id.clone(),
378+
stage_id,
379+
task_count,
380+
started_at_ms,
381+
));
382+
}
358383

359384
let current_running_stages: HashSet<usize> =
360385
HashSet::from_iter(self.running_stages());
@@ -775,17 +800,16 @@ impl ExecutionGraph {
775800
}
776801
}
777802

778-
let (events, mut stage_metrics) =
779-
self.processing_stages_update(UpdatedStages {
780-
resolved_stages,
781-
successful_stages,
782-
failed_stages,
783-
rollback_running_stages,
784-
resubmit_successful_stages: resubmit_successful_stages
785-
.keys()
786-
.cloned()
787-
.collect(),
788-
})?;
803+
let (events, stage_metrics) = self.processing_stages_update(UpdatedStages {
804+
resolved_stages,
805+
successful_stages,
806+
failed_stages,
807+
rollback_running_stages,
808+
resubmit_successful_stages: resubmit_successful_stages
809+
.keys()
810+
.cloned()
811+
.collect(),
812+
})?;
789813

790814
// Combine task metrics collected during processing with stage metrics
791815
metrics_info

ballista/scheduler/src/state/mod.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,18 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
284284
&self,
285285
bound_tasks: Vec<BoundTask>,
286286
) -> Result<Vec<ExecutorSlot>> {
287+
// Record task scheduling metrics for each task
288+
for (executor_id, task) in &bound_tasks {
289+
// Note: latency_ms is 0 since we don't currently track when tasks became schedulable.
290+
// This could be enhanced by adding a schedulable_time field to TaskDescription.
291+
self.metrics_collector.record_task_scheduled(
292+
&task.partition.job_id,
293+
task.partition.stage_id,
294+
executor_id,
295+
0, // latency_ms placeholder
296+
);
297+
}
298+
287299
// Put tasks to the same executor together
288300
// And put tasks belonging to the same stage together for creating MultiTaskDefinition
289301
let mut executor_stage_assignments: HashMap<

ballista/scheduler/src/state/task_manager.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,19 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
263263
self.active_job_cache.len()
264264
}
265265

266+
/// Get the total number of pending tasks across all active jobs.
267+
///
268+
/// A pending task is a task that is available to schedule on an executor
269+
/// but cannot be scheduled because no resources are available.
270+
pub async fn total_pending_tasks(&self) -> usize {
271+
let mut total = 0;
272+
for entry in self.active_job_cache.iter() {
273+
let graph = entry.value().execution_graph.read().await;
274+
total += graph.available_tasks();
275+
}
276+
total
277+
}
278+
266279
/// Generate an ExecutionGraph for the job and save it to the persistent state.
267280
/// By default, this job will be curated by the scheduler which receives it.
268281
/// Then we will also save it to the active execution graph

0 commit comments

Comments
 (0)