Skip to content

Commit 3b46cb9

Browse files
fix: resolve executor timeout due to lock contention in scheduler event loop
The metrics instrumentation PR introduced a hot-path call to total_pending_tasks() after every scheduler event. This method acquires read locks on all execution graphs, which combined with concurrent poll_work calls (acquiring write locks), caused deadlock-like behavior due to tokio RwLock fairness semantics. Changes: - Remove total_pending_tasks() from event loop hot path - Add periodic background task (5s interval) for pending_tasks metric - Remove diagnostic logging and unnecessary timeouts - Clean up unused imports
1 parent 62c381a commit 3b46cb9

5 files changed

Lines changed: 43 additions & 4 deletions

File tree

ballista/scheduler/src/cluster/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,7 @@ pub(crate) async fn bind_task_bias(
447447
continue;
448448
}
449449
let mut graph = job_info.execution_graph.write().await;
450+
450451
let session_id = graph.session_id().to_string();
451452
let mut black_list = vec![];
452453
while let Some((running_stage, task_id_gen)) =
@@ -548,6 +549,7 @@ pub(crate) async fn bind_task_round_robin(
548549
continue;
549550
}
550551
let mut graph = job_info.execution_graph.write().await;
552+
551553
let session_id = graph.session_id().to_string();
552554
let mut black_list = vec![];
553555
while let Some((running_stage, task_id_gen)) =

ballista/scheduler/src/scheduler_server/grpc.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
118118
}];
119119
let available_slots = available_slots.iter_mut().collect();
120120
let running_jobs = self.state.task_manager.get_running_job_cache();
121+
121122
let binding_result = match self.state.config.task_distribution {
122123
TaskDistributionPolicy::Bias => {
123124
bind_task_bias(available_slots, running_jobs, |_| false).await

ballista/scheduler/src/scheduler_server/mod.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
169169
self.state.init().await?;
170170
self.query_stage_event_loop.start()?;
171171
self.expire_dead_executors()?;
172+
self.start_pending_tasks_metrics_loop();
172173

173174
Ok(())
174175
}
@@ -339,6 +340,30 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
339340
Ok(())
340341
}
341342

343+
/// Spawns a background task that periodically updates the pending tasks metric.
344+
///
345+
/// This metric requires iterating over all active jobs and acquiring read locks,
346+
/// which can cause lock contention if done in the main event loop. Running it
347+
/// periodically in a background task provides observability without impacting
348+
/// scheduler performance.
349+
fn start_pending_tasks_metrics_loop(&self) {
350+
let state = self.state.clone();
351+
tokio::task::spawn(async move {
352+
// Update every 5 seconds - frequent enough for observability,
353+
// infrequent enough to avoid lock contention
354+
const UPDATE_INTERVAL: Duration = Duration::from_secs(5);
355+
356+
loop {
357+
let pending_tasks = state.task_manager.total_pending_tasks().await;
358+
state
359+
.metrics_collector
360+
.set_pending_tasks_queue_size(pending_tasks as u64);
361+
362+
tokio::time::sleep(UPDATE_INTERVAL).await;
363+
}
364+
});
365+
}
366+
342367
pub(crate) fn remove_executor(
343368
executor_manager: ExecutorManager,
344369
event_sender: EventSender<QueryStageSchedulerEvent>,

ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -324,14 +324,20 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
324324
}
325325
}
326326

327-
// Update queue size metrics after processing each event
327+
// Update pending jobs queue size metric (this is a cheap O(1) operation)
328328
let pending_jobs = self.state.task_manager.pending_job_number();
329329
self.metrics_collector
330330
.set_pending_jobs_queue_size(pending_jobs as u64);
331331

332-
let pending_tasks = self.state.task_manager.total_pending_tasks().await;
333-
self.metrics_collector
334-
.set_pending_tasks_queue_size(pending_tasks as u64);
332+
// NOTE: We intentionally do NOT call total_pending_tasks() here.
333+
// That method iterates over all active jobs and acquires read locks on each
334+
// execution graph, which causes lock contention with concurrent poll_work
335+
// calls that acquire write locks during task binding. This was causing
336+
// deadlocks and executor timeouts under load.
337+
//
338+
// The pending_jobs metric above provides sufficient visibility into queue depth.
339+
// If detailed pending task counts are needed, they should be collected
340+
// periodically in a background task, not on every event.
335341

336342
Ok(())
337343
}

ballista/scheduler/src/state/task_manager.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
266266
///
267267
/// A pending task is a task that is available to schedule on an executor
268268
/// but cannot be scheduled because no resources are available.
269+
///
270+
/// NOTE: This method iterates over all active jobs and acquires read locks
271+
/// on each execution graph. It should NOT be called frequently (e.g., in a
272+
/// hot loop or after every event) as it can cause lock contention with
273+
/// concurrent task binding operations.
269274
pub async fn total_pending_tasks(&self) -> usize {
270275
let mut total = 0;
271276
for entry in self.active_job_cache.iter() {

0 commit comments

Comments
 (0)