Skip to content

Commit f161028

Browse files
committed
feat: add Vortex columnar format support for shuffle operations (#7)
* feat: add Vortex columnar format support for shuffle operations - Introduced Vortex dependencies in Cargo.toml for columnar format handling. - Updated Ballista configuration to support shuffle format selection between Arrow IPC and Vortex. - Implemented Vortex shuffle reader and writer in execution plans. - Enhanced shuffle operations to detect and handle Vortex files. - Added utility functions for writing streams to disk in both Arrow IPC and Vortex formats. - Created a new module for Vortex shuffle operations, including reading and writing logic. - Added tests for Vortex write and read roundtrip functionality. * Fix Clippy and lint * Fix reading of Vortex files
1 parent 1a98c29 commit f161028

16 files changed

Lines changed: 1288 additions & 68 deletions

File tree

Cargo.lock

Lines changed: 422 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,13 @@ tokio-stream = { version = "0.1" }
7272
backoff = { version = "0.4" }
7373
url = { version = "2.5" }
7474

75+
# Vortex columnar format dependencies from spiceai fork
76+
vortex-array = { git = "https://github.com/spiceai/vortex", branch = "spiceai-51", default-features = false }
77+
vortex-buffer = { git = "https://github.com/spiceai/vortex", branch = "spiceai-51", default-features = false }
78+
vortex-dtype = { git = "https://github.com/spiceai/vortex", branch = "spiceai-51", default-features = false }
79+
vortex-error = { git = "https://github.com/spiceai/vortex", branch = "spiceai-51", default-features = false }
80+
vortex-ipc = { git = "https://github.com/spiceai/vortex", branch = "spiceai-51", default-features = false }
81+
7582
# cargo build --profile release-lto
7683
[profile.release-lto]
7784
codegen-units = 1

ballista/core/Cargo.toml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ build-binary = ["aws-config", "aws-credential-types", "clap", "object_store"]
3939
docsrs = []
4040
# Used for testing ONLY: causes all values to hash to the same value (test for collisions)
4141
force_hash_collisions = ["datafusion/force_hash_collisions"]
42+
# Enable Vortex columnar format support for shuffles
43+
vortex = ["vortex-array", "vortex-buffer", "vortex-dtype", "vortex-error", "vortex-ipc"]
4244

4345
[dependencies]
4446
arrow-flight = { workspace = true }
@@ -68,6 +70,13 @@ tonic-prost = { workspace = true }
6870
url = { workspace = true }
6971
uuid = { workspace = true }
7072

73+
# Vortex columnar format dependencies (optional)
74+
vortex-array = { workspace = true, optional = true }
75+
vortex-buffer = { workspace = true, optional = true }
76+
vortex-dtype = { workspace = true, optional = true }
77+
vortex-error = { workspace = true, optional = true }
78+
vortex-ipc = { workspace = true, optional = true }
79+
7180
[dev-dependencies]
7281
tempfile = { workspace = true }
7382

ballista/core/src/config.rs

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
//! Ballista configuration
2020
2121
use std::result;
22+
use std::str::FromStr;
2223
use std::{collections::HashMap, fmt::Display};
2324

2425
use crate::error::{BallistaError, Result};
@@ -47,6 +48,8 @@ pub const BALLISTA_SHUFFLE_READER_REMOTE_PREFER_FLIGHT: &str =
4748
pub const BALLISTA_SHUFFLE_STORAGE_TYPE: &str = "ballista.shuffle.storage_type";
4849
/// Configuration key for shuffle storage base URL/path.
4950
pub const BALLISTA_SHUFFLE_STORAGE_URL: &str = "ballista.shuffle.storage_url";
51+
/// Shuffle format configuration: "arrow_ipc" or "vortex"
52+
pub const BALLISTA_SHUFFLE_FORMAT: &str = "ballista.shuffle.format";
5053

5154
/// Configuration key for gRPC client connection timeout in seconds.
5255
pub const BALLISTA_GRPC_CLIENT_CONNECT_TIMEOUT_SECONDS: &str =
@@ -112,14 +115,54 @@ static CONFIG_ENTRIES: LazyLock<HashMap<String, ConfigEntry>> = LazyLock::new(||
112115
ConfigEntry::new(BALLISTA_GRPC_CLIENT_HTTP2_KEEPALIVE_INTERVAL_SECONDS.to_string(),
113116
"HTTP/2 keep-alive interval for gRPC client in seconds".to_string(),
114117
DataType::UInt64,
115-
Some((300).to_string()))
118+
Some((300).to_string())),
119+
ConfigEntry::new(BALLISTA_SHUFFLE_FORMAT.to_string(),
120+
"Shuffle data format: 'arrow_ipc' (default) or 'vortex'. Vortex requires the 'vortex' feature to be enabled.".to_string(),
121+
DataType::Utf8,
122+
Some(ShuffleFormat::default().to_string()))
116123
];
117124
entries
118125
.into_iter()
119126
.map(|e| (e.name.clone(), e))
120127
.collect::<HashMap<_, _>>()
121128
});
122129

130+
/// Shuffle data format for intermediate shuffle files
131+
#[derive(
132+
Clone, Copy, Debug, PartialEq, Eq, Default, serde::Deserialize, serde::Serialize,
133+
)]
134+
pub enum ShuffleFormat {
135+
/// Arrow IPC format (default, always available)
136+
#[default]
137+
ArrowIpc,
138+
/// Vortex columnar format (requires 'vortex' feature)
139+
Vortex,
140+
}
141+
142+
impl Display for ShuffleFormat {
143+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144+
match self {
145+
ShuffleFormat::ArrowIpc => f.write_str("arrow_ipc"),
146+
ShuffleFormat::Vortex => f.write_str("vortex"),
147+
}
148+
}
149+
}
150+
151+
impl FromStr for ShuffleFormat {
152+
type Err = String;
153+
154+
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
155+
match s.to_lowercase().as_str() {
156+
"arrow_ipc" | "arrow-ipc" | "arrowips" | "ipc" => Ok(ShuffleFormat::ArrowIpc),
157+
"vortex" => Ok(ShuffleFormat::Vortex),
158+
_ => Err(format!(
159+
"Invalid shuffle format '{}'. Valid options are: 'arrow_ipc', 'vortex'",
160+
s
161+
)),
162+
}
163+
}
164+
}
165+
123166
/// Configuration option meta-data
124167
#[derive(Debug, Clone)]
125168
pub struct ConfigEntry {
@@ -286,6 +329,17 @@ impl BallistaConfig {
286329
self.settings.get(BALLISTA_SHUFFLE_STORAGE_URL).cloned()
287330
}
288331

332+
/// Returns the configured shuffle format (ArrowIpc or Vortex)
333+
///
334+
/// Note: Vortex format requires the 'vortex' feature to be enabled.
335+
/// If Vortex is configured but the feature is not enabled, this will
336+
/// still return Vortex, but the shuffle operations will fail at runtime.
337+
pub fn shuffle_format(&self) -> ShuffleFormat {
338+
self.get_string_setting(BALLISTA_SHUFFLE_FORMAT)
339+
.parse()
340+
.unwrap_or_default()
341+
}
342+
289343
fn get_usize_setting(&self, key: &str) -> usize {
290344
if let Some(v) = self.settings.get(key) {
291345
// infallible because we validate all configs in the constructor

ballista/core/src/execution_plans/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,16 @@ mod shuffle_reader;
2323
mod shuffle_writer;
2424
mod unresolved_shuffle;
2525

26+
#[cfg(feature = "vortex")]
27+
pub mod vortex_shuffle;
28+
2629
pub use distributed_query::DistributedQueryExec;
2730
pub use shuffle_reader::ShuffleReaderExec;
2831
pub use shuffle_writer::ShuffleWriterExec;
2932
pub use unresolved_shuffle::UnresolvedShuffleExec;
33+
34+
#[cfg(feature = "vortex")]
35+
pub use vortex_shuffle::{
36+
LocalVortexShuffleStream, VortexWriteTracker, vortex_file_extension,
37+
write_stream_to_disk_vortex,
38+
};

ballista/core/src/execution_plans/shuffle_reader.rs

Lines changed: 61 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -666,31 +666,79 @@ async fn fetch_partition_local(
666666
let metadata = &location.executor_meta;
667667
let partition_id = &location.partition_id;
668668

669-
let reader = fetch_partition_local_inner(path).map_err(|e| {
670-
// return BallistaError::FetchFailed may let scheduler retry this task.
671-
BallistaError::FetchFailed(
672-
metadata.id.clone(),
673-
partition_id.stage_id,
674-
partition_id.partition_id,
675-
e.to_string(),
676-
)
677-
})?;
678-
Ok(Box::pin(LocalShuffleStream::new(reader)))
669+
// Detect format from file extension
670+
let is_vortex = path.ends_with(".vortex");
671+
672+
if is_vortex {
673+
#[cfg(feature = "vortex")]
674+
{
675+
// For Vortex files, we need the schema. Get it from partition stats or infer.
676+
// For now, we'll create a stream that reads the vortex file.
677+
// Note: Vortex IPC format is self-describing, so we can read the schema from the file.
678+
let stream = fetch_partition_local_vortex(path).map_err(|e| {
679+
BallistaError::FetchFailed(
680+
metadata.id.clone(),
681+
partition_id.stage_id,
682+
partition_id.partition_id,
683+
e.to_string(),
684+
)
685+
})?;
686+
Ok(stream)
687+
}
688+
#[cfg(not(feature = "vortex"))]
689+
{
690+
Err(BallistaError::General(
691+
"Vortex format files found but 'vortex' feature is not enabled"
692+
.to_string(),
693+
))
694+
}
695+
} else {
696+
// Arrow IPC format
697+
let reader = fetch_partition_local_arrow(path).map_err(|e| {
698+
BallistaError::FetchFailed(
699+
metadata.id.clone(),
700+
partition_id.stage_id,
701+
partition_id.partition_id,
702+
e.to_string(),
703+
)
704+
})?;
705+
Ok(Box::pin(LocalShuffleStream::new(reader)))
706+
}
679707
}
680708

681-
fn fetch_partition_local_inner(
709+
/// Fetch partition from local Arrow IPC file
710+
fn fetch_partition_local_arrow(
682711
path: &str,
683712
) -> result::Result<StreamReader<BufReader<File>>, BallistaError> {
684713
let file = File::open(path).map_err(|e| {
685714
BallistaError::General(format!("Failed to open partition file at {path}: {e:?}"))
686715
})?;
687716
let file = BufReader::new(file);
688717
let reader = StreamReader::try_new(file, None).map_err(|e| {
689-
BallistaError::General(format!("Failed to new arrow FileReader at {path}: {e:?}"))
718+
BallistaError::General(format!(
719+
"Failed to create Arrow IPC reader at {path}: {e:?}"
720+
))
690721
})?;
691722
Ok(reader)
692723
}
693724

725+
/// Fetch partition from local Vortex file
726+
#[cfg(feature = "vortex")]
727+
fn fetch_partition_local_vortex(
728+
path: &str,
729+
) -> result::Result<SendableRecordBatchStream, BallistaError> {
730+
use super::vortex_shuffle::LocalVortexShuffleStream;
731+
732+
// Vortex IPC format is self-describing, but we need a schema for the stream interface.
733+
// For now, use an empty schema - the actual data schema will come from the Vortex arrays.
734+
// TODO: Consider storing schema metadata in the Vortex file or a sidecar file.
735+
let schema = std::sync::Arc::new(datafusion::arrow::datatypes::Schema::empty());
736+
737+
// Create the stream - it handles reading and converting Vortex arrays to Arrow
738+
let stream = LocalVortexShuffleStream::try_new(path, schema)?;
739+
Ok(Box::pin(stream))
740+
}
741+
694742
#[cfg(feature = "build-binary")]
695743
async fn fetch_partition_object_store(
696744
location: &PartitionLocation,
@@ -1146,7 +1194,7 @@ mod tests {
11461194

11471195
// from to input partitions test the first one with two batches
11481196
let file_path = path.value(0);
1149-
let reader = fetch_partition_local_inner(file_path).unwrap();
1197+
let reader = fetch_partition_local_arrow(file_path).unwrap();
11501198

11511199
let mut stream: Pin<Box<dyn RecordBatchStream + Send>> =
11521200
async { Box::pin(LocalShuffleStream::new(reader)) }.await;

0 commit comments

Comments
 (0)