Skip to content

Commit bd4c9e2

Browse files
committed
Address PR comments
1 parent b15e4c3 commit bd4c9e2

3 files changed

Lines changed: 33 additions & 48 deletions

File tree

ballista/client/src/builder.rs

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
//! .build()?;
4040
//!
4141
//! let ctx = BallistaBuilder::new()
42-
//! .with_object_store("s3://my-bucket", Arc::new(s3_store))
42+
//! .add_object_store("s3://my-bucket", Arc::new(s3_store))
4343
//! .standalone()
4444
//! .await?;
4545
//! # Ok(())
@@ -79,7 +79,7 @@ use url::Url;
7979
/// .build()?;
8080
///
8181
/// let ctx = BallistaBuilder::new()
82-
/// .with_object_store("s3://my-bucket", Arc::new(s3_store))
82+
/// .add_object_store("s3://my-bucket", Arc::new(s3_store))
8383
/// .remote("df://localhost:50050")
8484
/// .await?;
8585
/// # Ok(())
@@ -159,32 +159,28 @@ impl BallistaBuilder {
159159
/// .build()?;
160160
///
161161
/// let ctx = BallistaBuilder::new()
162-
/// .with_object_store("s3://my-bucket", Arc::new(s3_store))
162+
/// .add_object_store("s3://my-bucket", Arc::new(s3_store))
163163
/// .standalone()
164164
/// .await?;
165165
/// # Ok(())
166166
/// # }
167167
/// ```
168-
pub fn with_object_store(mut self, url: &str, store: Arc<dyn ObjectStore>) -> Self {
168+
pub fn add_object_store(mut self, url: &str, store: Arc<dyn ObjectStore>) -> Self {
169169
// Parse the URL, or store it for later error handling during build
170170
if let Ok(parsed_url) = Url::parse(url) {
171171
self.object_stores.push((parsed_url, store));
172172
} else {
173173
// We'll handle invalid URLs during build
174-
log::warn!("Invalid object store URL: {}", url);
174+
log::warn!("Invalid object store URL: {url}");
175175
}
176176
self
177177
}
178178

179179
/// Registers a pre-created object store for a given URL.
180180
///
181-
/// This is the same as [`with_object_store`](Self::with_object_store) but takes
181+
/// This is the same as [`add_object_store`](Self::add_object_store) but takes
182182
/// a pre-parsed [`Url`] instead of a string.
183-
pub fn with_object_store_url(
184-
mut self,
185-
url: Url,
186-
store: Arc<dyn ObjectStore>,
187-
) -> Self {
183+
pub fn add_object_store_url(mut self, url: Url, store: Arc<dyn ObjectStore>) -> Self {
188184
self.object_stores.push((url, store));
189185
self
190186
}

ballista/core/src/execution_plans/shuffle_reader.rs

Lines changed: 25 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -876,24 +876,19 @@ async fn fetch_partition_object_store_with_runtime(
876876
let metadata = &location.executor_meta;
877877
let partition_id = &location.partition_id;
878878

879-
debug!(
880-
"Fetching shuffle partition from object store using runtime_env: {}",
881-
path
882-
);
879+
debug!("Fetching shuffle partition from object store using runtime_env: {path}");
883880

884881
let url = Url::parse(path).map_err(|e| {
885882
BallistaError::General(format!(
886-
"Failed to parse object store URL '{}': {:?}",
887-
path, e
883+
"Failed to parse object store URL '{path}': {e:?}"
888884
))
889885
})?;
890886

891887
// Get the object store from the RuntimeEnv's registry
892888
// This uses the credentials configured in the runtime (e.g., SpiceObjectStoreRegistry)
893889
let object_store_url = ObjectStoreUrl::parse(&url).map_err(|e| {
894890
BallistaError::General(format!(
895-
"Failed to parse object store URL '{}': {:?}",
896-
path, e
891+
"Failed to parse object store URL '{path}': {e:?}"
897892
))
898893
})?;
899894

@@ -902,21 +897,21 @@ async fn fetch_partition_object_store_with_runtime(
902897
metadata.id.clone(),
903898
partition_id.stage_id,
904899
partition_id.partition_id,
905-
format!("Failed to get object store for URL '{}': {:?}", path, e),
900+
format!("Failed to get object store for URL '{path}': {e:?}"),
906901
)
907902
})?;
908903

909904
// Extract the object path from the URL
910905
let object_path = ObjectPath::from(url.path().trim_start_matches('/'));
911906

912-
debug!("Reading object from path: {:?}", object_path);
907+
debug!("Reading object from path: {object_path:?}");
913908

914909
let get_result = store.get(&object_path).await.map_err(|e| {
915910
BallistaError::FetchFailed(
916911
metadata.id.clone(),
917912
partition_id.stage_id,
918913
partition_id.partition_id,
919-
format!("Failed to read object from {}: {:?}", path, e),
914+
format!("Failed to read object from {path}: {e:?}"),
920915
)
921916
})?;
922917

@@ -925,7 +920,7 @@ async fn fetch_partition_object_store_with_runtime(
925920
metadata.id.clone(),
926921
partition_id.stage_id,
927922
partition_id.partition_id,
928-
format!("Failed to read bytes from {}: {:?}", path, e),
923+
format!("Failed to read bytes from {path}: {e:?}"),
929924
)
930925
})?;
931926

@@ -935,7 +930,7 @@ async fn fetch_partition_object_store_with_runtime(
935930
metadata.id.clone(),
936931
partition_id.stage_id,
937932
partition_id.partition_id,
938-
format!("Failed to create Arrow stream reader for {}: {:?}", path, e),
933+
format!("Failed to create Arrow stream reader for {path}: {e:?}"),
939934
)
940935
})?;
941936

@@ -946,15 +941,14 @@ async fn fetch_partition_object_store_with_runtime(
946941
metadata.id.clone(),
947942
partition_id.stage_id,
948943
partition_id.partition_id,
949-
format!("Failed to read batch from {}: {:?}", path, e),
944+
format!("Failed to read batch from {path}: {e:?}"),
950945
)
951946
})?);
952947
}
953948

954949
if batches.is_empty() {
955950
return Err(BallistaError::General(format!(
956-
"No batches found in shuffle partition at {}",
957-
path
951+
"No batches found in shuffle partition at {path}"
958952
)));
959953
}
960954

@@ -981,7 +975,7 @@ async fn fetch_partition_object_store(
981975
let metadata = &location.executor_meta;
982976
let partition_id = &location.partition_id;
983977

984-
debug!("Fetching shuffle partition from object store: {}", path);
978+
debug!("Fetching shuffle partition from object store: {path}");
985979

986980
let batches = fetch_partition_object_store_inner(path)
987981
.await
@@ -997,8 +991,7 @@ async fn fetch_partition_object_store(
997991

998992
if batches.is_empty() {
999993
return Err(BallistaError::General(format!(
1000-
"No batches found in shuffle partition at {}",
1001-
path
994+
"No batches found in shuffle partition at {path}"
1002995
)));
1003996
}
1004997

@@ -1014,82 +1007,78 @@ async fn fetch_partition_object_store_inner(
10141007

10151008
let url = Url::parse(path).map_err(|e| {
10161009
BallistaError::General(format!(
1017-
"Failed to parse object store URL '{}': {:?}",
1018-
path, e
1010+
"Failed to parse object store URL '{path}': {e:?}"
10191011
))
10201012
})?;
10211013

10221014
let scheme = url.scheme();
10231015
let store: Arc<dyn ObjectStore> = match scheme {
10241016
"s3" => {
10251017
let bucket = url.host_str().ok_or_else(|| {
1026-
BallistaError::General(format!("No bucket in S3 URL: {}", path))
1018+
BallistaError::General(format!("No bucket in S3 URL: {path}"))
10271019
})?;
10281020
let builder = AmazonS3Builder::from_env().with_bucket_name(bucket);
10291021
Arc::new(builder.build().map_err(|e| {
1030-
BallistaError::General(format!("Failed to create S3 client: {:?}", e))
1022+
BallistaError::General(format!("Failed to create S3 client: {e:?}"))
10311023
})?)
10321024
}
10331025
"abfs" | "az" => {
10341026
// Parse Azure URL: abfs://container@account.dfs.core.windows.net/path
10351027
let host = url.host_str().ok_or_else(|| {
1036-
BallistaError::General(format!("No host in Azure URL: {}", path))
1028+
BallistaError::General(format!("No host in Azure URL: {path}"))
10371029
})?;
10381030

10391031
// Extract container from username portion
10401032
let container = url.username();
10411033
if container.is_empty() {
10421034
return Err(BallistaError::General(format!(
1043-
"No container in Azure URL. Expected format: abfs://container@account.dfs.core.windows.net/path. Got: {}",
1044-
path
1035+
"No container in Azure URL. Expected format: abfs://container@account.dfs.core.windows.net/path. Got: {path}"
10451036
)));
10461037
}
10471038

10481039
// Extract account from host (account.dfs.core.windows.net)
10491040
let account = host.split('.').next().ok_or_else(|| {
1050-
BallistaError::General(format!("No account in Azure URL: {}", path))
1041+
BallistaError::General(format!("No account in Azure URL: {path}"))
10511042
})?;
10521043

10531044
let builder = MicrosoftAzureBuilder::from_env()
10541045
.with_account(account)
10551046
.with_container_name(container);
10561047
Arc::new(builder.build().map_err(|e| {
1057-
BallistaError::General(format!("Failed to create Azure client: {:?}", e))
1048+
BallistaError::General(format!("Failed to create Azure client: {e:?}"))
10581049
})?)
10591050
}
10601051
_ => {
10611052
return Err(BallistaError::General(format!(
1062-
"Unsupported object store scheme: {}. Supported: s3, abfs, az",
1063-
scheme
1053+
"Unsupported object store scheme: {scheme}. Supported: s3, abfs, az"
10641054
)));
10651055
}
10661056
};
10671057

10681058
// Extract the object path from the URL
10691059
let object_path = ObjectPath::from(url.path().trim_start_matches('/'));
10701060

1071-
debug!("Reading object from path: {:?}", object_path);
1061+
debug!("Reading object from path: {object_path:?}");
10721062

10731063
let get_result = store.get(&object_path).await.map_err(|e| {
1074-
BallistaError::General(format!("Failed to read object from {}: {:?}", path, e))
1064+
BallistaError::General(format!("Failed to read object from {path}: {e:?}"))
10751065
})?;
10761066

10771067
let bytes = get_result.bytes().await.map_err(|e| {
1078-
BallistaError::General(format!("Failed to read bytes from {}: {:?}", path, e))
1068+
BallistaError::General(format!("Failed to read bytes from {path}: {e:?}"))
10791069
})?;
10801070

10811071
let cursor = Cursor::new(bytes.to_vec());
10821072
let stream_reader = StreamReader::try_new(cursor, None).map_err(|e| {
10831073
BallistaError::General(format!(
1084-
"Failed to create Arrow stream reader for {}: {:?}",
1085-
path, e
1074+
"Failed to create Arrow stream reader for {path}: {e:?}"
10861075
))
10871076
})?;
10881077

10891078
let mut batches = Vec::new();
10901079
for batch_result in stream_reader {
10911080
batches.push(batch_result.map_err(|e| {
1092-
BallistaError::General(format!("Failed to read batch from {}: {:?}", path, e))
1081+
BallistaError::General(format!("Failed to read batch from {path}: {e:?}"))
10931082
})?);
10941083
}
10951084

examples/examples/object-store-builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ async fn main() -> Result<()> {
9696
// Use BallistaBuilder to create a context with the pre-created object store
9797
let ctx = BallistaBuilder::new()
9898
.with_job_name("Object Store Builder Example")
99-
.with_object_store(&format!("s3://{S3_BUCKET}"), Arc::new(s3_store))
99+
.add_object_store(&format!("s3://{S3_BUCKET}"), Arc::new(s3_store))
100100
.standalone()
101101
.await?;
102102

0 commit comments

Comments
 (0)