Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ballista/executor/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ impl TryFrom<Config> for ExecutorProcessConfig {
grpc_max_decoding_message_size: opt.grpc_server_max_decoding_message_size,
grpc_max_encoding_message_size: opt.grpc_server_max_encoding_message_size,
executor_heartbeat_interval_seconds: opt.executor_heartbeat_interval_seconds,
disable_scheduler_heartbeats: false,
disable_task_status_push: false,
override_execution_engine: None,
override_function_registry: None,
override_config_producer: None,
Expand Down
10 changes: 10 additions & 0 deletions ballista/executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::execution_engine::ExecutionEngine;
use crate::execution_engine::QueryStageExecutor;
use crate::metrics::ExecutorMetricsCollector;
use crate::metrics::LoggingMetricsCollector;
use crate::status_store::ExecutorStatusStore;
use ballista_core::error::BallistaError;
use ballista_core::registry::BallistaFunctionRegistry;
use ballista_core::serde::protobuf;
Expand Down Expand Up @@ -85,6 +86,9 @@ pub struct Executor {
/// Execution engine that the executor will delegate to
/// for executing query stages
pub(crate) execution_engine: Arc<dyn ExecutionEngine>,

/// Stores task status updates for scheduler polling.
status_store: Arc<ExecutorStatusStore>,
}

impl Executor {
Expand Down Expand Up @@ -133,6 +137,7 @@ impl Executor {
abort_handles: Default::default(),
execution_engine: execution_engine
.unwrap_or_else(|| Arc::new(DefaultExecutionEngine {})),
status_store: Arc::new(ExecutorStatusStore::new()),
}
}
}
Expand All @@ -149,6 +154,11 @@ impl Executor {
(self.config_producer)()
}

#[must_use]
pub fn status_store(&self) -> Arc<ExecutorStatusStore> {
Arc::clone(&self.status_store)
}

/// Execute one partition of a query stage and persist the result to disk in IPC format. On
/// success, return a RecordBatch containing metadata about the results, including path
/// and statistics.
Expand Down
6 changes: 6 additions & 0 deletions ballista/executor/src/executor_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ pub struct ExecutorProcessConfig {
/// The maximum size of an encoded message
pub grpc_max_encoding_message_size: u32,
pub executor_heartbeat_interval_seconds: u64,
/// Disable outbound scheduler heartbeats for polling-based health.
pub disable_scheduler_heartbeats: bool,
/// Disable outbound task status updates for polling-based status.
pub disable_task_status_push: bool,
/// Optional execution engine to use to execute physical plans, will default to
/// DataFusion if none is provided.
pub override_execution_engine: Option<Arc<dyn ExecutionEngine>>,
Expand Down Expand Up @@ -147,6 +151,8 @@ impl Default for ExecutorProcessConfig {
grpc_max_decoding_message_size: 16777216,
grpc_max_encoding_message_size: 16777216,
executor_heartbeat_interval_seconds: 60,
disable_scheduler_heartbeats: false,
disable_task_status_push: false,
override_execution_engine: None,
override_function_registry: None,
override_runtime_producer: None,
Expand Down
Loading
Loading