Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions ballista/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,20 @@ impl BallistaClient {
port: u16,
flight_transport: bool,
) -> BResult<SendableRecordBatchStream> {
// When the writer-side stored this partition in object store (s3 / abfs / az / gs),
// skip the gRPC FetchPartition path entirely — the executor's handler only knows
// local paths and `memory://` and would `tokio::fs::File::open("s3://...")`, failing
// with `No such file or directory`. Read straight from object store instead.
if crate::execution_plans::shuffle_reader::path_is_object_store(path) {
return crate::execution_plans::shuffle_reader::fetch_object_store_partition_stream(
path,
executor_id,
partition_id.stage_id,
partition_id.partition_id,
)
.await;
}

let action = Action::FetchPartition {
job_id: partition_id.job_id.clone(),
stage_id: partition_id.stage_id,
Expand Down
3 changes: 3 additions & 0 deletions ballista/core/src/execution_plans/distributed_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,9 @@ async fn fetch_partition(
.await
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;

// `BallistaClient::fetch_partition` dispatches object-store URLs (s3 / abfs / az / gs)
// to the object-store reader, bypassing the gRPC FetchPartition path that only
// understands local files and `memory://`.
let stream = ballista_client
.fetch_partition(
&metadata.id,
Expand Down
2 changes: 1 addition & 1 deletion ballista/core/src/execution_plans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
mod distributed_explain_analyze;
mod distributed_query;
mod shuffle_manager;
mod shuffle_reader;
pub(crate) mod shuffle_reader;
mod shuffle_writer;
mod shuffle_writer_trait;
pub mod sort_shuffle;
Expand Down
56 changes: 44 additions & 12 deletions ballista/core/src/execution_plans/shuffle_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -932,11 +932,26 @@ impl RecordBatchStream for InMemoryShuffleStream {
async fn fetch_partition_object_store_streaming(
location: &PartitionLocation,
) -> result::Result<SendableRecordBatchStream, BallistaError> {
use object_store::path::Path as ObjectPath;
fetch_object_store_partition_stream(
&location.path,
&location.executor_meta.id,
location.partition_id.stage_id,
location.partition_id.partition_id,
)
.await
}

let path = &location.path;
let metadata = &location.executor_meta;
let partition_id = &location.partition_id;
/// Streams a shuffle / result partition directly from object store. Used by both the
/// intermediate shuffle reader and the driver-side final-stage fetch in
/// `distributed_query`; the two call sites have differently-shaped `PartitionLocation`
/// types but only need the path and identifiers, so this helper takes plain args.
pub(crate) async fn fetch_object_store_partition_stream(
path: &str,
executor_id: &str,
stage_id: usize,
partition_id: usize,
) -> result::Result<SendableRecordBatchStream, BallistaError> {
use object_store::path::Path as ObjectPath;

debug!("Fetching shuffle partition from object store: {path}");

Expand All @@ -948,9 +963,9 @@ async fn fetch_partition_object_store_streaming(

let store = build_shuffle_object_store(&url).map_err(|e| {
BallistaError::FetchFailed(
metadata.id.clone(),
partition_id.stage_id,
partition_id.partition_id,
executor_id.to_owned(),
stage_id,
partition_id,
format!("Failed to build object store client for '{path}': {e:?}"),
)
})?;
Expand All @@ -961,15 +976,15 @@ async fn fetch_partition_object_store_streaming(

let get_result = store.get(&object_path).await.map_err(|e| {
BallistaError::FetchFailed(
metadata.id.clone(),
partition_id.stage_id,
partition_id.partition_id,
executor_id.to_owned(),
stage_id,
partition_id,
format!("Failed to read object from {path}: {e:?}"),
)
})?;

let byte_stream = get_result.into_stream();
let stream = ObjectStoreShuffleStream::try_new(byte_stream, path.clone()).await?;
let stream = ObjectStoreShuffleStream::try_new(byte_stream, path.to_owned()).await?;

Ok(Box::pin(stream))
}
Expand Down Expand Up @@ -1178,7 +1193,24 @@ impl RecordBatchStream for ObjectStoreShuffleStream {
}
}

/// Check if the location is an object store path (S3 or Azure).
/// Returns true when the given path is an object-store URL the streaming shuffle
/// reader can handle. Exposed so non-shuffle-reader callers (e.g. the driver-side
/// final-stage fetch via [`crate::client::BallistaClient::fetch_partition`]) can
/// route around the gRPC `FetchPartition` path, which only understands local files
/// and `memory://`.
///
/// Scoped to `s3://` to match [`build_shuffle_object_store`]: the writer-side
/// `ObjectStoreShuffleStorage::new_azure` exists but is not wired through the
/// registry-based credential path yet, so `abfs://` / `az://` / `gs://` URLs would
/// be routed away from gRPC and then fail in `build_shuffle_object_store`.
/// Broaden this alongside `build_shuffle_object_store` once those backends are
/// supported.
pub(crate) fn path_is_object_store(path: &str) -> bool {
path.starts_with("s3://")
}

/// Check if the location is an object store path the broad
/// `fetch_partition_object_store_inner` reader supports (s3 / abfs / az / gs).
fn check_is_object_store_location(location: &PartitionLocation) -> bool {
let path = location.path.as_str();
path.starts_with("s3://")
Expand Down
Loading
Loading