@@ -3,7 +3,7 @@ use std::time::Duration;
33use aws_config:: { retry:: RetryConfig , timeout:: TimeoutConfig , BehaviorVersion } ;
44use aws_sdk_s3:: { config:: Builder , Client } ;
55use tokio_util:: bytes;
6- use tracing:: { error, info} ;
6+ use tracing:: { error, info, warn } ;
77
88#[ derive( Clone , Debug , Default ) ]
99pub struct S3Policy {
@@ -26,8 +26,16 @@ impl S3Policy {
2626 } ;
2727}
2828
29- pub async fn create_s3_client ( retry_policy : & S3Policy ) -> aws_sdk_s3:: Client {
29+ use aws_sdk_s3:: config:: ProvideCredentials ;
30+ pub async fn create_s3_client ( retry_policy : & S3Policy , url : & str ) -> aws_sdk_s3:: Client {
3031 let sdk_config = aws_config:: load_defaults ( BehaviorVersion :: latest ( ) ) . await ;
32+
33+ let credentials = sdk_config. credentials_provider ( ) . unwrap ( ) . provide_credentials ( ) . await . unwrap ( ) ;
34+ info ! ( access_key = %credentials. access_key_id( ) , "Loaded AWS credentials" ) ;
35+
36+ let region = sdk_config. region ( ) ;
37+ info ! ( region = ?region, "Using AWS region" ) ;
38+
3139 let timeout_config = TimeoutConfig :: builder ( )
3240 . connect_timeout ( retry_policy. connect_timeout )
3341 . operation_attempt_timeout ( retry_policy. max_retries_timeout )
@@ -40,43 +48,62 @@ pub async fn create_s3_client(retry_policy: &S3Policy) -> aws_sdk_s3::Client {
4048 let config = Builder :: from ( & sdk_config)
4149 . timeout_config ( timeout_config)
4250 . retry_config ( retry_config)
51+ . endpoint_url ( url)
4352 . build ( ) ;
4453
4554 Client :: from_conf ( config)
4655}
4756
48- pub async fn default_aws_s3_client ( ) -> AwsS3Client {
49- let s3_client = create_s3_client ( & S3Policy :: DEFAULT ) . await ;
50- AwsS3Client { s3_client }
51- }
52-
53- // Let's wrap Aws Client to have an interface for it so we can mock it.
57+ // Let's wrap Aws access to have an interface for it so we can mock it.
5458#[ derive( Clone ) ]
55- pub struct AwsS3Client {
56- pub s3_client : Client ,
57- }
59+ pub struct AwsS3Client { }
5860
5961impl AwsS3Interface for AwsS3Client {
60- async fn get_bucket_key ( & self , bucket : & str , key : & str ) -> anyhow:: Result < bytes:: Bytes > {
61- let result = self
62- . s3_client
62+ async fn get_bucket_key (
63+ & self ,
64+ url : & str ,
65+ bucket : & str ,
66+ key : & str ,
67+ ) -> anyhow:: Result < bytes:: Bytes > {
68+ Ok ( create_s3_client ( & S3Policy :: DEFAULT , url)
69+ . await
6370 . get_object ( )
6471 . bucket ( bucket)
6572 . key ( key)
6673 . send ( )
67- . await ?;
68- Ok ( result. body . collect ( ) . await ?. into_bytes ( ) )
74+ . await ?
75+ . body
76+ . collect ( )
77+ . await ?
78+ . into_bytes ( ) )
6979 }
7080}
7181
7282pub trait AwsS3Interface {
7383 fn get_bucket_key (
7484 & self ,
85+ url : & str ,
7586 bucket : & str ,
7687 key : & str ,
7788 ) -> impl std:: future:: Future < Output = anyhow:: Result < bytes:: Bytes > > ;
7889}
7990
91+ fn split_url ( s3_bucket_url : & String ) -> anyhow:: Result < ( String , String ) > {
92+ let parsed_url_and_bucket = url:: Url :: parse ( s3_bucket_url) ?;
93+ let bucket = parsed_url_and_bucket. path ( ) ;
94+ let host = s3_bucket_url. replace ( bucket, "" ) . trim_end_matches ( '/' ) . to_owned ( ) ;
95+ let host = if host. contains ( "minio:9000" ) {
96+ // TODO: replace by docker configuration
97+ warn ! ( s3_bucket_url, "Using localhost for minio access" ) ;
98+ host. replace ( "minio:9000" , "172.17.0.1:9000" )
99+ } else {
100+ host. to_owned ( )
101+ } ;
102+ let bucket = bucket. trim_start_matches ( '/' ) ;
103+ info ! ( s3_bucket_url, host, bucket, "Parsed S3 url" ) ;
104+ Ok ( ( host. to_owned ( ) , bucket. to_owned ( ) ) )
105+ }
106+
80107pub async fn download_key_from_s3 < A : AwsS3Interface > (
81108 s3_client : & A ,
82109 s3_bucket_urls : & [ String ] ,
@@ -92,7 +119,11 @@ pub async fn download_key_from_s3<A: AwsS3Interface>(
92119 key_path,
93120 s3_bucket_url, i_s3_bucket_url, nb_urls, url_index, "Try downloading"
94121 ) ;
95- let result = s3_client. get_bucket_key ( s3_bucket_url, & key_path) . await ;
122+ let Ok ( ( url, bucket) ) = split_url ( s3_bucket_url) else {
123+ error ! ( s3_bucket_url, "Failed to parse S3 url" ) ;
124+ continue ;
125+ } ;
126+ let result = s3_client. get_bucket_key ( & url, & bucket, & key_path) . await ;
96127 let Ok ( result) = result else {
97128 error ! ( s3_bucket_url, key_path, result = ?result, "Downloading failed" ) ;
98129 continue ;
0 commit comments