diff --git a/Cargo.lock b/Cargo.lock index cc2a39129f..3c7478b079 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -155,6 +155,12 @@ dependencies = [ "object", ] +[[package]] +name = "arcref" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28f6098a1e8ab66ff91324cce8fea6643101882cf7d09c85acdb1485ecf61e29" + [[package]] name = "arrayref" version = "0.3.9" @@ -383,6 +389,7 @@ version = "57.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6bb63203e8e0e54b288d0d8043ca8fa1013820822a27692ef1b78a977d879f2c" dependencies = [ + "bitflags 2.10.0", "serde_core", "serde_json", ] @@ -1017,6 +1024,11 @@ dependencies = [ "tonic-prost-build", "url", "uuid", + "vortex-array", + "vortex-buffer", + "vortex-dtype", + "vortex-error", + "vortex-ipc", ] [[package]] @@ -1065,6 +1077,8 @@ dependencies = [ "tracing-appender", "tracing-subscriber", "uuid", + "vortex-array", + "vortex-ipc", ] [[package]] @@ -1096,6 +1110,8 @@ dependencies = [ "tokio", "tokio-stream", "tonic", + "tonic-prost", + "tonic-prost-build", "tracing", "tracing-appender", "tracing-subscriber", @@ -1150,6 +1166,18 @@ version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3" +[[package]] +name = "bitvec" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bc2832c24239b0141d5674bb9174f9d68a8b5b3f2753311927c172ca46f7e9c" +dependencies = [ + "funty", + "radium", + "tap", + "wyz", +] + [[package]] name = "blake2" version = "0.10.6" @@ -1620,6 +1648,16 @@ version = "0.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2931af7e13dc045d8e9d26afccc6fa115d64e115c9c84b1166288b46f6782c2" +[[package]] +name = "cudarc" +version = "0.18.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3aa12038120eb13347a6ae2ffab1d34efe78150125108627fd85044dd4d6ff1e" +dependencies = [ + "half", + "libloading", +] + [[package]] name = "darling" version = "0.21.3" @@ -2580,6 +2618,26 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" +[[package]] +name = "enum-iterator" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4549325971814bda7a44061bf3fe7e487d447cba01e4220a4b454d630d7a016" +dependencies = [ + "enum-iterator-derive", +] + +[[package]] +name = "enum-iterator-derive" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "685adfa4d6f3d765a26bc5dbc936577de9abf756c1feeb3089b01dd395034842" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.114", +] + [[package]] name = "env_filter" version = "0.1.4" @@ -2731,6 +2789,12 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" +[[package]] +name = "funty" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" + [[package]] name = "futures" version = "0.3.31" @@ -2913,6 +2977,8 @@ dependencies = [ "cfg-if", "crunchy", "num-traits", + "rand 0.9.2", + "rand_distr", "zerocopy", ] @@ -3058,6 +3124,15 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "humansize" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6cb51c9a029ddc91b07a787f1d86b53ccfa49b0e86688c946ebe8d3555685dd7" +dependencies = [ + "libm", +] + [[package]] name = "humantime" version = "2.3.0" @@ -3363,6 +3438,15 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "inventory" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc61209c082fbeb19919bee74b176221b27223e27b65d781eb91af24eb1fb46e" +dependencies = [ + "rustversion", +] + [[package]] name = "ipnet" version = "2.11.0" @@ -3407,10 +3491,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e67e8da4c49d6d9909fe03361f9b620f58898859f5c7aded68351e85e71ecf50" dependencies = [ "jiff-static", + "jiff-tzdb-platform", "log", "portable-atomic", "portable-atomic-util", "serde_core", + "windows-sys 0.52.0", ] [[package]] @@ -3424,6 +3510,21 @@ dependencies = [ "syn 2.0.114", ] +[[package]] +name = "jiff-tzdb" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68971ebff725b9e2ca27a601c5eb38a4c5d64422c4cbab0c535f248087eda5c2" + +[[package]] +name = "jiff-tzdb-platform" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "875a5a69ac2bab1a891711cf5eccbec1ce0341ea805560dcd90b7a2e925132e8" +dependencies = [ + "jiff-tzdb", +] + [[package]] name = "jobserver" version = "0.1.34" @@ -3519,6 +3620,16 @@ version = "0.2.180" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bcc35a38544a891a5f7c865aca548a982ccb3b8650a5b06d0fd33a10283c56fc" +[[package]] +name = "libloading" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7c4b02199fee7c5d21a5ae7d8cfa79a6ef5bb2fc834d6e9058e89c825efdc55" +dependencies = [ + "cfg-if", + "windows-link", +] + [[package]] name = "libm" version = "0.2.15" @@ -3678,6 +3789,28 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" +[[package]] +name = "multiversion" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7edb7f0ff51249dfda9ab96b5823695e15a052dc15074c9dbf3d118afaf2c201" +dependencies = [ + "multiversion-macros", + "target-features", +] + +[[package]] +name = "multiversion-macros" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b093064383341eb3271f42e381cb8f10a01459478446953953c75d24bd339fc0" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.114", + "target-features", +] + [[package]] name = "nibble_vec" version = "0.1.0" @@ -3789,6 +3922,27 @@ dependencies = [ "libm", ] +[[package]] +name = "num_enum" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1207a7e20ad57b847bbddc6776b968420d38292bbfe2089accff5e19e82454c" +dependencies = [ + "num_enum_derive", + "rustversion", +] + +[[package]] +name = "num_enum_derive" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff32365de1b6743cb203b710788263c44a03de03802daf96092f2da4fe6ba4d7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.114", +] + [[package]] name = "object" version = "0.32.2" @@ -4439,6 +4593,12 @@ version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" +[[package]] +name = "radium" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" + [[package]] name = "radix_trie" version = "0.2.1" @@ -4508,6 +4668,16 @@ dependencies = [ "getrandom 0.3.4", ] +[[package]] +name = "rand_distr" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a8615d50dcf34fa31f7ab52692afec947c4dd0ab803cc87cb3b0b4570ff7463" +dependencies = [ + "num-traits", + "rand 0.9.2", +] + [[package]] name = "recursive" version = "0.1.1" @@ -5253,6 +5423,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "strsim" version = "0.11.1" @@ -5397,6 +5573,18 @@ dependencies = [ "syn 2.0.114", ] +[[package]] +name = "tap" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" + +[[package]] +name = "target-features" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1bbb9f3c5c463a01705937a24fdabc5047929ac764b2d5b9cf681c1f5041ed5" + [[package]] name = "tempfile" version = "3.24.0" @@ -5410,6 +5598,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "termtree" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f50febec83f5ee1df3015341d8bd429f2d1cc62bcba7ea2076759d315084683" + [[package]] name = "testcontainers" version = "0.25.2" @@ -6077,6 +6271,225 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "vortex-array" +version = "0.1.0" +source = "git+https://github.com/spiceai/vortex?branch=spiceai-51#623936e22bdc1daad586af4a54053240f2009d5c" +dependencies = [ + "arcref", + "arrow-arith", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-ord", + "arrow-schema", + "arrow-select", + "arrow-string", + "cfg-if", + "enum-iterator", + "flatbuffers", + "futures", + "getrandom 0.3.4", + "humansize", + "inventory", + "itertools", + "multiversion", + "num-traits", + "num_enum", + "parking_lot", + "paste", + "pin-project-lite", + "prost", + "rand 0.9.2", + "rustc-hash", + "simdutf8", + "termtree", + "tracing", + "vortex-buffer", + "vortex-compute", + "vortex-dtype", + "vortex-error", + "vortex-flatbuffers", + "vortex-mask", + "vortex-proto", + "vortex-scalar", + "vortex-session", + "vortex-utils", + "vortex-vector", +] + +[[package]] +name = "vortex-buffer" +version = "0.1.0" +source = "git+https://github.com/spiceai/vortex?branch=spiceai-51#623936e22bdc1daad586af4a54053240f2009d5c" +dependencies = [ + "arrow-buffer", + "bitvec", + "bytes", + "cudarc", + "itertools", + "simdutf8", + "vortex-error", +] + +[[package]] +name = "vortex-compute" +version = "0.1.0" +source = "git+https://github.com/spiceai/vortex?branch=spiceai-51#623936e22bdc1daad586af4a54053240f2009d5c" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-schema", + "half", + "itertools", + "multiversion", + "num-traits", + "paste", + "tracing", + "vortex-buffer", + "vortex-dtype", + "vortex-error", + "vortex-mask", + "vortex-vector", +] + +[[package]] +name = "vortex-dtype" +version = "0.1.0" +source = "git+https://github.com/spiceai/vortex?branch=spiceai-51#623936e22bdc1daad586af4a54053240f2009d5c" +dependencies = [ + "arrow-buffer", + "arrow-schema", + "flatbuffers", + "half", + "itertools", + "jiff", + "num-traits", + "num_enum", + "paste", + "prost", + "serde", + "static_assertions", + "vortex-buffer", + "vortex-error", + "vortex-flatbuffers", + "vortex-proto", + "vortex-utils", +] + +[[package]] +name = "vortex-error" +version = "0.1.0" +source = "git+https://github.com/spiceai/vortex?branch=spiceai-51#623936e22bdc1daad586af4a54053240f2009d5c" +dependencies = [ + "arrow-schema", + "flatbuffers", + "jiff", + "prost", + "url", +] + +[[package]] +name = "vortex-flatbuffers" +version = "0.1.0" +source = "git+https://github.com/spiceai/vortex?branch=spiceai-51#623936e22bdc1daad586af4a54053240f2009d5c" +dependencies = [ + "flatbuffers", + "vortex-buffer", +] + +[[package]] +name = "vortex-ipc" +version = "0.1.0" +source = "git+https://github.com/spiceai/vortex?branch=spiceai-51#623936e22bdc1daad586af4a54053240f2009d5c" +dependencies = [ + "bytes", + "flatbuffers", + "futures", + "itertools", + "pin-project-lite", + "vortex-array", + "vortex-buffer", + "vortex-dtype", + "vortex-error", + "vortex-flatbuffers", +] + +[[package]] +name = "vortex-mask" +version = "0.1.0" +source = "git+https://github.com/spiceai/vortex?branch=spiceai-51#623936e22bdc1daad586af4a54053240f2009d5c" +dependencies = [ + "arrow-buffer", + "itertools", + "vortex-buffer", + "vortex-error", +] + +[[package]] +name = "vortex-proto" +version = "0.1.0" +source = "git+https://github.com/spiceai/vortex?branch=spiceai-51#623936e22bdc1daad586af4a54053240f2009d5c" +dependencies = [ + "prost", + "prost-types", +] + +[[package]] +name = "vortex-scalar" +version = "0.1.0" +source = "git+https://github.com/spiceai/vortex?branch=spiceai-51#623936e22bdc1daad586af4a54053240f2009d5c" +dependencies = [ + "arrow-array", + "bytes", + "itertools", + "num-traits", + "paste", + "prost", + "vortex-buffer", + "vortex-dtype", + "vortex-error", + "vortex-mask", + "vortex-proto", + "vortex-utils", + "vortex-vector", +] + +[[package]] +name = "vortex-session" +version = "0.1.0" +source = "git+https://github.com/spiceai/vortex?branch=spiceai-51#623936e22bdc1daad586af4a54053240f2009d5c" +dependencies = [ + "dashmap", + "vortex-error", + "vortex-utils", +] + +[[package]] +name = "vortex-utils" +version = "0.1.0" +source = "git+https://github.com/spiceai/vortex?branch=spiceai-51#623936e22bdc1daad586af4a54053240f2009d5c" +dependencies = [ + "dashmap", + "hashbrown 0.16.1", + "vortex-error", +] + +[[package]] +name = "vortex-vector" +version = "0.1.0" +source = "git+https://github.com/spiceai/vortex?branch=spiceai-51#623936e22bdc1daad586af4a54053240f2009d5c" +dependencies = [ + "num-traits", + "paste", + "static_assertions", + "vortex-buffer", + "vortex-dtype", + "vortex-error", + "vortex-mask", +] + [[package]] name = "vsimd" version = "0.8.0" @@ -6494,6 +6907,15 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9" +[[package]] +name = "wyz" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f360fc0b24296329c78fda852a1e9ae82de9cf7b27dae4b7f62f118f77b9ed" +dependencies = [ + "tap", +] + [[package]] name = "xattr" version = "1.6.1" diff --git a/Cargo.toml b/Cargo.toml index bd73e3601a..a0aade4dc5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -71,6 +71,13 @@ tokio-stream = { version = "0.1" } backoff = { version = "0.4" } url = { version = "2.5" } +# Vortex columnar format dependencies from spiceai fork +vortex-array = { git = "https://github.com/spiceai/vortex", branch = "spiceai-51", default-features = false } +vortex-buffer = { git = "https://github.com/spiceai/vortex", branch = "spiceai-51", default-features = false } +vortex-dtype = { git = "https://github.com/spiceai/vortex", branch = "spiceai-51", default-features = false } +vortex-error = { git = "https://github.com/spiceai/vortex", branch = "spiceai-51", default-features = false } +vortex-ipc = { git = "https://github.com/spiceai/vortex", branch = "spiceai-51", default-features = false } + # cargo build --profile release-lto [profile.release-lto] codegen-units = 1 diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml index 4eebdc7007..8402c3613b 100644 --- a/ballista/core/Cargo.toml +++ b/ballista/core/Cargo.toml @@ -39,6 +39,8 @@ build-binary = ["aws-config", "aws-credential-types", "clap", "object_store"] docsrs = [] # Used for testing ONLY: causes all values to hash to the same value (test for collisions) force_hash_collisions = ["datafusion/force_hash_collisions"] +# Enable Vortex columnar format support for shuffles +vortex = ["vortex-array", "vortex-buffer", "vortex-dtype", "vortex-error", "vortex-ipc"] [dependencies] arrow-flight = { workspace = true } @@ -67,6 +69,13 @@ tonic-prost = { workspace = true } url = { workspace = true } uuid = { workspace = true } +# Vortex columnar format dependencies (optional) +vortex-array = { workspace = true, optional = true } +vortex-buffer = { workspace = true, optional = true } +vortex-dtype = { workspace = true, optional = true } +vortex-error = { workspace = true, optional = true } +vortex-ipc = { workspace = true, optional = true } + [dev-dependencies] tempfile = { workspace = true } diff --git a/ballista/core/src/config.rs b/ballista/core/src/config.rs index 1e6a593fd2..7510724a41 100644 --- a/ballista/core/src/config.rs +++ b/ballista/core/src/config.rs @@ -19,6 +19,7 @@ //! Ballista configuration use std::result; +use std::str::FromStr; use std::{collections::HashMap, fmt::Display}; use crate::error::{BallistaError, Result}; @@ -43,6 +44,8 @@ pub const BALLISTA_SHUFFLE_READER_FORCE_REMOTE_READ: &str = /// Configuration key to prefer Flight protocol for remote shuffle reads. pub const BALLISTA_SHUFFLE_READER_REMOTE_PREFER_FLIGHT: &str = "ballista.shuffle.remote_read_prefer_flight"; +/// Shuffle format configuration: "arrow_ipc" or "vortex" +pub const BALLISTA_SHUFFLE_FORMAT: &str = "ballista.shuffle.format"; /// Configuration key for gRPC client connection timeout in seconds. pub const BALLISTA_GRPC_CLIENT_CONNECT_TIMEOUT_SECONDS: &str = @@ -100,7 +103,11 @@ static CONFIG_ENTRIES: LazyLock> = LazyLock::new(|| ConfigEntry::new(BALLISTA_GRPC_CLIENT_HTTP2_KEEPALIVE_INTERVAL_SECONDS.to_string(), "HTTP/2 keep-alive interval for gRPC client in seconds".to_string(), DataType::UInt64, - Some((300).to_string())) + Some((300).to_string())), + ConfigEntry::new(BALLISTA_SHUFFLE_FORMAT.to_string(), + "Shuffle data format: 'arrow_ipc' (default) or 'vortex'. Vortex requires the 'vortex' feature to be enabled.".to_string(), + DataType::Utf8, + Some(ShuffleFormat::default().to_string())) ]; entries .into_iter() @@ -108,6 +115,42 @@ static CONFIG_ENTRIES: LazyLock> = LazyLock::new(|| .collect::>() }); +/// Shuffle data format for intermediate shuffle files +#[derive( + Clone, Copy, Debug, PartialEq, Eq, Default, serde::Deserialize, serde::Serialize, +)] +pub enum ShuffleFormat { + /// Arrow IPC format (default, always available) + #[default] + ArrowIpc, + /// Vortex columnar format (requires 'vortex' feature) + Vortex, +} + +impl Display for ShuffleFormat { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ShuffleFormat::ArrowIpc => f.write_str("arrow_ipc"), + ShuffleFormat::Vortex => f.write_str("vortex"), + } + } +} + +impl FromStr for ShuffleFormat { + type Err = String; + + fn from_str(s: &str) -> std::result::Result { + match s.to_lowercase().as_str() { + "arrow_ipc" | "arrow-ipc" | "arrowips" | "ipc" => Ok(ShuffleFormat::ArrowIpc), + "vortex" => Ok(ShuffleFormat::Vortex), + _ => Err(format!( + "Invalid shuffle format '{}'. Valid options are: 'arrow_ipc', 'vortex'", + s + )), + } + } +} + /// Configuration option meta-data #[derive(Debug, Clone)] pub struct ConfigEntry { @@ -264,6 +307,17 @@ impl BallistaConfig { self.get_bool_setting(BALLISTA_SHUFFLE_READER_REMOTE_PREFER_FLIGHT) } + /// Returns the configured shuffle format (ArrowIpc or Vortex) + /// + /// Note: Vortex format requires the 'vortex' feature to be enabled. + /// If Vortex is configured but the feature is not enabled, this will + /// still return Vortex, but the shuffle operations will fail at runtime. + pub fn shuffle_format(&self) -> ShuffleFormat { + self.get_string_setting(BALLISTA_SHUFFLE_FORMAT) + .parse() + .unwrap_or_default() + } + fn get_usize_setting(&self, key: &str) -> usize { if let Some(v) = self.settings.get(key) { // infallible because we validate all configs in the constructor diff --git a/ballista/core/src/execution_plans/mod.rs b/ballista/core/src/execution_plans/mod.rs index 7a5e105c6c..b431c0a1f6 100644 --- a/ballista/core/src/execution_plans/mod.rs +++ b/ballista/core/src/execution_plans/mod.rs @@ -23,7 +23,16 @@ mod shuffle_reader; mod shuffle_writer; mod unresolved_shuffle; +#[cfg(feature = "vortex")] +pub mod vortex_shuffle; + pub use distributed_query::DistributedQueryExec; pub use shuffle_reader::ShuffleReaderExec; pub use shuffle_writer::ShuffleWriterExec; pub use unresolved_shuffle::UnresolvedShuffleExec; + +#[cfg(feature = "vortex")] +pub use vortex_shuffle::{ + LocalVortexShuffleStream, VortexWriteTracker, vortex_file_extension, + write_stream_to_disk_vortex, +}; diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs b/ballista/core/src/execution_plans/shuffle_reader.rs index b7d734d364..aac8e17c46 100644 --- a/ballista/core/src/execution_plans/shuffle_reader.rs +++ b/ballista/core/src/execution_plans/shuffle_reader.rs @@ -613,19 +613,48 @@ async fn fetch_partition_local( let metadata = &location.executor_meta; let partition_id = &location.partition_id; - let reader = fetch_partition_local_inner(path).map_err(|e| { - // return BallistaError::FetchFailed may let scheduler retry this task. - BallistaError::FetchFailed( - metadata.id.clone(), - partition_id.stage_id, - partition_id.partition_id, - e.to_string(), - ) - })?; - Ok(Box::pin(LocalShuffleStream::new(reader))) + // Detect format from file extension + let is_vortex = path.ends_with(".vortex"); + + if is_vortex { + #[cfg(feature = "vortex")] + { + // For Vortex files, we need the schema. Get it from partition stats or infer. + // For now, we'll create a stream that reads the vortex file. + // Note: Vortex IPC format is self-describing, so we can read the schema from the file. + let stream = fetch_partition_local_vortex(path).map_err(|e| { + BallistaError::FetchFailed( + metadata.id.clone(), + partition_id.stage_id, + partition_id.partition_id, + e.to_string(), + ) + })?; + Ok(stream) + } + #[cfg(not(feature = "vortex"))] + { + Err(BallistaError::General( + "Vortex format files found but 'vortex' feature is not enabled" + .to_string(), + )) + } + } else { + // Arrow IPC format + let reader = fetch_partition_local_arrow(path).map_err(|e| { + BallistaError::FetchFailed( + metadata.id.clone(), + partition_id.stage_id, + partition_id.partition_id, + e.to_string(), + ) + })?; + Ok(Box::pin(LocalShuffleStream::new(reader))) + } } -fn fetch_partition_local_inner( +/// Fetch partition from local Arrow IPC file +fn fetch_partition_local_arrow( path: &str, ) -> result::Result>, BallistaError> { let file = File::open(path).map_err(|e| { @@ -633,11 +662,30 @@ fn fetch_partition_local_inner( })?; let file = BufReader::new(file); let reader = StreamReader::try_new(file, None).map_err(|e| { - BallistaError::General(format!("Failed to new arrow FileReader at {path}: {e:?}")) + BallistaError::General(format!( + "Failed to create Arrow IPC reader at {path}: {e:?}" + )) })?; Ok(reader) } +/// Fetch partition from local Vortex file +#[cfg(feature = "vortex")] +fn fetch_partition_local_vortex( + path: &str, +) -> result::Result { + use super::vortex_shuffle::LocalVortexShuffleStream; + + // Vortex IPC format is self-describing, but we need a schema for the stream interface. + // For now, use an empty schema - the actual data schema will come from the Vortex arrays. + // TODO: Consider storing schema metadata in the Vortex file or a sidecar file. + let schema = std::sync::Arc::new(datafusion::arrow::datatypes::Schema::empty()); + + // Create the stream - it handles reading and converting Vortex arrays to Arrow + let stream = LocalVortexShuffleStream::try_new(path, schema)?; + Ok(Box::pin(stream)) +} + async fn fetch_partition_object_store( _location: &PartitionLocation, ) -> result::Result { @@ -968,7 +1016,7 @@ mod tests { // from to input partitions test the first one with two batches let file_path = path.value(0); - let reader = fetch_partition_local_inner(file_path).unwrap(); + let reader = fetch_partition_local_arrow(file_path).unwrap(); let mut stream: Pin> = async { Box::pin(LocalShuffleStream::new(reader)) }.await; diff --git a/ballista/core/src/execution_plans/shuffle_writer.rs b/ballista/core/src/execution_plans/shuffle_writer.rs index 114fc66437..ec281da65e 100644 --- a/ballista/core/src/execution_plans/shuffle_writer.rs +++ b/ballista/core/src/execution_plans/shuffle_writer.rs @@ -17,8 +17,9 @@ //! ShuffleWriterExec represents a section of a query plan that has consistent partitioning and //! can be executed as one unit with each partition being executed in parallel. The output of each -//! partition is re-partitioned and streamed to disk in Arrow IPC format. Future stages of the query -//! will use the ShuffleReaderExec to read these results. +//! partition is re-partitioned and streamed to disk in Arrow IPC format (default) or Vortex format. +//! The shuffle format is configurable. Future stages of the query will use the ShuffleReaderExec +//! to read these results. use datafusion::arrow::ipc::CompressionType; use datafusion::arrow::ipc::writer::IpcWriteOptions; @@ -34,6 +35,8 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::Instant; +use crate::config::ShuffleFormat; +use crate::extension::SessionConfigExt; use crate::utils; use crate::serde::protobuf::ShuffleWritePartition; @@ -102,10 +105,96 @@ impl std::fmt::Display for ShuffleWriterExec { } } +/// Writer for Arrow IPC format +pub struct ArrowIpcWriter { + writer: StreamWriter, +} + +impl ArrowIpcWriter { + pub fn try_new( + file: File, + schema: &datafusion::arrow::datatypes::Schema, + ) -> Result { + let options = IpcWriteOptions::default() + .try_with_compression(Some(CompressionType::LZ4_FRAME))?; + let writer = StreamWriter::try_new_with_options(file, schema, options)?; + Ok(Self { writer }) + } + + pub fn write(&mut self, batch: &RecordBatch) -> Result<()> { + self.writer.write(batch)?; + Ok(()) + } + + pub fn finish(&mut self) -> Result<()> { + self.writer.finish()?; + Ok(()) + } +} + +/// Format-agnostic shuffle writer enum +pub enum ShuffleFileWriter { + ArrowIpc(ArrowIpcWriter), + #[cfg(feature = "vortex")] + Vortex(super::vortex_shuffle::VortexWriteTracker), +} + +impl ShuffleFileWriter { + pub fn try_new_arrow_ipc( + path: PathBuf, + schema: &datafusion::arrow::datatypes::Schema, + ) -> Result { + let file = File::create(&path)?; + Ok(Self::ArrowIpc(ArrowIpcWriter::try_new(file, schema)?)) + } + + #[cfg(feature = "vortex")] + pub fn try_new_vortex( + path: PathBuf, + schema: datafusion::arrow::datatypes::SchemaRef, + ) -> Result { + let tracker = super::vortex_shuffle::VortexWriteTracker::try_new(path, schema)?; + Ok(Self::Vortex(tracker)) + } + + pub fn try_new( + path: PathBuf, + schema: datafusion::arrow::datatypes::SchemaRef, + format: ShuffleFormat, + ) -> Result { + match format { + ShuffleFormat::ArrowIpc => Self::try_new_arrow_ipc(path, schema.as_ref()), + #[cfg(feature = "vortex")] + ShuffleFormat::Vortex => Self::try_new_vortex(path, schema), + #[cfg(not(feature = "vortex"))] + ShuffleFormat::Vortex => Err(DataFusionError::NotImplemented( + "Vortex format requires the 'vortex' feature to be enabled".to_string(), + )), + } + } + + pub fn write(&mut self, batch: &RecordBatch) -> Result<()> { + match self { + Self::ArrowIpc(w) => w.write(batch), + #[cfg(feature = "vortex")] + Self::Vortex(w) => w.write(batch), + } + } + + pub fn finish(self) -> Result<()> { + match self { + Self::ArrowIpc(mut w) => w.finish(), + #[cfg(feature = "vortex")] + Self::Vortex(w) => w.finish(), + } + } +} + +/// Tracks write progress for a partition pub struct WriteTracker { pub num_batches: usize, pub num_rows: usize, - pub writer: StreamWriter, + pub writer: ShuffleFileWriter, pub path: PathBuf, } @@ -205,6 +294,10 @@ impl ShuffleWriterExec { let output_partitioning = self.shuffle_output_partitioning.clone(); let plan = self.plan.clone(); + // Get shuffle format from session config + let shuffle_format = context.session_config().ballista_shuffle_format(); + let file_ext = utils::shuffle_file_extension(shuffle_format); + async move { let now = Instant::now(); let mut stream = plan.execute(input_partition, context)?; @@ -214,15 +307,16 @@ impl ShuffleWriterExec { let timer = write_metrics.write_time.timer(); path.push(format!("{input_partition}")); std::fs::create_dir_all(&path)?; - path.push("data.arrow"); + path.push(format!("data.{file_ext}")); let path = path.to_str().unwrap(); - debug!("Writing results to {path}"); + debug!("Writing results to {path} (format: {shuffle_format})"); - // stream results to disk - let stats = utils::write_stream_to_disk( + // stream results to disk using configured format + let stats = utils::write_stream_to_disk_with_format( &mut stream, path, &write_metrics.write_time, + shuffle_format, ) .await .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?; @@ -264,6 +358,8 @@ impl ShuffleWriterExec { write_metrics.repart_time.clone(), )?; + let schema = stream.schema(); + while let Some(result) = stream.next().await { let input_batch = result?; @@ -281,34 +377,27 @@ impl ShuffleWriterExec { w.writer.write(&output_batch)?; } None => { - let mut path = path.clone(); - path.push(format!("{output_partition}")); - std::fs::create_dir_all(&path)?; + let mut file_path = path.clone(); + file_path.push(format!("{output_partition}")); + std::fs::create_dir_all(&file_path)?; - path.push(format!( - "data-{input_partition}.arrow" + file_path.push(format!( + "data-{input_partition}.{file_ext}" )); - debug!("Writing results to {path:?}"); - - let options = IpcWriteOptions::default() - .try_with_compression(Some( - CompressionType::LZ4_FRAME, - ))?; + debug!("Writing results to {file_path:?} (format: {shuffle_format})"); - let file = File::create(path.clone())?; - let mut writer = - StreamWriter::try_new_with_options( - file, - stream.schema().as_ref(), - options, - )?; + let mut writer = ShuffleFileWriter::try_new( + file_path.clone(), + schema.clone(), + shuffle_format, + )?; writer.write(&output_batch)?; writers[output_partition] = Some(WriteTracker { num_batches: 1, num_rows: output_batch.num_rows(), writer, - path, + path: file_path, }); } } @@ -321,7 +410,7 @@ impl ShuffleWriterExec { let mut part_locs = vec![]; - for (i, w) in writers.iter_mut().enumerate() { + for (i, w) in writers.into_iter().enumerate() { if let Some(w) = w { let num_bytes = fs::metadata(&w.path)?.len(); w.writer.finish()?; diff --git a/ballista/core/src/execution_plans/vortex_shuffle.rs b/ballista/core/src/execution_plans/vortex_shuffle.rs new file mode 100644 index 0000000000..32ae0872d6 --- /dev/null +++ b/ballista/core/src/execution_plans/vortex_shuffle.rs @@ -0,0 +1,350 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Vortex format support for shuffle operations. +//! +//! This module provides Vortex-based serialization for shuffle data, +//! offering an alternative to Arrow IPC format with potentially better +//! compression and performance characteristics. +//! +//! Vortex IPC format is used for streaming data between processes. + +use std::fs::File; +use std::io::{BufReader, BufWriter, Cursor, Read, Write}; +use std::path::PathBuf; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use datafusion::arrow::datatypes::SchemaRef; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::error::Result; +use datafusion::physical_plan::RecordBatchStream; +use futures::Stream; +use log::debug; + +use vortex_array::ArrayRef; +use vortex_array::arrow::FromArrowArray; +use vortex_array::arrow::IntoArrowArray; +use vortex_array::iter::ArrayIteratorAdapter; +use vortex_array::session::ArraySession; +use vortex_error::VortexResult; +use vortex_ipc::iterator::{ArrayIteratorIPC, SyncIPCReader}; + +use crate::error::BallistaError; +use crate::serde::scheduler::PartitionStats; + +/// Writer for Vortex format shuffle data +pub struct VortexWriteTracker { + /// Number of record batches written + pub num_batches: usize, + /// Total number of rows written + pub num_rows: usize, + /// Path to the output file + pub path: PathBuf, + file: BufWriter, + #[allow(dead_code)] // May be needed for schema validation in the future + schema: SchemaRef, + buffer: Vec, +} + +impl VortexWriteTracker { + /// Create a new Vortex writer for the given path + pub fn try_new(path: PathBuf, schema: SchemaRef) -> Result { + let file = File::create(&path)?; + let writer = BufWriter::new(file); + + Ok(Self { + num_batches: 0, + num_rows: 0, + path, + file: writer, + schema, + buffer: Vec::new(), + }) + } + + /// Write a record batch to the buffer + pub fn write(&mut self, batch: &RecordBatch) -> Result<()> { + // Convert Arrow RecordBatch to Vortex Array + let vortex_array = ArrayRef::from_arrow(batch, false); + + self.buffer.push(vortex_array); + self.num_batches += 1; + self.num_rows += batch.num_rows(); + Ok(()) + } + + /// Finish writing and close the file + pub fn finish(mut self) -> Result<()> { + // Write all buffered arrays using IPC format + if !self.buffer.is_empty() { + // Get the dtype from the first array + let dtype = self.buffer[0].dtype().clone(); + + // Create an ArrayIterator from the buffer + let iter = self + .buffer + .into_iter() + .map(|a| Ok(a) as VortexResult); + let array_iter = ArrayIteratorAdapter::new(dtype, iter); + + // Convert to IPC bytes + let ipc_data = array_iter + .into_ipc() + .collect_to_buffer() + .map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?; + + self.file.write_all(ipc_data.as_ref()).map_err(|e| { + datafusion::error::DataFusionError::Execution(format!( + "Failed to write Vortex IPC data: {e}" + )) + })?; + } + + self.file.flush().map_err(|e| { + datafusion::error::DataFusionError::Execution(format!( + "Failed to flush Vortex file: {e}" + )) + })?; + + Ok(()) + } +} + +/// Stream for reading Vortex shuffle files locally +pub struct LocalVortexShuffleStream { + arrays: std::vec::IntoIter, + schema: SchemaRef, +} + +impl LocalVortexShuffleStream { + /// Create a new stream from a Vortex file path + pub fn try_new( + path: &str, + schema: SchemaRef, + ) -> std::result::Result { + let file = File::open(path).map_err(|e| { + BallistaError::General(format!( + "Failed to open Vortex partition file at {path}: {e:?}" + )) + })?; + + let mut buf_reader = BufReader::new(file); + let mut data = Vec::new(); + buf_reader.read_to_end(&mut data).map_err(|e| { + BallistaError::General(format!("Failed to read Vortex file at {path}: {e:?}")) + })?; + + // Create default registry with all canonical encodings + let session = ArraySession::default(); + let registry = session.registry().clone(); + + // Read IPC data + let cursor = Cursor::new(data); + let reader = SyncIPCReader::try_new(cursor, registry).map_err(|e| { + BallistaError::General(format!( + "Failed to create Vortex IPC reader at {path}: {e:?}" + )) + })?; + + let arrays: Vec = reader + .map(|r| { + r.map_err(|e| { + BallistaError::General(format!("Failed to read array: {e:?}")) + }) + }) + .collect::, _>>()?; + + Ok(Self { + arrays: arrays.into_iter(), + schema, + }) + } +} + +impl Stream for LocalVortexShuffleStream { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + match self.arrays.next() { + Some(array) => { + // Convert Vortex array back to Arrow + let arrow_array = array.into_arrow_preferred().map_err(|e| { + datafusion::error::DataFusionError::External(Box::new(e)) + })?; + + // The arrow_array should be a StructArray since we converted from RecordBatch + let struct_array = arrow_array + .as_any() + .downcast_ref::( + ); + + match struct_array { + Some(sa) => { + let batch = RecordBatch::from(sa); + Poll::Ready(Some(Ok(batch))) + } + None => Poll::Ready(Some(Err( + datafusion::error::DataFusionError::Internal( + "Expected StructArray from Vortex".to_string(), + ), + ))), + } + } + None => Poll::Ready(None), + } + } +} + +impl RecordBatchStream for LocalVortexShuffleStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +/// Write a stream to disk in Vortex IPC format +pub async fn write_stream_to_disk_vortex( + stream: &mut Pin>, + path: &str, + disk_write_metric: &datafusion::physical_plan::metrics::Time, +) -> std::result::Result { + use futures::StreamExt; + + let file = File::create(path).map_err(|e| { + log::error!("Failed to create Vortex partition file at {path}: {e:?}"); + BallistaError::IoError(e) + })?; + + let mut num_rows = 0; + let mut num_batches = 0; + let mut num_bytes = 0; + let mut arrays: Vec = Vec::new(); + + while let Some(result) = stream.next().await { + let batch = result?; + + let batch_size_bytes: usize = batch.get_array_memory_size(); + num_batches += 1; + num_rows += batch.num_rows(); + num_bytes += batch_size_bytes; + + // Convert Arrow RecordBatch to Vortex Array + let vortex_array = ArrayRef::from_arrow(&batch, false); + arrays.push(vortex_array); + } + + // Write all arrays using IPC format + let timer = disk_write_metric.timer(); + let mut writer = BufWriter::new(file); + + if !arrays.is_empty() { + // Get the dtype from the first array + let dtype = arrays[0].dtype().clone(); + + // Create an ArrayIterator from the buffer + let iter = arrays.into_iter().map(|a| Ok(a) as VortexResult); + let array_iter = ArrayIteratorAdapter::new(dtype, iter); + + // Convert to IPC bytes + let ipc_data = array_iter.into_ipc().collect_to_buffer().map_err(|e| { + BallistaError::General(format!("Failed to write Vortex IPC: {e}")) + })?; + + writer.write_all(ipc_data.as_ref()).map_err(|e| { + BallistaError::General(format!("Failed to write to file: {e}")) + })?; + } + + writer.flush().map_err(|e| { + BallistaError::General(format!("Failed to flush Vortex file: {e}")) + })?; + timer.done(); + + debug!( + "Wrote Vortex shuffle file to {}: {} rows, {} batches, {} bytes", + path, num_rows, num_batches, num_bytes + ); + + Ok(PartitionStats::new( + Some(num_rows as u64), + Some(num_batches), + Some(num_bytes as u64), + )) +} + +/// Get the file extension for Vortex files +pub fn vortex_file_extension() -> &'static str { + "vortex" +} + +#[cfg(test)] +mod tests { + use super::*; + use datafusion::arrow::array::{Int32Array, StringArray}; + use datafusion::arrow::datatypes::{DataType, Field, Schema}; + use std::sync::Arc; + use tempfile::TempDir; + + fn create_test_batch() -> RecordBatch { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, false), + ])); + + RecordBatch::try_new( + schema, + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["a", "b", "c"])), + ], + ) + .unwrap() + } + + #[tokio::test] + async fn test_vortex_write_read_roundtrip() { + let work_dir = TempDir::new().unwrap(); + let path = work_dir.path().join("test.vortex"); + + let batch = create_test_batch(); + let schema = batch.schema(); + + // Write + { + let mut writer = + VortexWriteTracker::try_new(path.clone(), schema.clone()).unwrap(); + writer.write(&batch).unwrap(); + writer.finish().unwrap(); + } + + // Read + { + let stream = + LocalVortexShuffleStream::try_new(path.to_str().unwrap(), schema) + .unwrap(); + let mut pinned = Box::pin(stream); + + use futures::StreamExt; + let result = pinned.next().await.unwrap().unwrap(); + assert_eq!(result.num_rows(), 3); + assert_eq!(result.num_columns(), 2); + } + } +} diff --git a/ballista/core/src/extension.rs b/ballista/core/src/extension.rs index af91dc7962..05b33a7de6 100644 --- a/ballista/core/src/extension.rs +++ b/ballista/core/src/extension.rs @@ -16,10 +16,10 @@ // under the License. use crate::config::{ - BALLISTA_GRPC_CLIENT_MAX_MESSAGE_SIZE, BALLISTA_JOB_NAME, + BALLISTA_GRPC_CLIENT_MAX_MESSAGE_SIZE, BALLISTA_JOB_NAME, BALLISTA_SHUFFLE_FORMAT, BALLISTA_SHUFFLE_READER_FORCE_REMOTE_READ, BALLISTA_SHUFFLE_READER_MAX_REQUESTS, BALLISTA_SHUFFLE_READER_REMOTE_PREFER_FLIGHT, BALLISTA_STANDALONE_PARALLELISM, - BallistaConfig, + BallistaConfig, ShuffleFormat, }; use crate::planner::BallistaQueryPlanner; use crate::serde::protobuf::KeyValuePair; @@ -178,6 +178,18 @@ pub trait SessionConfigExt { /// Get whether to use TLS for executor connections fn ballista_use_tls(&self) -> bool; + /// Get the shuffle format (ArrowIpc or Vortex) + /// + /// Note: Vortex format requires the 'vortex' feature to be enabled. + fn ballista_shuffle_format(&self) -> ShuffleFormat; + + /// Set the shuffle format for intermediate shuffle data + /// + /// Available formats: + /// - `ShuffleFormat::ArrowIpc` (default) - Standard Arrow IPC format + /// - `ShuffleFormat::Vortex` - Vortex columnar format (requires 'vortex' feature) + fn with_ballista_shuffle_format(self, format: ShuffleFormat) -> Self; + /// Set a callback for recording shuffle read metrics (local vs remote). /// /// This callback will be invoked by the shuffle reader during execution @@ -490,6 +502,23 @@ impl SessionConfigExt for SessionConfig { .unwrap_or(false) } + fn ballista_shuffle_format(&self) -> ShuffleFormat { + self.options() + .extensions + .get::() + .map(|c| c.shuffle_format()) + .unwrap_or_else(|| BallistaConfig::default().shuffle_format()) + } + + fn with_ballista_shuffle_format(self, format: ShuffleFormat) -> Self { + if self.options().extensions.get::().is_some() { + self.set_str(BALLISTA_SHUFFLE_FORMAT, &format.to_string()) + } else { + self.with_option_extension(BallistaConfig::default()) + .set_str(BALLISTA_SHUFFLE_FORMAT, &format.to_string()) + } + } + fn with_ballista_shuffle_read_metrics_callback( self, callback: Arc, diff --git a/ballista/core/src/lib.rs b/ballista/core/src/lib.rs index 80d22f5eb0..58e6d80a16 100644 --- a/ballista/core/src/lib.rs +++ b/ballista/core/src/lib.rs @@ -52,12 +52,10 @@ pub mod object_store; pub mod planner; /// Runtime registry for codec and function registration. pub mod registry; +/// Remote catalog for distributed function and table registration. +pub mod remote_catalog; /// Serialization and deserialization for Ballista messages and plans. pub mod serde; - -/// Remote catalog serialization and stub providers for Ballista clients. -pub mod remote_catalog; - /// General utility functions for Ballista operations. pub mod utils; diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs index 861dd72e7a..d89dc6c5c5 100644 --- a/ballista/core/src/utils.rs +++ b/ballista/core/src/utils.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::config::BallistaConfig; +use crate::config::{BallistaConfig, ShuffleFormat}; use crate::error::{BallistaError, Result}; use crate::extension::SessionConfigExt; use crate::serde::scheduler::PartitionStats; @@ -196,6 +196,39 @@ pub async fn collect_stream( Ok(batches) } +/// Write stream to disk using the specified shuffle format +/// +/// This function dispatches to the appropriate writer based on the format: +/// - ArrowIpc: Uses Arrow IPC streaming format with LZ4 compression +/// - Vortex: Uses Vortex columnar format (requires 'vortex' feature) +pub async fn write_stream_to_disk_with_format( + stream: &mut Pin>, + path: &str, + disk_write_metric: &metrics::Time, + format: ShuffleFormat, +) -> Result { + match format { + ShuffleFormat::ArrowIpc => write_stream_to_disk(stream, path, disk_write_metric).await, + #[cfg(feature = "vortex")] + ShuffleFormat::Vortex => { + crate::execution_plans::write_stream_to_disk_vortex(stream, path, disk_write_metric) + .await + } + #[cfg(not(feature = "vortex"))] + ShuffleFormat::Vortex => Err(BallistaError::General( + "Vortex format is not available. Enable the 'vortex' feature to use Vortex shuffle format.".to_string(), + )), + } +} + +/// Get the file extension for the given shuffle format +pub fn shuffle_file_extension(format: ShuffleFormat) -> &'static str { + match format { + ShuffleFormat::ArrowIpc => "arrow", + ShuffleFormat::Vortex => "vortex", + } +} + /// Creates a gRPC client connection with the specified configuration. pub async fn create_grpc_client_connection( dst: D, diff --git a/ballista/executor/Cargo.toml b/ballista/executor/Cargo.toml index fb692cbf51..41ab99b904 100644 --- a/ballista/executor/Cargo.toml +++ b/ballista/executor/Cargo.toml @@ -35,6 +35,7 @@ required-features = ["build-binary"] [features] build-binary = ["clap", "tracing-subscriber", "tracing-appender", "tracing", "ballista-core/build-binary"] default = ["build-binary", "mimalloc"] +vortex = ["ballista-core/vortex", "vortex-array", "vortex-ipc"] [dependencies] arrow = { workspace = true } @@ -63,6 +64,10 @@ tracing-appender = { workspace = true, optional = true } tracing-subscriber = { workspace = true, optional = true } uuid = { workspace = true } +# Vortex columnar format dependencies (optional) +vortex-array = { workspace = true, optional = true } +vortex-ipc = { workspace = true, optional = true } + [dev-dependencies] [build-dependencies] diff --git a/ballista/executor/src/flight_service.rs b/ballista/executor/src/flight_service.rs index 635ec6997b..73f9128e67 100644 --- a/ballista/executor/src/flight_service.rs +++ b/ballista/executor/src/flight_service.rs @@ -17,9 +17,11 @@ //! Implementation of the Apache Arrow Flight protocol that wraps an executor. +use datafusion::arrow::datatypes::SchemaRef; use datafusion::arrow::ipc::reader::StreamReader; use std::convert::TryFrom; use std::fs::File; +use std::path::Path; use std::pin::Pin; use tokio_util::io::ReaderStream; @@ -96,25 +98,35 @@ impl FlightService for BallistaFlightService { match &action { BallistaAction::FetchPartition { path, .. } => { - debug!("FetchPartition reading {path}"); - let file = File::open(path) - .map_err(|e| { - BallistaError::General(format!( - "Failed to open partition file at {path}: {e:?}" - )) - }) - .map_err(|e| from_ballista_err(&e))?; - let file = BufReader::new(file); - let reader = - StreamReader::try_new(file, None).map_err(|e| from_arrow_err(&e))?; - - let (tx, rx) = channel(2); - let schema = reader.schema(); - task::spawn_blocking(move || { - if let Err(e) = read_partition(reader, tx) { - log::warn!("error streaming shuffle partition: {e}"); + // Detect shuffle format based on file extension + let is_vortex = Path::new(path) + .extension() + .map(|ext| ext == "vortex") + .unwrap_or(false); + + let format = if is_vortex { "vortex" } else { "arrow-ipc" }; + debug!("FetchPartition reading {path} (format: {format})"); + + // Detect shuffle format based on file extension + let is_vortex = Path::new(path) + .extension() + .map(|ext| ext == "vortex") + .unwrap_or(false); + + let (schema, rx) = if is_vortex { + #[cfg(feature = "vortex")] + { + read_vortex_partition(path)? + } + #[cfg(not(feature = "vortex"))] + { + return Err(Status::unimplemented( + "Vortex format is not available. Enable the 'vortex' feature.", + )); } - }); + } else { + read_arrow_ipc_partition(path)? + }; let write_options: IpcWriteOptions = IpcWriteOptions::default() .try_with_compression(Some(CompressionType::LZ4_FRAME)) @@ -274,7 +286,157 @@ impl FlightService for BallistaFlightService { } } -fn read_partition( +/// Read an Arrow IPC partition file and return the schema and a receiver for record batches +fn read_arrow_ipc_partition( + path: &str, +) -> Result< + ( + SchemaRef, + tokio::sync::mpsc::Receiver>, + ), + Status, +> { + let file = File::open(path) + .map_err(|e| { + BallistaError::General(format!( + "Failed to open partition file at {path}: {e:?}" + )) + }) + .map_err(|e| from_ballista_err(&e))?; + let file = BufReader::new(file); + let reader = StreamReader::try_new(file, None).map_err(|e| from_arrow_err(&e))?; + + let (tx, rx) = channel(2); + let schema = reader.schema(); + task::spawn_blocking(move || { + if let Err(e) = read_arrow_ipc_batches(reader, tx) { + log::warn!("error streaming Arrow IPC shuffle partition: {e}"); + } + }); + + Ok((schema, rx)) +} + +/// Read Vortex partition file and return the schema and a receiver for record batches +#[cfg(feature = "vortex")] +fn read_vortex_partition( + path: &str, +) -> Result< + ( + SchemaRef, + tokio::sync::mpsc::Receiver>, + ), + Status, +> { + use std::io::Cursor; + use std::sync::Arc; + use vortex_array::ArrayRef; + use vortex_array::iter::ArrayIterator; + use vortex_array::session::ArraySession; + use vortex_ipc::iterator::SyncIPCReader; + + let file = File::open(path) + .map_err(|e| { + BallistaError::General(format!( + "Failed to open Vortex partition file at {path}: {e:?}" + )) + }) + .map_err(|e| from_ballista_err(&e))?; + + let mut buf_reader = BufReader::new(file); + let mut data = Vec::new(); + std::io::Read::read_to_end(&mut buf_reader, &mut data).map_err(|e| { + from_ballista_err(&BallistaError::General(format!( + "Failed to read Vortex file at {path}: {e:?}" + ))) + })?; + + // Create default registry with all canonical encodings + let session = ArraySession::default(); + let registry = session.registry().clone(); + + // Read IPC data + let cursor = Cursor::new(data); + let reader = SyncIPCReader::try_new(cursor, registry).map_err(|e| { + from_ballista_err(&BallistaError::General(format!( + "Failed to create Vortex IPC reader at {path}: {e:?}" + ))) + })?; + + // Get schema from IPC header via ArrayIterator::dtype() method + // This is stored in the Vortex IPC format header, not inferred from data + let dtype = reader.dtype().clone(); + let arrow_schema = dtype.to_arrow_schema().map_err(|e| { + from_ballista_err(&BallistaError::General(format!( + "Failed to convert Vortex DType to Arrow schema: {e:?}" + ))) + })?; + let schema = Arc::new(arrow_schema); + + let arrays: Vec = reader + .map(|r| { + r.map_err(|e| { + from_ballista_err(&BallistaError::General(format!( + "Failed to read Vortex array: {e:?}" + ))) + }) + }) + .collect::, _>>()?; + + let (tx, rx) = channel(2); + task::spawn_blocking(move || { + if let Err(e) = read_vortex_batches(arrays, tx) { + log::warn!("error streaming Vortex shuffle partition: {e}"); + } + }); + + Ok((schema, rx)) +} + +/// Read Vortex arrays and send them as record batches +#[cfg(feature = "vortex")] +fn read_vortex_batches( + arrays: Vec, + tx: Sender>, +) -> Result<(), FlightError> { + use vortex_array::arrow::IntoArrowArray; + + if tx.is_closed() { + return Err(FlightError::Tonic(Box::new(Status::internal( + "Can't send a batch, channel is closed", + )))); + } + + for array in arrays { + let arrow_array = array + .into_arrow_preferred() + .map_err(|e| FlightError::Arrow(ArrowError::ExternalError(Box::new(e))))?; + + let struct_array = arrow_array + .as_any() + .downcast_ref::() + .ok_or_else(|| { + FlightError::Arrow(ArrowError::InvalidArgumentError( + "Expected StructArray from Vortex".to_string(), + )) + })?; + + let batch = RecordBatch::from(struct_array); + + tx.blocking_send(Ok(batch)).map_err(|err| { + if let SendError(Err(err)) = err { + err + } else { + FlightError::Tonic(Box::new(Status::internal(format!( + "Can't send a batch, something went wrong: {err:?}" + )))) + } + })?; + } + Ok(()) +} + +fn read_arrow_ipc_batches( reader: StreamReader>, tx: Sender>, ) -> Result<(), FlightError> diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml index d29c7c5cc1..883b211d13 100644 --- a/ballista/scheduler/Cargo.toml +++ b/ballista/scheduler/Cargo.toml @@ -69,11 +69,15 @@ serde = { workspace = true, features = ["derive"] } tokio = { workspace = true, features = ["full"] } tokio-stream = { workspace = true, features = ["net"] } tonic = { workspace = true, features = ["router"] } +tonic-prost = { workspace = true } tracing = { workspace = true, optional = true } tracing-appender = { workspace = true, optional = true } tracing-subscriber = { workspace = true, optional = true } uuid = { workspace = true } +[build-dependencies] +tonic-prost-build = { workspace = true } + [dev-dependencies] rstest = { workspace = true } diff --git a/ballista/scheduler/build.rs b/ballista/scheduler/build.rs index ae0369dd53..5a90821250 100644 --- a/ballista/scheduler/build.rs +++ b/ballista/scheduler/build.rs @@ -20,7 +20,7 @@ fn main() -> Result<(), String> { println!("cargo:rerun-if-changed=proto/keda.proto"); #[cfg(feature = "keda-scaler")] - tonic_build::configure() + tonic_prost_build::configure() .compile_protos(&["proto/keda.proto"], &["proto"]) .map_err(|e| format!("protobuf compilation failed: {e}"))?; diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index d1401a6589..53dea05701 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -62,7 +62,7 @@ use std::{ use structopt::StructOpt; use tokio::task::JoinHandle; -#[cfg(feature = "snmalloc")] +#[cfg(all(feature = "snmalloc", not(feature = "mimalloc")))] #[global_allocator] static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;