@@ -29,7 +29,6 @@ use futures::TryStreamExt;
2929use futures:: stream:: BoxStream ;
3030use mea:: mutex:: Mutex ;
3131use mea:: oneshot;
32- use object_store:: ListResult ;
3332use object_store:: MultipartUpload ;
3433use object_store:: ObjectMeta ;
3534use object_store:: ObjectStore ;
@@ -41,6 +40,7 @@ use object_store::path::Path;
4140use object_store:: { GetOptions , UploadPart } ;
4241use object_store:: { GetRange , GetResultPayload } ;
4342use object_store:: { GetResult , PutMode } ;
43+ use object_store:: { ListResult , RenameOptions } ;
4444use opendal:: Buffer ;
4545use opendal:: Writer ;
4646use opendal:: options:: CopyOptions ;
@@ -227,23 +227,6 @@ impl ObjectStore for OpendalStore {
227227 Ok ( PutResult { e_tag, version } )
228228 }
229229
230- async fn put_multipart (
231- & self ,
232- location : & Path ,
233- ) -> object_store:: Result < Box < dyn MultipartUpload > > {
234- let decoded_location = percent_decode_path ( location. as_ref ( ) ) ;
235- let writer = self
236- . inner
237- . writer_with ( & decoded_location)
238- . concurrent ( 8 )
239- . into_send ( )
240- . await
241- . map_err ( |err| format_object_store_error ( err, location. as_ref ( ) ) ) ?;
242- let upload = OpendalMultipartUpload :: new ( writer, location. clone ( ) ) ;
243-
244- Ok ( Box :: new ( upload) )
245- }
246-
247230 async fn put_multipart_opts (
248231 & self ,
249232 location : & Path ,
@@ -430,15 +413,27 @@ impl ObjectStore for OpendalStore {
430413 } )
431414 }
432415
433- async fn delete ( & self , location : & Path ) -> object_store:: Result < ( ) > {
434- let decoded_location = percent_decode_path ( location. as_ref ( ) ) ;
435- self . inner
436- . delete ( & decoded_location)
437- . into_send ( )
438- . await
439- . map_err ( |err| format_object_store_error ( err, location. as_ref ( ) ) ) ?;
440-
441- Ok ( ( ) )
416+ fn delete_stream (
417+ & self ,
418+ locations : BoxStream < ' static , object_store:: Result < Path > > ,
419+ ) -> BoxStream < ' static , object_store:: Result < Path > > {
420+ // TODO: use batch delete to optimize performance
421+ let client = self . inner . clone ( ) ;
422+ locations
423+ . then ( move |location| {
424+ let client = client. clone ( ) ;
425+ async move {
426+ let location = location?;
427+ let decoded_location = percent_decode_path ( location. as_ref ( ) ) ;
428+ client
429+ . delete ( & decoded_location)
430+ . into_send ( )
431+ . await
432+ . map_err ( |err| format_object_store_error ( err, location. as_ref ( ) ) ) ?;
433+ Ok ( location)
434+ }
435+ } )
436+ . boxed ( )
442437 }
443438
444439 fn list ( & self , prefix : Option < & Path > ) -> BoxStream < ' static , object_store:: Result < ObjectMeta > > {
@@ -576,11 +571,23 @@ impl ObjectStore for OpendalStore {
576571 } )
577572 }
578573
579- async fn copy ( & self , from : & Path , to : & Path ) -> object_store:: Result < ( ) > {
580- self . copy_request ( from, to, false ) . await
574+ async fn copy_opts (
575+ & self ,
576+ from : & Path ,
577+ to : & Path ,
578+ options : object_store:: CopyOptions ,
579+ ) -> object_store:: Result < ( ) > {
580+ let if_not_exists = matches ! ( options. mode, object_store:: CopyMode :: Create ) ;
581+ self . copy_request ( from, to, if_not_exists) . await
581582 }
582583
583- async fn rename ( & self , from : & Path , to : & Path ) -> object_store:: Result < ( ) > {
584+ async fn rename_opts (
585+ & self ,
586+ from : & Path ,
587+ to : & Path ,
588+ // TODO: if we need to support rename options in the future
589+ _options : RenameOptions ,
590+ ) -> object_store:: Result < ( ) > {
584591 self . inner
585592 . rename (
586593 & percent_decode_path ( from. as_ref ( ) ) ,
@@ -592,10 +599,6 @@ impl ObjectStore for OpendalStore {
592599
593600 Ok ( ( ) )
594601 }
595-
596- async fn copy_if_not_exists ( & self , from : & Path , to : & Path ) -> object_store:: Result < ( ) > {
597- self . copy_request ( from, to, true ) . await
598- }
599602}
600603
601604/// `MultipartUpload`'s impl based on `Writer` in opendal
@@ -691,7 +694,7 @@ impl Debug for OpendalMultipartUpload {
691694mod tests {
692695 use bytes:: Bytes ;
693696 use object_store:: path:: Path ;
694- use object_store:: { ObjectStore , WriteMultipart } ;
697+ use object_store:: { ObjectStore , ObjectStoreExt , WriteMultipart } ;
695698 use opendal:: services;
696699 use rand:: prelude:: * ;
697700 use std:: sync:: Arc ;
0 commit comments