Skip to content

Commit 9fcff62

Browse files
committed
feat: Store shuffles in object store (S3, Azure)
1 parent 8c3ee90 commit 9fcff62

8 files changed

Lines changed: 1021 additions & 15 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ datafusion-cli = "51.0.0"
4040
datafusion-proto = "51.0.0"
4141
datafusion-proto-common = "51.0.0"
4242
datafusion-substrait = "51.0.0"
43-
object_store = "0.12"
43+
object_store = { version = "0.12", features = ["aws", "azure"] }
44+
bytes = "1.5"
4445
prost = "0.14"
4546
prost-types = "0.14"
4647
rstest = { version = "0.26" }

ballista/core/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ arrow-flight = { workspace = true }
4545
async-trait = { workspace = true }
4646
aws-config = { version = "1.6.0", optional = true }
4747
aws-credential-types = { version = "1.2.0", optional = true }
48+
bytes = { workspace = true }
4849
chrono = { version = "0.4", default-features = false }
4950
clap = { workspace = true, optional = true }
5051
datafusion = { workspace = true }
@@ -54,7 +55,7 @@ futures = { workspace = true }
5455
itertools = "0.14"
5556
log = { workspace = true }
5657
md-5 = { version = "^0.10.0" }
57-
object_store = { workspace = true, features = ["aws", "http"], optional = true }
58+
object_store = { workspace = true, features = ["aws", "azure", "http"], optional = true }
5859
parking_lot = { workspace = true }
5960
prost = { workspace = true }
6061
prost-types = { workspace = true }

ballista/core/src/config.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ pub const BALLISTA_SHUFFLE_READER_FORCE_REMOTE_READ: &str =
4343
/// Configuration key to prefer Flight protocol for remote shuffle reads.
4444
pub const BALLISTA_SHUFFLE_READER_REMOTE_PREFER_FLIGHT: &str =
4545
"ballista.shuffle.remote_read_prefer_flight";
46+
/// Configuration key for shuffle storage type (local, s3, azure).
47+
pub const BALLISTA_SHUFFLE_STORAGE_TYPE: &str = "ballista.shuffle.storage_type";
48+
/// Configuration key for shuffle storage base URL/path.
49+
pub const BALLISTA_SHUFFLE_STORAGE_URL: &str = "ballista.shuffle.storage_url";
4650

4751
/// Configuration key for gRPC client connection timeout in seconds.
4852
pub const BALLISTA_GRPC_CLIENT_CONNECT_TIMEOUT_SECONDS: &str =
@@ -85,6 +89,14 @@ static CONFIG_ENTRIES: LazyLock<HashMap<String, ConfigEntry>> = LazyLock::new(||
8589
"Forces the shuffle reader to use flight reader instead of block reader for remote read. Block reader usually has better performance and resource utilization".to_string(),
8690
DataType::Boolean,
8791
Some((false).to_string())),
92+
ConfigEntry::new(BALLISTA_SHUFFLE_STORAGE_TYPE.to_string(),
93+
"Storage type for shuffle data: 'local' (default), 's3', or 'azure'".to_string(),
94+
DataType::Utf8,
95+
Some("local".to_string())),
96+
ConfigEntry::new(BALLISTA_SHUFFLE_STORAGE_URL.to_string(),
97+
"Base URL/path for shuffle storage. For local: file path; For S3: s3://bucket/prefix; For Azure: abfs://container@account.dfs.core.windows.net/prefix".to_string(),
98+
DataType::Utf8,
99+
None),
88100
ConfigEntry::new(BALLISTA_GRPC_CLIENT_CONNECT_TIMEOUT_SECONDS.to_string(),
89101
"Connection timeout for gRPC client in seconds".to_string(),
90102
DataType::UInt64,
@@ -264,6 +276,16 @@ impl BallistaConfig {
264276
self.get_bool_setting(BALLISTA_SHUFFLE_READER_REMOTE_PREFER_FLIGHT)
265277
}
266278

279+
/// Returns the shuffle storage type (local, s3, azure).
280+
pub fn shuffle_storage_type(&self) -> String {
281+
self.get_string_setting(BALLISTA_SHUFFLE_STORAGE_TYPE)
282+
}
283+
284+
/// Returns the shuffle storage base URL/path if configured.
285+
pub fn shuffle_storage_url(&self) -> Option<String> {
286+
self.settings.get(BALLISTA_SHUFFLE_STORAGE_URL).cloned()
287+
}
288+
267289
fn get_usize_setting(&self, key: &str) -> usize {
268290
if let Some(v) = self.settings.get(key) {
269291
// infallible because we validate all configs in the constructor

ballista/core/src/execution_plans/shuffle_reader.rs

Lines changed: 188 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,21 @@ use std::any::Any;
2222
use std::collections::HashMap;
2323
use std::fmt::Debug;
2424
use std::fs::File;
25-
use std::io::BufReader;
25+
use std::io::{BufReader, Cursor};
2626
use std::pin::Pin;
2727
use std::result;
2828
use std::sync::Arc;
2929
use std::task::{Context, Poll};
3030

31+
#[cfg(feature = "build-binary")]
32+
use object_store::aws::AmazonS3Builder;
33+
#[cfg(feature = "build-binary")]
34+
use object_store::azure::MicrosoftAzureBuilder;
35+
#[cfg(feature = "build-binary")]
36+
use object_store::ObjectStore;
37+
#[cfg(feature = "build-binary")]
38+
use url::Url;
39+
3140
use crate::client::BallistaClient;
3241
use crate::extension::{BallistaConfigGrpcEndpoint, SessionConfigExt};
3342
use crate::serde::scheduler::{PartitionLocation, PartitionStats};
@@ -371,23 +380,34 @@ impl Stream for AbortableReceiverStream {
371380
.map_err(|e| ArrowError::ExternalError(Box::new(e)))
372381
}
373382
}
374-
/// Splits the provided partition locations into local and remote partitions.
383+
/// Splits the provided partition locations into local, object store, and remote partitions.
375384
/// Local partitions are read directly from local Arrow IPC files,
385+
/// object store partitions are read via the object store client,
376386
/// while remote partitions are fetched using the Arrow Flight client.
377387
/// If `force_remote_read` is true, all partitions are treated as remote.
378388
fn local_remote_read_split(
379389
partition_locations: Vec<PartitionLocation>,
380390
force_remote_read: bool,
381-
) -> (Vec<PartitionLocation>, Vec<PartitionLocation>) {
391+
) -> (Vec<PartitionLocation>, Vec<PartitionLocation>, Vec<PartitionLocation>) {
382392
if !force_remote_read {
383-
partition_locations
393+
let (local, non_local): (Vec<_>, Vec<_>) = partition_locations
394+
.into_iter()
395+
.partition(check_is_local_location);
396+
let (object_store, remote): (Vec<_>, Vec<_>) = non_local
384397
.into_iter()
385-
.partition(check_is_local_location)
398+
.partition(check_is_object_store_location);
399+
(local, object_store, remote)
386400
} else {
387-
(vec![], partition_locations)
401+
(vec![], vec![], partition_locations)
388402
}
389403
}
390404

405+
/// Check if the location is an object store path (S3 or Azure).
406+
fn check_is_object_store_location(location: &PartitionLocation) -> bool {
407+
let path = location.path.as_str();
408+
path.starts_with("s3://") || path.starts_with("abfs://") || path.starts_with("az://")
409+
}
410+
391411
fn send_fetch_partitions(
392412
partition_locations: Vec<PartitionLocation>,
393413
max_request_num: usize,
@@ -401,12 +421,13 @@ fn send_fetch_partitions(
401421
let semaphore = Arc::new(Semaphore::new(max_request_num));
402422
let mut spawned_tasks: Vec<SpawnedTask<()>> = vec![];
403423

404-
let (local_locations, remote_locations): (Vec<_>, Vec<_>) =
424+
let (local_locations, object_store_locations, remote_locations): (Vec<_>, Vec<_>, Vec<_>) =
405425
local_remote_read_split(partition_locations, force_remote_read);
406426

407427
debug!(
408-
"local shuffle file counts:{}, remote shuffle file count:{}.",
428+
"local shuffle file counts:{}, object store shuffle file count:{}, remote shuffle file count:{}.",
409429
local_locations.len(),
430+
object_store_locations.len(),
410431
remote_locations.len()
411432
);
412433

@@ -430,6 +451,31 @@ fn send_fetch_partitions(
430451
}
431452
}));
432453

454+
// Handle object store partitions with concurrency control
455+
for p in object_store_locations.into_iter() {
456+
let semaphore = semaphore.clone();
457+
let response_sender = response_sender.clone();
458+
spawned_tasks.push(SpawnedTask::spawn(async move {
459+
// Block if exceeds max request number.
460+
let permit = semaphore.acquire_owned().await.unwrap();
461+
let r = PartitionReaderEnum::ObjectStoreRemote
462+
.fetch_partition(
463+
&p,
464+
max_message_size,
465+
false, // flight_transport not used for object store
466+
None, // customize_endpoint not used for object store
467+
false, // use_tls not used for object store
468+
)
469+
.await;
470+
// Block if the channel buffer is full.
471+
if let Err(e) = response_sender.send(r).await {
472+
error!("Fail to send response event to the channel due to {e}");
473+
}
474+
// Increase semaphore by dropping existing permits.
475+
drop(permit);
476+
}));
477+
}
478+
433479
for p in remote_locations.into_iter() {
434480
let semaphore = semaphore.clone();
435481
let response_sender = response_sender.clone();
@@ -590,14 +636,143 @@ fn fetch_partition_local_inner(
590636
Ok(reader)
591637
}
592638

639+
#[cfg(feature = "build-binary")]
640+
async fn fetch_partition_object_store(
641+
location: &PartitionLocation,
642+
) -> result::Result<SendableRecordBatchStream, BallistaError> {
643+
let path = &location.path;
644+
let metadata = &location.executor_meta;
645+
let partition_id = &location.partition_id;
646+
647+
debug!("Fetching shuffle partition from object store: {}", path);
648+
649+
let batches = fetch_partition_object_store_inner(path).await.map_err(|e| {
650+
// return BallistaError::FetchFailed may let scheduler retry this task.
651+
BallistaError::FetchFailed(
652+
metadata.id.clone(),
653+
partition_id.stage_id,
654+
partition_id.partition_id,
655+
e.to_string(),
656+
)
657+
})?;
658+
659+
if batches.is_empty() {
660+
return Err(BallistaError::General(format!(
661+
"No batches found in shuffle partition at {}",
662+
path
663+
)));
664+
}
665+
666+
let schema = batches[0].schema();
667+
let stream = futures::stream::iter(batches.into_iter().map(Ok));
668+
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
669+
}
670+
671+
#[cfg(not(feature = "build-binary"))]
593672
async fn fetch_partition_object_store(
594673
_location: &PartitionLocation,
595674
) -> result::Result<SendableRecordBatchStream, BallistaError> {
596675
Err(BallistaError::NotImplemented(
597-
"Should not use ObjectStorePartitionReader".to_string(),
676+
"Object store support requires 'build-binary' feature".to_string(),
598677
))
599678
}
600679

680+
#[cfg(feature = "build-binary")]
681+
async fn fetch_partition_object_store_inner(
682+
path: &str,
683+
) -> result::Result<Vec<RecordBatch>, BallistaError> {
684+
use object_store::path::Path as ObjectPath;
685+
686+
let url = Url::parse(path).map_err(|e| {
687+
BallistaError::General(format!("Failed to parse object store URL '{}': {:?}", path, e))
688+
})?;
689+
690+
let scheme = url.scheme();
691+
let store: Arc<dyn ObjectStore> = match scheme {
692+
"s3" => {
693+
let bucket = url.host_str().ok_or_else(|| {
694+
BallistaError::General(format!("No bucket in S3 URL: {}", path))
695+
})?;
696+
let builder = AmazonS3Builder::from_env().with_bucket_name(bucket);
697+
Arc::new(builder.build().map_err(|e| {
698+
BallistaError::General(format!("Failed to create S3 client: {:?}", e))
699+
})?)
700+
}
701+
"abfs" | "az" => {
702+
// Parse Azure URL: abfs://container@account.dfs.core.windows.net/path
703+
let host = url.host_str().ok_or_else(|| {
704+
BallistaError::General(format!("No host in Azure URL: {}", path))
705+
})?;
706+
707+
// Extract container from username portion
708+
let container = url.username();
709+
if container.is_empty() {
710+
return Err(BallistaError::General(format!(
711+
"No container in Azure URL. Expected format: abfs://container@account.dfs.core.windows.net/path. Got: {}",
712+
path
713+
)));
714+
}
715+
716+
// Extract account from host (account.dfs.core.windows.net)
717+
let account = host.split('.').next().ok_or_else(|| {
718+
BallistaError::General(format!("No account in Azure URL: {}", path))
719+
})?;
720+
721+
let builder = MicrosoftAzureBuilder::from_env()
722+
.with_account(account)
723+
.with_container_name(container);
724+
Arc::new(builder.build().map_err(|e| {
725+
BallistaError::General(format!("Failed to create Azure client: {:?}", e))
726+
})?)
727+
}
728+
_ => {
729+
return Err(BallistaError::General(format!(
730+
"Unsupported object store scheme: {}. Supported: s3, abfs, az",
731+
scheme
732+
)));
733+
}
734+
};
735+
736+
// Extract the object path from the URL
737+
let object_path = ObjectPath::from(url.path().trim_start_matches('/'));
738+
739+
debug!("Reading object from path: {:?}", object_path);
740+
741+
let get_result = store.get(&object_path).await.map_err(|e| {
742+
BallistaError::General(format!(
743+
"Failed to read object from {}: {:?}",
744+
path, e
745+
))
746+
})?;
747+
748+
let bytes = get_result.bytes().await.map_err(|e| {
749+
BallistaError::General(format!(
750+
"Failed to read bytes from {}: {:?}",
751+
path, e
752+
))
753+
})?;
754+
755+
let cursor = Cursor::new(bytes.to_vec());
756+
let stream_reader = StreamReader::try_new(cursor, None).map_err(|e| {
757+
BallistaError::General(format!(
758+
"Failed to create Arrow stream reader for {}: {:?}",
759+
path, e
760+
))
761+
})?;
762+
763+
let mut batches = Vec::new();
764+
for batch_result in stream_reader {
765+
batches.push(batch_result.map_err(|e| {
766+
BallistaError::General(format!(
767+
"Failed to read batch from {}: {:?}",
768+
path, e
769+
))
770+
})?);
771+
}
772+
773+
Ok(batches)
774+
}
775+
601776
#[cfg(test)]
602777
mod tests {
603778
use super::*;
@@ -955,14 +1130,16 @@ mod tests {
9551130
let partition_locations =
9561131
get_test_partition_locations(1, file_path.to_str().unwrap().to_string());
9571132

958-
let (local, remote) = local_remote_read_split(partition_locations.clone(), false);
1133+
let (local, object_store, remote) = local_remote_read_split(partition_locations.clone(), false);
9591134

9601135
assert!(!local.is_empty());
1136+
assert!(object_store.is_empty());
9611137
assert!(remote.is_empty());
9621138

963-
let (local, remote) = local_remote_read_split(partition_locations, true);
1139+
let (local, object_store, remote) = local_remote_read_split(partition_locations, true);
9641140

9651141
assert!(local.is_empty());
1142+
assert!(object_store.is_empty());
9661143
assert!(!remote.is_empty());
9671144
}
9681145

0 commit comments

Comments
 (0)