From 6a95a5217940a1a5ce707ac7287243d9b793fa6e Mon Sep 17 00:00:00 2001 From: Luke Kim <80174+lukekim@users.noreply.github.com> Date: Wed, 28 Jan 2026 18:36:59 -0800 Subject: [PATCH 1/2] feat: Allow external semaphore for executor busy state tracking This change allows the semaphore controlling task concurrency to be passed into the poll_loop instead of being created internally. This enables: 1. Sharing the semaphore across multiple poll loops connected to different scheduler nodes 2. External tracking of executor busy state by querying available_permits() 3. Reporting busy state in scheduler shared state location metadata The semaphore parameter is optional - if None is passed, the function creates one internally to maintain backwards compatibility. --- ballista/executor/src/execution_loop.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/ballista/executor/src/execution_loop.rs b/ballista/executor/src/execution_loop.rs index 99f4c37c4b..f896b97b10 100644 --- a/ballista/executor/src/execution_loop.rs +++ b/ballista/executor/src/execution_loop.rs @@ -78,12 +78,14 @@ const QUIET_AFTER_FAILURES: u32 = 5; /// * `codec` - Codec for serializing/deserializing plans /// * `readiness` - Optional channel to signal when the executor is ready /// * `poll_now_notify` - Optional notify to wake the poll loop immediately when new work is available +/// * `available_task_slots` - Optional semaphore for controlling task concurrency. If None, creates one internally. pub async fn poll_loop( mut scheduler: SchedulerGrpcClient, executor: Arc, codec: BallistaCodec, readiness: Option>, poll_now_notify: Option>, + available_task_slots: Option>, ) -> Result<(), BallistaError> where C: tonic::client::GrpcService, @@ -98,8 +100,9 @@ where .unwrap() .clone() .into(); - let available_task_slots = - Arc::new(Semaphore::new(executor_specification.task_slots as usize)); + let available_task_slots = available_task_slots.unwrap_or_else(|| { + Arc::new(Semaphore::new(executor_specification.task_slots as usize)) + }); let (task_status_sender, mut task_status_receiver) = std::sync::mpsc::channel::(); From b38fc72b84e915744026c9b438b9f52f50546ded Mon Sep 17 00:00:00 2001 From: Luke Kim <80174+lukekim@users.noreply.github.com> Date: Wed, 28 Jan 2026 19:15:00 -0800 Subject: [PATCH 2/2] feat: Add internal semaphore for available task slots in executor process --- ballista/executor/src/executor_process.rs | 1 + ballista/executor/src/standalone.rs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/ballista/executor/src/executor_process.rs b/ballista/executor/src/executor_process.rs index be3ee3d95c..6a864ddcae 100644 --- a/ballista/executor/src/executor_process.rs +++ b/ballista/executor/src/executor_process.rs @@ -434,6 +434,7 @@ pub async fn start_executor_process( default_codec, None, None, // poll_now_notify: not used in standalone executor + None, // available_task_slots: use internal semaphore ))); } }; diff --git a/ballista/executor/src/standalone.rs b/ballista/executor/src/standalone.rs index 3402ad175f..9e6cdf4f24 100644 --- a/ballista/executor/src/standalone.rs +++ b/ballista/executor/src/standalone.rs @@ -144,7 +144,7 @@ pub async fn new_standalone_executor_from_builder( ); tokio::spawn(execution_loop::poll_loop( - scheduler, executor, codec, None, None, + scheduler, executor, codec, None, None, None, )); Ok(()) }