diff --git a/ballista/executor/src/execution_loop.rs b/ballista/executor/src/execution_loop.rs index 99f4c37c4b..f896b97b10 100644 --- a/ballista/executor/src/execution_loop.rs +++ b/ballista/executor/src/execution_loop.rs @@ -78,12 +78,14 @@ const QUIET_AFTER_FAILURES: u32 = 5; /// * `codec` - Codec for serializing/deserializing plans /// * `readiness` - Optional channel to signal when the executor is ready /// * `poll_now_notify` - Optional notify to wake the poll loop immediately when new work is available +/// * `available_task_slots` - Optional semaphore for controlling task concurrency. If None, creates one internally. pub async fn poll_loop( mut scheduler: SchedulerGrpcClient, executor: Arc, codec: BallistaCodec, readiness: Option>, poll_now_notify: Option>, + available_task_slots: Option>, ) -> Result<(), BallistaError> where C: tonic::client::GrpcService, @@ -98,8 +100,9 @@ where .unwrap() .clone() .into(); - let available_task_slots = - Arc::new(Semaphore::new(executor_specification.task_slots as usize)); + let available_task_slots = available_task_slots.unwrap_or_else(|| { + Arc::new(Semaphore::new(executor_specification.task_slots as usize)) + }); let (task_status_sender, mut task_status_receiver) = std::sync::mpsc::channel::(); diff --git a/ballista/executor/src/executor_process.rs b/ballista/executor/src/executor_process.rs index be3ee3d95c..6a864ddcae 100644 --- a/ballista/executor/src/executor_process.rs +++ b/ballista/executor/src/executor_process.rs @@ -434,6 +434,7 @@ pub async fn start_executor_process( default_codec, None, None, // poll_now_notify: not used in standalone executor + None, // available_task_slots: use internal semaphore ))); } }; diff --git a/ballista/executor/src/standalone.rs b/ballista/executor/src/standalone.rs index 3402ad175f..9e6cdf4f24 100644 --- a/ballista/executor/src/standalone.rs +++ b/ballista/executor/src/standalone.rs @@ -144,7 +144,7 @@ pub async fn new_standalone_executor_from_builder( ); tokio::spawn(execution_loop::poll_loop( - scheduler, executor, codec, None, None, + scheduler, executor, codec, None, None, None, )); Ok(()) }