@@ -34,6 +34,7 @@ use log::{debug, error};
3434use object_store:: aws:: AmazonS3Builder ;
3535use object_store:: azure:: MicrosoftAzureBuilder ;
3636use object_store:: path:: Path as ObjectPath ;
37+ use object_store:: prefix:: PrefixStore ;
3738use object_store:: { ObjectStore , PutPayload , WriteMultipart } ;
3839use std:: fmt:: { Debug , Display } ;
3940use std:: fs:: File ;
@@ -386,11 +387,44 @@ impl ShuffleStorage for LocalShuffleStorage {
386387/// Object store based shuffle storage implementation (for S3 and Azure).
387388#[ derive( Debug ) ]
388389pub struct ObjectStoreShuffleStorage {
390+ /// Either the raw bucket/container store (when `base_url` has no path) or a
391+ /// [`PrefixStore`] wrapping it (when `base_url` includes a path like
392+ /// `s3://bucket/shuffle/prefix`). All keys passed to `self.store` are
393+ /// job-relative — `PrefixStore` reattaches the URL path prefix transparently.
389394 store : Arc < dyn ObjectStore > ,
390395 base_url : String ,
396+ /// Path portion of `base_url`, normalised without leading/trailing slashes
397+ /// (e.g. "shuffle/prefix" for `s3://bucket/shuffle/prefix`). Empty when the
398+ /// URL has no path. Used only by [`Self::extract_object_path`] to strip the
399+ /// prefix from caller-supplied full URLs before handing the key to the
400+ /// already-prefixed `self.store`.
401+ path_prefix : String ,
391402 storage_type : ShuffleStorageType ,
392403}
393404
405+ /// Extracts the path component of an object-store URL as a normalised key prefix.
406+ /// Returns "" if the URL has no path (e.g. `s3://bucket`) or fails to parse.
407+ fn extract_path_prefix ( base_url : & str ) -> String {
408+ Url :: parse ( base_url)
409+ . ok ( )
410+ . map ( |u| u. path ( ) . trim_matches ( '/' ) . to_string ( ) )
411+ . unwrap_or_default ( )
412+ }
413+
414+ /// Wraps `store` in a [`PrefixStore`] when `prefix` is non-empty so every
415+ /// subsequent operation runs in the prefix's namespace; returns the store
416+ /// unchanged otherwise.
417+ fn apply_path_prefix < S > ( store : S , prefix : & str ) -> Arc < dyn ObjectStore >
418+ where
419+ S : ObjectStore + ' static ,
420+ {
421+ if prefix. is_empty ( ) {
422+ Arc :: new ( store)
423+ } else {
424+ Arc :: new ( PrefixStore :: new ( store, prefix. to_string ( ) ) )
425+ }
426+ }
427+
394428impl ObjectStoreShuffleStorage {
395429 /// Creates a new S3 shuffle storage.
396430 pub fn new_s3 ( config : & ShuffleStorageConfig ) -> Result < Self > {
@@ -429,10 +463,13 @@ impl ObjectStoreShuffleStorage {
429463 . base_url
430464 . clone ( )
431465 . unwrap_or_else ( || format ! ( "s3://{}" , bucket) ) ;
466+ let path_prefix = extract_path_prefix ( & base_url) ;
467+ let store = apply_path_prefix ( store, & path_prefix) ;
432468
433469 Ok ( Self {
434- store : Arc :: new ( store ) ,
470+ store,
435471 base_url,
472+ path_prefix,
436473 storage_type : ShuffleStorageType :: S3 ,
437474 } )
438475 }
@@ -484,10 +521,13 @@ impl ObjectStoreShuffleStorage {
484521 let base_url = config. base_url . clone ( ) . unwrap_or_else ( || {
485522 format ! ( "abfs://{}@{}.dfs.core.windows.net" , container, account)
486523 } ) ;
524+ let path_prefix = extract_path_prefix ( & base_url) ;
525+ let store = apply_path_prefix ( store, & path_prefix) ;
487526
488527 Ok ( Self {
489- store : Arc :: new ( store ) ,
528+ store,
490529 base_url,
530+ path_prefix,
491531 storage_type : ShuffleStorageType :: Azure ,
492532 } )
493533 }
@@ -508,7 +548,10 @@ impl ObjectStoreShuffleStorage {
508548 & self . store
509549 }
510550
511- /// Constructs the full URL for a shuffle partition.
551+ /// Constructs the full URL for a shuffle partition along with the job-relative
552+ /// object-store key. The returned `ObjectPath` is relative to the storage's
553+ /// `path_prefix` — `self.store` is already a [`PrefixStore`] when a prefix is set,
554+ /// so it reattaches the prefix transparently.
512555 pub fn make_full_url (
513556 & self ,
514557 job_id : & str ,
@@ -617,7 +660,7 @@ impl ShuffleStorage for ObjectStoreShuffleStorage {
617660
618661 let num_bytes = buffer. len ( ) ;
619662
620- // Upload to object store
663+ // Upload via the (possibly prefix-wrapped) store with the job-relative key.
621664 let object_path = ObjectPath :: from ( relative_path) ;
622665 let payload = PutPayload :: from ( Bytes :: from ( buffer) ) ;
623666
@@ -670,7 +713,8 @@ impl ShuffleStorage for ObjectStoreShuffleStorage {
670713 async fn delete_job_data ( & self , job_id : & str ) -> Result < ( ) > {
671714 let prefix = ObjectPath :: from ( job_id. to_string ( ) ) ;
672715
673- // List all objects with the job_id prefix
716+ // List all objects with the job_id prefix (relative to the storage's path_prefix —
717+ // PrefixStore reattaches the URL path prefix on every operation).
674718 let mut list_stream = self . store . list ( Some ( & prefix) ) ;
675719 let mut objects_to_delete = Vec :: new ( ) ;
676720
@@ -715,15 +759,25 @@ impl ShuffleStorage for ObjectStoreShuffleStorage {
715759}
716760
717761impl ObjectStoreShuffleStorage {
762+ /// Resolves a caller-supplied URL or path to a key relative to the storage's
763+ /// `path_prefix`. `self.store` is already prefix-wrapped, so handing it the
764+ /// full URL path would double-prefix — strip `self.path_prefix` first.
718765 fn extract_object_path ( & self , path : & str ) -> Result < ObjectPath > {
719- // Parse the URL and extract the path component
720- if let Ok ( url) = Url :: parse ( path) {
721- let path_str = url. path ( ) . trim_start_matches ( '/' ) ;
722- Ok ( ObjectPath :: from ( path_str) )
766+ let raw = match Url :: parse ( path) {
767+ Ok ( url) => url. path ( ) . trim_start_matches ( '/' ) . to_string ( ) ,
768+ // Not a URL — treat as an already-relative key.
769+ Err ( _) => return Ok ( ObjectPath :: from ( path) ) ,
770+ } ;
771+ let relative = if self . path_prefix . is_empty ( ) {
772+ raw. as_str ( )
723773 } else {
724- // If it's not a valid URL, assume it's already a relative path
725- Ok ( ObjectPath :: from ( path) )
726- }
774+ raw. strip_prefix ( & self . path_prefix )
775+ . map ( |rest| rest. trim_start_matches ( '/' ) )
776+ // Fall back to the raw key if it doesn't start with our prefix —
777+ // happens in tests that hand-construct URLs against unrelated stores.
778+ . unwrap_or ( raw. as_str ( ) )
779+ } ;
780+ Ok ( ObjectPath :: from ( relative) )
727781 }
728782}
729783
@@ -948,4 +1002,108 @@ mod tests {
9481002 ShuffleStorageConfig :: from_type_and_url ( ShuffleStorageType :: S3 , "not-a-url" ) ;
9491003 assert ! ( result. is_err( ) ) ;
9501004 }
1005+
1006+ #[ test]
1007+ fn test_extract_path_prefix ( ) {
1008+ assert_eq ! ( extract_path_prefix( "s3://bucket" ) , "" ) ;
1009+ assert_eq ! ( extract_path_prefix( "s3://bucket/" ) , "" ) ;
1010+ assert_eq ! ( extract_path_prefix( "s3://bucket/shuffle" ) , "shuffle" ) ;
1011+ assert_eq ! (
1012+ extract_path_prefix( "s3://bucket/shuffle/prefix" ) ,
1013+ "shuffle/prefix"
1014+ ) ;
1015+ assert_eq ! (
1016+ extract_path_prefix( "s3://bucket/shuffle/prefix/" ) ,
1017+ "shuffle/prefix"
1018+ ) ;
1019+ assert_eq ! (
1020+ extract_path_prefix( "abfs://container@account.dfs.core.windows.net/shuffle" ) ,
1021+ "shuffle"
1022+ ) ;
1023+ assert_eq ! ( extract_path_prefix( "not-a-url" ) , "" ) ;
1024+ }
1025+
1026+ /// Builds an `ObjectStoreShuffleStorage` over the supplied [`InMemory`] store
1027+ /// with the same `PrefixStore`-based wiring `new_s3` / `new_azure` apply in
1028+ /// production. Returns the inner store so tests can directly inspect the
1029+ /// final object key (the wrapped store would strip the prefix on listing).
1030+ fn build_storage_for_test (
1031+ base_url : & str ,
1032+ ) -> ( ObjectStoreShuffleStorage , Arc < dyn ObjectStore > ) {
1033+ let inner: Arc < dyn ObjectStore > = Arc :: new ( object_store:: memory:: InMemory :: new ( ) ) ;
1034+ let path_prefix = extract_path_prefix ( base_url) ;
1035+ let store: Arc < dyn ObjectStore > = if path_prefix. is_empty ( ) {
1036+ Arc :: clone ( & inner)
1037+ } else {
1038+ Arc :: new ( PrefixStore :: new ( Arc :: clone ( & inner) , path_prefix. clone ( ) ) )
1039+ } ;
1040+ let storage = ObjectStoreShuffleStorage {
1041+ store,
1042+ base_url : base_url. to_string ( ) ,
1043+ path_prefix,
1044+ storage_type : ShuffleStorageType :: S3 ,
1045+ } ;
1046+ ( storage, inner)
1047+ }
1048+
1049+ /// `make_full_url` reports a full URL for downstream consumers but hands the
1050+ /// store a job-relative key — `PrefixStore` reattaches the URL path prefix.
1051+ #[ test]
1052+ fn test_make_full_url_returns_relative_object_path ( ) {
1053+ let ( storage, _inner) = build_storage_for_test ( "s3://my-bucket/shuffle/prefix" ) ;
1054+
1055+ let ( full_url, object_path) = storage. make_full_url ( "job_a" , 1 , 40 , 40 , "arrow" ) ;
1056+ assert_eq ! (
1057+ full_url,
1058+ "s3://my-bucket/shuffle/prefix/job_a/1/40/data.arrow"
1059+ ) ;
1060+ assert_eq ! ( object_path. as_ref( ) , "job_a/1/40/data.arrow" ) ;
1061+ }
1062+
1063+ #[ test]
1064+ fn test_make_full_url_no_prefix_round_trip ( ) {
1065+ let ( storage, _inner) = build_storage_for_test ( "s3://my-bucket" ) ;
1066+
1067+ let ( full_url, object_path) = storage. make_full_url ( "job_a" , 1 , 0 , 0 , "arrow" ) ;
1068+ assert_eq ! ( full_url, "s3://my-bucket/job_a/1/0/data.arrow" ) ;
1069+ assert_eq ! ( object_path. as_ref( ) , "job_a/1/0/data.arrow" ) ;
1070+ }
1071+
1072+ /// Regression test for the writer-side prefix bug: an end-to-end write must
1073+ /// land under the URL path prefix in the underlying bucket. Before the fix,
1074+ /// the object landed at `job_a/1/0/data.arrow` while the reader looked under
1075+ /// `shuffle/prefix/job_a/1/0/data.arrow` and got NotFound.
1076+ #[ tokio:: test]
1077+ async fn test_object_store_round_trip_with_prefix ( ) {
1078+ let ( storage, inner) = build_storage_for_test ( "s3://my-bucket/shuffle/prefix" ) ;
1079+
1080+ let ( batch, schema) = create_test_batch ( ) ;
1081+ let metrics = ExecutionPlanMetricsSet :: new ( ) ;
1082+ let time_metric =
1083+ metrics:: MetricBuilder :: new ( & metrics) . subset_time ( "write_time" , 0 ) ;
1084+
1085+ let ( full_url, _stats) = storage
1086+ . write_shuffle_data ( "job_a" , 1 , 0 , 0 , vec ! [ batch] , schema, & time_metric)
1087+ . await
1088+ . unwrap ( ) ;
1089+ assert_eq ! (
1090+ full_url,
1091+ "s3://my-bucket/shuffle/prefix/job_a/1/0/data.arrow"
1092+ ) ;
1093+
1094+ // The actual S3 key in the underlying bucket must include the URL path prefix.
1095+ let inner_keys: Vec < String > = inner
1096+ . list ( None )
1097+ . filter_map ( |r| async move { r. ok ( ) . map ( |m| m. location . to_string ( ) ) } )
1098+ . collect :: < Vec < _ > > ( )
1099+ . await ;
1100+ assert_eq ! ( inner_keys, vec![ "shuffle/prefix/job_a/1/0/data.arrow" ] ) ;
1101+
1102+ let read_batches = storage. read_shuffle_data ( & full_url) . await . unwrap ( ) ;
1103+ assert_eq ! ( read_batches. len( ) , 1 ) ;
1104+ assert_eq ! ( read_batches[ 0 ] . num_rows( ) , 3 ) ;
1105+
1106+ storage. delete_job_data ( "job_a" ) . await . unwrap ( ) ;
1107+ assert ! ( storage. read_shuffle_data( & full_url) . await . is_err( ) ) ;
1108+ }
9511109}
0 commit comments