Skip to content

Commit 7e9872a

Browse files
feat(scheduler): detect and report stuck queries (#39)
Adds an operator-facing warning when a distributed query stops making progress, with enough detail in-line to diagnose without rerunning with debug logging. Targets spiceai/spiceai#10832 where the production wedge is rare and high-impact: by the time it is noticed the cluster needs restarting, so any debug-level telemetry would no longer be in scope. Single diagnostic surface, single info-level signal: 1. A 30s background loop on the scheduler samples per-stage progress for every active job. Capture uses a 500ms try_read budget per graph so a held write lock is itself recorded as a separate state ("Locked") rather than blocking the snapshot. 2. After four consecutive samples with no per-stage movement, while the cluster has live executors and the job is not terminal, the loop emits one block of warn lines. The block re-fires every 30s while the condition holds and goes silent the moment progress resumes. 3. The block contains: - Primary line: query id, elapsed stuck time, pending task count - One line per Running stage with unassigned/uncomplete partitions - "Lock could not be read" line if the snapshot timed out - Alive executor count - The event handler currently in flight on the scheduler event loop with its elapsed time, if a handler has been running longer than 30s Lines after the primary are emitted only when their underlying check fires, so the block reads as a coherent diagnosis of which code path is hung. 4. To support (3), EventLoop now publishes an EventInFlight slot populated around each on_receive call and exposed via EventLoop::in_flight_signal(). EventAction gains an event_label method (default empty) that the scheduler implements to give each QueryStageSchedulerEvent variant a stable label for the warn block. Volume: zero lines from this instrumentation at default RUST_LOG=info when the cluster is healthy. When a query is stuck, ~3-6 lines per 30s while the condition holds. Verified: cargo build, cargo test -p ballista-core -p ballista-scheduler (83 passed, 1 ignored, includes 4 new tests covering the warn-block helpers), cargo clippy -D warnings, cargo fmt --check all clean.
1 parent 8afc1b7 commit 7e9872a

4 files changed

Lines changed: 506 additions & 3 deletions

File tree

ballista/core/src/event_loop.rs

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,38 @@
1818
//! Event loop infrastructure for asynchronous message processing.
1919
2020
use std::sync::Arc;
21+
use std::sync::Mutex;
2122
use std::sync::atomic::{AtomicBool, Ordering};
23+
use std::time::Instant;
2224

2325
use async_trait::async_trait;
2426
use log::{error, info};
2527
use tokio::sync::mpsc;
2628

2729
use crate::error::{BallistaError, Result};
2830

31+
/// Snapshot of the event handler currently executing in an [`EventLoop`].
32+
///
33+
/// Populated by the loop on entry to [`EventAction::on_receive`] and cleared
34+
/// on return. External diagnostic code (see the scheduler's stuck-query
35+
/// detector) reads this to attribute a hang to a specific event variant
36+
/// and learn how long it has been in flight.
37+
#[derive(Clone, Debug)]
38+
pub struct EventInFlight {
39+
/// Short, stable identifier for the event variant being processed,
40+
/// supplied by [`EventAction::event_label`].
41+
pub label: &'static str,
42+
/// Instant the handler began.
43+
pub started_at: Instant,
44+
}
45+
46+
/// Shared slot tracking which event is currently being processed by the
47+
/// loop. `Some(_)` while a handler runs, `None` otherwise.
48+
///
49+
/// Uses a synchronous mutex — the lock is held only to swap a small Option
50+
/// at the start and end of each `on_receive`, never across an await.
51+
pub type EventProgressSignal = Arc<Mutex<Option<EventInFlight>>>;
52+
2953
/// Trait defining actions to be performed in response to events in an event loop.
3054
#[async_trait]
3155
pub trait EventAction<E>: Send + Sync {
@@ -45,6 +69,14 @@ pub trait EventAction<E>: Send + Sync {
4569

4670
/// Called when an error occurs during event processing.
4771
fn on_error(&self, error: BallistaError);
72+
73+
/// Short, stable label for the given event variant, used by external
74+
/// diagnostic code to identify which handler is currently running.
75+
/// Default returns an empty string, meaning the event loop will not
76+
/// publish a tracking entry for events from this action.
77+
fn event_label(&self, _event: &E) -> &'static str {
78+
""
79+
}
4880
}
4981

5082
/// An asynchronous event loop that processes events through a channel.
@@ -57,6 +89,7 @@ pub struct EventLoop<E> {
5789
stopped: Arc<AtomicBool>,
5890
action: Arc<dyn EventAction<E>>,
5991
tx_event: Option<mpsc::Sender<E>>,
92+
in_flight: EventProgressSignal,
6093
}
6194

6295
impl<E: Send + 'static> EventLoop<E> {
@@ -72,9 +105,17 @@ impl<E: Send + 'static> EventLoop<E> {
72105
stopped: Arc::new(AtomicBool::new(false)),
73106
action,
74107
tx_event: None,
108+
in_flight: Arc::new(Mutex::new(None)),
75109
}
76110
}
77111

112+
/// Returns a handle to the shared signal that tracks the event
113+
/// currently being processed. Used by diagnostic code to attribute
114+
/// a stuck condition to a specific handler.
115+
pub fn in_flight_signal(&self) -> EventProgressSignal {
116+
self.in_flight.clone()
117+
}
118+
78119
fn run(&self, mut rx_event: mpsc::Receiver<E>) {
79120
assert!(
80121
self.tx_event.is_some(),
@@ -84,11 +125,27 @@ impl<E: Send + 'static> EventLoop<E> {
84125
let name = self.name.clone();
85126
let stopped = self.stopped.clone();
86127
let action = self.action.clone();
128+
let in_flight = self.in_flight.clone();
87129
tokio::spawn(async move {
88130
info!("Starting the event loop {name}");
89131
while !stopped.load(Ordering::SeqCst) {
90132
if let Some(event) = rx_event.recv().await {
91-
if let Err(e) = action.on_receive(event, &tx_event, &rx_event).await {
133+
let label = action.event_label(&event);
134+
if !label.is_empty()
135+
&& let Ok(mut slot) = in_flight.lock()
136+
{
137+
*slot = Some(EventInFlight {
138+
label,
139+
started_at: Instant::now(),
140+
});
141+
}
142+
let result = action.on_receive(event, &tx_event, &rx_event).await;
143+
if !label.is_empty()
144+
&& let Ok(mut slot) = in_flight.lock()
145+
{
146+
*slot = None;
147+
}
148+
if let Err(e) = result {
92149
error!("Fail to process event due to {e}");
93150
action.on_error(e);
94151
}

0 commit comments

Comments
 (0)