Skip to content

Commit 0559efa

Browse files
push
1 parent 9c17685 commit 0559efa

8 files changed

Lines changed: 268 additions & 118 deletions

File tree

ballista/executor/src/config.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ impl TryFrom<Config> for ExecutorProcessConfig {
140140
grpc_max_decoding_message_size: opt.grpc_server_max_decoding_message_size,
141141
grpc_max_encoding_message_size: opt.grpc_server_max_encoding_message_size,
142142
executor_heartbeat_interval_seconds: opt.executor_heartbeat_interval_seconds,
143+
disable_scheduler_heartbeats: false,
144+
disable_task_status_push: false,
143145
override_execution_engine: None,
144146
override_function_registry: None,
145147
override_config_producer: None,

ballista/executor/src/executor.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use ballista_core::serde::protobuf::ExecutorRegistration;
2929
use ballista_core::serde::scheduler::PartitionId;
3030
use ballista_core::ConfigProducer;
3131
use ballista_core::RuntimeProducer;
32+
use crate::status_store::ExecutorStatusStore;
3233
use dashmap::DashMap;
3334
use datafusion::execution::context::TaskContext;
3435
use datafusion::execution::runtime_env::RuntimeEnv;
@@ -85,6 +86,9 @@ pub struct Executor {
8586
/// Execution engine that the executor will delegate to
8687
/// for executing query stages
8788
pub(crate) execution_engine: Arc<dyn ExecutionEngine>,
89+
90+
/// Stores task status updates for scheduler polling.
91+
status_store: Arc<ExecutorStatusStore>,
8892
}
8993

9094
impl Executor {
@@ -133,6 +137,7 @@ impl Executor {
133137
abort_handles: Default::default(),
134138
execution_engine: execution_engine
135139
.unwrap_or_else(|| Arc::new(DefaultExecutionEngine {})),
140+
status_store: Arc::new(ExecutorStatusStore::new()),
136141
}
137142
}
138143
}
@@ -149,6 +154,11 @@ impl Executor {
149154
(self.config_producer)()
150155
}
151156

157+
#[must_use]
158+
pub fn status_store(&self) -> Arc<ExecutorStatusStore> {
159+
Arc::clone(&self.status_store)
160+
}
161+
152162
/// Execute one partition of a query stage and persist the result to disk in IPC format. On
153163
/// success, return a RecordBatch containing metadata about the results, including path
154164
/// and statistics.

ballista/executor/src/executor_process.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,10 @@ pub struct ExecutorProcessConfig {
9494
/// The maximum size of an encoded message
9595
pub grpc_max_encoding_message_size: u32,
9696
pub executor_heartbeat_interval_seconds: u64,
97+
/// Disable outbound scheduler heartbeats for polling-based health.
98+
pub disable_scheduler_heartbeats: bool,
99+
/// Disable outbound task status updates for polling-based status.
100+
pub disable_task_status_push: bool,
97101
/// Optional execution engine to use to execute physical plans, will default to
98102
/// DataFusion if none is provided.
99103
pub override_execution_engine: Option<Arc<dyn ExecutionEngine>>,
@@ -147,6 +151,8 @@ impl Default for ExecutorProcessConfig {
147151
grpc_max_decoding_message_size: 16777216,
148152
grpc_max_encoding_message_size: 16777216,
149153
executor_heartbeat_interval_seconds: 60,
154+
disable_scheduler_heartbeats: false,
155+
disable_task_status_push: false,
150156
override_execution_engine: None,
151157
override_function_registry: None,
152158
override_runtime_producer: None,

0 commit comments

Comments
 (0)