Skip to content

Commit 624119f

Browse files
committed
fix: route in-memory shuffle partitions to remote fetch when not present locally
When using in-memory shuffle (shuffle_location: memory), shuffle partitions are stored in the executor's local InMemoryShuffleManager. Previously, the shuffle reader incorrectly assumed all memory:// paths could be read locally. In distributed mode with multiple executors, a shuffle reader on executor A may need to read a partition that was written by executor B. The partition would have a memory:// path, but only exist in executor B's shuffle manager. This caused 'Shuffle partition not found in memory' errors. This fix modifies split_partition_locations() to check if a memory:// partition actually exists in the local shuffle manager before categorizing it for local read. If the partition doesn't exist locally, it's routed to the remote fetch path which uses the Arrow Flight service. The Flight service already correctly handles memory:// paths by reading from the executor's local shuffle manager. Fixes spiceai/spiceai#9290
1 parent 20ef1eb commit 624119f

1 file changed

Lines changed: 23 additions & 2 deletions

File tree

ballista/core/src/execution_plans/shuffle_reader.rs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -425,8 +425,19 @@ fn split_partition_locations(
425425

426426
for loc in partition_locations {
427427
if check_is_memory_location(&loc) {
428-
// Memory locations are always read locally
429-
result.memory.push(loc);
428+
// Memory locations should only be read locally if they exist in this executor's
429+
// shuffle manager. If the partition doesn't exist locally, it means the partition
430+
// was written by another executor and we need to fetch it remotely via Flight.
431+
if check_is_local_memory_location(&loc) {
432+
result.memory.push(loc);
433+
} else {
434+
// Partition is in memory on another executor, fetch remotely
435+
debug!(
436+
"Memory partition {} not found locally, will fetch remotely from executor {}",
437+
loc.path, loc.executor_meta.id
438+
);
439+
result.remote.push(loc);
440+
}
430441
} else if check_is_object_store_location(&loc) {
431442
// Object store locations are handled via the runtime_env's registered object stores
432443
result.object_store.push(loc);
@@ -598,6 +609,16 @@ fn check_is_memory_location(location: &PartitionLocation) -> bool {
598609
location.path.starts_with("memory://")
599610
}
600611

612+
/// Check if a memory:// partition actually exists in the local shuffle manager.
613+
/// This is used to determine whether to read the partition locally or fetch it remotely.
614+
fn check_is_local_memory_location(location: &PartitionLocation) -> bool {
615+
if let Some(key) = location.path.strip_prefix("memory://") {
616+
global_shuffle_manager().contains_partition(key)
617+
} else {
618+
false
619+
}
620+
}
621+
601622
/// Partition reader Trait, different partition reader can have
602623
#[async_trait]
603624
trait PartitionReader: Send + Sync + Clone {

0 commit comments

Comments
 (0)