11use std:: time:: Duration ;
22
33use aws_config:: { retry:: RetryConfig , timeout:: TimeoutConfig , BehaviorVersion } ;
4+ use aws_credential_types:: Credentials ;
45use aws_sdk_s3:: { config:: Builder , Client } ;
56use tokio_util:: bytes;
6- use tracing:: { error, info} ;
7+ use tracing:: { error, info, warn } ;
78
89#[ derive( Clone , Debug , Default ) ]
910pub struct S3Policy {
@@ -26,8 +27,9 @@ impl S3Policy {
2627 } ;
2728}
2829
29- pub async fn create_s3_client ( retry_policy : & S3Policy ) -> aws_sdk_s3:: Client {
30+ pub async fn create_s3_client ( retry_policy : & S3Policy , url : & str ) -> aws_sdk_s3:: Client {
3031 let sdk_config = aws_config:: load_defaults ( BehaviorVersion :: latest ( ) ) . await ;
32+
3133 let timeout_config = TimeoutConfig :: builder ( )
3234 . connect_timeout ( retry_policy. connect_timeout )
3335 . operation_attempt_timeout ( retry_policy. max_retries_timeout )
@@ -40,43 +42,63 @@ pub async fn create_s3_client(retry_policy: &S3Policy) -> aws_sdk_s3::Client {
4042 let config = Builder :: from ( & sdk_config)
4143 . timeout_config ( timeout_config)
4244 . retry_config ( retry_config)
45+ . endpoint_url ( url)
46+ . force_path_style ( true )
4347 . build ( ) ;
4448
4549 Client :: from_conf ( config)
4650}
4751
48- pub async fn default_aws_s3_client ( ) -> AwsS3Client {
49- let s3_client = create_s3_client ( & S3Policy :: DEFAULT ) . await ;
50- AwsS3Client { s3_client }
51- }
52-
53- // Let's wrap Aws Client to have an interface for it so we can mock it.
52+ // Let's wrap Aws access to have an interface for it so we can mock it.
5453#[ derive( Clone ) ]
55- pub struct AwsS3Client {
56- pub s3_client : Client ,
57- }
54+ pub struct AwsS3Client { }
5855
5956impl AwsS3Interface for AwsS3Client {
60- async fn get_bucket_key ( & self , bucket : & str , key : & str ) -> anyhow:: Result < bytes:: Bytes > {
61- let result = self
62- . s3_client
57+ async fn get_bucket_key (
58+ & self ,
59+ url : & str ,
60+ bucket : & str ,
61+ key : & str ,
62+ ) -> anyhow:: Result < bytes:: Bytes > {
63+ Ok ( create_s3_client ( & S3Policy :: DEFAULT , url)
64+ . await
6365 . get_object ( )
6466 . bucket ( bucket)
6567 . key ( key)
6668 . send ( )
67- . await ?;
68- Ok ( result. body . collect ( ) . await ?. into_bytes ( ) )
69+ . await ?
70+ . body
71+ . collect ( )
72+ . await ?
73+ . into_bytes ( ) )
6974 }
7075}
7176
7277pub trait AwsS3Interface {
7378 fn get_bucket_key (
7479 & self ,
80+ url : & str ,
7581 bucket : & str ,
7682 key : & str ,
7783 ) -> impl std:: future:: Future < Output = anyhow:: Result < bytes:: Bytes > > ;
7884}
7985
86+ fn split_url ( s3_bucket_url : & String ) -> anyhow:: Result < ( String , String ) > {
87+ let parsed_url_and_bucket = url:: Url :: parse ( s3_bucket_url) ?;
88+ let bucket = parsed_url_and_bucket. path ( ) ;
89+ let scheme = parsed_url_and_bucket. scheme ( ) ;
90+ let host = s3_bucket_url. replace ( bucket, "" ) . trim_end_matches ( '/' ) . to_owned ( ) ;
91+ let host = if host. contains ( "minio:9000" ) {
92+ // TODO: replace by docker configuration
93+ warn ! ( s3_bucket_url, "Using localhost for minio access" ) ;
94+ host. replace ( "minio:9000" , "172.17.0.1:9000" )
95+ } else {
96+ host. to_owned ( )
97+ } ;
98+ info ! ( s3_bucket_url, host, bucket, "Parsed S3 url" ) ;
99+ Ok ( ( host. to_owned ( ) , bucket. to_owned ( ) ) )
100+ }
101+
80102pub async fn download_key_from_s3 < A : AwsS3Interface > (
81103 s3_client : & A ,
82104 s3_bucket_urls : & [ String ] ,
@@ -92,7 +114,11 @@ pub async fn download_key_from_s3<A: AwsS3Interface>(
92114 key_path,
93115 s3_bucket_url, i_s3_bucket_url, nb_urls, url_index, "Try downloading"
94116 ) ;
95- let result = s3_client. get_bucket_key ( s3_bucket_url, & key_path) . await ;
117+ let Ok ( ( url, bucket) ) = split_url ( s3_bucket_url) else {
118+ error ! ( s3_bucket_url, "Failed to parse S3 url" ) ;
119+ continue ;
120+ } ;
121+ let result = s3_client. get_bucket_key ( & url, & bucket, & key_path) . await ;
96122 let Ok ( result) = result else {
97123 error ! ( s3_bucket_url, key_path, result = ?result, "Downloading failed" ) ;
98124 continue ;
0 commit comments