Skip to content

Commit f13a56a

Browse files
feat(executor): add polling-based task status and optional heartbeat/push modes
Add ExecutorStatusStore for local task status storage that schedulers can poll. Add config flags to disable scheduler heartbeats and task status push. Refactor executor_server startup into composable functions for flexible initialization. Add TLS support for scheduler-to-executor gRPC connections.
1 parent 9c17685 commit f13a56a

9 files changed

Lines changed: 307 additions & 134 deletions

File tree

ballista/executor/src/config.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ impl TryFrom<Config> for ExecutorProcessConfig {
140140
grpc_max_decoding_message_size: opt.grpc_server_max_decoding_message_size,
141141
grpc_max_encoding_message_size: opt.grpc_server_max_encoding_message_size,
142142
executor_heartbeat_interval_seconds: opt.executor_heartbeat_interval_seconds,
143+
disable_scheduler_heartbeats: false,
144+
disable_task_status_push: false,
143145
override_execution_engine: None,
144146
override_function_registry: None,
145147
override_config_producer: None,

ballista/executor/src/executor.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use crate::execution_engine::ExecutionEngine;
2222
use crate::execution_engine::QueryStageExecutor;
2323
use crate::metrics::ExecutorMetricsCollector;
2424
use crate::metrics::LoggingMetricsCollector;
25+
use crate::status_store::ExecutorStatusStore;
2526
use ballista_core::error::BallistaError;
2627
use ballista_core::registry::BallistaFunctionRegistry;
2728
use ballista_core::serde::protobuf;
@@ -85,6 +86,9 @@ pub struct Executor {
8586
/// Execution engine that the executor will delegate to
8687
/// for executing query stages
8788
pub(crate) execution_engine: Arc<dyn ExecutionEngine>,
89+
90+
/// Stores task status updates for scheduler polling.
91+
status_store: Arc<ExecutorStatusStore>,
8892
}
8993

9094
impl Executor {
@@ -133,6 +137,7 @@ impl Executor {
133137
abort_handles: Default::default(),
134138
execution_engine: execution_engine
135139
.unwrap_or_else(|| Arc::new(DefaultExecutionEngine {})),
140+
status_store: Arc::new(ExecutorStatusStore::new()),
136141
}
137142
}
138143
}
@@ -149,6 +154,11 @@ impl Executor {
149154
(self.config_producer)()
150155
}
151156

157+
#[must_use]
158+
pub fn status_store(&self) -> Arc<ExecutorStatusStore> {
159+
Arc::clone(&self.status_store)
160+
}
161+
152162
/// Execute one partition of a query stage and persist the result to disk in IPC format. On
153163
/// success, return a RecordBatch containing metadata about the results, including path
154164
/// and statistics.

ballista/executor/src/executor_process.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,10 @@ pub struct ExecutorProcessConfig {
9494
/// The maximum size of an encoded message
9595
pub grpc_max_encoding_message_size: u32,
9696
pub executor_heartbeat_interval_seconds: u64,
97+
/// Disable outbound scheduler heartbeats for polling-based health.
98+
pub disable_scheduler_heartbeats: bool,
99+
/// Disable outbound task status updates for polling-based status.
100+
pub disable_task_status_push: bool,
97101
/// Optional execution engine to use to execute physical plans, will default to
98102
/// DataFusion if none is provided.
99103
pub override_execution_engine: Option<Arc<dyn ExecutionEngine>>,
@@ -147,6 +151,8 @@ impl Default for ExecutorProcessConfig {
147151
grpc_max_decoding_message_size: 16777216,
148152
grpc_max_encoding_message_size: 16777216,
149153
executor_heartbeat_interval_seconds: 60,
154+
disable_scheduler_heartbeats: false,
155+
disable_task_status_push: false,
150156
override_execution_engine: None,
151157
override_function_registry: None,
152158
override_runtime_producer: None,

0 commit comments

Comments
 (0)