Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ballista/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ aws-config = { version = "1.6.0", optional = true }
aws-credential-types = { version = "1.2.0", optional = true }
chrono = { version = "0.4", default-features = false }
clap = { workspace = true, optional = true }
dashmap = "6"
datafusion = { workspace = true }
datafusion-proto = { workspace = true }
datafusion-proto-common = { workspace = true }
Expand Down
70 changes: 70 additions & 0 deletions ballista/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ pub const BALLISTA_SHUFFLE_READER_FORCE_REMOTE_READ: &str =
/// Configuration key to prefer Flight protocol for remote shuffle reads.
pub const BALLISTA_SHUFFLE_READER_REMOTE_PREFER_FLIGHT: &str =
"ballista.shuffle.remote_read_prefer_flight";
/// Configuration key for shuffle storage mode (disk or memory).
pub const BALLISTA_SHUFFLE_MEMORY_MODE: &str = "ballista.shuffle.memory_mode";
/// Configuration key indicating if this is the final output stage.
/// When true, shuffle data is always written to disk regardless of memory_mode setting.
pub const BALLISTA_IS_FINAL_STAGE: &str = "ballista.shuffle.is_final_stage";
/// Shuffle format configuration: "arrow_ipc" or "vortex"
pub const BALLISTA_SHUFFLE_FORMAT: &str = "ballista.shuffle.format";

Expand Down Expand Up @@ -88,6 +93,14 @@ static CONFIG_ENTRIES: LazyLock<HashMap<String, ConfigEntry>> = LazyLock::new(||
"Forces the shuffle reader to use flight reader instead of block reader for remote read. Block reader usually has better performance and resource utilization".to_string(),
DataType::Boolean,
Some((false).to_string())),
ConfigEntry::new(BALLISTA_SHUFFLE_MEMORY_MODE.to_string(),
"When enabled, shuffle data is kept in memory on executors instead of being written to disk. This can improve performance for workloads with sufficient memory.".to_string(),
DataType::Boolean,
Some((false).to_string())),
ConfigEntry::new(BALLISTA_IS_FINAL_STAGE.to_string(),
"When true, indicates this is the final output stage. Final stages always write to disk regardless of memory_mode setting to ensure proper cleanup.".to_string(),
DataType::Boolean,
Some((false).to_string())),
ConfigEntry::new(BALLISTA_GRPC_CLIENT_CONNECT_TIMEOUT_SECONDS.to_string(),
"Connection timeout for gRPC client in seconds".to_string(),
DataType::UInt64,
Expand Down Expand Up @@ -307,6 +320,21 @@ impl BallistaConfig {
self.get_bool_setting(BALLISTA_SHUFFLE_READER_REMOTE_PREFER_FLIGHT)
}

/// Returns whether in-memory shuffle mode is enabled.
///
/// When enabled, shuffle data is kept in memory on executors instead of
/// being written to disk. This can improve performance for workloads
/// with sufficient memory.
pub fn shuffle_memory_mode(&self) -> bool {
self.get_bool_setting(BALLISTA_SHUFFLE_MEMORY_MODE)
}

/// Returns whether this is the final output stage.
/// Final stages always write to disk regardless of memory_mode setting.
pub fn is_final_stage(&self) -> bool {
self.get_bool_setting(BALLISTA_IS_FINAL_STAGE)
}

/// Returns the configured shuffle format (ArrowIpc or Vortex)
///
/// Note: Vortex format requires the 'vortex' feature to be enabled.
Expand Down Expand Up @@ -477,4 +505,46 @@ mod tests {
assert_eq!(16777216, config.default_grpc_client_max_message_size());
Ok(())
}

#[test]
fn test_is_final_stage_default() {
let config = BallistaConfig::default();
// Default should be false
assert!(!config.is_final_stage());
}

#[test]
fn test_shuffle_memory_mode_default() {
let config = BallistaConfig::default();
// Default should be false (disk-based shuffles)
assert!(!config.shuffle_memory_mode());
}

#[test]
fn test_shuffle_format_default() {
let config = BallistaConfig::default();
// Default should be ArrowIpc
assert_eq!(config.shuffle_format(), ShuffleFormat::ArrowIpc);
}

#[test]
fn test_shuffle_format_parsing() {
assert_eq!(
"arrow_ipc".parse::<ShuffleFormat>().unwrap(),
ShuffleFormat::ArrowIpc
);
assert_eq!(
"arrow-ipc".parse::<ShuffleFormat>().unwrap(),
ShuffleFormat::ArrowIpc
);
assert_eq!(
"ipc".parse::<ShuffleFormat>().unwrap(),
ShuffleFormat::ArrowIpc
);
assert_eq!(
"vortex".parse::<ShuffleFormat>().unwrap(),
ShuffleFormat::Vortex
);
assert!("invalid".parse::<ShuffleFormat>().is_err());
}
}
5 changes: 5 additions & 0 deletions ballista/core/src/execution_plans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//! several Ballista executors.

mod distributed_query;
mod shuffle_manager;
mod shuffle_reader;
mod shuffle_writer;
mod unresolved_shuffle;
Expand All @@ -27,6 +28,10 @@ mod unresolved_shuffle;
pub mod vortex_shuffle;

pub use distributed_query::DistributedQueryExec;
pub use shuffle_manager::{
InMemoryShuffleManager, ShufflePartitionData, ShufflePartitionKey,
global_shuffle_manager,
};
pub use shuffle_reader::ShuffleReaderExec;
pub use shuffle_writer::ShuffleWriterExec;
pub use unresolved_shuffle::UnresolvedShuffleExec;
Expand Down
Loading