Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ballista/scheduler/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ pub(crate) async fn bind_task_bias(
continue;
}
let mut graph = job_info.execution_graph.write().await;

let session_id = graph.session_id().to_string();
let mut black_list = vec![];
while let Some((running_stage, task_id_gen)) =
Expand Down Expand Up @@ -548,6 +549,7 @@ pub(crate) async fn bind_task_round_robin(
continue;
}
let mut graph = job_info.execution_graph.write().await;

let session_id = graph.session_id().to_string();
let mut black_list = vec![];
while let Some((running_stage, task_id_gen)) =
Expand Down
1 change: 1 addition & 0 deletions ballista/scheduler/src/scheduler_server/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
}];
let available_slots = available_slots.iter_mut().collect();
let running_jobs = self.state.task_manager.get_running_job_cache();

let binding_result = match self.state.config.task_distribution {
TaskDistributionPolicy::Bias => {
bind_task_bias(available_slots, running_jobs, |_| false).await
Expand Down
25 changes: 25 additions & 0 deletions ballista/scheduler/src/scheduler_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
self.state.init().await?;
self.query_stage_event_loop.start()?;
self.expire_dead_executors()?;
self.start_pending_tasks_metrics_loop();

Ok(())
}
Expand Down Expand Up @@ -339,6 +340,30 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
Ok(())
}

/// Spawns a background task that periodically updates the pending tasks metric.
///
/// This metric requires iterating over all active jobs and acquiring read locks,
/// which can cause lock contention if done in the main event loop. Running it
/// periodically in a background task provides observability without impacting
/// scheduler performance.
fn start_pending_tasks_metrics_loop(&self) {
let state = self.state.clone();
tokio::task::spawn(async move {
// Update every 5 seconds - frequent enough for observability,
// infrequent enough to avoid lock contention
const UPDATE_INTERVAL: Duration = Duration::from_secs(5);

loop {
let pending_tasks = state.task_manager.total_pending_tasks().await;
state
.metrics_collector
.set_pending_tasks_queue_size(pending_tasks as u64);

tokio::time::sleep(UPDATE_INTERVAL).await;
}
});
}

pub(crate) fn remove_executor(
executor_manager: ExecutorManager,
event_sender: EventSender<QueryStageSchedulerEvent>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,15 +336,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
}
}

// Update queue size metrics after processing each event
// Update pending jobs queue size metric (this is a cheap O(1) operation)
let pending_jobs = self.state.task_manager.pending_job_number();
self.metrics_collector
.set_pending_jobs_queue_size(pending_jobs as u64);

let pending_tasks = self.state.task_manager.total_pending_tasks().await;
self.metrics_collector
.set_pending_tasks_queue_size(pending_tasks as u64);

Ok(())
}

Expand Down
27 changes: 25 additions & 2 deletions ballista/scheduler/src/state/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,11 +266,34 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
///
/// A pending task is a task that is available to schedule on an executor
/// but cannot be scheduled because no resources are available.
///
/// NOTE: This method iterates over all active jobs and acquires read locks
/// on each execution graph. It should NOT be called frequently (e.g., in a
/// hot loop or after every event) as it can cause lock contention with
/// concurrent task binding operations.
pub async fn total_pending_tasks(&self) -> usize {
let mut total = 0;
for entry in self.active_job_cache.iter() {
let graph = entry.value().execution_graph.read().await;
total += graph.available_tasks();
// Use a timeout to avoid blocking indefinitely if there's lock contention.
// If we can't acquire the lock within the timeout, skip this job's count
// rather than blocking the metrics collection.
match tokio::time::timeout(
Duration::from_millis(100),
entry.value().execution_graph.read(),
)
.await
{
Ok(graph) => {
total += graph.available_tasks();
}
Err(_) => {
// Lock acquisition timed out, skip this job
trace!(
"Skipping pending task count for job {} due to lock contention",
entry.key()
);
}
}
}
total
}
Expand Down