Skip to content

Commit 31ebf43

Browse files
mach-kernellukekim
authored andcommitted
executor poll_loop to support oneshot channel for readiness reporting (#2)
1 parent 08544f4 commit 31ebf43

3 files changed

Lines changed: 14 additions & 1 deletion

File tree

ballista/executor/src/execution_loop.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,13 @@ use datafusion_proto::physical_plan::AsExecutionPlan;
4040
use futures::FutureExt;
4141
use log::{debug, error, info, warn};
4242
use std::any::Any;
43+
use std::cell::LazyCell;
4344
use std::convert::TryInto;
4445
use std::error::Error;
4546
use std::sync::mpsc::{Receiver, Sender, TryRecvError};
4647
use std::time::{SystemTime, UNIX_EPOCH};
4748
use std::{sync::Arc, time::Duration};
49+
use tokio::sync::oneshot::Sender as OneShotSender;
4850
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
4951
use tonic::transport::Channel;
5052

@@ -60,6 +62,7 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
6062
mut scheduler: SchedulerGrpcClient<Channel>,
6163
executor: Arc<Executor>,
6264
codec: BallistaCodec<T, U>,
65+
readiness: Option<OneShotSender<String>>,
6366
) -> Result<(), BallistaError> {
6467
let executor_specification: ExecutorSpecification = executor
6568
.metadata
@@ -78,6 +81,13 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
7881
let dedicated_executor =
7982
DedicatedExecutor::new("task_runner", executor_specification.task_slots as usize);
8083

84+
let report_ready = LazyCell::new(|| {
85+
if let Some(chan) = readiness {
86+
chan.send(executor.metadata.id.clone())
87+
.expect("Must send readiness")
88+
}
89+
});
90+
8191
loop {
8292
// Wait for task slots to be available before asking for new work
8393
let permit = available_task_slots.acquire().await.unwrap();
@@ -100,6 +110,8 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
100110
})
101111
.await;
102112

113+
*report_ready;
114+
103115
match poll_work_result {
104116
Ok(result) => {
105117
let PollWorkResult {

ballista/executor/src/executor_process.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,7 @@ pub async fn start_executor_process(
399399
scheduler.clone(),
400400
executor.clone(),
401401
default_codec,
402+
None,
402403
)));
403404
}
404405
};

ballista/executor/src/standalone.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ pub async fn new_standalone_executor_from_builder(
143143
)),
144144
);
145145

146-
tokio::spawn(execution_loop::poll_loop(scheduler, executor, codec));
146+
tokio::spawn(execution_loop::poll_loop(scheduler, executor, codec, None));
147147
Ok(())
148148
}
149149

0 commit comments

Comments
 (0)