Skip to content

Commit 4d37a2b

Browse files
committed
feat: Add a broadcast channel to send job state event notifications
1 parent 20ef1eb commit 4d37a2b

4 files changed

Lines changed: 237 additions & 0 deletions

File tree

ballista/scheduler/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,4 @@ pub mod state;
4646
pub mod test_utils;
4747

4848
pub use scheduler_server::SessionBuilder;
49+
pub use scheduler_server::job_state_event::{JobState, JobStateEvent};
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Job state event notifications for subscribers.
19+
//!
20+
//! This module provides a broadcast-based notification system for job state changes.
21+
//! Consumers can subscribe to receive notifications when jobs change state, avoiding
22+
//! the need to poll for status updates.
23+
24+
use std::fmt;
25+
26+
/// Represents the current state of a job in the scheduler.
27+
///
28+
/// This enum mirrors the possible states from the protobuf `job_status::Status`
29+
/// but is designed to be lightweight for broadcasting.
30+
#[derive(Debug, Clone, PartialEq, Eq)]
31+
pub enum JobState {
32+
/// Job is queued and waiting to be scheduled.
33+
Queued,
34+
/// Job is currently running.
35+
Running,
36+
/// Job has completed successfully.
37+
Completed,
38+
/// Job has failed with an error message.
39+
Failed(String),
40+
/// Job was cancelled.
41+
Cancelled,
42+
}
43+
44+
impl fmt::Display for JobState {
45+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46+
match self {
47+
JobState::Queued => write!(f, "Queued"),
48+
JobState::Running => write!(f, "Running"),
49+
JobState::Completed => write!(f, "Completed"),
50+
JobState::Failed(msg) => write!(f, "Failed: {}", msg),
51+
JobState::Cancelled => write!(f, "Cancelled"),
52+
}
53+
}
54+
}
55+
56+
/// Event emitted when a job's state changes.
57+
///
58+
/// This struct is designed to be cloned and sent through a broadcast channel
59+
/// to notify subscribers about job state changes.
60+
#[derive(Debug, Clone)]
61+
pub struct JobStateEvent {
62+
/// The unique identifier of the job.
63+
pub job_id: String,
64+
/// The new state of the job.
65+
pub state: JobState,
66+
}
67+
68+
impl JobStateEvent {
69+
/// Creates a new job state event.
70+
pub fn new(job_id: impl Into<String>, state: JobState) -> Self {
71+
Self {
72+
job_id: job_id.into(),
73+
state,
74+
}
75+
}
76+
77+
/// Creates a queued event for the given job.
78+
pub fn queued(job_id: impl Into<String>) -> Self {
79+
Self::new(job_id, JobState::Queued)
80+
}
81+
82+
/// Creates a running event for the given job.
83+
pub fn running(job_id: impl Into<String>) -> Self {
84+
Self::new(job_id, JobState::Running)
85+
}
86+
87+
/// Creates a completed event for the given job.
88+
pub fn completed(job_id: impl Into<String>) -> Self {
89+
Self::new(job_id, JobState::Completed)
90+
}
91+
92+
/// Creates a failed event for the given job.
93+
pub fn failed(job_id: impl Into<String>, error: impl Into<String>) -> Self {
94+
Self::new(job_id, JobState::Failed(error.into()))
95+
}
96+
97+
/// Creates a cancelled event for the given job.
98+
pub fn cancelled(job_id: impl Into<String>) -> Self {
99+
Self::new(job_id, JobState::Cancelled)
100+
}
101+
}
102+
103+
impl fmt::Display for JobStateEvent {
104+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105+
write!(
106+
f,
107+
"JobStateEvent[job_id={}, state={}]",
108+
self.job_id, self.state
109+
)
110+
}
111+
}
112+
113+
#[cfg(test)]
114+
mod tests {
115+
use super::*;
116+
117+
#[test]
118+
fn test_job_state_event_creation() {
119+
let event = JobStateEvent::queued("job-123");
120+
assert_eq!(event.job_id, "job-123");
121+
assert_eq!(event.state, JobState::Queued);
122+
123+
let event = JobStateEvent::running("job-123");
124+
assert_eq!(event.state, JobState::Running);
125+
126+
let event = JobStateEvent::completed("job-123");
127+
assert_eq!(event.state, JobState::Completed);
128+
129+
let event = JobStateEvent::failed("job-123", "Something went wrong");
130+
assert_eq!(
131+
event.state,
132+
JobState::Failed("Something went wrong".to_string())
133+
);
134+
135+
let event = JobStateEvent::cancelled("job-123");
136+
assert_eq!(event.state, JobState::Cancelled);
137+
}
138+
139+
#[test]
140+
fn test_job_state_display() {
141+
assert_eq!(JobState::Queued.to_string(), "Queued");
142+
assert_eq!(JobState::Running.to_string(), "Running");
143+
assert_eq!(JobState::Completed.to_string(), "Completed");
144+
assert_eq!(
145+
JobState::Failed("error".to_string()).to_string(),
146+
"Failed: error"
147+
);
148+
assert_eq!(JobState::Cancelled.to_string(), "Cancelled");
149+
}
150+
}

ballista/scheduler/src/scheduler_server/mod.rs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use ballista_core::error::Result;
2222
use ballista_core::event_loop::{EventLoop, EventSender};
2323
use ballista_core::serde::BallistaCodec;
2424
use ballista_core::serde::protobuf::TaskStatus;
25+
use tokio::sync::broadcast;
2526

2627
use datafusion::execution::context::SessionState;
2728
use datafusion::logical_expr::LogicalPlan;
@@ -56,6 +57,8 @@ pub mod event;
5657
#[cfg(feature = "keda-scaler")]
5758
mod external_scaler;
5859
mod grpc;
60+
/// Job state event notifications for subscribers.
61+
pub mod job_state_event;
5962
pub(crate) mod query_stage_scheduler;
6063

6164
/// Function type for building DataFusion session states from configuration.
@@ -84,9 +87,20 @@ pub struct SchedulerServer<T: 'static + AsLogicalPlan, U: 'static + AsExecutionP
8487
query_stage_scheduler: Arc<QueryStageScheduler<T, U>>,
8588
/// Scheduler configuration.
8689
config: Arc<SchedulerConfig>,
90+
/// Broadcast sender for job state change notifications.
91+
///
92+
/// Subscribers can receive notifications when jobs change state by calling
93+
/// `subscribe_job_updates()`.
94+
job_state_sender: broadcast::Sender<job_state_event::JobStateEvent>,
8795
}
8896

8997
impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T, U> {
98+
/// Default capacity for the job state broadcast channel.
99+
///
100+
/// This determines how many job state events can be buffered before
101+
/// slow receivers start lagging behind.
102+
const JOB_STATE_CHANNEL_CAPACITY: usize = 256;
103+
90104
/// Creates a new `SchedulerServer` with the given configuration.
91105
pub fn new(
92106
scheduler_name: String,
@@ -102,10 +116,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
102116
config.clone(),
103117
metrics_collector.clone(),
104118
));
119+
let (job_state_sender, _) = broadcast::channel(Self::JOB_STATE_CHANNEL_CAPACITY);
105120
let query_stage_scheduler = Arc::new(QueryStageScheduler::new(
106121
state.clone(),
107122
metrics_collector,
108123
config.clone(),
124+
job_state_sender.clone(),
109125
));
110126
let query_stage_event_loop = EventLoop::new(
111127
"query_stage".to_owned(),
@@ -121,6 +137,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
121137
#[cfg(feature = "rest-api")]
122138
query_stage_scheduler,
123139
config,
140+
job_state_sender,
124141
}
125142
}
126143

@@ -142,10 +159,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
142159
metrics_collector.clone(),
143160
task_launcher,
144161
));
162+
let (job_state_sender, _) = broadcast::channel(Self::JOB_STATE_CHANNEL_CAPACITY);
145163
let query_stage_scheduler = Arc::new(QueryStageScheduler::new(
146164
state.clone(),
147165
metrics_collector,
148166
config.clone(),
167+
job_state_sender.clone(),
149168
));
150169
let query_stage_event_loop = EventLoop::new(
151170
"query_stage".to_owned(),
@@ -161,6 +180,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
161180
#[cfg(feature = "rest-api")]
162181
query_stage_scheduler,
163182
config,
183+
job_state_sender,
164184
}
165185
}
166186

@@ -183,6 +203,37 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
183203
pub fn running_job_number(&self) -> usize {
184204
self.state.task_manager.running_job_number()
185205
}
206+
207+
/// Subscribes to job state change notifications.
208+
///
209+
/// Returns a receiver that will receive [`JobStateEvent`] notifications
210+
/// whenever a job changes state. This allows consumers to be notified
211+
/// of job state changes without polling.
212+
///
213+
/// # Example
214+
///
215+
/// ```ignore
216+
/// let mut receiver = scheduler.subscribe_job_updates();
217+
/// tokio::spawn(async move {
218+
/// while let Ok(event) = receiver.recv().await {
219+
/// println!("Job {} changed to state: {}", event.job_id, event.state);
220+
/// }
221+
/// });
222+
/// ```
223+
///
224+
/// # Note
225+
///
226+
/// If the receiver falls behind and the channel buffer fills up,
227+
/// older messages will be dropped and the receiver will receive
228+
/// a `RecvError::Lagged` error on the next `recv()` call.
229+
///
230+
/// [`JobStateEvent`]: job_state_event::JobStateEvent
231+
pub fn subscribe_job_updates(
232+
&self,
233+
) -> broadcast::Receiver<job_state_event::JobStateEvent> {
234+
self.job_state_sender.subscribe()
235+
}
236+
186237
#[cfg(feature = "rest-api")]
187238
pub(crate) fn metrics_collector(&self) -> &dyn SchedulerMetricsCollector {
188239
self.query_stage_scheduler.metrics_collector()

ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@ use std::time::Duration;
2020

2121
use async_trait::async_trait;
2222
use log::{error, info, trace, warn};
23+
use tokio::sync::broadcast;
2324

2425
use ballista_core::error::{BallistaError, Result};
2526
use ballista_core::event_loop::{EventAction, EventSender};
2627

2728
use crate::config::SchedulerConfig;
2829
use crate::metrics::SchedulerMetricsCollector;
30+
use crate::scheduler_server::job_state_event::JobStateEvent;
2931
use crate::scheduler_server::timestamp_millis;
3032
use datafusion_proto::logical_plan::AsLogicalPlan;
3133
use datafusion_proto::physical_plan::AsExecutionPlan;
@@ -43,20 +45,31 @@ pub(crate) struct QueryStageScheduler<
4345
state: Arc<SchedulerState<T, U>>,
4446
metrics_collector: Arc<dyn SchedulerMetricsCollector>,
4547
config: Arc<SchedulerConfig>,
48+
/// Broadcast sender for job state change notifications.
49+
job_state_sender: broadcast::Sender<JobStateEvent>,
4650
}
4751

4852
impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> QueryStageScheduler<T, U> {
4953
pub(crate) fn new(
5054
state: Arc<SchedulerState<T, U>>,
5155
metrics_collector: Arc<dyn SchedulerMetricsCollector>,
5256
config: Arc<SchedulerConfig>,
57+
job_state_sender: broadcast::Sender<JobStateEvent>,
5358
) -> Self {
5459
Self {
5560
state,
5661
metrics_collector,
5762
config,
63+
job_state_sender,
5864
}
5965
}
66+
67+
/// Broadcasts a job state event to all subscribers.
68+
fn broadcast_job_state(&self, event: JobStateEvent) {
69+
// Ignore send errors - no receivers is a valid state
70+
let _ = self.job_state_sender.send(event);
71+
}
72+
6073
#[cfg(feature = "rest-api")]
6174
pub(crate) fn metrics_collector(&self) -> &dyn SchedulerMetricsCollector {
6275
self.metrics_collector.as_ref()
@@ -96,6 +109,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
96109
} => {
97110
info!("Job {job_id} queued with name {job_name:?}");
98111

112+
// Broadcast job queued state
113+
self.broadcast_job_state(JobStateEvent::queued(&job_id));
114+
99115
if let Err(e) = self
100116
.state
101117
.task_manager
@@ -106,6 +122,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
106122
}
107123

108124
let state = self.state.clone();
125+
let job_state_sender = self.job_state_sender.clone();
109126
tokio::spawn(async move {
110127
let event = if let Err(e) = state
111128
.submit_job(&job_id, &job_name, session_ctx, &plan, queued_at)
@@ -120,6 +137,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
120137
failed_at: timestamp_millis(),
121138
}
122139
} else {
140+
// Broadcast job running state when successfully submitted
141+
let _ = job_state_sender.send(JobStateEvent::running(&job_id));
123142
QueryStageSchedulerEvent::JobSubmitted {
124143
job_id,
125144
queued_at,
@@ -162,6 +181,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
162181
.record_failed(&job_id, queued_at, failed_at);
163182

164183
error!("Job {job_id} failed: {fail_message}");
184+
185+
// Broadcast job failed state
186+
self.broadcast_job_state(JobStateEvent::failed(&job_id, &fail_message));
187+
165188
if let Err(e) = self
166189
.state
167190
.task_manager
@@ -182,6 +205,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
182205
.record_completed(&job_id, queued_at, completed_at);
183206

184207
info!("Job {job_id} success");
208+
209+
// Broadcast job completed state
210+
self.broadcast_job_state(JobStateEvent::completed(&job_id));
211+
185212
if let Err(e) = self.state.task_manager.succeed_job(&job_id).await {
186213
error!("Fail to invoke succeed_job for job {job_id} due to {e:?}");
187214
}
@@ -197,6 +224,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
197224
.record_failed(&job_id, queued_at, failed_at);
198225

199226
error!("Job {job_id} running failed");
227+
228+
// Broadcast job failed state
229+
self.broadcast_job_state(JobStateEvent::failed(&job_id, &fail_message));
230+
200231
match self
201232
.state
202233
.task_manager
@@ -228,6 +259,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
228259
self.metrics_collector.record_cancelled(&job_id);
229260

230261
info!("Job {job_id} Cancelled");
262+
263+
// Broadcast job cancelled state
264+
self.broadcast_job_state(JobStateEvent::cancelled(&job_id));
265+
231266
match self.state.task_manager.cancel_job(&job_id).await {
232267
Ok((running_tasks, _pending_tasks)) => {
233268
event_sender

0 commit comments

Comments
 (0)