Skip to content

Commit 20ef1eb

Browse files
authored
feat: Allow external semaphore for executor busy state tracking (#14)
* feat: Allow external semaphore for executor busy state tracking This change allows the semaphore controlling task concurrency to be passed into the poll_loop instead of being created internally. This enables: 1. Sharing the semaphore across multiple poll loops connected to different scheduler nodes 2. External tracking of executor busy state by querying available_permits() 3. Reporting busy state in scheduler shared state location metadata The semaphore parameter is optional - if None is passed, the function creates one internally to maintain backwards compatibility. * feat: Add internal semaphore for available task slots in executor process
1 parent 2abc199 commit 20ef1eb

3 files changed

Lines changed: 7 additions & 3 deletions

File tree

ballista/executor/src/execution_loop.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,14 @@ const QUIET_AFTER_FAILURES: u32 = 5;
7878
/// * `codec` - Codec for serializing/deserializing plans
7979
/// * `readiness` - Optional channel to signal when the executor is ready
8080
/// * `poll_now_notify` - Optional notify to wake the poll loop immediately when new work is available
81+
/// * `available_task_slots` - Optional semaphore for controlling task concurrency. If None, creates one internally.
8182
pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan, C>(
8283
mut scheduler: SchedulerGrpcClient<C>,
8384
executor: Arc<Executor>,
8485
codec: BallistaCodec<T, U>,
8586
readiness: Option<OneShotSender<String>>,
8687
poll_now_notify: Option<Arc<Notify>>,
88+
available_task_slots: Option<Arc<Semaphore>>,
8789
) -> Result<(), BallistaError>
8890
where
8991
C: tonic::client::GrpcService<tonic::body::Body>,
@@ -98,8 +100,9 @@ where
98100
.unwrap()
99101
.clone()
100102
.into();
101-
let available_task_slots =
102-
Arc::new(Semaphore::new(executor_specification.task_slots as usize));
103+
let available_task_slots = available_task_slots.unwrap_or_else(|| {
104+
Arc::new(Semaphore::new(executor_specification.task_slots as usize))
105+
});
103106

104107
let (task_status_sender, mut task_status_receiver) =
105108
std::sync::mpsc::channel::<TaskStatus>();

ballista/executor/src/executor_process.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,7 @@ pub async fn start_executor_process(
434434
default_codec,
435435
None,
436436
None, // poll_now_notify: not used in standalone executor
437+
None, // available_task_slots: use internal semaphore
437438
)));
438439
}
439440
};

ballista/executor/src/standalone.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ pub async fn new_standalone_executor_from_builder(
144144
);
145145

146146
tokio::spawn(execution_loop::poll_loop(
147-
scheduler, executor, codec, None, None,
147+
scheduler, executor, codec, None, None, None,
148148
));
149149
Ok(())
150150
}

0 commit comments

Comments
 (0)