Skip to content

Commit 336bb6e

Browse files
authored
executor poll_loop to support oneshot channel for readiness reporting (#2)
1 parent 5d71a6d commit 336bb6e

3 files changed

Lines changed: 14 additions & 1 deletion

File tree

ballista/executor/src/execution_loop.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,19 +36,22 @@ use datafusion_proto::physical_plan::AsExecutionPlan;
3636
use futures::FutureExt;
3737
use log::{debug, error, info, warn};
3838
use std::any::Any;
39+
use std::cell::LazyCell;
3940
use std::convert::TryInto;
4041
use std::error::Error;
4142
use std::ops::Deref;
4243
use std::sync::mpsc::{Receiver, Sender, TryRecvError};
4344
use std::time::{SystemTime, UNIX_EPOCH};
4445
use std::{sync::Arc, time::Duration};
46+
use tokio::sync::oneshot::Sender as OneShotSender;
4547
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
4648
use tonic::transport::Channel;
4749

4850
pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
4951
mut scheduler: SchedulerGrpcClient<Channel>,
5052
executor: Arc<Executor>,
5153
codec: BallistaCodec<T, U>,
54+
readiness: Option<OneShotSender<String>>,
5255
) -> Result<(), BallistaError> {
5356
let executor_specification: ExecutorSpecification = executor
5457
.metadata
@@ -67,6 +70,13 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
6770
let dedicated_executor =
6871
DedicatedExecutor::new("task_runner", executor_specification.task_slots as usize);
6972

73+
let report_ready = LazyCell::new(|| {
74+
if let Some(chan) = readiness {
75+
chan.send(executor.metadata.id.clone())
76+
.expect("Must send readiness")
77+
}
78+
});
79+
7080
loop {
7181
// Wait for task slots to be available before asking for new work
7282
let permit = available_task_slots.acquire().await.unwrap();
@@ -89,6 +99,8 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
8999
})
90100
.await;
91101

102+
*report_ready;
103+
92104
match poll_work_result {
93105
Ok(result) => {
94106
let PollWorkResult {

ballista/executor/src/executor_process.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,7 @@ pub async fn start_executor_process(
357357
scheduler.clone(),
358358
executor.clone(),
359359
default_codec,
360+
None,
360361
)));
361362
}
362363
};

ballista/executor/src/standalone.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ pub async fn new_standalone_executor_from_builder(
131131
)),
132132
);
133133

134-
tokio::spawn(execution_loop::poll_loop(scheduler, executor, codec));
134+
tokio::spawn(execution_loop::poll_loop(scheduler, executor, codec, None));
135135
Ok(())
136136
}
137137

0 commit comments

Comments
 (0)