Skip to content

Commit 25ab388

Browse files
fix(shuffle): route s3 partition fetches through object store and stream per-partition IPC as a single Arrow stream
Two independent object-store shuffle correctness bugs that were latent until PR #42 fixed the writer-side prefix bug and let intermediate shuffles actually land in S3: 1. BallistaClient::fetch_partition was always going via the gRPC Action::FetchPartition, whose executor-side handler does `tokio::fs::File::open(path)` and only understands local paths and `memory://`. With object-store shuffle enabled, writers reported `location.path = s3://...` and the gRPC handler then failed with `Failed to open file: No such file or directory`. Dispatch on the path scheme (s3 / abfs / az / gs) and route those to the existing object-store reader, sharing it via two new pub(crate) helpers (path_is_object_store, fetch_object_store_partition_stream). 2. Both the no-repart and hash-repart Arrow IPC writer paths serialised each batch via serialize_batch_to_ipc_bytes — every call writes a complete IPC stream WITH a StreamWriter::finish() EOS marker — and concatenated those streams into one S3 object. StreamReader stops at the first EOS, so any partition holding more than one batch came back as `Arrow error: Ipc error: Unexpected EOS`. Replace with a long-lived StreamingMultipartIpcUploader that writes the header once on construction, appends each batch's bytes into the multipart upload, and emits the EOS marker exactly once on finish(). Verified end-to-end against a 1 scheduler + 2 executor local Spice cluster with a multi-segment-prefix `shuffle_location: s3://bucket/<prefix>`: - `SELECT COUNT(*) FROM t` (single-stage; exercises Bug A): succeeds. - `SELECT col, COUNT(*) FROM t GROUP BY col ORDER BY ... LIMIT 5` (hash-repartition shuffle; exercises Bug B): succeeds.
1 parent f62181c commit 25ab388

5 files changed

Lines changed: 251 additions & 156 deletions

File tree

ballista/core/src/client.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,20 @@ impl BallistaClient {
164164
port: u16,
165165
flight_transport: bool,
166166
) -> BResult<SendableRecordBatchStream> {
167+
// When the writer-side stored this partition in object store (s3 / abfs / az / gs),
168+
// skip the gRPC FetchPartition path entirely — the executor's handler only knows
169+
// local paths and `memory://` and would `tokio::fs::File::open("s3://...")`, failing
170+
// with `No such file or directory`. Read straight from object store instead.
171+
if crate::execution_plans::shuffle_reader::path_is_object_store(path) {
172+
return crate::execution_plans::shuffle_reader::fetch_object_store_partition_stream(
173+
path,
174+
executor_id,
175+
partition_id.stage_id,
176+
partition_id.partition_id,
177+
)
178+
.await;
179+
}
180+
167181
let action = Action::FetchPartition {
168182
job_id: partition_id.job_id.clone(),
169183
stage_id: partition_id.stage_id,

ballista/core/src/execution_plans/distributed_query.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -769,6 +769,9 @@ async fn fetch_partition(
769769
.await
770770
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
771771

772+
// `BallistaClient::fetch_partition` dispatches object-store URLs (s3 / abfs / az / gs)
773+
// to the object-store reader, bypassing the gRPC FetchPartition path that only
774+
// understands local files and `memory://`.
772775
let stream = ballista_client
773776
.fetch_partition(
774777
&metadata.id,

ballista/core/src/execution_plans/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
mod distributed_explain_analyze;
2222
mod distributed_query;
2323
mod shuffle_manager;
24-
mod shuffle_reader;
24+
pub(crate) mod shuffle_reader;
2525
mod shuffle_writer;
2626
mod shuffle_writer_trait;
2727
pub mod sort_shuffle;

ballista/core/src/execution_plans/shuffle_reader.rs

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -932,11 +932,26 @@ impl RecordBatchStream for InMemoryShuffleStream {
932932
async fn fetch_partition_object_store_streaming(
933933
location: &PartitionLocation,
934934
) -> result::Result<SendableRecordBatchStream, BallistaError> {
935-
use object_store::path::Path as ObjectPath;
935+
fetch_object_store_partition_stream(
936+
&location.path,
937+
&location.executor_meta.id,
938+
location.partition_id.stage_id,
939+
location.partition_id.partition_id,
940+
)
941+
.await
942+
}
936943

937-
let path = &location.path;
938-
let metadata = &location.executor_meta;
939-
let partition_id = &location.partition_id;
944+
/// Streams a shuffle / result partition directly from object store. Used by both the
945+
/// intermediate shuffle reader and the driver-side final-stage fetch in
946+
/// `distributed_query`; the two call sites have differently-shaped `PartitionLocation`
947+
/// types but only need the path and identifiers, so this helper takes plain args.
948+
pub(crate) async fn fetch_object_store_partition_stream(
949+
path: &str,
950+
executor_id: &str,
951+
stage_id: usize,
952+
partition_id: usize,
953+
) -> result::Result<SendableRecordBatchStream, BallistaError> {
954+
use object_store::path::Path as ObjectPath;
940955

941956
debug!("Fetching shuffle partition from object store: {path}");
942957

@@ -948,9 +963,9 @@ async fn fetch_partition_object_store_streaming(
948963

949964
let store = build_shuffle_object_store(&url).map_err(|e| {
950965
BallistaError::FetchFailed(
951-
metadata.id.clone(),
952-
partition_id.stage_id,
953-
partition_id.partition_id,
966+
executor_id.to_owned(),
967+
stage_id,
968+
partition_id,
954969
format!("Failed to build object store client for '{path}': {e:?}"),
955970
)
956971
})?;
@@ -961,15 +976,15 @@ async fn fetch_partition_object_store_streaming(
961976

962977
let get_result = store.get(&object_path).await.map_err(|e| {
963978
BallistaError::FetchFailed(
964-
metadata.id.clone(),
965-
partition_id.stage_id,
966-
partition_id.partition_id,
979+
executor_id.to_owned(),
980+
stage_id,
981+
partition_id,
967982
format!("Failed to read object from {path}: {e:?}"),
968983
)
969984
})?;
970985

971986
let byte_stream = get_result.into_stream();
972-
let stream = ObjectStoreShuffleStream::try_new(byte_stream, path.clone()).await?;
987+
let stream = ObjectStoreShuffleStream::try_new(byte_stream, path.to_owned()).await?;
973988

974989
Ok(Box::pin(stream))
975990
}
@@ -1178,15 +1193,22 @@ impl RecordBatchStream for ObjectStoreShuffleStream {
11781193
}
11791194
}
11801195

1181-
/// Check if the location is an object store path (S3 or Azure).
1182-
fn check_is_object_store_location(location: &PartitionLocation) -> bool {
1183-
let path = location.path.as_str();
1196+
/// Returns true when the given path is an object-store URL (s3 / abfs / az / gs).
1197+
/// Exposed so non-shuffle-reader callers (e.g. the driver-side final-stage fetch in
1198+
/// `distributed_query`) can route around the gRPC `FetchPartition` path, which only
1199+
/// understands local files and `memory://`.
1200+
pub(crate) fn path_is_object_store(path: &str) -> bool {
11841201
path.starts_with("s3://")
11851202
|| path.starts_with("abfs://")
11861203
|| path.starts_with("az://")
11871204
|| path.starts_with("gs://")
11881205
}
11891206

1207+
/// Check if the location is an object store path (S3 or Azure).
1208+
fn check_is_object_store_location(location: &PartitionLocation) -> bool {
1209+
path_is_object_store(location.path.as_str())
1210+
}
1211+
11901212
async fn fetch_partition_object_store(
11911213
location: &PartitionLocation,
11921214
) -> result::Result<SendableRecordBatchStream, BallistaError> {

0 commit comments

Comments
 (0)