Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
422 changes: 422 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ tokio-stream = { version = "0.1" }
backoff = { version = "0.4" }
url = { version = "2.5" }

# Vortex columnar format dependencies from spiceai fork
vortex-array = { git = "https://github.com/spiceai/vortex", branch = "spiceai-51", default-features = false }
vortex-buffer = { git = "https://github.com/spiceai/vortex", branch = "spiceai-51", default-features = false }
vortex-dtype = { git = "https://github.com/spiceai/vortex", branch = "spiceai-51", default-features = false }
vortex-error = { git = "https://github.com/spiceai/vortex", branch = "spiceai-51", default-features = false }
vortex-ipc = { git = "https://github.com/spiceai/vortex", branch = "spiceai-51", default-features = false }

# cargo build --profile release-lto
[profile.release-lto]
codegen-units = 1
Expand Down
9 changes: 9 additions & 0 deletions ballista/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ build-binary = ["aws-config", "aws-credential-types", "clap", "object_store"]
docsrs = []
# Used for testing ONLY: causes all values to hash to the same value (test for collisions)
force_hash_collisions = ["datafusion/force_hash_collisions"]
# Enable Vortex columnar format support for shuffles
vortex = ["vortex-array", "vortex-buffer", "vortex-dtype", "vortex-error", "vortex-ipc"]

[dependencies]
arrow-flight = { workspace = true }
Expand Down Expand Up @@ -67,6 +69,13 @@ tonic-prost = { workspace = true }
url = { workspace = true }
uuid = { workspace = true }

# Vortex columnar format dependencies (optional)
vortex-array = { workspace = true, optional = true }
vortex-buffer = { workspace = true, optional = true }
vortex-dtype = { workspace = true, optional = true }
vortex-error = { workspace = true, optional = true }
vortex-ipc = { workspace = true, optional = true }

[dev-dependencies]
tempfile = { workspace = true }

Expand Down
56 changes: 55 additions & 1 deletion ballista/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//! Ballista configuration

use std::result;
use std::str::FromStr;
use std::{collections::HashMap, fmt::Display};

use crate::error::{BallistaError, Result};
Expand All @@ -43,6 +44,8 @@ pub const BALLISTA_SHUFFLE_READER_FORCE_REMOTE_READ: &str =
/// Configuration key to prefer Flight protocol for remote shuffle reads.
pub const BALLISTA_SHUFFLE_READER_REMOTE_PREFER_FLIGHT: &str =
"ballista.shuffle.remote_read_prefer_flight";
/// Shuffle format configuration: "arrow_ipc" or "vortex"
pub const BALLISTA_SHUFFLE_FORMAT: &str = "ballista.shuffle.format";

/// Configuration key for gRPC client connection timeout in seconds.
pub const BALLISTA_GRPC_CLIENT_CONNECT_TIMEOUT_SECONDS: &str =
Expand Down Expand Up @@ -100,14 +103,54 @@ static CONFIG_ENTRIES: LazyLock<HashMap<String, ConfigEntry>> = LazyLock::new(||
ConfigEntry::new(BALLISTA_GRPC_CLIENT_HTTP2_KEEPALIVE_INTERVAL_SECONDS.to_string(),
"HTTP/2 keep-alive interval for gRPC client in seconds".to_string(),
DataType::UInt64,
Some((300).to_string()))
Some((300).to_string())),
ConfigEntry::new(BALLISTA_SHUFFLE_FORMAT.to_string(),
"Shuffle data format: 'arrow_ipc' (default) or 'vortex'. Vortex requires the 'vortex' feature to be enabled.".to_string(),
DataType::Utf8,
Some(ShuffleFormat::default().to_string()))
];
entries
.into_iter()
.map(|e| (e.name.clone(), e))
.collect::<HashMap<_, _>>()
});

/// Shuffle data format for intermediate shuffle files
#[derive(
Clone, Copy, Debug, PartialEq, Eq, Default, serde::Deserialize, serde::Serialize,
)]
pub enum ShuffleFormat {
/// Arrow IPC format (default, always available)
#[default]
ArrowIpc,
/// Vortex columnar format (requires 'vortex' feature)
Vortex,
}

impl Display for ShuffleFormat {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ShuffleFormat::ArrowIpc => f.write_str("arrow_ipc"),
ShuffleFormat::Vortex => f.write_str("vortex"),
}
}
}

impl FromStr for ShuffleFormat {
type Err = String;

fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"arrow_ipc" | "arrow-ipc" | "arrowips" | "ipc" => Ok(ShuffleFormat::ArrowIpc),
"vortex" => Ok(ShuffleFormat::Vortex),
_ => Err(format!(
"Invalid shuffle format '{}'. Valid options are: 'arrow_ipc', 'vortex'",
s
)),
}
}
}

/// Configuration option meta-data
#[derive(Debug, Clone)]
pub struct ConfigEntry {
Expand Down Expand Up @@ -264,6 +307,17 @@ impl BallistaConfig {
self.get_bool_setting(BALLISTA_SHUFFLE_READER_REMOTE_PREFER_FLIGHT)
}

/// Returns the configured shuffle format (ArrowIpc or Vortex)
///
/// Note: Vortex format requires the 'vortex' feature to be enabled.
/// If Vortex is configured but the feature is not enabled, this will
/// still return Vortex, but the shuffle operations will fail at runtime.
pub fn shuffle_format(&self) -> ShuffleFormat {
self.get_string_setting(BALLISTA_SHUFFLE_FORMAT)
.parse()
.unwrap_or_default()
}

fn get_usize_setting(&self, key: &str) -> usize {
if let Some(v) = self.settings.get(key) {
// infallible because we validate all configs in the constructor
Expand Down
9 changes: 9 additions & 0 deletions ballista/core/src/execution_plans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,16 @@ mod shuffle_reader;
mod shuffle_writer;
mod unresolved_shuffle;

#[cfg(feature = "vortex")]
pub mod vortex_shuffle;

pub use distributed_query::DistributedQueryExec;
pub use shuffle_reader::ShuffleReaderExec;
pub use shuffle_writer::ShuffleWriterExec;
pub use unresolved_shuffle::UnresolvedShuffleExec;

#[cfg(feature = "vortex")]
pub use vortex_shuffle::{
LocalVortexShuffleStream, VortexWriteTracker, vortex_file_extension,
write_stream_to_disk_vortex,
};
74 changes: 61 additions & 13 deletions ballista/core/src/execution_plans/shuffle_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -613,31 +613,79 @@ async fn fetch_partition_local(
let metadata = &location.executor_meta;
let partition_id = &location.partition_id;

let reader = fetch_partition_local_inner(path).map_err(|e| {
// return BallistaError::FetchFailed may let scheduler retry this task.
BallistaError::FetchFailed(
metadata.id.clone(),
partition_id.stage_id,
partition_id.partition_id,
e.to_string(),
)
})?;
Ok(Box::pin(LocalShuffleStream::new(reader)))
// Detect format from file extension
let is_vortex = path.ends_with(".vortex");

if is_vortex {
#[cfg(feature = "vortex")]
{
// For Vortex files, we need the schema. Get it from partition stats or infer.
// For now, we'll create a stream that reads the vortex file.
// Note: Vortex IPC format is self-describing, so we can read the schema from the file.
let stream = fetch_partition_local_vortex(path).map_err(|e| {
BallistaError::FetchFailed(
metadata.id.clone(),
partition_id.stage_id,
partition_id.partition_id,
e.to_string(),
)
})?;
Ok(stream)
}
#[cfg(not(feature = "vortex"))]
{
Err(BallistaError::General(
"Vortex format files found but 'vortex' feature is not enabled"
.to_string(),
))
}
} else {
// Arrow IPC format
let reader = fetch_partition_local_arrow(path).map_err(|e| {
BallistaError::FetchFailed(
metadata.id.clone(),
partition_id.stage_id,
partition_id.partition_id,
e.to_string(),
)
})?;
Ok(Box::pin(LocalShuffleStream::new(reader)))
}
}

fn fetch_partition_local_inner(
/// Fetch partition from local Arrow IPC file
fn fetch_partition_local_arrow(
path: &str,
) -> result::Result<StreamReader<BufReader<File>>, BallistaError> {
let file = File::open(path).map_err(|e| {
BallistaError::General(format!("Failed to open partition file at {path}: {e:?}"))
})?;
let file = BufReader::new(file);
let reader = StreamReader::try_new(file, None).map_err(|e| {
BallistaError::General(format!("Failed to new arrow FileReader at {path}: {e:?}"))
BallistaError::General(format!(
"Failed to create Arrow IPC reader at {path}: {e:?}"
))
})?;
Ok(reader)
}

/// Fetch partition from local Vortex file
#[cfg(feature = "vortex")]
fn fetch_partition_local_vortex(
path: &str,
) -> result::Result<SendableRecordBatchStream, BallistaError> {
use super::vortex_shuffle::LocalVortexShuffleStream;

// Vortex IPC format is self-describing, but we need a schema for the stream interface.
// For now, use an empty schema - the actual data schema will come from the Vortex arrays.
// TODO: Consider storing schema metadata in the Vortex file or a sidecar file.
let schema = std::sync::Arc::new(datafusion::arrow::datatypes::Schema::empty());

// Create the stream - it handles reading and converting Vortex arrays to Arrow
let stream = LocalVortexShuffleStream::try_new(path, schema)?;
Ok(Box::pin(stream))
}

async fn fetch_partition_object_store(
_location: &PartitionLocation,
) -> result::Result<SendableRecordBatchStream, BallistaError> {
Expand Down Expand Up @@ -968,7 +1016,7 @@ mod tests {

// from to input partitions test the first one with two batches
let file_path = path.value(0);
let reader = fetch_partition_local_inner(file_path).unwrap();
let reader = fetch_partition_local_arrow(file_path).unwrap();

let mut stream: Pin<Box<dyn RecordBatchStream + Send>> =
async { Box::pin(LocalShuffleStream::new(reader)) }.await;
Expand Down
Loading