Skip to content

Commit f238a12

Browse files
committed
Support shuffle format
1 parent cd1d51a commit f238a12

4 files changed

Lines changed: 156 additions & 22 deletions

File tree

ballista/core/src/execution_plans/shuffle_manager.rs

Lines changed: 73 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,29 +27,43 @@ use datafusion::arrow::datatypes::SchemaRef;
2727
use datafusion::arrow::record_batch::RecordBatch;
2828
use std::sync::Arc;
2929

30+
use crate::config::ShuffleFormat;
3031
use crate::error::{BallistaError, Result};
3132

3233
/// Key for identifying a shuffle partition in the in-memory store.
3334
/// Format: "{job_id}/{stage_id}/{partition_id}" or "{job_id}/{stage_id}/{output_partition}/{input_partition}"
3435
pub type ShufflePartitionKey = String;
3536

37+
/// In-memory representation of shuffle data.
38+
/// Supports both Arrow RecordBatch and Vortex formats.
39+
#[derive(Debug, Clone)]
40+
pub enum InMemoryShuffleData {
41+
/// Arrow RecordBatch format (default)
42+
Arrow(Vec<RecordBatch>),
43+
/// Vortex columnar format (requires 'vortex' feature)
44+
#[cfg(feature = "vortex")]
45+
Vortex(Vec<vortex_array::ArrayRef>),
46+
}
47+
3648
/// Data stored for a single shuffle partition.
3749
#[derive(Debug, Clone)]
3850
pub struct ShufflePartitionData {
3951
/// The schema of the record batches
4052
pub schema: SchemaRef,
41-
/// The record batches for this partition
42-
pub batches: Vec<RecordBatch>,
53+
/// The data for this partition (Arrow or Vortex format)
54+
pub data: InMemoryShuffleData,
4355
/// Total number of rows across all batches
4456
pub num_rows: u64,
4557
/// Total number of batches
4658
pub num_batches: u64,
4759
/// Approximate size in bytes (based on array memory size)
4860
pub num_bytes: u64,
61+
/// The format of the data
62+
pub format: ShuffleFormat,
4963
}
5064

5165
impl ShufflePartitionData {
52-
/// Creates a new ShufflePartitionData from a schema and batches.
66+
/// Creates a new ShufflePartitionData from a schema and Arrow batches.
5367
pub fn new(schema: SchemaRef, batches: Vec<RecordBatch>) -> Self {
5468
let num_rows: u64 = batches.iter().map(|b| b.num_rows() as u64).sum();
5569
let num_batches = batches.len() as u64;
@@ -60,10 +74,63 @@ impl ShufflePartitionData {
6074

6175
Self {
6276
schema,
63-
batches,
77+
data: InMemoryShuffleData::Arrow(batches),
78+
num_rows,
79+
num_batches,
80+
num_bytes,
81+
format: ShuffleFormat::ArrowIpc,
82+
}
83+
}
84+
85+
/// Creates a new ShufflePartitionData from a schema and Vortex arrays.
86+
#[cfg(feature = "vortex")]
87+
pub fn new_vortex(
88+
schema: SchemaRef,
89+
arrays: Vec<vortex_array::ArrayRef>,
90+
num_rows: u64,
91+
num_bytes: u64,
92+
) -> Self {
93+
let num_batches = arrays.len() as u64;
94+
95+
Self {
96+
schema,
97+
data: InMemoryShuffleData::Vortex(arrays),
6498
num_rows,
6599
num_batches,
66100
num_bytes,
101+
format: ShuffleFormat::Vortex,
102+
}
103+
}
104+
105+
/// Returns the batches if stored in Arrow format, otherwise converts from Vortex.
106+
pub fn to_batches(&self) -> Result<Vec<RecordBatch>> {
107+
match &self.data {
108+
InMemoryShuffleData::Arrow(batches) => Ok(batches.clone()),
109+
#[cfg(feature = "vortex")]
110+
InMemoryShuffleData::Vortex(arrays) => {
111+
use vortex_array::arrow::IntoArrowArray;
112+
arrays
113+
.iter()
114+
.map(|array| {
115+
let arrow_array =
116+
array.clone().into_arrow_preferred().map_err(|e| {
117+
BallistaError::General(format!(
118+
"Failed to convert Vortex array to Arrow: {e}"
119+
))
120+
})?;
121+
let struct_array = arrow_array
122+
.as_any()
123+
.downcast_ref::<datafusion::arrow::array::StructArray>()
124+
.ok_or_else(|| {
125+
BallistaError::General(
126+
"Expected StructArray from Vortex conversion"
127+
.to_string(),
128+
)
129+
})?;
130+
Ok(RecordBatch::from(struct_array))
131+
})
132+
.collect()
133+
}
67134
}
68135
}
69136
}
@@ -229,7 +296,8 @@ mod tests {
229296
let retrieved = manager.get_partition(&key).unwrap();
230297
assert_eq!(retrieved.num_rows, 3);
231298
assert_eq!(retrieved.num_batches, 1);
232-
assert_eq!(retrieved.batches.len(), 1);
299+
let batches = retrieved.to_batches().unwrap();
300+
assert_eq!(batches.len(), 1);
233301
}
234302

235303
#[test]

ballista/core/src/execution_plans/shuffle_reader.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -773,10 +773,16 @@ async fn fetch_partition_memory(
773773
key, data.num_batches, data.num_rows
774774
);
775775

776-
Ok(Box::pin(InMemoryShuffleStream::new(
777-
data.schema,
778-
data.batches,
779-
)))
776+
let batches = data.to_batches().map_err(|e| {
777+
BallistaError::FetchFailed(
778+
metadata.id.clone(),
779+
partition_id.stage_id,
780+
partition_id.partition_id,
781+
format!("Failed to convert in-memory partition to batches: {e}"),
782+
)
783+
})?;
784+
785+
Ok(Box::pin(InMemoryShuffleStream::new(data.schema, batches)))
780786
}
781787

782788
/// Stream that reads from in-memory shuffle data

ballista/core/src/execution_plans/shuffle_writer.rs

Lines changed: 54 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ impl ShuffleWriterExec {
321321
let mut stream = plan.execute(input_partition, context)?;
322322

323323
if memory_mode {
324-
// Use in-memory shuffle storage
324+
// Use in-memory shuffle storage with configurable format
325325
Self::execute_shuffle_write_memory(
326326
&job_id,
327327
stage_id,
@@ -330,6 +330,7 @@ impl ShuffleWriterExec {
330330
output_partitioning,
331331
write_metrics,
332332
now,
333+
shuffle_format,
333334
)
334335
.await
335336
} else {
@@ -499,6 +500,7 @@ impl ShuffleWriterExec {
499500
}
500501

501502
/// Executes shuffle write to in-memory storage.
503+
#[allow(clippy::too_many_arguments)]
502504
async fn execute_shuffle_write_memory(
503505
job_id: &str,
504506
stage_id: usize,
@@ -509,6 +511,7 @@ impl ShuffleWriterExec {
509511
output_partitioning: Option<Partitioning>,
510512
write_metrics: ShuffleWriteMetrics,
511513
now: Instant,
514+
shuffle_format: ShuffleFormat,
512515
) -> Result<Vec<ShuffleWritePartition>> {
513516
let shuffle_manager = global_shuffle_manager();
514517
let schema = stream.schema();
@@ -538,14 +541,15 @@ impl ShuffleWriterExec {
538541
input_partition,
539542
);
540543

541-
// Store in the global shuffle manager
542-
let data = ShufflePartitionData::new(schema.clone(), batches);
544+
// Store in the global shuffle manager using the configured format
545+
let data =
546+
Self::create_partition_data(schema.clone(), batches, shuffle_format)?;
543547
shuffle_manager.store_partition(key.clone(), data);
544548

545549
timer.done();
546550

547551
info!(
548-
"Executed partition {} to memory in {} seconds. Batches: {}, Rows: {}, Bytes: {}",
552+
"Executed partition {} to memory ({shuffle_format}) in {} seconds. Batches: {}, Rows: {}, Bytes: {}",
549553
input_partition,
550554
now.elapsed().as_secs(),
551555
num_batches,
@@ -622,12 +626,16 @@ impl ShuffleWriterExec {
622626
for (i, w) in mem_writers.into_iter().enumerate() {
623627
if let Some(w) = w {
624628
debug!(
625-
"Finished writing shuffle partition {} to memory. Batches: {}. Rows: {}. Bytes: {}.",
629+
"Finished writing shuffle partition {} to memory ({shuffle_format}). Batches: {}. Rows: {}. Bytes: {}.",
626630
i, w.num_batches, w.num_rows, w.num_bytes
627631
);
628632

629-
// Store in the global shuffle manager
630-
let data = ShufflePartitionData::new(schema.clone(), w.batches);
633+
// Store in the global shuffle manager using the configured format
634+
let data = Self::create_partition_data(
635+
schema.clone(),
636+
w.batches,
637+
shuffle_format,
638+
)?;
631639
shuffle_manager.store_partition(w.key.clone(), data);
632640

633641
part_locs.push(ShuffleWritePartition {
@@ -647,6 +655,45 @@ impl ShuffleWriterExec {
647655
)),
648656
}
649657
}
658+
659+
/// Creates partition data in the specified format (Arrow or Vortex).
660+
fn create_partition_data(
661+
schema: SchemaRef,
662+
batches: Vec<RecordBatch>,
663+
format: ShuffleFormat,
664+
) -> Result<ShufflePartitionData> {
665+
match format {
666+
ShuffleFormat::ArrowIpc => Ok(ShufflePartitionData::new(schema, batches)),
667+
#[cfg(feature = "vortex")]
668+
ShuffleFormat::Vortex => {
669+
use vortex_array::ArrayRef;
670+
use vortex_array::arrow::FromArrowArray;
671+
672+
let mut arrays = Vec::with_capacity(batches.len());
673+
let mut total_rows = 0u64;
674+
let mut total_bytes = 0u64;
675+
676+
for batch in batches {
677+
total_rows += batch.num_rows() as u64;
678+
// Convert Arrow RecordBatch to Vortex Array
679+
let vortex_array = ArrayRef::from_arrow(&batch, false);
680+
total_bytes += vortex_array.nbytes();
681+
arrays.push(vortex_array);
682+
}
683+
684+
Ok(ShufflePartitionData::new_vortex(
685+
schema,
686+
arrays,
687+
total_rows,
688+
total_bytes,
689+
))
690+
}
691+
#[cfg(not(feature = "vortex"))]
692+
ShuffleFormat::Vortex => Err(DataFusionError::NotImplemented(
693+
"Vortex format requires the 'vortex' feature to be enabled".to_string(),
694+
)),
695+
}
696+
}
650697
}
651698

652699
impl DisplayAs for ShuffleWriterExec {

ballista/executor/src/flight_service.rs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -110,13 +110,19 @@ impl FlightService for BallistaFlightService {
110110
})?;
111111

112112
debug!(
113-
"FetchPartition serving in-memory partition: {} ({} batches, {} rows)",
114-
key, data.num_batches, data.num_rows
113+
"FetchPartition serving in-memory partition: {} ({} batches, {} rows, format: {:?})",
114+
key, data.num_batches, data.num_rows, data.format
115115
);
116116

117117
let (tx, rx) = channel(2);
118118
let schema = data.schema.clone();
119-
let batches = data.batches;
119+
120+
// Convert to batches (handles both Arrow and Vortex formats)
121+
let batches = data.to_batches().map_err(|e| {
122+
Status::internal(format!(
123+
"Failed to convert in-memory partition to batches: {e}"
124+
))
125+
})?;
120126

121127
// Stream the batches from memory
122128
task::spawn(async move {
@@ -273,10 +279,17 @@ impl FlightService for BallistaFlightService {
273279
})?;
274280

275281
debug!(
276-
"FetchPartition serving in-memory partition via block transfer: {} ({} batches)",
277-
key, data.num_batches
282+
"FetchPartition serving in-memory partition via block transfer: {} ({} batches, format: {:?})",
283+
key, data.num_batches, data.format
278284
);
279285

286+
// Convert to batches (handles both Arrow and Vortex formats)
287+
let batches = data.to_batches().map_err(|e| {
288+
Status::internal(format!(
289+
"Failed to convert in-memory partition to batches: {e}"
290+
))
291+
})?;
292+
280293
// Serialize batches to IPC format in memory
281294
let mut buffer = Vec::new();
282295
{
@@ -292,7 +305,7 @@ impl FlightService for BallistaFlightService {
292305
)
293306
.map_err(|e| from_arrow_err(&e))?;
294307

295-
for batch in &data.batches {
308+
for batch in &batches {
296309
writer
297310
.write(batch)
298311
.map_err(|e| from_arrow_err(&e))?;

0 commit comments

Comments
 (0)