Skip to content

Commit 6088cdb

Browse files
Add poll_now_notify parameter to poll_loop and on_work_available callback
- Add poll_now_notify: Option<Arc<Notify>> parameter to poll_loop in execution_loop.rs - Update idle sleep to use tokio::select! - wakes on 100ms timeout OR notify signal - Add OnWorkAvailableFn callback type and field to SchedulerConfig - Invoke callback after JobSubmitted and TaskUpdating events when new work available - Update standalone executor and executor_process to pass None for poll_now_notify
1 parent 62c381a commit 6088cdb

5 files changed

Lines changed: 55 additions & 3 deletions

File tree

ballista/executor/src/execution_loop.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ use std::sync::Arc;
5252
use std::sync::mpsc::{Receiver, Sender, TryRecvError};
5353
use std::time::{Duration, SystemTime, UNIX_EPOCH};
5454
use tokio::sync::oneshot::Sender as OneShotSender;
55-
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
55+
use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore};
5656
use tonic::codegen::{Body, Bytes, StdError};
5757

5858
/// Main execution loop that polls the scheduler for available tasks.
@@ -70,11 +70,20 @@ const QUIET_AFTER_FAILURES: u32 = 5;
7070
///
7171
/// This function polls the scheduler for new tasks to execute and runs them,
7272
/// ensuring no more than the configured number of tasks run simultaneously.
73+
///
74+
/// # Arguments
75+
///
76+
/// * `scheduler` - gRPC client for communicating with the scheduler
77+
/// * `executor` - The executor instance that runs tasks
78+
/// * `codec` - Codec for serializing/deserializing plans
79+
/// * `readiness` - Optional channel to signal when the executor is ready
80+
/// * `poll_now_notify` - Optional notify to wake the poll loop immediately when new work is available
7381
pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan, C>(
7482
mut scheduler: SchedulerGrpcClient<C>,
7583
executor: Arc<Executor>,
7684
codec: BallistaCodec<T, U>,
7785
readiness: Option<OneShotSender<String>>,
86+
poll_now_notify: Option<Arc<Notify>>,
7887
) -> Result<(), BallistaError>
7988
where
8089
C: tonic::client::GrpcService<tonic::body::Body>,
@@ -264,7 +273,20 @@ where
264273
}
265274

266275
if !active_job {
267-
tokio::time::sleep(Duration::from_millis(100)).await;
276+
// Wait for either the poll interval or a poll_now notification
277+
match &poll_now_notify {
278+
Some(notify) => {
279+
tokio::select! {
280+
() = tokio::time::sleep(Duration::from_millis(100)) => {}
281+
() = notify.notified() => {
282+
debug!("Received poll_now notification, polling immediately");
283+
}
284+
}
285+
}
286+
None => {
287+
tokio::time::sleep(Duration::from_millis(100)).await;
288+
}
289+
}
268290
}
269291
}
270292
}

ballista/executor/src/executor_process.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,7 @@ pub async fn start_executor_process(
433433
executor.clone(),
434434
default_codec,
435435
None,
436+
None, // poll_now_notify: not used in standalone executor
436437
)));
437438
}
438439
};

ballista/executor/src/standalone.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,9 @@ pub async fn new_standalone_executor_from_builder(
143143
)),
144144
);
145145

146-
tokio::spawn(execution_loop::poll_loop(scheduler, executor, codec, None));
146+
tokio::spawn(execution_loop::poll_loop(
147+
scheduler, executor, codec, None, None,
148+
));
147149
Ok(())
148150
}
149151

ballista/scheduler/src/config.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,16 @@ use tonic::transport::{Endpoint, Error as TonicTransportError};
3939
pub type EndpointOverrideFn =
4040
Arc<dyn Fn(Endpoint) -> Result<Endpoint, TonicTransportError> + Send + Sync>;
4141

42+
/// Callback invoked when new work becomes available for executors.
43+
///
44+
/// This is called after:
45+
/// - A job is submitted and tasks are ready to be scheduled
46+
/// - Tasks complete and new stages become runnable
47+
///
48+
/// This allows external systems to notify executors to poll immediately
49+
/// rather than waiting for their next poll interval.
50+
pub type OnWorkAvailableFn = Arc<dyn Fn(&str) + Send + Sync>;
51+
4252
/// Command-line configuration for the scheduler binary.
4353
#[cfg(feature = "build-binary")]
4454
#[derive(clap::Parser, Debug)]
@@ -248,6 +258,9 @@ pub struct SchedulerConfig {
248258
pub override_create_grpc_client_endpoint: Option<EndpointOverrideFn>,
249259
/// [SchedulerMetricsCollector] override option
250260
pub override_metrics_collector: Option<Arc<dyn SchedulerMetricsCollector>>,
261+
/// Callback invoked when new work becomes available for executors.
262+
/// The string argument is a reason/description for debugging purposes.
263+
pub on_work_available: Option<OnWorkAvailableFn>,
251264
}
252265

253266
impl Default for SchedulerConfig {
@@ -276,6 +289,7 @@ impl Default for SchedulerConfig {
276289
override_physical_codec: None,
277290
override_create_grpc_client_endpoint: None,
278291
override_metrics_collector: None,
292+
on_work_available: None,
279293
}
280294
}
281295
}
@@ -527,6 +541,7 @@ impl TryFrom<Config> for SchedulerConfig {
527541
override_session_builder: None,
528542
override_create_grpc_client_endpoint: None,
529543
override_metrics_collector: None,
544+
on_work_available: None,
530545
};
531546

532547
Ok(config)

ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
146146
.post_event(QueryStageSchedulerEvent::ReviveOffers)
147147
.await?;
148148
}
149+
150+
// Notify external systems that new work is available
151+
if let Some(ref callback) = self.config.on_work_available {
152+
callback(&format!("job_submitted:{job_id}"));
153+
}
149154
}
150155
QueryStageSchedulerEvent::JobPlanningFailed {
151156
job_id,
@@ -261,6 +266,13 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
261266
.await?;
262267
}
263268

269+
// Notify external systems when new stages become runnable
270+
if !stage_events.is_empty()
271+
&& let Some(ref callback) = self.config.on_work_available
272+
{
273+
callback("tasks_completed:new_stages_runnable");
274+
}
275+
264276
for stage_event in stage_events {
265277
event_sender.post_event(stage_event).await?;
266278
}

0 commit comments

Comments
 (0)