11use aws_config:: SdkConfig ;
22use aws_credential_types:: provider:: ProvideCredentials ;
3- use aws_sigv4:: { http_request:: { self , SignableBody , SignableRequest , SignatureLocation , SigningSettings } , sign:: v4} ;
3+ use aws_sigv4:: {
4+ http_request:: { self , SignableBody , SignableRequest , SignatureLocation , SigningSettings } ,
5+ sign:: v4,
6+ } ;
47use base64:: { engine:: general_purpose:: URL_SAFE_NO_PAD , Engine as _} ;
8+ use std:: {
9+ error:: Error ,
10+ time:: { Duration as StdDuration , SystemTime } ,
11+ } ;
512use time:: { macros:: format_description, Duration , OffsetDateTime , PrimitiveDateTime } ;
613use tokio:: runtime:: Runtime ;
7- use std:: { error:: Error , time:: { SystemTime , Duration as StdDuration } } ;
814use url:: Url ;
915
1016const ACTION_TYPE : & str = "Action" ;
@@ -15,128 +21,147 @@ const DATE_QUERY_KEY: &str = "X-Amz-Date";
1521const EXPIRES_QUERY_KEY : & str = "X-Amz-Expires" ;
1622const DEFAULT_EXPIRY_SECONDS : u64 = 900 ;
1723
18- const APP_NAME : & str = "constellation-processors" ;
19- const USER_AGENT_VERSION : & str = "1.0" ;
24+ const USER_AGENT_VALUE : & str = "aws-msk-iam-sasl-signer-rust" ;
2025
2126#[ derive( Clone ) ]
2227pub struct TokenInfo {
23- pub token : String ,
24- pub expiration_time : OffsetDateTime ,
28+ pub token : String ,
29+ // TODO(djandries): change to UTCDateTime
30+ pub expiration_time : OffsetDateTime ,
2531}
2632pub struct MSKIAMAuthManager {
27- token_info : Option < TokenInfo > ,
33+ token_info : Option < TokenInfo > ,
2834}
2935
3036impl MSKIAMAuthManager {
31- pub fn new ( ) -> Self {
32- Self {
33- token_info : None ,
34- }
35- }
36-
37- pub fn get_auth_token ( & mut self ) -> Result < TokenInfo , Box < dyn Error > > {
38- if let Some ( token_info) = & self . token_info {
39- if token_info. expiration_time > OffsetDateTime :: now_utc ( ) {
40- return Ok ( token_info. clone ( ) ) ;
41- }
42- }
43-
44- let token_info = Runtime :: new ( ) ?. block_on ( generate_auth_token_async ( ) ) ?;
45- self . token_info = Some ( token_info. clone ( ) ) ;
46- Ok ( token_info)
37+ pub fn new ( ) -> Self {
38+ Self { token_info : None }
39+ }
40+
41+ pub fn get_auth_token ( & mut self ) -> Result < TokenInfo , Box < dyn Error > > {
42+ if let Some ( token_info) = & self . token_info {
43+ if token_info. expiration_time > OffsetDateTime :: now_utc ( ) {
44+ return Ok ( token_info. clone ( ) ) ;
45+ }
4746 }
47+
48+ let token_info = Runtime :: new ( ) ?. block_on ( generate_auth_token_async ( ) ) ?;
49+ self . token_info = Some ( token_info. clone ( ) ) ;
50+ Ok ( token_info)
51+ }
4852}
4953
5054async fn generate_auth_token_async ( ) -> Result < TokenInfo , Box < dyn Error > > {
51- debug ! ( "Generating MSK IAM auth token" ) ;
52- let config = aws_config:: from_env ( )
53- . load ( )
54- . await ;
55-
56- let mut url = build_request_url ( & config) ?;
57-
58- sign_request_url ( & mut url, & config) . await ?;
59-
60- let expiration_time = get_expiration_time ( & url) ?;
61-
62- add_user_agent ( & mut url) ;
63-
64- let encoded = URL_SAFE_NO_PAD . encode ( url. as_str ( ) . as_bytes ( ) ) ;
65-
66- Ok ( TokenInfo {
67- token : encoded,
68- expiration_time,
69- } )
55+ debug ! ( "Generating MSK IAM auth token" ) ;
56+ let config = aws_config:: from_env ( ) . load ( ) . await ;
57+
58+ let mut url = build_request_url ( & config) ?;
59+
60+ sign_request_url ( & mut url, & config) . await ?;
61+
62+ let expiration_time = get_expiration_time ( & url) ?;
63+
64+ url
65+ . query_pairs_mut ( )
66+ . append_pair ( USER_AGENT_KEY , USER_AGENT_VALUE ) ;
67+
68+ let encoded = URL_SAFE_NO_PAD . encode ( url. as_str ( ) . as_bytes ( ) ) ;
69+
70+ error ! ( "url: {}" , encoded) ;
71+
72+ Ok ( TokenInfo {
73+ token : encoded,
74+ expiration_time,
75+ } )
7076}
7177
7278fn build_request_url ( config : & SdkConfig ) -> Result < Url , Box < dyn Error > > {
73- let endpoint_url = format ! ( "https://kafka.{}.amazonaws.com/" , config. region( ) . ok_or_else( || "AWS region is not set" ) ?. to_string( ) ) ;
74- let mut url = Url :: parse ( & endpoint_url) ?;
75-
76- {
77- let mut query_pairs = url. query_pairs_mut ( ) ;
78- query_pairs. append_pair ( ACTION_TYPE , ACTION_NAME ) ;
79- query_pairs. append_pair ( EXPIRES_QUERY_KEY , & DEFAULT_EXPIRY_SECONDS . to_string ( ) ) ;
80- }
79+ let endpoint_url = format ! (
80+ "https://kafka.{}.amazonaws.com/" ,
81+ config
82+ . region( )
83+ . ok_or_else( || "AWS region is not set" ) ?
84+ . to_string( )
85+ ) ;
86+ let mut url = Url :: parse ( & endpoint_url) ?;
87+
88+ {
89+ let mut query_pairs = url. query_pairs_mut ( ) ;
90+ query_pairs. append_pair ( ACTION_TYPE , ACTION_NAME ) ;
91+ query_pairs. append_pair ( EXPIRES_QUERY_KEY , & DEFAULT_EXPIRY_SECONDS . to_string ( ) ) ;
92+ }
8193
82- Ok ( url)
94+ Ok ( url)
8395}
8496
85- async fn sign_request_url (
86- url : & mut Url ,
87- config : & SdkConfig ,
88- ) -> Result < ( ) , Box < dyn Error > > {
89- let credentials_provider = config. credentials_provider ( ) . ok_or_else ( || "AWS credentials provider is not set" ) ?;
90- let credentials = credentials_provider. provide_credentials ( ) . await ?;
91-
92- let signable_request = SignableRequest :: new ( "GET" , url. as_str ( ) , std:: iter:: empty ( ) , SignableBody :: Bytes ( & [ ] ) ) ?;
93-
94- let identity = credentials. into ( ) ;
95- let region = config. region ( ) . ok_or_else ( || "AWS region is not set" ) ?. to_string ( ) ;
96- let mut signing_settings = SigningSettings :: default ( ) ;
97- signing_settings. signature_location = SignatureLocation :: QueryParams ;
98- signing_settings. expires_in = Some ( StdDuration :: from_secs ( DEFAULT_EXPIRY_SECONDS ) ) ;
99- let signing_params = v4:: SigningParams :: builder ( )
100- . identity ( & identity)
101- . region ( & region)
102- . name ( SIGNING_NAME )
103- . time ( SystemTime :: now ( ) )
104- . settings ( signing_settings)
105- . build ( ) ?
106- . into ( ) ;
107-
108- let signing_output = http_request:: sign (
109- signable_request,
110- & signing_params,
111- ) ?;
112-
113- for ( key, value) in signing_output. output ( ) . params ( ) {
114- url. query_pairs_mut ( ) . append_pair ( key, value) ;
115- }
116-
117- Ok ( ( ) )
97+ async fn sign_request_url ( url : & mut Url , config : & SdkConfig ) -> Result < ( ) , Box < dyn Error > > {
98+ let credentials_provider = config
99+ . credentials_provider ( )
100+ . ok_or_else ( || "AWS credentials provider is not set" ) ?;
101+ let credentials = credentials_provider. provide_credentials ( ) . await ?;
102+
103+ let signable_request = SignableRequest :: new (
104+ "GET" ,
105+ url. as_str ( ) ,
106+ std:: iter:: empty ( ) ,
107+ SignableBody :: Bytes ( & [ ] ) ,
108+ ) ?;
109+
110+ let identity = credentials. into ( ) ;
111+ let region = config
112+ . region ( )
113+ . ok_or_else ( || "AWS region is not set" ) ?
114+ . to_string ( ) ;
115+ let mut signing_settings = SigningSettings :: default ( ) ;
116+ signing_settings. signature_location = SignatureLocation :: QueryParams ;
117+ signing_settings. expires_in = Some ( StdDuration :: from_secs ( DEFAULT_EXPIRY_SECONDS ) ) ;
118+ let signing_params = v4:: SigningParams :: builder ( )
119+ . identity ( & identity)
120+ . region ( & region)
121+ . name ( SIGNING_NAME )
122+ . time ( SystemTime :: now ( ) )
123+ . settings ( signing_settings)
124+ . build ( ) ?
125+ . into ( ) ;
126+
127+ let signing_output = http_request:: sign ( signable_request, & signing_params) ?;
128+
129+ for ( key, value) in signing_output. output ( ) . params ( ) {
130+ url. query_pairs_mut ( ) . append_pair ( key, value) ;
131+ }
132+
133+ Ok ( ( ) )
118134}
119135
120136fn get_expiration_time ( url : & Url ) -> Result < OffsetDateTime , Box < dyn Error > > {
121- let date_str = url. query_pairs ( )
122- . find_map ( |( k, v) | if k == DATE_QUERY_KEY { Some ( v. to_string ( ) ) } else { None } )
123- . ok_or_else ( || "failed to find AWS signed date parameter" ) ?;
124-
125- let date_format_description = format_description ! ( "[year][month][day]T[hour][minute][second]Z" ) ;
126- let date = PrimitiveDateTime :: parse ( & date_str, date_format_description) ?. assume_utc ( ) ;
127-
128- let expiry_duration_seconds = url. query_pairs ( )
129- . find_map ( |( k, v) | if k == EXPIRES_QUERY_KEY { Some ( v. to_string ( ) ) } else { None } )
130- . ok_or_else ( || "failed to find AWS signed expiry parameter" ) ?
131- . parse :: < i64 > ( ) ?;
132-
133- let expiry_duration = Duration :: seconds ( expiry_duration_seconds) ;
134- let expiry_time = date + expiry_duration;
135-
136- Ok ( expiry_time)
137- }
137+ let date_str = url
138+ . query_pairs ( )
139+ . find_map ( |( k, v) | {
140+ if k == DATE_QUERY_KEY {
141+ Some ( v. to_string ( ) )
142+ } else {
143+ None
144+ }
145+ } )
146+ . ok_or_else ( || "failed to find AWS signed date parameter" ) ?;
147+
148+ let date_format_description = format_description ! ( "[year][month][day]T[hour][minute][second]Z" ) ;
149+ let date = PrimitiveDateTime :: parse ( & date_str, date_format_description) ?. assume_utc ( ) ;
150+
151+ let expiry_duration_seconds = url
152+ . query_pairs ( )
153+ . find_map ( |( k, v) | {
154+ if k == EXPIRES_QUERY_KEY {
155+ Some ( v. to_string ( ) )
156+ } else {
157+ None
158+ }
159+ } )
160+ . ok_or_else ( || "failed to find AWS signed expiry parameter" ) ?
161+ . parse :: < i64 > ( ) ?;
162+
163+ let expiry_duration = Duration :: seconds ( expiry_duration_seconds) ;
164+ let expiry_time = date + expiry_duration;
138165
139- fn add_user_agent ( url : & mut Url ) {
140- let user_agent = format ! ( "{}/{}/{}" , APP_NAME , USER_AGENT_VERSION , USER_AGENT_VERSION ) ;
141- url. query_pairs_mut ( ) . append_pair ( USER_AGENT_KEY , & user_agent) ;
166+ Ok ( expiry_time)
142167}
0 commit comments