Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions ballista/executor/src/execution_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,14 @@ const QUIET_AFTER_FAILURES: u32 = 5;
/// * `codec` - Codec for serializing/deserializing plans
/// * `readiness` - Optional channel to signal when the executor is ready
/// * `poll_now_notify` - Optional notify to wake the poll loop immediately when new work is available
/// * `available_task_slots` - Optional semaphore for controlling task concurrency. If None, creates one internally.
pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan, C>(
mut scheduler: SchedulerGrpcClient<C>,
executor: Arc<Executor>,
codec: BallistaCodec<T, U>,
readiness: Option<OneShotSender<String>>,
poll_now_notify: Option<Arc<Notify>>,
available_task_slots: Option<Arc<Semaphore>>,
) -> Result<(), BallistaError>
where
C: tonic::client::GrpcService<tonic::body::Body>,
Expand All @@ -98,8 +100,9 @@ where
.unwrap()
.clone()
.into();
let available_task_slots =
Arc::new(Semaphore::new(executor_specification.task_slots as usize));
let available_task_slots = available_task_slots.unwrap_or_else(|| {
Arc::new(Semaphore::new(executor_specification.task_slots as usize))
});

let (task_status_sender, mut task_status_receiver) =
std::sync::mpsc::channel::<TaskStatus>();
Expand Down
Loading