Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions ballista/executor/src/execution_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use std::sync::Arc;
use std::sync::mpsc::{Receiver, Sender, TryRecvError};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::oneshot::Sender as OneShotSender;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore};
use tonic::codegen::{Body, Bytes, StdError};

/// Main execution loop that polls the scheduler for available tasks.
Expand All @@ -70,11 +70,20 @@ const QUIET_AFTER_FAILURES: u32 = 5;
///
/// This function polls the scheduler for new tasks to execute and runs them,
/// ensuring no more than the configured number of tasks run simultaneously.
///
/// # Arguments
///
/// * `scheduler` - gRPC client for communicating with the scheduler
/// * `executor` - The executor instance that runs tasks
/// * `codec` - Codec for serializing/deserializing plans
/// * `readiness` - Optional channel to signal when the executor is ready
/// * `poll_now_notify` - Optional notify to wake the poll loop immediately when new work is available
pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan, C>(
mut scheduler: SchedulerGrpcClient<C>,
executor: Arc<Executor>,
codec: BallistaCodec<T, U>,
readiness: Option<OneShotSender<String>>,
poll_now_notify: Option<Arc<Notify>>,
) -> Result<(), BallistaError>
where
C: tonic::client::GrpcService<tonic::body::Body>,
Expand Down Expand Up @@ -264,7 +273,20 @@ where
}

if !active_job {
tokio::time::sleep(Duration::from_millis(100)).await;
// Wait for either the poll interval or a poll_now notification
match &poll_now_notify {
Some(notify) => {
tokio::select! {
() = tokio::time::sleep(Duration::from_millis(100)) => {}
() = notify.notified() => {
debug!("Received poll_now notification, polling immediately");
}
}
}
None => {
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions ballista/executor/src/executor_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ pub async fn start_executor_process(
executor.clone(),
default_codec,
None,
None, // poll_now_notify: not used in standalone executor
)));
}
};
Expand Down
4 changes: 3 additions & 1 deletion ballista/executor/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,9 @@ pub async fn new_standalone_executor_from_builder(
)),
);

tokio::spawn(execution_loop::poll_loop(scheduler, executor, codec, None));
tokio::spawn(execution_loop::poll_loop(
scheduler, executor, codec, None, None,
));
Ok(())
}

Expand Down
15 changes: 15 additions & 0 deletions ballista/scheduler/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ use tonic::transport::{Endpoint, Error as TonicTransportError};
pub type EndpointOverrideFn =
Arc<dyn Fn(Endpoint) -> Result<Endpoint, TonicTransportError> + Send + Sync>;

/// Callback invoked when new work becomes available for executors.
///
/// This is called after:
/// - A job is submitted and tasks are ready to be scheduled
/// - Tasks complete and new stages become runnable
///
/// This allows external systems to notify executors to poll immediately
/// rather than waiting for their next poll interval.
pub type OnWorkAvailableFn = Arc<dyn Fn(&str) + Send + Sync>;

/// Command-line configuration for the scheduler binary.
#[cfg(feature = "build-binary")]
#[derive(clap::Parser, Debug)]
Expand Down Expand Up @@ -248,6 +258,9 @@ pub struct SchedulerConfig {
pub override_create_grpc_client_endpoint: Option<EndpointOverrideFn>,
/// [SchedulerMetricsCollector] override option
pub override_metrics_collector: Option<Arc<dyn SchedulerMetricsCollector>>,
/// Callback invoked when new work becomes available for executors.
/// The string argument is a reason/description for debugging purposes.
pub on_work_available: Option<OnWorkAvailableFn>,
}

impl Default for SchedulerConfig {
Expand Down Expand Up @@ -276,6 +289,7 @@ impl Default for SchedulerConfig {
override_physical_codec: None,
override_create_grpc_client_endpoint: None,
override_metrics_collector: None,
on_work_available: None,
}
}
}
Expand Down Expand Up @@ -527,6 +541,7 @@ impl TryFrom<Config> for SchedulerConfig {
override_session_builder: None,
override_create_grpc_client_endpoint: None,
override_metrics_collector: None,
on_work_available: None,
};

Ok(config)
Expand Down
12 changes: 12 additions & 0 deletions ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
.post_event(QueryStageSchedulerEvent::ReviveOffers)
.await?;
}

// Notify external systems that new work is available
if let Some(ref callback) = self.config.on_work_available {
callback(&format!("job_submitted:{job_id}"));
}
}
QueryStageSchedulerEvent::JobPlanningFailed {
job_id,
Expand Down Expand Up @@ -261,6 +266,13 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
.await?;
}

// Notify external systems when new stages become runnable
if !stage_events.is_empty()
&& let Some(ref callback) = self.config.on_work_available
{
callback("tasks_completed:new_stages_runnable");
}

for stage_event in stage_events {
event_sender.post_event(stage_event).await?;
}
Expand Down