2323use async_trait:: async_trait;
2424use bytes:: Bytes ;
2525use datafusion:: arrow:: datatypes:: SchemaRef ;
26+ use datafusion:: arrow:: ipc:: CompressionType ;
2627use datafusion:: arrow:: ipc:: reader:: StreamReader ;
2728use datafusion:: arrow:: ipc:: writer:: IpcWriteOptions ;
2829use datafusion:: arrow:: ipc:: writer:: StreamWriter ;
29- use datafusion:: arrow:: ipc:: CompressionType ;
3030use datafusion:: arrow:: record_batch:: RecordBatch ;
3131use datafusion:: physical_plan:: metrics;
3232use futures:: StreamExt ;
3333use log:: { debug, error} ;
34- use object_store:: azure:: MicrosoftAzureBuilder ;
3534use object_store:: aws:: AmazonS3Builder ;
35+ use object_store:: azure:: MicrosoftAzureBuilder ;
3636use object_store:: path:: Path as ObjectPath ;
3737use object_store:: { ObjectStore , PutPayload } ;
3838use std:: fmt:: { Debug , Display } ;
@@ -160,7 +160,10 @@ impl ShuffleStorageConfig {
160160 /// Creates a new Azure Blob Storage configuration.
161161 pub fn new_azure ( account : & str , container : & str , prefix : Option < & str > ) -> Self {
162162 let base_url = match prefix {
163- Some ( p) => format ! ( "abfs://{}@{}.dfs.core.windows.net/{}" , container, account, p) ,
163+ Some ( p) => format ! (
164+ "abfs://{}@{}.dfs.core.windows.net/{}" ,
165+ container, account, p
166+ ) ,
164167 None => format ! ( "abfs://{}@{}.dfs.core.windows.net" , container, account) ,
165168 } ;
166169 Self {
@@ -178,6 +181,7 @@ impl ShuffleStorageConfig {
178181
179182/// Trait for shuffle storage operations.
180183#[ async_trait]
184+ #[ allow( clippy:: too_many_arguments) ]
181185pub trait ShuffleStorage : Send + Sync + Debug {
182186 /// Write a record batch to storage and return the path where it was written.
183187 async fn write_shuffle_data (
@@ -256,7 +260,8 @@ impl ShuffleStorage for LocalShuffleStorage {
256260 let options = IpcWriteOptions :: default ( )
257261 . try_with_compression ( Some ( CompressionType :: LZ4_FRAME ) ) ?;
258262
259- let mut writer = StreamWriter :: try_new_with_options ( file, schema. as_ref ( ) , options) ?;
263+ let mut writer =
264+ StreamWriter :: try_new_with_options ( file, schema. as_ref ( ) , options) ?;
260265
261266 let mut num_rows = 0 ;
262267 let mut num_batches = 0 ;
@@ -283,7 +288,10 @@ impl ShuffleStorage for LocalShuffleStorage {
283288
284289 async fn read_shuffle_data ( & self , path : & str ) -> Result < Vec < RecordBatch > > {
285290 let file = File :: open ( path) . map_err ( |e| {
286- BallistaError :: General ( format ! ( "Failed to open shuffle file at {}: {:?}" , path, e) )
291+ BallistaError :: General ( format ! (
292+ "Failed to open shuffle file at {}: {:?}" ,
293+ path, e
294+ ) )
287295 } ) ?;
288296 let reader = BufReader :: new ( file) ;
289297 let stream_reader = StreamReader :: try_new ( reader, None ) ?;
@@ -311,7 +319,9 @@ impl ShuffleStorage for LocalShuffleStorage {
311319
312320 fn can_handle ( & self , path : & str ) -> bool {
313321 // Local storage can handle paths that don't start with a URL scheme
314- !path. starts_with ( "s3://" ) && !path. starts_with ( "abfs://" ) && !path. starts_with ( "az://" )
322+ !path. starts_with ( "s3://" )
323+ && !path. starts_with ( "abfs://" )
324+ && !path. starts_with ( "az://" )
315325 }
316326}
317327
@@ -396,7 +406,9 @@ impl ObjectStoreShuffleStorage {
396406 . filter_map ( |pair| {
397407 let mut parts = pair. splitn ( 2 , '=' ) ;
398408 match ( parts. next ( ) , parts. next ( ) ) {
399- ( Some ( key) , Some ( value) ) => Some ( ( key. to_string ( ) , value. to_string ( ) ) ) ,
409+ ( Some ( key) , Some ( value) ) => {
410+ Some ( ( key. to_string ( ) , value. to_string ( ) ) )
411+ }
400412 _ => None ,
401413 }
402414 } )
@@ -405,7 +417,10 @@ impl ObjectStoreShuffleStorage {
405417 }
406418
407419 let store = builder. build ( ) . map_err ( |e| {
408- BallistaError :: General ( format ! ( "Failed to create Azure object store: {:?}" , e) )
420+ BallistaError :: General ( format ! (
421+ "Failed to create Azure object store: {:?}" ,
422+ e
423+ ) )
409424 } ) ?;
410425
411426 let base_url = config. base_url . clone ( ) . unwrap_or_else ( || {
@@ -430,7 +445,13 @@ impl ObjectStoreShuffleStorage {
430445 }
431446 }
432447
433- fn make_path ( & self , job_id : & str , stage_id : usize , partition_id : usize , input_partition : usize ) -> String {
448+ fn make_path (
449+ & self ,
450+ job_id : & str ,
451+ stage_id : usize ,
452+ partition_id : usize ,
453+ input_partition : usize ,
454+ ) -> String {
434455 let filename = if input_partition == partition_id {
435456 "data.arrow" . to_string ( )
436457 } else {
@@ -452,9 +473,10 @@ impl ShuffleStorage for ObjectStoreShuffleStorage {
452473 schema : SchemaRef ,
453474 write_metric : & metrics:: Time ,
454475 ) -> Result < ( String , PartitionStats ) > {
455- let relative_path = self . make_path ( job_id, stage_id, partition_id, input_partition) ;
476+ let relative_path =
477+ self . make_path ( job_id, stage_id, partition_id, input_partition) ;
456478 let full_url = format ! ( "{}/{}" , self . base_url, relative_path) ;
457-
479+
458480 debug ! ( "Writing shuffle data to object store: {}" , full_url) ;
459481
460482 let timer = write_metric. timer ( ) ;
@@ -463,8 +485,8 @@ impl ShuffleStorage for ObjectStoreShuffleStorage {
463485 let mut buffer = Vec :: new ( ) ;
464486 let options = IpcWriteOptions :: default ( )
465487 . try_with_compression ( Some ( CompressionType :: LZ4_FRAME ) ) ?;
466-
467- let ( total_rows , total_batches ) = {
488+
489+ let ( _total_rows , _total_batches ) = {
468490 let mut writer = StreamWriter :: try_new_with_options (
469491 Cursor :: new ( & mut buffer) ,
470492 schema. as_ref ( ) ,
@@ -489,7 +511,7 @@ impl ShuffleStorage for ObjectStoreShuffleStorage {
489511 // Upload to object store
490512 let object_path = ObjectPath :: from ( relative_path) ;
491513 let payload = PutPayload :: from ( Bytes :: from ( buffer) ) ;
492-
514+
493515 self . store . put ( & object_path, payload) . await . map_err ( |e| {
494516 BallistaError :: General ( format ! (
495517 "Failed to upload shuffle data to {}: {:?}" ,
@@ -511,7 +533,7 @@ impl ShuffleStorage for ObjectStoreShuffleStorage {
511533 async fn read_shuffle_data ( & self , path : & str ) -> Result < Vec < RecordBatch > > {
512534 // Extract the object path from the full URL
513535 let object_path = self . extract_object_path ( path) ?;
514-
536+
515537 debug ! ( "Reading shuffle data from object store: {}" , path) ;
516538
517539 let get_result = self . store . get ( & object_path) . await . map_err ( |e| {
@@ -522,10 +544,7 @@ impl ShuffleStorage for ObjectStoreShuffleStorage {
522544 } ) ?;
523545
524546 let bytes = get_result. bytes ( ) . await . map_err ( |e| {
525- BallistaError :: General ( format ! (
526- "Failed to read bytes from {}: {:?}" ,
527- path, e
528- ) )
547+ BallistaError :: General ( format ! ( "Failed to read bytes from {}: {:?}" , path, e) )
529548 } ) ?;
530549
531550 let cursor = Cursor :: new ( bytes. to_vec ( ) ) ;
@@ -541,7 +560,7 @@ impl ShuffleStorage for ObjectStoreShuffleStorage {
541560
542561 async fn delete_job_data ( & self , job_id : & str ) -> Result < ( ) > {
543562 let prefix = ObjectPath :: from ( job_id. to_string ( ) ) ;
544-
563+
545564 // List all objects with the job_id prefix
546565 let mut list_stream = self . store . list ( Some ( & prefix) ) ;
547566 let mut objects_to_delete = Vec :: new ( ) ;
@@ -607,10 +626,9 @@ impl ShuffleStorageFactory {
607626 pub fn create ( config : & ShuffleStorageConfig ) -> Result < Arc < dyn ShuffleStorage > > {
608627 match config. storage_type {
609628 ShuffleStorageType :: Local => {
610- let work_dir = config
611- . base_url
612- . as_ref ( )
613- . ok_or_else ( || BallistaError :: General ( "Work directory not configured" . to_string ( ) ) ) ?;
629+ let work_dir = config. base_url . as_ref ( ) . ok_or_else ( || {
630+ BallistaError :: General ( "Work directory not configured" . to_string ( ) )
631+ } ) ?;
614632 Ok ( Arc :: new ( LocalShuffleStorage :: new ( work_dir) ) )
615633 }
616634 ShuffleStorageType :: S3 => {
@@ -637,9 +655,7 @@ mod tests {
637655 use tempfile:: TempDir ;
638656
639657 fn create_test_batch ( ) -> ( RecordBatch , SchemaRef ) {
640- let schema = Arc :: new ( Schema :: new ( vec ! [
641- Field :: new( "a" , DataType :: Int32 , false ) ,
642- ] ) ) ;
658+ let schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new( "a" , DataType :: Int32 , false ) ] ) ) ;
643659 let batch = RecordBatch :: try_new (
644660 schema. clone ( ) ,
645661 vec ! [ Arc :: new( Int32Array :: from( vec![ 1 , 2 , 3 ] ) ) ] ,
@@ -655,7 +671,8 @@ mod tests {
655671
656672 let ( batch, schema) = create_test_batch ( ) ;
657673 let metrics = ExecutionPlanMetricsSet :: new ( ) ;
658- let time_metric = metrics:: MetricBuilder :: new ( & metrics) . subset_time ( "write_time" , 0 ) ;
674+ let time_metric =
675+ metrics:: MetricBuilder :: new ( & metrics) . subset_time ( "write_time" , 0 ) ;
659676
660677 let ( path, stats) = storage
661678 . write_shuffle_data (
@@ -686,18 +703,11 @@ mod tests {
686703
687704 let ( batch, schema) = create_test_batch ( ) ;
688705 let metrics = ExecutionPlanMetricsSet :: new ( ) ;
689- let time_metric = metrics:: MetricBuilder :: new ( & metrics) . subset_time ( "write_time" , 0 ) ;
706+ let time_metric =
707+ metrics:: MetricBuilder :: new ( & metrics) . subset_time ( "write_time" , 0 ) ;
690708
691709 let ( path, _) = storage
692- . write_shuffle_data (
693- "test_job" ,
694- 1 ,
695- 0 ,
696- 0 ,
697- vec ! [ batch] ,
698- schema,
699- & time_metric,
700- )
710+ . write_shuffle_data ( "test_job" , 1 , 0 , 0 , vec ! [ batch] , schema, & time_metric)
701711 . await
702712 . unwrap ( ) ;
703713
@@ -736,7 +746,8 @@ mod tests {
736746
737747 #[ test]
738748 fn test_storage_config_new_s3 ( ) {
739- let config = ShuffleStorageConfig :: new_s3 ( "my-bucket" , Some ( "shuffle" ) , Some ( "us-east-1" ) ) ;
749+ let config =
750+ ShuffleStorageConfig :: new_s3 ( "my-bucket" , Some ( "shuffle" ) , Some ( "us-east-1" ) ) ;
740751 assert_eq ! ( config. storage_type, ShuffleStorageType :: S3 ) ;
741752 assert_eq ! ( config. base_url, Some ( "s3://my-bucket/shuffle" . to_string( ) ) ) ;
742753 assert_eq ! ( config. s3_config. bucket, Some ( "my-bucket" . to_string( ) ) ) ;
@@ -745,7 +756,8 @@ mod tests {
745756
746757 #[ test]
747758 fn test_storage_config_new_azure ( ) {
748- let config = ShuffleStorageConfig :: new_azure ( "myaccount" , "mycontainer" , Some ( "shuffle" ) ) ;
759+ let config =
760+ ShuffleStorageConfig :: new_azure ( "myaccount" , "mycontainer" , Some ( "shuffle" ) ) ;
749761 assert_eq ! ( config. storage_type, ShuffleStorageType :: Azure ) ;
750762 assert_eq ! (
751763 config. base_url,
0 commit comments