Skip to content

Commit 6b96f6a

Browse files
feat(scheduler): instrumentation to locate the wedge in spiceai/spiceai#10832
Adds opt-in diagnostic instrumentation aimed at identifying which call path is wedging the QueryStageScheduler event loop in production. Pure logging; no behavior change in the happy path. 1. ballista/core/src/lock_tracing.rs Helpers traced_write / traced_read wrap tokio::sync::RwLock acquisitions to log: - acquire_ms if >= 100ms (real contention) - hold_ms if >= 500ms (long critical sections) Each call site provides a static label so a slow-acquire or slow-hold message in the log points directly at the offending function. 2. Lock instrumentation applied at every execution_graph read/write across task_manager.rs (update_job, update_task_statuses helpers, abort_job, succeed_job, get_job_status, get_job_execution_graph, get_available_task_count, executor_lost, the metrics read in total_pending_tasks) and cluster/mod.rs (bind_task_bias, bind_task_round_robin, bind_task_consistent_hash). 3. ballista/core/src/event_loop.rs EventLoop::run now stopwatches each on_receive call and warns if a handler runs >= 1s. The event label is logged via Debug so a wedged handler is identified by event variant. EventSender::post_event stopwatches the send().await and warns if it parks >= 100ms (mpsc channel full and consumer parked). Requires E: Debug on EventLoop::run and ::start; the only production event type (QueryStageSchedulerEvent) already hand-rolls Debug. 4. ballista/scheduler/src/scheduler_server/mod.rs Adds start_diagnostic_dump_loop that, every 30s, calls a new TaskManager::diagnostic_snapshot which try_reads each active job graph and logs one line per stage: diagnostic_dump: job_id=X stage_id=Y variant=Running \ partitions=96 assigned=64 unassigned=32 job_status=Running(...) The unassigned count is the single most diagnostic number for #10832: in production we see partitions stuck bound to no executor while the cluster has live slots. A graph that fails try_read (write lock held) is logged as graph_locked=true. The four signals together let a single production run identify: - whether the wedge is in an event handler (slow on_receive) - or in the event channel (slow post_event) - or in a specific lock acquire / hold (slow rwlock_*) - and the stage-level shape of the wedged graph (diagnostic_dump) Verified: cargo build, cargo test -p ballista-core -p ballista-scheduler (79 passed, 1 ignored), cargo clippy -D warnings, cargo fmt --check all clean.
1 parent 8afc1b7 commit 6b96f6a

6 files changed

Lines changed: 316 additions & 19 deletions

File tree

ballista/core/src/event_loop.rs

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,27 @@
1717

1818
//! Event loop infrastructure for asynchronous message processing.
1919
20+
use std::fmt::Debug;
2021
use std::sync::Arc;
2122
use std::sync::atomic::{AtomicBool, Ordering};
23+
use std::time::{Duration, Instant};
2224

2325
use async_trait::async_trait;
24-
use log::{error, info};
26+
use log::{error, info, warn};
2527
use tokio::sync::mpsc;
2628

2729
use crate::error::{BallistaError, Result};
2830

31+
/// Hold-time threshold above which `on_receive` durations are logged at warn.
32+
/// Background tasks (queued events) are expected to be sub-second; anything
33+
/// over a second is diagnostic.
34+
const SLOW_ON_RECEIVE_THRESHOLD: Duration = Duration::from_secs(1);
35+
36+
/// Threshold above which `EventSender::post_event` await durations are
37+
/// logged at warn. Indicates the underlying mpsc channel is full and the
38+
/// sender is parked waiting on the consumer.
39+
const SLOW_POST_EVENT_THRESHOLD: Duration = Duration::from_millis(100);
40+
2941
/// Trait defining actions to be performed in response to events in an event loop.
3042
#[async_trait]
3143
pub trait EventAction<E>: Send + Sync {
@@ -75,7 +87,10 @@ impl<E: Send + 'static> EventLoop<E> {
7587
}
7688
}
7789

78-
fn run(&self, mut rx_event: mpsc::Receiver<E>) {
90+
fn run(&self, mut rx_event: mpsc::Receiver<E>)
91+
where
92+
E: Debug,
93+
{
7994
assert!(
8095
self.tx_event.is_some(),
8196
"The event sender should be initialized first!"
@@ -88,7 +103,21 @@ impl<E: Send + 'static> EventLoop<E> {
88103
info!("Starting the event loop {name}");
89104
while !stopped.load(Ordering::SeqCst) {
90105
if let Some(event) = rx_event.recv().await {
91-
if let Err(e) = action.on_receive(event, &tx_event, &rx_event).await {
106+
// Diagnostic: time every on_receive call so a wedged
107+
// handler shows up as an event that never logs its
108+
// matching completion. See spiceai/spiceai#10832.
109+
let started = Instant::now();
110+
let event_label = format!("{event:?}");
111+
let result = action.on_receive(event, &tx_event, &rx_event).await;
112+
let elapsed = started.elapsed();
113+
if elapsed >= SLOW_ON_RECEIVE_THRESHOLD {
114+
warn!(
115+
"slow on_receive in event_loop={name}: \
116+
event={event_label:.200} elapsed_ms={}",
117+
elapsed.as_millis()
118+
);
119+
}
120+
if let Err(e) = result {
92121
error!("Fail to process event due to {e}");
93122
action.on_error(e);
94123
}
@@ -102,7 +131,10 @@ impl<E: Send + 'static> EventLoop<E> {
102131
}
103132

104133
/// Starts the event loop, spawning a background task to process events.
105-
pub fn start(&mut self) -> Result<()> {
134+
pub fn start(&mut self) -> Result<()>
135+
where
136+
E: Debug,
137+
{
106138
if self.stopped.load(Ordering::SeqCst) {
107139
return Err(BallistaError::General(format!(
108140
"{} has already been stopped",
@@ -151,9 +183,20 @@ impl<E> EventSender<E> {
151183

152184
/// Posts an event to the event loop asynchronously.
153185
pub async fn post_event(&self, event: E) -> Result<()> {
154-
self.tx_event
155-
.send(event)
156-
.await
186+
// Diagnostic: a slow `send().await` here means the underlying
187+
// mpsc channel is full and the loop's consumer is parked. See
188+
// spiceai/spiceai#10832.
189+
let started = Instant::now();
190+
let result = self.tx_event.send(event).await;
191+
let elapsed = started.elapsed();
192+
if elapsed >= SLOW_POST_EVENT_THRESHOLD {
193+
warn!(
194+
"slow post_event: elapsed_ms={} channel_capacity={}",
195+
elapsed.as_millis(),
196+
self.tx_event.capacity(),
197+
);
198+
}
199+
result
157200
.map_err(|e| BallistaError::General(format!("Fail to send event due to {e}")))
158201
}
159202
}

ballista/core/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ pub mod event_loop;
4747
pub mod execution_plans;
4848
/// Extension traits and utilities for DataFusion integration.
4949
pub mod extension;
50+
/// Diagnostic timing wrappers for `tokio::sync::RwLock`. See
51+
/// [`lock_tracing`] for the motivating issue.
52+
pub mod lock_tracing;
5053
/// Object store configuration and utilities for distributed file access.
5154
pub mod object_store;
5255
/// Query planning utilities for distributed execution.

ballista/core/src/lock_tracing.rs

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Diagnostic instrumentation for tokio `RwLock` hot paths in the scheduler.
19+
//!
20+
//! Wraps `RwLock::write().await` and `RwLock::read().await` with timing
21+
//! around acquire (time-to-acquire) and hold (time-from-acquire-to-drop).
22+
//! Both thresholds surface as `warn!` logs so a stuck or slow path is
23+
//! visible without overwhelming the log in healthy operation.
24+
//!
25+
//! Added while investigating spiceai/spiceai#10832 — the scheduler's
26+
//! `QueryStageScheduler` event loop wedges mid-query, and three call paths
27+
//! share the per-job `execution_graph` write lock. The pattern matches a
28+
//! leaked or never-released `RwLockWriteGuard`. Instrumenting these
29+
//! acquisitions identifies (a) which call site holds the lock, (b) for how
30+
//! long, and (c) whether the contention is at acquire-time or hold-time.
31+
32+
use std::ops::{Deref, DerefMut};
33+
use std::time::{Duration, Instant};
34+
35+
use log::warn;
36+
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
37+
38+
/// Threshold for slow acquisitions. Healthy paths acquire in microseconds;
39+
/// over 100ms means real contention.
40+
const SLOW_ACQUIRE_THRESHOLD: Duration = Duration::from_millis(100);
41+
42+
/// Threshold for slow holds. A graph write lock held for over 500ms is
43+
/// suspicious — the critical sections under it are in-memory bookkeeping
44+
/// plus DashMap operations.
45+
const SLOW_HOLD_THRESHOLD: Duration = Duration::from_millis(500);
46+
47+
/// Acquires a write lock on `lock`, logging if either acquisition or
48+
/// release takes longer than the slow thresholds.
49+
///
50+
/// `label` should identify the call site, e.g. `"task_manager::update_job"`.
51+
pub async fn traced_write<'a, T>(
52+
lock: &'a RwLock<T>,
53+
label: &'static str,
54+
) -> TracedWriteGuard<'a, T> {
55+
let acquire_start = Instant::now();
56+
let guard = lock.write().await;
57+
let acquire = acquire_start.elapsed();
58+
if acquire >= SLOW_ACQUIRE_THRESHOLD {
59+
warn!(
60+
"slow rwlock_write acquire: label={label} acquire_ms={}",
61+
acquire.as_millis()
62+
);
63+
}
64+
TracedWriteGuard {
65+
guard,
66+
held_since: Instant::now(),
67+
label,
68+
}
69+
}
70+
71+
/// Acquires a read lock on `lock`, logging if either acquisition or release
72+
/// takes longer than the slow thresholds.
73+
pub async fn traced_read<'a, T>(
74+
lock: &'a RwLock<T>,
75+
label: &'static str,
76+
) -> TracedReadGuard<'a, T> {
77+
let acquire_start = Instant::now();
78+
let guard = lock.read().await;
79+
let acquire = acquire_start.elapsed();
80+
if acquire >= SLOW_ACQUIRE_THRESHOLD {
81+
warn!(
82+
"slow rwlock_read acquire: label={label} acquire_ms={}",
83+
acquire.as_millis()
84+
);
85+
}
86+
TracedReadGuard {
87+
guard,
88+
held_since: Instant::now(),
89+
label,
90+
}
91+
}
92+
93+
/// Write guard that logs at warn level if the lock is held longer than
94+
/// `SLOW_HOLD_THRESHOLD`.
95+
pub struct TracedWriteGuard<'a, T> {
96+
guard: RwLockWriteGuard<'a, T>,
97+
held_since: Instant,
98+
label: &'static str,
99+
}
100+
101+
impl<T> Deref for TracedWriteGuard<'_, T> {
102+
type Target = T;
103+
fn deref(&self) -> &T {
104+
&self.guard
105+
}
106+
}
107+
108+
impl<T> DerefMut for TracedWriteGuard<'_, T> {
109+
fn deref_mut(&mut self) -> &mut T {
110+
&mut self.guard
111+
}
112+
}
113+
114+
impl<T> Drop for TracedWriteGuard<'_, T> {
115+
fn drop(&mut self) {
116+
let held = self.held_since.elapsed();
117+
if held >= SLOW_HOLD_THRESHOLD {
118+
warn!(
119+
"slow rwlock_write hold: label={} hold_ms={}",
120+
self.label,
121+
held.as_millis()
122+
);
123+
}
124+
}
125+
}
126+
127+
/// Read guard that logs at warn level if the lock is held longer than
128+
/// `SLOW_HOLD_THRESHOLD`.
129+
pub struct TracedReadGuard<'a, T> {
130+
guard: RwLockReadGuard<'a, T>,
131+
held_since: Instant,
132+
label: &'static str,
133+
}
134+
135+
impl<T> Deref for TracedReadGuard<'_, T> {
136+
type Target = T;
137+
fn deref(&self) -> &T {
138+
&self.guard
139+
}
140+
}
141+
142+
impl<T> Drop for TracedReadGuard<'_, T> {
143+
fn drop(&mut self) {
144+
let held = self.held_since.elapsed();
145+
if held >= SLOW_HOLD_THRESHOLD {
146+
warn!(
147+
"slow rwlock_read hold: label={} hold_ms={}",
148+
self.label,
149+
held.as_millis()
150+
);
151+
}
152+
}
153+
}

ballista/scheduler/src/cluster/mod.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use log::debug;
3232

3333
use ballista_core::consistent_hash::ConsistentHash;
3434
use ballista_core::error::Result;
35+
use ballista_core::lock_tracing::traced_write;
3536
use ballista_core::serde::protobuf::{
3637
AvailableTaskSlots, ExecutorHeartbeat, JobStatus, job_status,
3738
};
@@ -456,7 +457,7 @@ pub(crate) async fn bind_task_bias(
456457
debug!("Job {job_id} is not in running status and will be skipped");
457458
continue;
458459
}
459-
let mut graph = job_info.execution_graph.write().await;
460+
let mut graph = traced_write(&job_info.execution_graph, "bind_task_bias").await;
460461

461462
let session_id = graph.session_id().to_string();
462463
let mut black_list = vec![];
@@ -558,7 +559,8 @@ pub(crate) async fn bind_task_round_robin(
558559
debug!("Job {job_id} is not in running status and will be skipped");
559560
continue;
560561
}
561-
let mut graph = job_info.execution_graph.write().await;
562+
let mut graph =
563+
traced_write(&job_info.execution_graph, "bind_task_round_robin").await;
562564

563565
let session_id = graph.session_id().to_string();
564566
let mut black_list = vec![];
@@ -714,7 +716,8 @@ pub(crate) async fn bind_task_consistent_hash(
714716
debug!("Job {job_id} is not in running status and will be skipped");
715717
continue;
716718
}
717-
let mut graph = job_info.execution_graph.write().await;
719+
let mut graph =
720+
traced_write(&job_info.execution_graph, "bind_task_consistent_hash").await;
718721
let session_id = graph.session_id().to_string();
719722
let mut black_list = vec![];
720723
while let Some((running_stage, task_id_gen)) =

ballista/scheduler/src/scheduler_server/mod.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use crate::cluster::BallistaCluster;
3535
use crate::config::SchedulerConfig;
3636
use crate::metrics::SchedulerMetricsCollector;
3737
use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
38-
use log::{debug, error, warn};
38+
use log::{debug, error, info, warn};
3939

4040
use crate::scheduler_server::event::QueryStageSchedulerEvent;
4141
use crate::scheduler_server::query_stage_scheduler::QueryStageScheduler;
@@ -191,6 +191,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
191191
self.query_stage_event_loop.start()?;
192192
self.expire_dead_executors()?;
193193
self.start_pending_tasks_metrics_loop();
194+
self.start_diagnostic_dump_loop();
194195

195196
Ok(())
196197
}
@@ -393,6 +394,33 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
393394
Ok(())
394395
}
395396

397+
/// Spawns a background task that periodically dumps a snapshot of every
398+
/// active job's per-stage state, plus alive executor count.
399+
///
400+
/// Added for spiceai/spiceai#10832: the production wedge appears as a
401+
/// stage with unassigned partitions that never decrease (32 of 96
402+
/// partitions stuck bound to no executor while the cluster has live
403+
/// slots). This dump prints `unassigned` per Running stage every 30s
404+
/// so the wedge becomes visible in scheduler logs without requiring
405+
/// tokio-console or external tooling.
406+
///
407+
/// Uses `try_read` on each graph so a held write lock is itself
408+
/// diagnostic — the loop logs `graph_locked=true` rather than parking.
409+
fn start_diagnostic_dump_loop(&self) {
410+
let state = self.state.clone();
411+
tokio::task::spawn(async move {
412+
const INTERVAL: Duration = Duration::from_secs(30);
413+
// Sleep first to avoid an empty-state dump at startup.
414+
loop {
415+
tokio::time::sleep(INTERVAL).await;
416+
state.task_manager.diagnostic_snapshot();
417+
let alive = state.executor_manager.get_alive_executors().len();
418+
let running = state.task_manager.running_job_number();
419+
info!("diagnostic_dump: alive_executors={alive} running_jobs={running}");
420+
}
421+
});
422+
}
423+
396424
/// Spawns a background task that periodically updates the pending tasks metric.
397425
///
398426
/// This metric requires iterating over all active jobs and acquiring read locks,

0 commit comments

Comments
 (0)