Skip to content

Commit 449f948

Browse files
committed
feat(aws): auto-refresh credentials via reqsign Signer
Replace manual AwsCredential passing with reqsign's Signer<Credential> which handles credential caching and lazy refresh internally. This enables production deployments (EC2/ECS/EKS) where temporary credentials from IMDS/STS expire periodically. - Add context.rs with TokioFileRead, TokioCommandExecute, ReqwestHttpSend implementations for reqsign Context (hand-written async, no async-trait dep) - Replace S3Options.credential with S3Options.signer (reqsign Signer) - Simplify sign.rs to delegate to signer.sign() instead of manual RequestSigner - Add AmazonS3Builder::default_credential_provider() for auto-detect chain (env, profile, SSO, IRSA, ECS, IMDS), feature-gated to tokio-http/web-http - Update fusio-manifest s3::Builder with matching default_credential_provider() - Rewrite tests to use AmazonS3Builder instead of direct S3Options construction
1 parent c46b2b3 commit 449f948

File tree

11 files changed

+258
-151
lines changed

11 files changed

+258
-151
lines changed

fusio-manifest/src/s3.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ where
105105
region: Option<String>,
106106
endpoint: Option<String>,
107107
credential: Option<AwsCredential>,
108+
use_default_credential_provider: bool,
108109
sign_payload: bool,
109110
checksum: bool,
110111
opts: Arc<ManifestContext<R, E>>,
@@ -128,6 +129,7 @@ impl Builder<DefaultRetention, DefaultExecutor> {
128129
region: None,
129130
endpoint: None,
130131
credential: None,
132+
use_default_credential_provider: false,
131133
sign_payload: false,
132134
checksum: false,
133135
opts: Arc::new(opts),
@@ -180,6 +182,15 @@ where
180182
self
181183
}
182184

185+
/// Use the default credential provider chain (env, profile, SSO, IRSA, ECS, IMDS).
186+
///
187+
/// Available with the `tokio` feature or with `web` on `wasm32`.
188+
#[cfg(any(feature = "tokio", all(feature = "web", target_arch = "wasm32")))]
189+
pub fn default_credential_provider(mut self) -> Self {
190+
self.use_default_credential_provider = true;
191+
self
192+
}
193+
183194
/// Enable SigV4 payload signing (default: false). Recommended for AWS/MinIO.
184195
pub fn sign_payload(mut self, yes: bool) -> Self {
185196
self.sign_payload = yes;
@@ -212,6 +223,7 @@ where
212223
region: self.region,
213224
endpoint: self.endpoint,
214225
credential: self.credential,
226+
use_default_credential_provider: self.use_default_credential_provider,
215227
sign_payload: self.sign_payload,
216228
checksum: self.checksum,
217229
opts: opts.into(),
@@ -224,6 +236,10 @@ where
224236
if let Some(cred) = self.credential {
225237
b = b.credential(cred);
226238
}
239+
#[cfg(any(feature = "tokio", all(feature = "web", target_arch = "wasm32")))]
240+
if self.use_default_credential_provider {
241+
b = b.default_credential_provider();
242+
}
227243
if let Some(ep) = self.endpoint {
228244
b = b.endpoint(ep);
229245
}
@@ -256,6 +272,7 @@ where
256272
region: self.region.clone(),
257273
endpoint: self.endpoint.clone(),
258274
credential: self.credential.clone(),
275+
use_default_credential_provider: self.use_default_credential_provider,
259276
sign_payload: self.sign_payload,
260277
checksum: self.checksum,
261278
opts: self.opts.clone(),

fusio-manifest/src/session.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1121,7 +1121,8 @@ mod tests {
11211121
assert_eq!(keys_equal_to_v1, 2);
11221122

11231123
session.put(key, "v2".to_owned());
1124-
return session.commit().await.map(|_| ()); // This should fail for one of the transactions
1124+
return session.commit().await.map(|_| ()); // This should fail for one of the
1125+
// transactions
11251126
}
11261127

11271128
// 2. Run both transactions concurrently.

fusio/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ aws = [
2424
"serde",
2525
"serde_json",
2626
"serde_urlencoded",
27+
"tokio?/process",
2728
]
2829
bytes = ["dep:bytes", "fusio-core/bytes"]
2930
completion-based = ["fusio-core/completion-based"]

fusio/src/fs/mod.rs

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,7 @@ mod tests {
110110
fs::{Fs, OpenOptions},
111111
impls::disk::tokio::fs::TokioFs,
112112
path::Path,
113-
remotes::{
114-
aws::{credential::AwsCredential, fs::AmazonS3, options::S3Options, s3::S3File},
115-
http::tokio::TokioClient,
116-
},
113+
remotes::aws::{credential::AwsCredential, fs::AmazonS3Builder, s3::S3File},
117114
DynFs, Read, Write,
118115
};
119116

@@ -122,25 +119,18 @@ mod tests {
122119
.map_err(|err| Error::Path(Box::new(err)))?;
123120
let s3_path: Path = "s3_copy_test.file".into();
124121

125-
let key_id = "user".to_string();
126-
let secret_key = "password".to_string();
127-
128-
let client = TokioClient::new();
129-
let region = "ap-southeast-1";
130-
let options = S3Options {
131-
endpoint: "http://localhost:9000/data".into(),
132-
bucket: "data".to_string(),
133-
credential: Some(AwsCredential {
134-
key_id,
135-
secret_key,
136-
token: None,
137-
}),
138-
region: region.into(),
139-
sign_payload: true,
140-
checksum: false,
141-
};
142-
143-
let s3_fs = Arc::new(AmazonS3::new(Box::new(client), options));
122+
let s3_fs = Arc::new(
123+
AmazonS3Builder::new("data".to_string())
124+
.endpoint("http://localhost:9000".to_string())
125+
.region("ap-southeast-1".to_string())
126+
.credential(AwsCredential {
127+
key_id: "user".to_string(),
128+
secret_key: "password".to_string(),
129+
token: None,
130+
})
131+
.sign_payload(true)
132+
.build(),
133+
);
144134
let local_fs = Arc::new(TokioFs);
145135

146136
{
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
use reqsign_core::Context;
2+
3+
cfg_if::cfg_if! {
4+
if #[cfg(all(feature = "tokio", not(target_arch = "wasm32")))] {
5+
#[derive(Debug)]
6+
pub(crate) struct TokioFileRead;
7+
8+
impl reqsign_core::FileRead for TokioFileRead {
9+
fn file_read<'a, 'b, 'c>(
10+
&'a self,
11+
path: &'b str,
12+
) -> std::pin::Pin<
13+
Box<dyn std::future::Future<Output = reqsign_core::Result<Vec<u8>>> + Send + 'c>,
14+
>
15+
where
16+
'a: 'c,
17+
'b: 'c,
18+
Self: 'c,
19+
{
20+
Box::pin(async move { tokio::fs::read(path).await.map_err(Into::into) })
21+
}
22+
}
23+
24+
#[derive(Debug)]
25+
pub(crate) struct TokioCommandExecute;
26+
27+
impl reqsign_core::CommandExecute for TokioCommandExecute {
28+
fn command_execute<'a, 'b, 'c, 'd, 'fut>(
29+
&'a self,
30+
program: &'b str,
31+
args: &'c [&'d str],
32+
) -> std::pin::Pin<
33+
Box<
34+
dyn std::future::Future<Output = reqsign_core::Result<reqsign_core::CommandOutput>>
35+
+ Send
36+
+ 'fut,
37+
>,
38+
>
39+
where
40+
'a: 'fut,
41+
'b: 'fut,
42+
'c: 'fut,
43+
'd: 'fut,
44+
Self: 'fut,
45+
{
46+
let program = program.to_owned();
47+
let args: Vec<String> = args.iter().map(|s| s.to_string()).collect();
48+
Box::pin(async move {
49+
let output = tokio::process::Command::new(&program)
50+
.args(&args)
51+
.output()
52+
.await
53+
.map_err(|e| {
54+
reqsign_core::Error::unexpected(format!(
55+
"execute command {program}: {e}"
56+
))
57+
})?;
58+
Ok(reqsign_core::CommandOutput {
59+
status: output.status.code().unwrap_or(-1),
60+
stdout: output.stdout,
61+
stderr: output.stderr,
62+
})
63+
})
64+
}
65+
}
66+
}
67+
}
68+
69+
cfg_if::cfg_if! {
70+
if #[cfg(any(feature = "tokio-http", feature = "web-http"))] {
71+
#[derive(Debug)]
72+
pub(crate) struct ReqwestHttpSend {
73+
client: reqwest::Client,
74+
}
75+
76+
impl Default for ReqwestHttpSend {
77+
fn default() -> Self {
78+
Self {
79+
client: reqwest::Client::new(),
80+
}
81+
}
82+
}
83+
84+
impl reqsign_core::HttpSend for ReqwestHttpSend {
85+
fn http_send<'a, 'b>(
86+
&'a self,
87+
req: http::Request<bytes::Bytes>,
88+
) -> std::pin::Pin<
89+
Box<
90+
dyn std::future::Future<
91+
Output = reqsign_core::Result<http::Response<bytes::Bytes>>,
92+
> + Send
93+
+ 'b,
94+
>,
95+
>
96+
where
97+
'a: 'b,
98+
Self: 'b,
99+
{
100+
Box::pin(async move {
101+
let (parts, body) = req.into_parts();
102+
let resp = self
103+
.client
104+
.request(parts.method, parts.uri.to_string())
105+
.headers(parts.headers)
106+
.body(body)
107+
.send()
108+
.await
109+
.map_err(|e| {
110+
reqsign_core::Error::unexpected(format!("http request failed: {e}"))
111+
})?;
112+
113+
let status = resp.status();
114+
let headers = resp.headers().clone();
115+
let body = resp.bytes().await.map_err(|e| {
116+
reqsign_core::Error::unexpected(format!("read response body: {e}"))
117+
})?;
118+
119+
let mut builder = http::Response::builder().status(status);
120+
*builder.headers_mut().unwrap() = headers;
121+
builder.body(body).map_err(|e| {
122+
reqsign_core::Error::unexpected(format!("build http response: {e}"))
123+
})
124+
})
125+
}
126+
}
127+
}
128+
}
129+
130+
pub(crate) fn default_context() -> Context {
131+
#[allow(unused_mut)]
132+
let mut ctx = Context::new();
133+
134+
cfg_if::cfg_if! {
135+
if #[cfg(all(feature = "tokio", not(target_arch = "wasm32")))] {
136+
ctx = ctx
137+
.with_file_read(TokioFileRead)
138+
.with_command_execute(TokioCommandExecute);
139+
}
140+
}
141+
142+
cfg_if::cfg_if! {
143+
if #[cfg(any(feature = "tokio-http", feature = "web-http"))] {
144+
ctx = ctx.with_http_send(ReqwestHttpSend::default());
145+
}
146+
}
147+
148+
ctx.with_env(reqsign_core::OsEnv)
149+
}

0 commit comments

Comments
 (0)