fix(shuffle): fetch S3 partitions directly and stream IPC writes#43
Conversation
There was a problem hiding this comment.
Pull request overview
Fixes object-store shuffle correctness by (1) routing final-stage partition fetches for object-store URLs around the gRPC/local-file fetch path, and (2) ensuring Arrow IPC data written to object store forms a single valid IPC stream per output partition (no per-batch EOS markers).
Changes:
- Added
shuffle_readerhelpers to detect object-store URLs and stream partitions directly from object store, and used them fromBallistaClient::fetch_partition. - Reworked object-store shuffle writers to maintain one IPC
StreamWriterper output partition across the whole multipart upload (newStreamingMultipartIpcUploader), eliminating concatenated IPC streams. - Exposed
shuffle_readermodule internally and updated comments where the new routing behavior is relied upon.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
ballista/core/src/execution_plans/shuffle_writer.rs |
Writes Arrow IPC to object store as one stream per partition via multipart upload; removes per-batch IPC stream concatenation. |
ballista/core/src/execution_plans/shuffle_reader.rs |
Adds reusable object-store streaming fetch helper and path_is_object_store scheme detection. |
ballista/core/src/execution_plans/mod.rs |
Exposes shuffle_reader internally (pub(crate)) for cross-module reuse. |
ballista/core/src/execution_plans/distributed_query.rs |
Documents reliance on BallistaClient::fetch_partition object-store routing for final-stage fetches. |
ballista/core/src/client.rs |
Routes object-store URLs to the object-store streaming reader instead of gRPC/local-file fetch. |
Comments suppressed due to low confidence (1)
ballista/core/src/execution_plans/shuffle_reader.rs:972
fetch_object_store_partition_streamrelies onbuild_shuffle_object_store(&url), but that helper only constructs an S3 store (and explicitly errors on non-s3 schemes). Given this helper is now used outside the shuffle reader (driver-side fetch viaBallistaClient::fetch_partition), it should either support the same object-store URL schemes asfetch_partition_object_store_inner(e.g.abfs://...) or clearly reject them before routing to avoid hard-to-diagnose failures.
let url = Url::parse(path).map_err(|e| {
BallistaError::General(format!(
"Failed to parse object store URL '{path}': {e:?}"
))
})?;
let store = build_shuffle_object_store(&url).map_err(|e| {
BallistaError::FetchFailed(
executor_id.to_owned(),
stage_id,
partition_id,
format!("Failed to build object store client for '{path}': {e:?}"),
)
})?;
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…eam per-partition IPC as a single Arrow stream Two independent object-store shuffle correctness bugs that were latent until PR #42 fixed the writer-side prefix bug and let intermediate shuffles actually land in S3: 1. BallistaClient::fetch_partition was always going via the gRPC Action::FetchPartition, whose executor-side handler does `tokio::fs::File::open(path)` and only understands local paths and `memory://`. With object-store shuffle enabled, writers reported `location.path = s3://...` and the gRPC handler then failed with `Failed to open file: No such file or directory`. Dispatch on the path scheme (s3 / abfs / az / gs) and route those to the existing object-store reader, sharing it via two new pub(crate) helpers (path_is_object_store, fetch_object_store_partition_stream). 2. Both the no-repart and hash-repart Arrow IPC writer paths serialised each batch via serialize_batch_to_ipc_bytes — every call writes a complete IPC stream WITH a StreamWriter::finish() EOS marker — and concatenated those streams into one S3 object. StreamReader stops at the first EOS, so any partition holding more than one batch came back as `Arrow error: Ipc error: Unexpected EOS`. Replace with a long-lived StreamingMultipartIpcUploader that writes the header once on construction, appends each batch's bytes into the multipart upload, and emits the EOS marker exactly once on finish(). Verified end-to-end against a 1 scheduler + 2 executor local Spice cluster with a multi-segment-prefix `shuffle_location: s3://bucket/<prefix>`: - `SELECT COUNT(*) FROM t` (single-stage; exercises Bug A): succeeds. - `SELECT col, COUNT(*) FROM t GROUP BY col ORDER BY ... LIMIT 5` (hash-repartition shuffle; exercises Bug B): succeeds.
25ab388 to
ad17b15
Compare
|
Addressed all three Copilot comments in
|
Summary
Fixes two object-store shuffle failures that appeared after prefixed S3 shuffle writes started landing in the correct location.
First,
BallistaClient::fetch_partitionalways used the executor gRPCFetchPartitionpath. That handler opens paths withtokio::fs::File::open, so final-stage result fetches failed when the partition location was ans3://...URL. This now detects S3 partition paths and reads them through the existing object-store shuffle reader instead.Second, the object-store Arrow IPC writer serialized each batch as a complete IPC stream and concatenated those streams into one S3 object. Arrow readers stop at the first end-of-stream marker, so partitions with more than one batch could fail with
Unexpected EOS. This now keeps oneStreamWriterper output partition and emits the IPC header and end-of-stream marker only once.Changes
BallistaClient::fetch_partitiondirectly through the object-store reader.shuffle_readerhelpers used by both intermediate shuffle reads and final-stage fetches.StreamingMultipartIpcUploader, which streams one valid Arrow IPC stream per shuffle partition.Test plan
cargo fmt --all -- --checkcargo clippy --all-targets --workspace --all-features -- -D warningscargo test -p ballista-core --lib shuffle_storageshuffle_location: s3://<bucket>/<two-segment-prefix>:SELECT COUNT(*) FROM treturned2000000SELECT col, COUNT(*) FROM t GROUP BY col ORDER BY ... LIMIT 5returned 5 rows