Skip to content

Commit 18d1534

Browse files
committed
Clean up in memory shuffles
1 parent 937bd3e commit 18d1534

7 files changed

Lines changed: 407 additions & 16 deletions

File tree

ballista/core/src/config.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ pub const BALLISTA_SHUFFLE_READER_REMOTE_PREFER_FLIGHT: &str =
4646
"ballista.shuffle.remote_read_prefer_flight";
4747
/// Configuration key for shuffle storage mode (disk or memory).
4848
pub const BALLISTA_SHUFFLE_MEMORY_MODE: &str = "ballista.shuffle.memory_mode";
49+
/// Configuration key indicating if this is the final output stage.
50+
/// When true, shuffle data is always written to disk regardless of memory_mode setting.
51+
pub const BALLISTA_IS_FINAL_STAGE: &str = "ballista.shuffle.is_final_stage";
4952
/// Shuffle format configuration: "arrow_ipc" or "vortex"
5053
pub const BALLISTA_SHUFFLE_FORMAT: &str = "ballista.shuffle.format";
5154

@@ -94,6 +97,10 @@ static CONFIG_ENTRIES: LazyLock<HashMap<String, ConfigEntry>> = LazyLock::new(||
9497
"When enabled, shuffle data is kept in memory on executors instead of being written to disk. This can improve performance for workloads with sufficient memory.".to_string(),
9598
DataType::Boolean,
9699
Some((false).to_string())),
100+
ConfigEntry::new(BALLISTA_IS_FINAL_STAGE.to_string(),
101+
"When true, indicates this is the final output stage. Final stages always write to disk regardless of memory_mode setting to ensure proper cleanup.".to_string(),
102+
DataType::Boolean,
103+
Some((false).to_string())),
97104
ConfigEntry::new(BALLISTA_GRPC_CLIENT_CONNECT_TIMEOUT_SECONDS.to_string(),
98105
"Connection timeout for gRPC client in seconds".to_string(),
99106
DataType::UInt64,
@@ -322,6 +329,12 @@ impl BallistaConfig {
322329
self.get_bool_setting(BALLISTA_SHUFFLE_MEMORY_MODE)
323330
}
324331

332+
/// Returns whether this is the final output stage.
333+
/// Final stages always write to disk regardless of memory_mode setting.
334+
pub fn is_final_stage(&self) -> bool {
335+
self.get_bool_setting(BALLISTA_IS_FINAL_STAGE)
336+
}
337+
325338
/// Returns the configured shuffle format (ArrowIpc or Vortex)
326339
///
327340
/// Note: Vortex format requires the 'vortex' feature to be enabled.
@@ -492,4 +505,34 @@ mod tests {
492505
assert_eq!(16777216, config.default_grpc_client_max_message_size());
493506
Ok(())
494507
}
508+
509+
#[test]
510+
fn test_is_final_stage_default() {
511+
let config = BallistaConfig::default();
512+
// Default should be false
513+
assert!(!config.is_final_stage());
514+
}
515+
516+
#[test]
517+
fn test_shuffle_memory_mode_default() {
518+
let config = BallistaConfig::default();
519+
// Default should be false (disk-based shuffles)
520+
assert!(!config.shuffle_memory_mode());
521+
}
522+
523+
#[test]
524+
fn test_shuffle_format_default() {
525+
let config = BallistaConfig::default();
526+
// Default should be ArrowIpc
527+
assert_eq!(config.shuffle_format(), ShuffleFormat::ArrowIpc);
528+
}
529+
530+
#[test]
531+
fn test_shuffle_format_parsing() {
532+
assert_eq!("arrow_ipc".parse::<ShuffleFormat>().unwrap(), ShuffleFormat::ArrowIpc);
533+
assert_eq!("arrow-ipc".parse::<ShuffleFormat>().unwrap(), ShuffleFormat::ArrowIpc);
534+
assert_eq!("ipc".parse::<ShuffleFormat>().unwrap(), ShuffleFormat::ArrowIpc);
535+
assert_eq!("vortex".parse::<ShuffleFormat>().unwrap(), ShuffleFormat::Vortex);
536+
assert!("invalid".parse::<ShuffleFormat>().is_err());
537+
}
495538
}

ballista/core/src/execution_plans/shuffle_manager.rs

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,31 @@ impl InMemoryShuffleManager {
212212
log::debug!("Removed all shuffle partitions for job: {job_id}");
213213
}
214214

215+
/// Removes all partitions for a given stage within a job.
216+
///
217+
/// This is called when a stage's output has been fully consumed by the next stage,
218+
/// allowing the memory to be reclaimed immediately rather than waiting for job completion.
219+
///
220+
/// # Arguments
221+
/// * `job_id` - The job identifier
222+
/// * `stage_id` - The stage identifier
223+
///
224+
/// # Returns
225+
/// The number of partitions that were removed
226+
pub fn remove_stage_partitions(&self, job_id: &str, stage_id: usize) -> usize {
227+
let prefix = format!("{job_id}/{stage_id}/");
228+
let initial_count = self.partitions.len();
229+
self.partitions.retain(|k, _| !k.starts_with(&prefix));
230+
let removed = initial_count - self.partitions.len();
231+
log::debug!(
232+
"Removed {} shuffle partitions for stage: {}/{}",
233+
removed,
234+
job_id,
235+
stage_id
236+
);
237+
removed
238+
}
239+
215240
/// Returns the total number of partitions stored in memory.
216241
pub fn partition_count(&self) -> usize {
217242
self.partitions.len()
@@ -334,4 +359,133 @@ mod tests {
334359
let key = InMemoryShuffleManager::hash_partition_key("job1", 1, 2, 3);
335360
assert_eq!(key, "job1/1/2/data-3");
336361
}
362+
363+
#[test]
364+
fn test_remove_stage_partitions() {
365+
let manager = InMemoryShuffleManager::new();
366+
let batch = create_test_batch();
367+
let schema = batch.schema();
368+
369+
// Store partitions for multiple stages in the same job
370+
for stage in 0..3 {
371+
for partition in 0..4 {
372+
let key =
373+
InMemoryShuffleManager::partition_key("job1", stage, partition);
374+
let data =
375+
ShufflePartitionData::new(schema.clone(), vec![batch.clone()]);
376+
manager.store_partition(key, data);
377+
}
378+
}
379+
380+
assert_eq!(manager.partition_count(), 12);
381+
382+
// Remove stage 1 partitions
383+
let removed = manager.remove_stage_partitions("job1", 1);
384+
assert_eq!(removed, 4);
385+
assert_eq!(manager.partition_count(), 8);
386+
387+
// Verify stage 0 and 2 partitions still exist
388+
let key0 = InMemoryShuffleManager::partition_key("job1", 0, 0);
389+
let key2 = InMemoryShuffleManager::partition_key("job1", 2, 0);
390+
assert!(manager.contains_partition(&key0));
391+
assert!(manager.contains_partition(&key2));
392+
393+
// Verify stage 1 partitions are gone
394+
let key1 = InMemoryShuffleManager::partition_key("job1", 1, 0);
395+
assert!(!manager.contains_partition(&key1));
396+
}
397+
398+
#[test]
399+
fn test_remove_stage_partitions_different_jobs() {
400+
let manager = InMemoryShuffleManager::new();
401+
let batch = create_test_batch();
402+
let schema = batch.schema();
403+
404+
// Store partitions for stage 1 in two different jobs
405+
for job in ["job1", "job2"] {
406+
for partition in 0..3 {
407+
let key = InMemoryShuffleManager::partition_key(job, 1, partition);
408+
let data =
409+
ShufflePartitionData::new(schema.clone(), vec![batch.clone()]);
410+
manager.store_partition(key, data);
411+
}
412+
}
413+
414+
assert_eq!(manager.partition_count(), 6);
415+
416+
// Remove stage 1 from job1 only
417+
let removed = manager.remove_stage_partitions("job1", 1);
418+
assert_eq!(removed, 3);
419+
assert_eq!(manager.partition_count(), 3);
420+
421+
// Verify job2 stage 1 partitions still exist
422+
let key = InMemoryShuffleManager::partition_key("job2", 1, 0);
423+
assert!(manager.contains_partition(&key));
424+
}
425+
426+
#[test]
427+
fn test_remove_partition_returns_data() {
428+
let manager = InMemoryShuffleManager::new();
429+
let batch = create_test_batch();
430+
let schema = batch.schema();
431+
let data = ShufflePartitionData::new(schema.clone(), vec![batch]);
432+
433+
let key = InMemoryShuffleManager::partition_key("job1", 1, 0);
434+
manager.store_partition(key.clone(), data);
435+
436+
assert!(manager.contains_partition(&key));
437+
438+
// Remove should return the data
439+
let removed = manager.remove_partition(&key);
440+
assert!(removed.is_some());
441+
let removed_data = removed.unwrap();
442+
assert_eq!(removed_data.num_rows, 3);
443+
assert_eq!(removed_data.num_batches, 1);
444+
445+
// Partition should no longer exist
446+
assert!(!manager.contains_partition(&key));
447+
448+
// Second remove should return None
449+
let removed_again = manager.remove_partition(&key);
450+
assert!(removed_again.is_none());
451+
}
452+
453+
#[test]
454+
fn test_total_memory_usage() {
455+
let manager = InMemoryShuffleManager::new();
456+
let batch = create_test_batch();
457+
let schema = batch.schema();
458+
459+
// Store multiple partitions
460+
for i in 0..3 {
461+
let key = InMemoryShuffleManager::partition_key("job1", 1, i);
462+
let data = ShufflePartitionData::new(schema.clone(), vec![batch.clone()]);
463+
manager.store_partition(key, data);
464+
}
465+
466+
// Memory usage should be > 0
467+
let usage = manager.total_memory_usage();
468+
assert!(usage > 0);
469+
470+
// Remove partitions and verify usage decreases
471+
manager.remove_job_partitions("job1");
472+
assert_eq!(manager.total_memory_usage(), 0);
473+
}
474+
475+
#[test]
476+
fn test_clear() {
477+
let manager = InMemoryShuffleManager::new();
478+
let batch = create_test_batch();
479+
let schema = batch.schema();
480+
481+
for i in 0..5 {
482+
let key = InMemoryShuffleManager::partition_key("job1", 1, i);
483+
let data = ShufflePartitionData::new(schema.clone(), vec![batch.clone()]);
484+
manager.store_partition(key, data);
485+
}
486+
487+
assert_eq!(manager.partition_count(), 5);
488+
manager.clear();
489+
assert_eq!(manager.partition_count(), 0);
490+
}
337491
}

ballista/core/src/execution_plans/shuffle_reader.rs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -746,6 +746,10 @@ async fn fetch_partition_object_store(
746746
}
747747

748748
/// Fetch partition data from in-memory shuffle storage.
749+
///
750+
/// After successfully fetching the data, the partition is removed from memory
751+
/// to allow for immediate memory reclamation. This is safe because each shuffle
752+
/// partition is typically read only once by the consuming stage.
749753
async fn fetch_partition_memory(
750754
location: &PartitionLocation,
751755
) -> result::Result<SendableRecordBatchStream, BallistaError> {
@@ -759,17 +763,23 @@ async fn fetch_partition_memory(
759763
})?;
760764

761765
let shuffle_manager = global_shuffle_manager();
762-
let data = shuffle_manager.get_partition(key).map_err(|e| {
763-
BallistaError::FetchFailed(
764-
metadata.id.clone(),
765-
partition_id.stage_id,
766-
partition_id.partition_id,
767-
e.to_string(),
768-
)
769-
})?;
766+
767+
// Remove and retrieve the partition data in one atomic operation
768+
// This ensures the memory is reclaimed as soon as the data is read
769+
let data = shuffle_manager.remove_partition(key).ok_or_else(|| {
770+
// If remove fails, try a regular get (for retry scenarios)
771+
shuffle_manager.get_partition(key).map_err(|e| {
772+
BallistaError::FetchFailed(
773+
metadata.id.clone(),
774+
partition_id.stage_id,
775+
partition_id.partition_id,
776+
e.to_string(),
777+
)
778+
})
779+
}).or_else(|result| result)?;
770780

771781
debug!(
772-
"Fetched partition {} from memory: {} batches, {} rows",
782+
"Fetched and removed partition {} from memory: {} batches, {} rows",
773783
key, data.num_batches, data.num_rows
774784
);
775785

ballista/core/src/execution_plans/shuffle_writer.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -309,8 +309,13 @@ impl ShuffleWriterExec {
309309
let job_id = self.job_id.clone();
310310
let stage_id = self.stage_id;
311311

312-
// Check if memory mode is enabled
312+
// Check if memory mode is enabled and this is not the final stage
313+
// Final stages always write to disk to ensure proper cleanup via existing mechanisms
313314
let memory_mode = context.session_config().ballista_shuffle_memory_mode();
315+
let is_final_stage = context.session_config().ballista_is_final_stage();
316+
317+
// Use memory mode only for intermediate stages, not for the final output stage
318+
let use_memory = memory_mode && !is_final_stage;
314319

315320
// Get shuffle format from session config
316321
let shuffle_format = context.session_config().ballista_shuffle_format();
@@ -320,7 +325,7 @@ impl ShuffleWriterExec {
320325
let now = Instant::now();
321326
let mut stream = plan.execute(input_partition, context)?;
322327

323-
if memory_mode {
328+
if use_memory {
324329
// Use in-memory shuffle storage with configurable format
325330
Self::execute_shuffle_write_memory(
326331
&job_id,
@@ -335,6 +340,9 @@ impl ShuffleWriterExec {
335340
.await
336341
} else {
337342
// Use disk-based shuffle storage with configurable format
343+
// This is used for:
344+
// 1. When memory_mode is disabled
345+
// 2. For final stages (even if memory_mode is enabled)
338346
Self::execute_shuffle_write_disk(
339347
path,
340348
input_partition,

0 commit comments

Comments
 (0)