Skip to content

Commit 6a95a52

Browse files
committed
feat: Allow external semaphore for executor busy state tracking
This change allows the semaphore controlling task concurrency to be passed into the poll_loop instead of being created internally. This enables: 1. Sharing the semaphore across multiple poll loops connected to different scheduler nodes 2. External tracking of executor busy state by querying available_permits() 3. Reporting busy state in scheduler shared state location metadata The semaphore parameter is optional - if None is passed, the function creates one internally to maintain backwards compatibility.
1 parent 2abc199 commit 6a95a52

1 file changed

Lines changed: 5 additions & 2 deletions

File tree

ballista/executor/src/execution_loop.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,14 @@ const QUIET_AFTER_FAILURES: u32 = 5;
7878
/// * `codec` - Codec for serializing/deserializing plans
7979
/// * `readiness` - Optional channel to signal when the executor is ready
8080
/// * `poll_now_notify` - Optional notify to wake the poll loop immediately when new work is available
81+
/// * `available_task_slots` - Optional semaphore for controlling task concurrency. If None, creates one internally.
8182
pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan, C>(
8283
mut scheduler: SchedulerGrpcClient<C>,
8384
executor: Arc<Executor>,
8485
codec: BallistaCodec<T, U>,
8586
readiness: Option<OneShotSender<String>>,
8687
poll_now_notify: Option<Arc<Notify>>,
88+
available_task_slots: Option<Arc<Semaphore>>,
8789
) -> Result<(), BallistaError>
8890
where
8991
C: tonic::client::GrpcService<tonic::body::Body>,
@@ -98,8 +100,9 @@ where
98100
.unwrap()
99101
.clone()
100102
.into();
101-
let available_task_slots =
102-
Arc::new(Semaphore::new(executor_specification.task_slots as usize));
103+
let available_task_slots = available_task_slots.unwrap_or_else(|| {
104+
Arc::new(Semaphore::new(executor_specification.task_slots as usize))
105+
});
103106

104107
let (task_status_sender, mut task_status_receiver) =
105108
std::sync::mpsc::channel::<TaskStatus>();

0 commit comments

Comments
 (0)