Skip to content

Commit 3b89e69

Browse files
committed
feat: Implement object store shuffle write support for S3 and Azure
1 parent 20ef1eb commit 3b89e69

2 files changed

Lines changed: 322 additions & 0 deletions

File tree

ballista/core/src/execution_plans/shuffle_writer.rs

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use crate::execution_plans::shuffle_manager::{
4040
InMemoryShuffleManager, ShufflePartitionData, global_shuffle_manager,
4141
};
4242
use crate::extension::SessionConfigExt;
43+
use crate::shuffle_storage::ShuffleStorageType;
4344
use crate::utils;
4445

4546
use crate::serde::protobuf::ShuffleWritePartition;
@@ -317,6 +318,18 @@ impl ShuffleWriterExec {
317318
// Use memory mode only for intermediate stages, not for the final output stage
318319
let use_memory = memory_mode && !is_final_stage;
319320

321+
// Check for object store shuffle configuration
322+
let storage_type_str = context.session_config().ballista_shuffle_storage_type();
323+
let storage_type: ShuffleStorageType = storage_type_str
324+
.parse()
325+
.unwrap_or(ShuffleStorageType::Local);
326+
let storage_url = context.session_config().ballista_shuffle_storage_url();
327+
let use_object_store = !use_memory
328+
&& matches!(
329+
storage_type,
330+
ShuffleStorageType::S3 | ShuffleStorageType::Azure
331+
);
332+
320333
// Get shuffle format from session config
321334
let shuffle_format = context.session_config().ballista_shuffle_format();
322335
let file_ext = utils::shuffle_file_extension(shuffle_format);
@@ -338,6 +351,20 @@ impl ShuffleWriterExec {
338351
shuffle_format,
339352
)
340353
.await
354+
} else if use_object_store {
355+
// Use object store (S3 or Azure) for shuffle data
356+
Self::execute_shuffle_write_object_store(
357+
&job_id,
358+
stage_id,
359+
input_partition,
360+
&mut stream,
361+
output_partitioning,
362+
write_metrics,
363+
now,
364+
storage_type,
365+
storage_url,
366+
)
367+
.await
341368
} else {
342369
// Use disk-based shuffle storage with configurable format
343370
// This is used for:
@@ -507,6 +534,158 @@ impl ShuffleWriterExec {
507534
}
508535
}
509536

537+
/// Executes shuffle write to an object store (S3 or Azure).
538+
///
539+
/// Uses Arrow IPC format with LZ4 compression for serialization. Data is serialized
540+
/// to an in-memory buffer and then uploaded to the object store in a single PUT request.
541+
#[allow(clippy::too_many_arguments)]
542+
async fn execute_shuffle_write_object_store(
543+
job_id: &str,
544+
stage_id: usize,
545+
input_partition: usize,
546+
stream: &mut std::pin::Pin<
547+
Box<dyn datafusion::physical_plan::RecordBatchStream + Send>,
548+
>,
549+
output_partitioning: Option<Partitioning>,
550+
write_metrics: ShuffleWriteMetrics,
551+
now: Instant,
552+
storage_type: ShuffleStorageType,
553+
storage_url: Option<String>,
554+
) -> Result<Vec<ShuffleWritePartition>> {
555+
use crate::shuffle_storage::{ShuffleStorageConfig, ShuffleStorageFactory};
556+
557+
let base_url = storage_url.ok_or_else(|| {
558+
DataFusionError::Configuration(format!(
559+
"Shuffle storage URL must be set when using {storage_type} storage type. Set the 'ballista.shuffle.storage_url' configuration."
560+
))
561+
})?;
562+
563+
let config = ShuffleStorageConfig::from_type_and_url(storage_type, &base_url)
564+
.map_err(|e| DataFusionError::External(Box::new(e)))?;
565+
let storage = ShuffleStorageFactory::create(&config)
566+
.map_err(|e| DataFusionError::External(Box::new(e)))?;
567+
568+
let schema = stream.schema();
569+
570+
match output_partitioning {
571+
None => {
572+
// No repartitioning — collect all batches and write them as a single partition
573+
let mut batches = Vec::new();
574+
while let Some(result) = stream.next().await {
575+
let batch = result?;
576+
write_metrics.input_rows.add(batch.num_rows());
577+
write_metrics.output_rows.add(batch.num_rows());
578+
batches.push(batch);
579+
}
580+
581+
let (path, stats) = storage
582+
.write_shuffle_data(
583+
job_id,
584+
stage_id,
585+
input_partition,
586+
input_partition,
587+
batches,
588+
schema,
589+
&write_metrics.write_time,
590+
)
591+
.await
592+
.map_err(|e| DataFusionError::External(Box::new(e)))?;
593+
594+
info!(
595+
"Executed partition {} to object store in {} seconds. Statistics: {}",
596+
input_partition,
597+
now.elapsed().as_secs(),
598+
stats
599+
);
600+
601+
Ok(vec![ShuffleWritePartition {
602+
partition_id: input_partition as u64,
603+
path,
604+
num_batches: stats.num_batches.unwrap_or(0),
605+
num_rows: stats.num_rows.unwrap_or(0),
606+
num_bytes: stats.num_bytes.unwrap_or(0),
607+
}])
608+
}
609+
610+
Some(Partitioning::Hash(exprs, num_output_partitions)) => {
611+
// Hash-repartition: collect batches per output partition, then upload each
612+
let mut partition_batches: Vec<Option<(Vec<RecordBatch>, usize, usize)>> =
613+
(0..num_output_partitions).map(|_| None).collect();
614+
615+
let mut partitioner = BatchPartitioner::try_new(
616+
Partitioning::Hash(exprs, num_output_partitions),
617+
write_metrics.repart_time.clone(),
618+
)?;
619+
620+
while let Some(result) = stream.next().await {
621+
let input_batch = result?;
622+
write_metrics.input_rows.add(input_batch.num_rows());
623+
624+
partitioner.partition(
625+
input_batch,
626+
|output_partition, output_batch| {
627+
let timer = write_metrics.write_time.timer();
628+
let batch_rows = output_batch.num_rows();
629+
match &mut partition_batches[output_partition] {
630+
Some((batches, num_batches, num_rows)) => {
631+
*num_batches += 1;
632+
*num_rows += batch_rows;
633+
batches.push(output_batch);
634+
}
635+
None => {
636+
partition_batches[output_partition] =
637+
Some((vec![output_batch], 1, batch_rows));
638+
}
639+
}
640+
write_metrics.output_rows.add(batch_rows);
641+
timer.done();
642+
Ok(())
643+
},
644+
)?;
645+
}
646+
647+
let mut part_locs = Vec::new();
648+
649+
for (output_partition, entry) in
650+
partition_batches.into_iter().enumerate()
651+
{
652+
if let Some((batches, _num_batches, _num_rows)) = entry {
653+
let (path, stats) = storage
654+
.write_shuffle_data(
655+
job_id,
656+
stage_id,
657+
output_partition,
658+
input_partition,
659+
batches,
660+
schema.clone(),
661+
&write_metrics.write_time,
662+
)
663+
.await
664+
.map_err(|e| DataFusionError::External(Box::new(e)))?;
665+
666+
debug!(
667+
"Finished writing shuffle partition {} to object store. Stats: {}.",
668+
output_partition, stats
669+
);
670+
671+
part_locs.push(ShuffleWritePartition {
672+
partition_id: output_partition as u64,
673+
path,
674+
num_batches: stats.num_batches.unwrap_or(0),
675+
num_rows: stats.num_rows.unwrap_or(0),
676+
num_bytes: stats.num_bytes.unwrap_or(0),
677+
});
678+
}
679+
}
680+
Ok(part_locs)
681+
}
682+
683+
_ => Err(DataFusionError::Execution(
684+
"Invalid shuffle partitioning scheme".to_owned(),
685+
)),
686+
}
687+
}
688+
510689
/// Executes shuffle write to in-memory storage.
511690
#[allow(clippy::too_many_arguments)]
512691
async fn execute_shuffle_write_memory(

ballista/core/src/shuffle_storage.rs

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,64 @@ impl ShuffleStorageConfig {
177177
..Default::default()
178178
}
179179
}
180+
181+
/// Creates a storage configuration from a storage type and URL.
182+
///
183+
/// Parses the URL to extract backend-specific fields (bucket, account, container, prefix).
184+
/// Credentials are resolved from environment variables by the underlying object store builders.
185+
pub fn from_type_and_url(
186+
storage_type: ShuffleStorageType,
187+
url: &str,
188+
) -> Result<Self> {
189+
match storage_type {
190+
ShuffleStorageType::Local => Ok(Self::new_local(url)),
191+
ShuffleStorageType::S3 => {
192+
let parsed = Url::parse(url).map_err(|e| {
193+
BallistaError::General(format!(
194+
"Failed to parse S3 shuffle URL '{url}': {e}"
195+
))
196+
})?;
197+
let bucket = parsed.host_str().ok_or_else(|| {
198+
BallistaError::General(format!(
199+
"No bucket found in S3 shuffle URL '{url}'"
200+
))
201+
})?;
202+
let path = parsed.path().trim_start_matches('/');
203+
let prefix = if path.is_empty() { None } else { Some(path) };
204+
Ok(Self::new_s3(bucket, prefix, None))
205+
}
206+
ShuffleStorageType::Azure => {
207+
let parsed = Url::parse(url).map_err(|e| {
208+
BallistaError::General(format!(
209+
"Failed to parse Azure shuffle URL '{url}': {e}"
210+
))
211+
})?;
212+
// Azure URL format: abfs://container@account.dfs.core.windows.net/prefix
213+
let host = parsed.host_str().ok_or_else(|| {
214+
BallistaError::General(format!(
215+
"No host found in Azure shuffle URL '{url}'"
216+
))
217+
})?;
218+
let account = host
219+
.strip_suffix(".dfs.core.windows.net")
220+
.or_else(|| host.strip_suffix(".blob.core.windows.net"))
221+
.ok_or_else(|| {
222+
BallistaError::General(format!(
223+
"Cannot extract Azure account name from host '{host}' in URL '{url}'"
224+
))
225+
})?;
226+
let container = parsed.username();
227+
if container.is_empty() {
228+
return Err(BallistaError::General(format!(
229+
"No container found in Azure shuffle URL '{url}'. Expected format: abfs://container@account.dfs.core.windows.net/prefix"
230+
)));
231+
}
232+
let path = parsed.path().trim_start_matches('/');
233+
let prefix = if path.is_empty() { None } else { Some(path) };
234+
Ok(Self::new_azure(account, container, prefix))
235+
}
236+
}
237+
}
180238
}
181239

182240
/// Trait for shuffle storage operations.
@@ -764,4 +822,89 @@ mod tests {
764822
Some("abfs://mycontainer@myaccount.dfs.core.windows.net/shuffle".to_string())
765823
);
766824
}
825+
826+
#[test]
827+
fn test_from_type_and_url_local() {
828+
let config = ShuffleStorageConfig::from_type_and_url(
829+
ShuffleStorageType::Local,
830+
"/tmp/ballista",
831+
)
832+
.unwrap();
833+
assert_eq!(config.storage_type, ShuffleStorageType::Local);
834+
assert_eq!(config.base_url, Some("/tmp/ballista".to_string()));
835+
}
836+
837+
#[test]
838+
fn test_from_type_and_url_s3() {
839+
let config = ShuffleStorageConfig::from_type_and_url(
840+
ShuffleStorageType::S3,
841+
"s3://my-bucket/shuffle/prefix",
842+
)
843+
.unwrap();
844+
assert_eq!(config.storage_type, ShuffleStorageType::S3);
845+
assert_eq!(
846+
config.base_url,
847+
Some("s3://my-bucket/shuffle/prefix".to_string())
848+
);
849+
assert_eq!(config.s3_config.bucket, Some("my-bucket".to_string()));
850+
}
851+
852+
#[test]
853+
fn test_from_type_and_url_s3_no_prefix() {
854+
let config = ShuffleStorageConfig::from_type_and_url(
855+
ShuffleStorageType::S3,
856+
"s3://my-bucket",
857+
)
858+
.unwrap();
859+
assert_eq!(config.storage_type, ShuffleStorageType::S3);
860+
assert_eq!(config.base_url, Some("s3://my-bucket".to_string()));
861+
assert_eq!(config.s3_config.bucket, Some("my-bucket".to_string()));
862+
}
863+
864+
#[test]
865+
fn test_from_type_and_url_azure() {
866+
let config = ShuffleStorageConfig::from_type_and_url(
867+
ShuffleStorageType::Azure,
868+
"abfs://mycontainer@myaccount.dfs.core.windows.net/shuffle",
869+
)
870+
.unwrap();
871+
assert_eq!(config.storage_type, ShuffleStorageType::Azure);
872+
assert_eq!(
873+
config.base_url,
874+
Some(
875+
"abfs://mycontainer@myaccount.dfs.core.windows.net/shuffle".to_string()
876+
)
877+
);
878+
assert_eq!(
879+
config.azure_config.account,
880+
Some("myaccount".to_string())
881+
);
882+
assert_eq!(
883+
config.azure_config.container,
884+
Some("mycontainer".to_string())
885+
);
886+
}
887+
888+
#[test]
889+
fn test_from_type_and_url_azure_no_prefix() {
890+
let config = ShuffleStorageConfig::from_type_and_url(
891+
ShuffleStorageType::Azure,
892+
"abfs://mycontainer@myaccount.dfs.core.windows.net",
893+
)
894+
.unwrap();
895+
assert_eq!(config.storage_type, ShuffleStorageType::Azure);
896+
assert_eq!(
897+
config.azure_config.account,
898+
Some("myaccount".to_string())
899+
);
900+
}
901+
902+
#[test]
903+
fn test_from_type_and_url_s3_invalid_url() {
904+
let result = ShuffleStorageConfig::from_type_and_url(
905+
ShuffleStorageType::S3,
906+
"not-a-url",
907+
);
908+
assert!(result.is_err());
909+
}
767910
}

0 commit comments

Comments
 (0)