From 3b89e69e067b9bc2b641faff2714b93571153d15 Mon Sep 17 00:00:00 2001 From: Luke Kim <80174+lukekim@users.noreply.github.com> Date: Thu, 5 Feb 2026 15:36:24 -0800 Subject: [PATCH 1/4] feat: Implement object store shuffle write support for S3 and Azure --- .../src/execution_plans/shuffle_writer.rs | 179 ++++++++++++++++++ ballista/core/src/shuffle_storage.rs | 143 ++++++++++++++ 2 files changed, 322 insertions(+) diff --git a/ballista/core/src/execution_plans/shuffle_writer.rs b/ballista/core/src/execution_plans/shuffle_writer.rs index 0fc8ae577d..96b5fc251b 100644 --- a/ballista/core/src/execution_plans/shuffle_writer.rs +++ b/ballista/core/src/execution_plans/shuffle_writer.rs @@ -40,6 +40,7 @@ use crate::execution_plans::shuffle_manager::{ InMemoryShuffleManager, ShufflePartitionData, global_shuffle_manager, }; use crate::extension::SessionConfigExt; +use crate::shuffle_storage::ShuffleStorageType; use crate::utils; use crate::serde::protobuf::ShuffleWritePartition; @@ -317,6 +318,18 @@ impl ShuffleWriterExec { // Use memory mode only for intermediate stages, not for the final output stage let use_memory = memory_mode && !is_final_stage; + // Check for object store shuffle configuration + let storage_type_str = context.session_config().ballista_shuffle_storage_type(); + let storage_type: ShuffleStorageType = storage_type_str + .parse() + .unwrap_or(ShuffleStorageType::Local); + let storage_url = context.session_config().ballista_shuffle_storage_url(); + let use_object_store = !use_memory + && matches!( + storage_type, + ShuffleStorageType::S3 | ShuffleStorageType::Azure + ); + // Get shuffle format from session config let shuffle_format = context.session_config().ballista_shuffle_format(); let file_ext = utils::shuffle_file_extension(shuffle_format); @@ -338,6 +351,20 @@ impl ShuffleWriterExec { shuffle_format, ) .await + } else if use_object_store { + // Use object store (S3 or Azure) for shuffle data + Self::execute_shuffle_write_object_store( + &job_id, + stage_id, + input_partition, + &mut stream, + output_partitioning, + write_metrics, + now, + storage_type, + storage_url, + ) + .await } else { // Use disk-based shuffle storage with configurable format // This is used for: @@ -507,6 +534,158 @@ impl ShuffleWriterExec { } } + /// Executes shuffle write to an object store (S3 or Azure). + /// + /// Uses Arrow IPC format with LZ4 compression for serialization. Data is serialized + /// to an in-memory buffer and then uploaded to the object store in a single PUT request. + #[allow(clippy::too_many_arguments)] + async fn execute_shuffle_write_object_store( + job_id: &str, + stage_id: usize, + input_partition: usize, + stream: &mut std::pin::Pin< + Box, + >, + output_partitioning: Option, + write_metrics: ShuffleWriteMetrics, + now: Instant, + storage_type: ShuffleStorageType, + storage_url: Option, + ) -> Result> { + use crate::shuffle_storage::{ShuffleStorageConfig, ShuffleStorageFactory}; + + let base_url = storage_url.ok_or_else(|| { + DataFusionError::Configuration(format!( + "Shuffle storage URL must be set when using {storage_type} storage type. Set the 'ballista.shuffle.storage_url' configuration." + )) + })?; + + let config = ShuffleStorageConfig::from_type_and_url(storage_type, &base_url) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + let storage = ShuffleStorageFactory::create(&config) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + let schema = stream.schema(); + + match output_partitioning { + None => { + // No repartitioning — collect all batches and write them as a single partition + let mut batches = Vec::new(); + while let Some(result) = stream.next().await { + let batch = result?; + write_metrics.input_rows.add(batch.num_rows()); + write_metrics.output_rows.add(batch.num_rows()); + batches.push(batch); + } + + let (path, stats) = storage + .write_shuffle_data( + job_id, + stage_id, + input_partition, + input_partition, + batches, + schema, + &write_metrics.write_time, + ) + .await + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + info!( + "Executed partition {} to object store in {} seconds. Statistics: {}", + input_partition, + now.elapsed().as_secs(), + stats + ); + + Ok(vec![ShuffleWritePartition { + partition_id: input_partition as u64, + path, + num_batches: stats.num_batches.unwrap_or(0), + num_rows: stats.num_rows.unwrap_or(0), + num_bytes: stats.num_bytes.unwrap_or(0), + }]) + } + + Some(Partitioning::Hash(exprs, num_output_partitions)) => { + // Hash-repartition: collect batches per output partition, then upload each + let mut partition_batches: Vec, usize, usize)>> = + (0..num_output_partitions).map(|_| None).collect(); + + let mut partitioner = BatchPartitioner::try_new( + Partitioning::Hash(exprs, num_output_partitions), + write_metrics.repart_time.clone(), + )?; + + while let Some(result) = stream.next().await { + let input_batch = result?; + write_metrics.input_rows.add(input_batch.num_rows()); + + partitioner.partition( + input_batch, + |output_partition, output_batch| { + let timer = write_metrics.write_time.timer(); + let batch_rows = output_batch.num_rows(); + match &mut partition_batches[output_partition] { + Some((batches, num_batches, num_rows)) => { + *num_batches += 1; + *num_rows += batch_rows; + batches.push(output_batch); + } + None => { + partition_batches[output_partition] = + Some((vec![output_batch], 1, batch_rows)); + } + } + write_metrics.output_rows.add(batch_rows); + timer.done(); + Ok(()) + }, + )?; + } + + let mut part_locs = Vec::new(); + + for (output_partition, entry) in + partition_batches.into_iter().enumerate() + { + if let Some((batches, _num_batches, _num_rows)) = entry { + let (path, stats) = storage + .write_shuffle_data( + job_id, + stage_id, + output_partition, + input_partition, + batches, + schema.clone(), + &write_metrics.write_time, + ) + .await + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + debug!( + "Finished writing shuffle partition {} to object store. Stats: {}.", + output_partition, stats + ); + + part_locs.push(ShuffleWritePartition { + partition_id: output_partition as u64, + path, + num_batches: stats.num_batches.unwrap_or(0), + num_rows: stats.num_rows.unwrap_or(0), + num_bytes: stats.num_bytes.unwrap_or(0), + }); + } + } + Ok(part_locs) + } + + _ => Err(DataFusionError::Execution( + "Invalid shuffle partitioning scheme".to_owned(), + )), + } + } + /// Executes shuffle write to in-memory storage. #[allow(clippy::too_many_arguments)] async fn execute_shuffle_write_memory( diff --git a/ballista/core/src/shuffle_storage.rs b/ballista/core/src/shuffle_storage.rs index dfc917e95d..68b1186796 100644 --- a/ballista/core/src/shuffle_storage.rs +++ b/ballista/core/src/shuffle_storage.rs @@ -177,6 +177,64 @@ impl ShuffleStorageConfig { ..Default::default() } } + + /// Creates a storage configuration from a storage type and URL. + /// + /// Parses the URL to extract backend-specific fields (bucket, account, container, prefix). + /// Credentials are resolved from environment variables by the underlying object store builders. + pub fn from_type_and_url( + storage_type: ShuffleStorageType, + url: &str, + ) -> Result { + match storage_type { + ShuffleStorageType::Local => Ok(Self::new_local(url)), + ShuffleStorageType::S3 => { + let parsed = Url::parse(url).map_err(|e| { + BallistaError::General(format!( + "Failed to parse S3 shuffle URL '{url}': {e}" + )) + })?; + let bucket = parsed.host_str().ok_or_else(|| { + BallistaError::General(format!( + "No bucket found in S3 shuffle URL '{url}'" + )) + })?; + let path = parsed.path().trim_start_matches('/'); + let prefix = if path.is_empty() { None } else { Some(path) }; + Ok(Self::new_s3(bucket, prefix, None)) + } + ShuffleStorageType::Azure => { + let parsed = Url::parse(url).map_err(|e| { + BallistaError::General(format!( + "Failed to parse Azure shuffle URL '{url}': {e}" + )) + })?; + // Azure URL format: abfs://container@account.dfs.core.windows.net/prefix + let host = parsed.host_str().ok_or_else(|| { + BallistaError::General(format!( + "No host found in Azure shuffle URL '{url}'" + )) + })?; + let account = host + .strip_suffix(".dfs.core.windows.net") + .or_else(|| host.strip_suffix(".blob.core.windows.net")) + .ok_or_else(|| { + BallistaError::General(format!( + "Cannot extract Azure account name from host '{host}' in URL '{url}'" + )) + })?; + let container = parsed.username(); + if container.is_empty() { + return Err(BallistaError::General(format!( + "No container found in Azure shuffle URL '{url}'. Expected format: abfs://container@account.dfs.core.windows.net/prefix" + ))); + } + let path = parsed.path().trim_start_matches('/'); + let prefix = if path.is_empty() { None } else { Some(path) }; + Ok(Self::new_azure(account, container, prefix)) + } + } + } } /// Trait for shuffle storage operations. @@ -764,4 +822,89 @@ mod tests { Some("abfs://mycontainer@myaccount.dfs.core.windows.net/shuffle".to_string()) ); } + + #[test] + fn test_from_type_and_url_local() { + let config = ShuffleStorageConfig::from_type_and_url( + ShuffleStorageType::Local, + "/tmp/ballista", + ) + .unwrap(); + assert_eq!(config.storage_type, ShuffleStorageType::Local); + assert_eq!(config.base_url, Some("/tmp/ballista".to_string())); + } + + #[test] + fn test_from_type_and_url_s3() { + let config = ShuffleStorageConfig::from_type_and_url( + ShuffleStorageType::S3, + "s3://my-bucket/shuffle/prefix", + ) + .unwrap(); + assert_eq!(config.storage_type, ShuffleStorageType::S3); + assert_eq!( + config.base_url, + Some("s3://my-bucket/shuffle/prefix".to_string()) + ); + assert_eq!(config.s3_config.bucket, Some("my-bucket".to_string())); + } + + #[test] + fn test_from_type_and_url_s3_no_prefix() { + let config = ShuffleStorageConfig::from_type_and_url( + ShuffleStorageType::S3, + "s3://my-bucket", + ) + .unwrap(); + assert_eq!(config.storage_type, ShuffleStorageType::S3); + assert_eq!(config.base_url, Some("s3://my-bucket".to_string())); + assert_eq!(config.s3_config.bucket, Some("my-bucket".to_string())); + } + + #[test] + fn test_from_type_and_url_azure() { + let config = ShuffleStorageConfig::from_type_and_url( + ShuffleStorageType::Azure, + "abfs://mycontainer@myaccount.dfs.core.windows.net/shuffle", + ) + .unwrap(); + assert_eq!(config.storage_type, ShuffleStorageType::Azure); + assert_eq!( + config.base_url, + Some( + "abfs://mycontainer@myaccount.dfs.core.windows.net/shuffle".to_string() + ) + ); + assert_eq!( + config.azure_config.account, + Some("myaccount".to_string()) + ); + assert_eq!( + config.azure_config.container, + Some("mycontainer".to_string()) + ); + } + + #[test] + fn test_from_type_and_url_azure_no_prefix() { + let config = ShuffleStorageConfig::from_type_and_url( + ShuffleStorageType::Azure, + "abfs://mycontainer@myaccount.dfs.core.windows.net", + ) + .unwrap(); + assert_eq!(config.storage_type, ShuffleStorageType::Azure); + assert_eq!( + config.azure_config.account, + Some("myaccount".to_string()) + ); + } + + #[test] + fn test_from_type_and_url_s3_invalid_url() { + let result = ShuffleStorageConfig::from_type_and_url( + ShuffleStorageType::S3, + "not-a-url", + ); + assert!(result.is_err()); + } } From 973c05890fbe8a275ca839086c61b9c4a9e16fe9 Mon Sep 17 00:00:00 2001 From: Luke Kim <80174+lukekim@users.noreply.github.com> Date: Thu, 5 Feb 2026 15:47:11 -0800 Subject: [PATCH 2/4] refactor: Simplify iterator usage and improve test assertions in shuffle storage --- .../src/execution_plans/shuffle_writer.rs | 3 +-- ballista/core/src/shuffle_storage.rs | 20 +++++-------------- 2 files changed, 6 insertions(+), 17 deletions(-) diff --git a/ballista/core/src/execution_plans/shuffle_writer.rs b/ballista/core/src/execution_plans/shuffle_writer.rs index 96b5fc251b..f72b37841b 100644 --- a/ballista/core/src/execution_plans/shuffle_writer.rs +++ b/ballista/core/src/execution_plans/shuffle_writer.rs @@ -646,8 +646,7 @@ impl ShuffleWriterExec { let mut part_locs = Vec::new(); - for (output_partition, entry) in - partition_batches.into_iter().enumerate() + for (output_partition, entry) in partition_batches.into_iter().enumerate() { if let Some((batches, _num_batches, _num_rows)) = entry { let (path, stats) = storage diff --git a/ballista/core/src/shuffle_storage.rs b/ballista/core/src/shuffle_storage.rs index 68b1186796..743aa00ddc 100644 --- a/ballista/core/src/shuffle_storage.rs +++ b/ballista/core/src/shuffle_storage.rs @@ -871,14 +871,9 @@ mod tests { assert_eq!(config.storage_type, ShuffleStorageType::Azure); assert_eq!( config.base_url, - Some( - "abfs://mycontainer@myaccount.dfs.core.windows.net/shuffle".to_string() - ) - ); - assert_eq!( - config.azure_config.account, - Some("myaccount".to_string()) + Some("abfs://mycontainer@myaccount.dfs.core.windows.net/shuffle".to_string()) ); + assert_eq!(config.azure_config.account, Some("myaccount".to_string())); assert_eq!( config.azure_config.container, Some("mycontainer".to_string()) @@ -893,18 +888,13 @@ mod tests { ) .unwrap(); assert_eq!(config.storage_type, ShuffleStorageType::Azure); - assert_eq!( - config.azure_config.account, - Some("myaccount".to_string()) - ); + assert_eq!(config.azure_config.account, Some("myaccount".to_string())); } #[test] fn test_from_type_and_url_s3_invalid_url() { - let result = ShuffleStorageConfig::from_type_and_url( - ShuffleStorageType::S3, - "not-a-url", - ); + let result = + ShuffleStorageConfig::from_type_and_url(ShuffleStorageType::S3, "not-a-url"); assert!(result.is_err()); } } From 54a61d7b2bf47185c73232a2ed6f803dcdb1027c Mon Sep 17 00:00:00 2001 From: Luke Kim <80174+lukekim@users.noreply.github.com> Date: Thu, 5 Feb 2026 18:35:58 -0800 Subject: [PATCH 3/4] feat: Enhance shuffle writer to support multipart uploads for Arrow IPC and Vortex formats --- .../src/execution_plans/shuffle_writer.rs | 506 +++++++++++++++--- ballista/core/src/shuffle_storage.rs | 59 +- 2 files changed, 478 insertions(+), 87 deletions(-) diff --git a/ballista/core/src/execution_plans/shuffle_writer.rs b/ballista/core/src/execution_plans/shuffle_writer.rs index f72b37841b..e86e95973b 100644 --- a/ballista/core/src/execution_plans/shuffle_writer.rs +++ b/ballista/core/src/execution_plans/shuffle_writer.rs @@ -36,6 +36,7 @@ use std::sync::Arc; use std::time::Instant; use crate::config::ShuffleFormat; +use crate::error::BallistaError; use crate::execution_plans::shuffle_manager::{ InMemoryShuffleManager, ShufflePartitionData, global_shuffle_manager, }; @@ -363,6 +364,8 @@ impl ShuffleWriterExec { now, storage_type, storage_url, + shuffle_format, + file_ext, ) .await } else { @@ -536,8 +539,11 @@ impl ShuffleWriterExec { /// Executes shuffle write to an object store (S3 or Azure). /// - /// Uses Arrow IPC format with LZ4 compression for serialization. Data is serialized - /// to an in-memory buffer and then uploaded to the object store in a single PUT request. + /// Supports Arrow IPC and Vortex shuffle formats. Arrow IPC data is streamed + /// to the object store using multipart uploads to minimize memory pressure — each + /// batch is serialized to IPC bytes and written to the upload as it arrives. + /// Vortex data is buffered in memory and serialized at the end, since the Vortex + /// IPC format requires all arrays to be available before serialization. #[allow(clippy::too_many_arguments)] async fn execute_shuffle_write_object_store( job_id: &str, @@ -551,8 +557,18 @@ impl ShuffleWriterExec { now: Instant, storage_type: ShuffleStorageType, storage_url: Option, + shuffle_format: ShuffleFormat, + file_ext: &str, ) -> Result> { - use crate::shuffle_storage::{ShuffleStorageConfig, ShuffleStorageFactory}; + use crate::shuffle_storage::{ObjectStoreShuffleStorage, ShuffleStorageConfig}; + + // Validate Vortex availability at compile time + #[cfg(not(feature = "vortex"))] + if shuffle_format == ShuffleFormat::Vortex { + return Err(DataFusionError::NotImplemented( + "Vortex format requires the 'vortex' feature to be enabled".to_string(), + )); + } let base_url = storage_url.ok_or_else(|| { DataFusionError::Configuration(format!( @@ -562,37 +578,93 @@ impl ShuffleWriterExec { let config = ShuffleStorageConfig::from_type_and_url(storage_type, &base_url) .map_err(|e| DataFusionError::External(Box::new(e)))?; - let storage = ShuffleStorageFactory::create(&config) + let storage = ObjectStoreShuffleStorage::from_config(&config) .map_err(|e| DataFusionError::External(Box::new(e)))?; let schema = stream.schema(); match output_partitioning { None => { - // No repartitioning — collect all batches and write them as a single partition - let mut batches = Vec::new(); - while let Some(result) = stream.next().await { - let batch = result?; - write_metrics.input_rows.add(batch.num_rows()); - write_metrics.output_rows.add(batch.num_rows()); - batches.push(batch); - } - - let (path, stats) = storage - .write_shuffle_data( + // No repartitioning — stream batches directly to a multipart upload + let (mut writer, full_url) = storage + .start_multipart_write( job_id, stage_id, input_partition, input_partition, - batches, - schema, - &write_metrics.write_time, + file_ext, ) .await .map_err(|e| DataFusionError::External(Box::new(e)))?; + let mut num_rows: u64 = 0; + let mut num_batches: u64 = 0; + let mut num_bytes: u64 = 0; + + // For Vortex, buffer arrays and serialize all at end + #[cfg(feature = "vortex")] + let mut vortex_buffer: Vec = Vec::new(); + + while let Some(result) = stream.next().await { + let batch = result?; + write_metrics.input_rows.add(batch.num_rows()); + write_metrics.output_rows.add(batch.num_rows()); + num_rows += batch.num_rows() as u64; + num_batches += 1; + + let timer = write_metrics.write_time.timer(); + + match shuffle_format { + ShuffleFormat::ArrowIpc => { + // Serialize each batch to IPC bytes and stream to upload + let buf = + serialize_batch_to_ipc_bytes(&batch, schema.as_ref())?; + num_bytes += buf.len() as u64; + writer.put(bytes::Bytes::from(buf)); + } + #[cfg(feature = "vortex")] + ShuffleFormat::Vortex => { + use vortex_array::arrow::FromArrowArray; + let vortex_array = + vortex_array::ArrayRef::from_arrow(&batch, false); + vortex_buffer.push(vortex_array); + } + // Non-vortex build: already returned error above + #[cfg(not(feature = "vortex"))] + _ => unreachable!(), + } + + timer.done(); + } + + // For Vortex, serialize all buffered arrays and write to the upload + #[cfg(feature = "vortex")] + if shuffle_format == ShuffleFormat::Vortex && !vortex_buffer.is_empty() { + let timer = write_metrics.write_time.timer(); + let buf = serialize_vortex_arrays_to_bytes(vortex_buffer)?; + num_bytes = buf.len() as u64; + writer.put(bytes::Bytes::from(buf)); + timer.done(); + } + + // Finalize the multipart upload + let timer = write_metrics.write_time.timer(); + writer.finish().await.map_err(|e| { + DataFusionError::External(Box::new(BallistaError::General(format!( + "Failed to complete multipart upload to {}: {:?}", + full_url, e + )))) + })?; + timer.done(); + + let stats = PartitionStats::new( + Some(num_rows), + Some(num_batches), + Some(num_bytes), + ); + info!( - "Executed partition {} to object store in {} seconds. Statistics: {}", + "Executed partition {} ({shuffle_format}) to object store in {} seconds. Statistics: {}", input_partition, now.elapsed().as_secs(), stats @@ -600,7 +672,7 @@ impl ShuffleWriterExec { Ok(vec![ShuffleWritePartition { partition_id: input_partition as u64, - path, + path: full_url, num_batches: stats.num_batches.unwrap_or(0), num_rows: stats.num_rows.unwrap_or(0), num_bytes: stats.num_bytes.unwrap_or(0), @@ -608,81 +680,306 @@ impl ShuffleWriterExec { } Some(Partitioning::Hash(exprs, num_output_partitions)) => { - // Hash-repartition: collect batches per output partition, then upload each - let mut partition_batches: Vec, usize, usize)>> = - (0..num_output_partitions).map(|_| None).collect(); + match shuffle_format { + ShuffleFormat::ArrowIpc => { + // Arrow IPC: stream serialized batches to per-partition multipart uploads + Self::execute_hash_repart_object_store_ipc( + job_id, + stage_id, + input_partition, + stream, + exprs, + num_output_partitions, + &schema, + &storage, + &write_metrics, + now, + file_ext, + ) + .await + } + #[cfg(feature = "vortex")] + ShuffleFormat::Vortex => { + // Vortex: buffer arrays per partition, serialize at end + Self::execute_hash_repart_object_store_vortex( + job_id, + stage_id, + input_partition, + stream, + exprs, + num_output_partitions, + &schema, + &storage, + &write_metrics, + now, + file_ext, + ) + .await + } + // Non-vortex build: already returned error above + #[cfg(not(feature = "vortex"))] + _ => unreachable!(), + } + } - let mut partitioner = BatchPartitioner::try_new( - Partitioning::Hash(exprs, num_output_partitions), - write_metrics.repart_time.clone(), - )?; + _ => Err(DataFusionError::Execution( + "Invalid shuffle partitioning scheme".to_owned(), + )), + } + } - while let Some(result) = stream.next().await { - let input_batch = result?; - write_metrics.input_rows.add(input_batch.num_rows()); + /// Hash-repartition to object store using Arrow IPC format. + /// + /// Maintains lazy per-partition multipart writers. Each repartitioned batch + /// is serialized to IPC bytes and streamed directly to the corresponding + /// partition's multipart upload. + #[allow(clippy::too_many_arguments)] + async fn execute_hash_repart_object_store_ipc( + job_id: &str, + stage_id: usize, + input_partition: usize, + stream: &mut std::pin::Pin< + Box, + >, + exprs: Vec>, + num_output_partitions: usize, + schema: &SchemaRef, + storage: &crate::shuffle_storage::ObjectStoreShuffleStorage, + write_metrics: &ShuffleWriteMetrics, + _now: Instant, + file_ext: &str, + ) -> Result> { + struct ObjectStoreWriteTracker { + writer: object_store::WriteMultipart, + full_url: String, + num_batches: u64, + num_rows: u64, + num_bytes: u64, + } - partitioner.partition( - input_batch, - |output_partition, output_batch| { - let timer = write_metrics.write_time.timer(); - let batch_rows = output_batch.num_rows(); - match &mut partition_batches[output_partition] { - Some((batches, num_batches, num_rows)) => { - *num_batches += 1; - *num_rows += batch_rows; - batches.push(output_batch); - } - None => { - partition_batches[output_partition] = - Some((vec![output_batch], 1, batch_rows)); - } - } - write_metrics.output_rows.add(batch_rows); - timer.done(); - Ok(()) - }, - )?; + let mut writers: Vec> = + (0..num_output_partitions).map(|_| None).collect(); + + let mut partitioner = BatchPartitioner::try_new( + Partitioning::Hash(exprs, num_output_partitions), + write_metrics.repart_time.clone(), + )?; + + // Collect serialized IPC bytes per partition in the synchronous + // partition callback, then write them to the multipart writers + // after each input batch. + // (output_partition, ipc_bytes, num_rows) + let mut pending_writes: Vec, u64)>> = Vec::new(); + + while let Some(result) = stream.next().await { + let input_batch = result?; + write_metrics.input_rows.add(input_batch.num_rows()); + + let mut batch_pending: Vec<(usize, Vec, u64)> = Vec::new(); + let schema_ref = schema.clone(); + + partitioner.partition(input_batch, |output_partition, output_batch| { + let timer = write_metrics.write_time.timer(); + let batch_rows = output_batch.num_rows() as u64; + + let buf = + serialize_batch_to_ipc_bytes(&output_batch, schema_ref.as_ref())?; + + batch_pending.push((output_partition, buf, batch_rows)); + write_metrics.output_rows.add(batch_rows as usize); + timer.done(); + Ok(()) + })?; + + pending_writes.push(batch_pending); + + // Process pending writes — start multipart uploads lazily + for batch_writes in pending_writes.drain(..) { + for (output_partition, buf, rows) in batch_writes { + let buf_len = buf.len() as u64; + + match &mut writers[output_partition] { + Some(tracker) => { + tracker.num_batches += 1; + tracker.num_rows += rows; + tracker.num_bytes += buf_len; + tracker.writer.put(bytes::Bytes::from(buf)); + } + None => { + let (writer, full_url) = storage + .start_multipart_write( + job_id, + stage_id, + output_partition, + input_partition, + file_ext, + ) + .await + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + let mut tracker = ObjectStoreWriteTracker { + writer, + full_url, + num_batches: 1, + num_rows: rows, + num_bytes: buf_len, + }; + tracker.writer.put(bytes::Bytes::from(buf)); + writers[output_partition] = Some(tracker); + } + } } + } + } - let mut part_locs = Vec::new(); - - for (output_partition, entry) in partition_batches.into_iter().enumerate() - { - if let Some((batches, _num_batches, _num_rows)) = entry { - let (path, stats) = storage - .write_shuffle_data( - job_id, - stage_id, - output_partition, - input_partition, - batches, - schema.clone(), - &write_metrics.write_time, - ) - .await - .map_err(|e| DataFusionError::External(Box::new(e)))?; + // Finalize all multipart uploads + let mut part_locs = Vec::new(); - debug!( - "Finished writing shuffle partition {} to object store. Stats: {}.", - output_partition, stats - ); + for (output_partition, writer_opt) in writers.into_iter().enumerate() { + if let Some(tracker) = writer_opt { + let timer = write_metrics.write_time.timer(); + tracker.writer.finish().await.map_err(|e| { + DataFusionError::External(Box::new(BallistaError::General(format!( + "Failed to complete multipart upload to {}: {:?}", + tracker.full_url, e + )))) + })?; + timer.done(); - part_locs.push(ShuffleWritePartition { - partition_id: output_partition as u64, - path, - num_batches: stats.num_batches.unwrap_or(0), - num_rows: stats.num_rows.unwrap_or(0), - num_bytes: stats.num_bytes.unwrap_or(0), + debug!( + "Finished writing shuffle partition {} (Arrow IPC) to object store. Batches: {}, Bytes: {}.", + output_partition, tracker.num_batches, tracker.num_bytes + ); + + part_locs.push(ShuffleWritePartition { + partition_id: output_partition as u64, + path: tracker.full_url, + num_batches: tracker.num_batches, + num_rows: tracker.num_rows, + num_bytes: tracker.num_bytes, + }); + } + } + Ok(part_locs) + } + + /// Hash-repartition to object store using Vortex format. + /// + /// Buffers Vortex arrays per output partition during repartitioning, then + /// serializes each partition's arrays to Vortex IPC bytes and uploads via + /// multipart at the end. + #[cfg(feature = "vortex")] + #[allow(clippy::too_many_arguments)] + async fn execute_hash_repart_object_store_vortex( + job_id: &str, + stage_id: usize, + input_partition: usize, + stream: &mut std::pin::Pin< + Box, + >, + exprs: Vec>, + num_output_partitions: usize, + schema: &SchemaRef, + storage: &crate::shuffle_storage::ObjectStoreShuffleStorage, + write_metrics: &ShuffleWriteMetrics, + _now: Instant, + file_ext: &str, + ) -> Result> { + use vortex_array::arrow::FromArrowArray; + + struct VortexPartitionBuffer { + arrays: Vec, + num_batches: u64, + num_rows: u64, + } + + let mut buffers: Vec> = + (0..num_output_partitions).map(|_| None).collect(); + + let mut partitioner = BatchPartitioner::try_new( + Partitioning::Hash(exprs, num_output_partitions), + write_metrics.repart_time.clone(), + )?; + + while let Some(result) = stream.next().await { + let input_batch = result?; + write_metrics.input_rows.add(input_batch.num_rows()); + + partitioner.partition(input_batch, |output_partition, output_batch| { + let timer = write_metrics.write_time.timer(); + let batch_rows = output_batch.num_rows() as u64; + + let vortex_array = + vortex_array::ArrayRef::from_arrow(&output_batch, false); + + match &mut buffers[output_partition] { + Some(buf) => { + buf.arrays.push(vortex_array); + buf.num_batches += 1; + buf.num_rows += batch_rows; + } + None => { + buffers[output_partition] = Some(VortexPartitionBuffer { + arrays: vec![vortex_array], + num_batches: 1, + num_rows: batch_rows, }); } } - Ok(part_locs) - } - _ => Err(DataFusionError::Execution( - "Invalid shuffle partitioning scheme".to_owned(), - )), + write_metrics.output_rows.add(batch_rows as usize); + timer.done(); + Ok(()) + })?; } + + // Serialize and upload each partition + let mut part_locs = Vec::new(); + + for (output_partition, buf_opt) in buffers.into_iter().enumerate() { + if let Some(partition_buf) = buf_opt { + let timer = write_metrics.write_time.timer(); + + // Serialize all arrays for this partition to Vortex IPC bytes + let ipc_bytes = serialize_vortex_arrays_to_bytes(partition_buf.arrays)?; + let num_bytes = ipc_bytes.len() as u64; + + // Start multipart upload and write all bytes + let (mut writer, full_url) = storage + .start_multipart_write( + job_id, + stage_id, + output_partition, + input_partition, + file_ext, + ) + .await + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + writer.put(bytes::Bytes::from(ipc_bytes)); + writer.finish().await.map_err(|e| { + DataFusionError::External(Box::new(BallistaError::General(format!( + "Failed to complete multipart upload to {}: {:?}", + full_url, e + )))) + })?; + timer.done(); + + debug!( + "Finished writing shuffle partition {} (Vortex) to object store. Batches: {}, Bytes: {}.", + output_partition, partition_buf.num_batches, num_bytes + ); + + part_locs.push(ShuffleWritePartition { + partition_id: output_partition as u64, + path: full_url, + num_batches: partition_buf.num_batches, + num_rows: partition_buf.num_rows, + num_bytes, + }); + } + } + Ok(part_locs) } /// Executes shuffle write to in-memory storage. @@ -1041,6 +1338,49 @@ fn result_schema() -> SchemaRef { ])) } +/// Serialize a single record batch to Arrow IPC bytes with LZ4 compression. +fn serialize_batch_to_ipc_bytes(batch: &RecordBatch, schema: &Schema) -> Result> { + let options = IpcWriteOptions::default() + .try_with_compression(Some(CompressionType::LZ4_FRAME))?; + let mut buf = Vec::new(); + { + let mut ipc_writer = StreamWriter::try_new_with_options( + std::io::Cursor::new(&mut buf), + schema, + options, + )?; + ipc_writer.write(batch)?; + ipc_writer.finish()?; + } + Ok(buf) +} + +/// Serialize buffered Vortex arrays to IPC bytes. +#[cfg(feature = "vortex")] +fn serialize_vortex_arrays_to_bytes( + arrays: Vec, +) -> Result> { + use vortex_array::iter::ArrayIteratorAdapter; + use vortex_error::VortexResult; + use vortex_ipc::iterator::ArrayIteratorIPC; + + if arrays.is_empty() { + return Ok(Vec::new()); + } + + let dtype = arrays[0].dtype().clone(); + let iter = arrays + .into_iter() + .map(|a| Ok(a) as VortexResult); + let array_iter = ArrayIteratorAdapter::new(dtype, iter); + let ipc_data = array_iter + .into_ipc() + .collect_to_buffer() + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + Ok(ipc_data.to_vec()) +} + #[cfg(test)] #[cfg(not(feature = "force_hash_collisions"))] mod tests { diff --git a/ballista/core/src/shuffle_storage.rs b/ballista/core/src/shuffle_storage.rs index 743aa00ddc..b0c9bc288d 100644 --- a/ballista/core/src/shuffle_storage.rs +++ b/ballista/core/src/shuffle_storage.rs @@ -34,7 +34,7 @@ use log::{debug, error}; use object_store::aws::AmazonS3Builder; use object_store::azure::MicrosoftAzureBuilder; use object_store::path::Path as ObjectPath; -use object_store::{ObjectStore, PutPayload}; +use object_store::{ObjectStore, PutPayload, WriteMultipart}; use std::fmt::{Debug, Display}; use std::fs::File; use std::io::{BufReader, Cursor}; @@ -503,17 +503,68 @@ impl ObjectStoreShuffleStorage { } } + /// Returns a reference to the underlying object store. + pub fn object_store(&self) -> &Arc { + &self.store + } + + /// Constructs the full URL for a shuffle partition. + pub fn make_full_url( + &self, + job_id: &str, + stage_id: usize, + partition_id: usize, + input_partition: usize, + file_ext: &str, + ) -> (String, ObjectPath) { + let relative_path = + self.make_path(job_id, stage_id, partition_id, input_partition, file_ext); + let full_url = format!("{}/{}", self.base_url, relative_path); + let object_path = ObjectPath::from(relative_path); + (full_url, object_path) + } + + /// Starts a streaming multipart upload for a shuffle partition. + /// + /// Returns a `WriteMultipart` writer and the full URL where data will be written. + /// The caller should serialize batches and write them to the returned writer, + /// then call `finish()` to complete the upload. + pub async fn start_multipart_write( + &self, + job_id: &str, + stage_id: usize, + partition_id: usize, + input_partition: usize, + file_ext: &str, + ) -> Result<(WriteMultipart, String)> { + let (full_url, object_path) = + self.make_full_url(job_id, stage_id, partition_id, input_partition, file_ext); + + debug!("Starting multipart upload to object store: {}", full_url); + + let upload = self.store.put_multipart(&object_path).await.map_err(|e| { + BallistaError::General(format!( + "Failed to start multipart upload to {}: {:?}", + full_url, e + )) + })?; + + let write = WriteMultipart::new(upload); + Ok((write, full_url)) + } + fn make_path( &self, job_id: &str, stage_id: usize, partition_id: usize, input_partition: usize, + file_ext: &str, ) -> String { let filename = if input_partition == partition_id { - "data.arrow".to_string() + format!("data.{file_ext}") } else { - format!("data-{}.arrow", input_partition) + format!("data-{input_partition}.{file_ext}") }; format!("{}/{}/{}/{}", job_id, stage_id, partition_id, filename) } @@ -532,7 +583,7 @@ impl ShuffleStorage for ObjectStoreShuffleStorage { write_metric: &metrics::Time, ) -> Result<(String, PartitionStats)> { let relative_path = - self.make_path(job_id, stage_id, partition_id, input_partition); + self.make_path(job_id, stage_id, partition_id, input_partition, "arrow"); let full_url = format!("{}/{}", self.base_url, relative_path); debug!("Writing shuffle data to object store: {}", full_url); From 69133c3a55f491b07771cf4d5106acef87f1e2f7 Mon Sep 17 00:00:00 2001 From: Luke Kim <80174+lukekim@users.noreply.github.com> Date: Thu, 5 Feb 2026 19:11:52 -0800 Subject: [PATCH 4/4] refactor: Remove unused 'now' parameter from ShuffleWriterExec methods --- ballista/core/src/execution_plans/shuffle_writer.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/ballista/core/src/execution_plans/shuffle_writer.rs b/ballista/core/src/execution_plans/shuffle_writer.rs index e86e95973b..f653c15144 100644 --- a/ballista/core/src/execution_plans/shuffle_writer.rs +++ b/ballista/core/src/execution_plans/shuffle_writer.rs @@ -693,7 +693,6 @@ impl ShuffleWriterExec { &schema, &storage, &write_metrics, - now, file_ext, ) .await @@ -711,7 +710,6 @@ impl ShuffleWriterExec { &schema, &storage, &write_metrics, - now, file_ext, ) .await @@ -746,7 +744,6 @@ impl ShuffleWriterExec { schema: &SchemaRef, storage: &crate::shuffle_storage::ObjectStoreShuffleStorage, write_metrics: &ShuffleWriteMetrics, - _now: Instant, file_ext: &str, ) -> Result> { struct ObjectStoreWriteTracker { @@ -882,7 +879,6 @@ impl ShuffleWriterExec { schema: &SchemaRef, storage: &crate::shuffle_storage::ObjectStoreShuffleStorage, write_metrics: &ShuffleWriteMetrics, - _now: Instant, file_ext: &str, ) -> Result> { use vortex_array::arrow::FromArrowArray;