11use std:: time:: Duration ;
22
33use aws_config:: { retry:: RetryConfig , timeout:: TimeoutConfig , BehaviorVersion } ;
4+ use aws_credential_types:: Credentials ;
45use aws_sdk_s3:: { config:: Builder , Client } ;
56use tokio_util:: bytes;
6- use tracing:: { error, info} ;
7+ use tracing:: { error, info, warn } ;
78
89#[ derive( Clone , Debug , Default ) ]
910pub struct S3Policy {
@@ -26,8 +27,11 @@ impl S3Policy {
2627 } ;
2728}
2829
29- pub async fn create_s3_client ( retry_policy : & S3Policy ) -> aws_sdk_s3:: Client {
30+ pub async fn create_s3_client ( retry_policy : & S3Policy , url : & str ) -> aws_sdk_s3:: Client {
3031 let sdk_config = aws_config:: load_defaults ( BehaviorVersion :: latest ( ) ) . await ;
32+ let access_key_id = std:: env:: var ( "AWS_KEY_ID" ) . unwrap ( ) ;
33+ let secret_key_id = std:: env:: var ( "AWS_SECRET_KEY" ) . unwrap ( ) ; // TODO error
34+
3135 let timeout_config = TimeoutConfig :: builder ( )
3236 . connect_timeout ( retry_policy. connect_timeout )
3337 . operation_attempt_timeout ( retry_policy. max_retries_timeout )
@@ -37,46 +41,77 @@ pub async fn create_s3_client(retry_policy: &S3Policy) -> aws_sdk_s3::Client {
3741 . with_max_attempts ( retry_policy. max_attempt )
3842 . with_max_backoff ( retry_policy. max_backoff ) ;
3943
44+ let credentials = Credentials :: new (
45+ access_key_id,
46+ secret_key_id,
47+ None , // Session token (optional)
48+ None , // Expiry (optional)
49+ "env" , // Provider name
50+ ) ;
51+
4052 let config = Builder :: from ( & sdk_config)
4153 . timeout_config ( timeout_config)
4254 . retry_config ( retry_config)
55+ . endpoint_url ( url)
56+ . credentials_provider ( credentials)
57+ . force_path_style ( true )
4358 . build ( ) ;
4459
4560 Client :: from_conf ( config)
4661}
4762
48- pub async fn default_aws_s3_client ( ) -> AwsS3Client {
49- let s3_client = create_s3_client ( & S3Policy :: DEFAULT ) . await ;
50- AwsS3Client { s3_client }
51- }
52-
53- // Let's wrap Aws Client to have an interface for it so we can mock it.
63+ // Let's wrap Aws access to have an interface for it so we can mock it.
5464#[ derive( Clone ) ]
55- pub struct AwsS3Client {
56- pub s3_client : Client ,
57- }
65+ pub struct AwsS3Client { }
5866
5967impl AwsS3Interface for AwsS3Client {
60- async fn get_bucket_key ( & self , bucket : & str , key : & str ) -> anyhow:: Result < bytes:: Bytes > {
61- let result = self
62- . s3_client
68+ async fn get_bucket_key (
69+ & self ,
70+ url : & str ,
71+ bucket : & str ,
72+ key : & str ,
73+ ) -> anyhow:: Result < bytes:: Bytes > {
74+ // let parsed_url_and_bucket = url::Url::parse(_url_and_bucket)?;
75+ // let url = parsed_url_and_bucket.
76+ Ok ( create_s3_client ( & S3Policy :: DEFAULT , url)
77+ . await
6378 . get_object ( )
6479 . bucket ( bucket)
6580 . key ( key)
6681 . send ( )
67- . await ?;
68- Ok ( result. body . collect ( ) . await ?. into_bytes ( ) )
82+ . await ?
83+ . body
84+ . collect ( )
85+ . await ?
86+ . into_bytes ( ) )
6987 }
7088}
7189
7290pub trait AwsS3Interface {
7391 fn get_bucket_key (
7492 & self ,
93+ url : & str ,
7594 bucket : & str ,
7695 key : & str ,
7796 ) -> impl std:: future:: Future < Output = anyhow:: Result < bytes:: Bytes > > ;
7897}
7998
99+ fn split_url ( s3_bucket_url : & String ) -> anyhow:: Result < ( String , String ) > {
100+ let parsed_url_and_bucket = url:: Url :: parse ( s3_bucket_url) ?;
101+ let bucket = parsed_url_and_bucket. path ( ) ;
102+ let scheme = parsed_url_and_bucket. scheme ( ) ;
103+ let host = s3_bucket_url. replace ( bucket, "" ) . trim_end_matches ( '/' ) . to_owned ( ) ;
104+ let host = if host. contains ( "minio:9000" ) {
105+ // TODO: replace by docker configuration
106+ warn ! ( s3_bucket_url, "Using localhost for minio access" ) ;
107+ host. replace ( "minio:9000" , "172.17.0.1:9000" )
108+ } else {
109+ host. to_owned ( )
110+ } ;
111+ info ! ( s3_bucket_url, host, bucket, "Parsed S3 url" ) ;
112+ Ok ( ( host. to_owned ( ) , bucket. to_owned ( ) ) )
113+ }
114+
80115pub async fn download_key_from_s3 < A : AwsS3Interface > (
81116 s3_client : & A ,
82117 s3_bucket_urls : & [ String ] ,
@@ -92,7 +127,11 @@ pub async fn download_key_from_s3<A: AwsS3Interface>(
92127 key_path,
93128 s3_bucket_url, i_s3_bucket_url, nb_urls, url_index, "Try downloading"
94129 ) ;
95- let result = s3_client. get_bucket_key ( s3_bucket_url, & key_path) . await ;
130+ let Ok ( ( url, bucket) ) = split_url ( s3_bucket_url) else {
131+ error ! ( s3_bucket_url, "Failed to parse S3 url" ) ;
132+ continue ;
133+ } ;
134+ let result = s3_client. get_bucket_key ( & url, & bucket, & key_path) . await ;
96135 let Ok ( result) = result else {
97136 error ! ( s3_bucket_url, key_path, result = ?result, "Downloading failed" ) ;
98137 continue ;
0 commit comments