Skip to content

Commit c45c17b

Browse files
fix: resolve executor timeout due to lock contention in scheduler event loop
The metrics instrumentation PR introduced a hot-path call to total_pending_tasks() after every scheduler event. This method acquires read locks on all execution graphs, which combined with concurrent poll_work calls (acquiring write locks), caused deadlock-like behavior due to tokio RwLock fairness semantics. Changes: - Remove total_pending_tasks() from event loop hot path - Add periodic background task (5s interval) for pending_tasks metric - Add 100ms timeout to lock acquisition in total_pending_tasks() as safety net - Remove diagnostic logging and unnecessary timeouts - Clean up unused imports
1 parent d827769 commit c45c17b

5 files changed

Lines changed: 54 additions & 6 deletions

File tree

ballista/scheduler/src/cluster/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,7 @@ pub(crate) async fn bind_task_bias(
447447
continue;
448448
}
449449
let mut graph = job_info.execution_graph.write().await;
450+
450451
let session_id = graph.session_id().to_string();
451452
let mut black_list = vec![];
452453
while let Some((running_stage, task_id_gen)) =
@@ -548,6 +549,7 @@ pub(crate) async fn bind_task_round_robin(
548549
continue;
549550
}
550551
let mut graph = job_info.execution_graph.write().await;
552+
551553
let session_id = graph.session_id().to_string();
552554
let mut black_list = vec![];
553555
while let Some((running_stage, task_id_gen)) =

ballista/scheduler/src/scheduler_server/grpc.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
118118
}];
119119
let available_slots = available_slots.iter_mut().collect();
120120
let running_jobs = self.state.task_manager.get_running_job_cache();
121+
121122
let binding_result = match self.state.config.task_distribution {
122123
TaskDistributionPolicy::Bias => {
123124
bind_task_bias(available_slots, running_jobs, |_| false).await

ballista/scheduler/src/scheduler_server/mod.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
169169
self.state.init().await?;
170170
self.query_stage_event_loop.start()?;
171171
self.expire_dead_executors()?;
172+
self.start_pending_tasks_metrics_loop();
172173

173174
Ok(())
174175
}
@@ -339,6 +340,30 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
339340
Ok(())
340341
}
341342

343+
/// Spawns a background task that periodically updates the pending tasks metric.
344+
///
345+
/// This metric requires iterating over all active jobs and acquiring read locks,
346+
/// which can cause lock contention if done in the main event loop. Running it
347+
/// periodically in a background task provides observability without impacting
348+
/// scheduler performance.
349+
fn start_pending_tasks_metrics_loop(&self) {
350+
let state = self.state.clone();
351+
tokio::task::spawn(async move {
352+
// Update every 5 seconds - frequent enough for observability,
353+
// infrequent enough to avoid lock contention
354+
const UPDATE_INTERVAL: Duration = Duration::from_secs(5);
355+
356+
loop {
357+
let pending_tasks = state.task_manager.total_pending_tasks().await;
358+
state
359+
.metrics_collector
360+
.set_pending_tasks_queue_size(pending_tasks as u64);
361+
362+
tokio::time::sleep(UPDATE_INTERVAL).await;
363+
}
364+
});
365+
}
366+
342367
pub(crate) fn remove_executor(
343368
executor_manager: ExecutorManager,
344369
event_sender: EventSender<QueryStageSchedulerEvent>,

ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -336,14 +336,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
336336
}
337337
}
338338

339-
// Update queue size metrics after processing each event
339+
// Update pending jobs queue size metric (this is a cheap O(1) operation)
340340
let pending_jobs = self.state.task_manager.pending_job_number();
341341
self.metrics_collector
342342
.set_pending_jobs_queue_size(pending_jobs as u64);
343343

344-
let pending_tasks = self.state.task_manager.total_pending_tasks().await;
345-
self.metrics_collector
346-
.set_pending_tasks_queue_size(pending_tasks as u64);
347344

348345
Ok(())
349346
}

ballista/scheduler/src/state/task_manager.rs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -266,11 +266,34 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
266266
///
267267
/// A pending task is a task that is available to schedule on an executor
268268
/// but cannot be scheduled because no resources are available.
269+
///
270+
/// NOTE: This method iterates over all active jobs and acquires read locks
271+
/// on each execution graph. It should NOT be called frequently (e.g., in a
272+
/// hot loop or after every event) as it can cause lock contention with
273+
/// concurrent task binding operations.
269274
pub async fn total_pending_tasks(&self) -> usize {
270275
let mut total = 0;
271276
for entry in self.active_job_cache.iter() {
272-
let graph = entry.value().execution_graph.read().await;
273-
total += graph.available_tasks();
277+
// Use a timeout to avoid blocking indefinitely if there's lock contention.
278+
// If we can't acquire the lock within the timeout, skip this job's count
279+
// rather than blocking the metrics collection.
280+
match tokio::time::timeout(
281+
Duration::from_millis(100),
282+
entry.value().execution_graph.read(),
283+
)
284+
.await
285+
{
286+
Ok(graph) => {
287+
total += graph.available_tasks();
288+
}
289+
Err(_) => {
290+
// Lock acquisition timed out, skip this job
291+
trace!(
292+
"Skipping pending task count for job {} due to lock contention",
293+
entry.key()
294+
);
295+
}
296+
}
274297
}
275298
total
276299
}

0 commit comments

Comments
 (0)