diff --git a/core/Cargo.lock b/core/Cargo.lock index 2b82c744ee0d..2bc0da80b8a2 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -6596,14 +6596,17 @@ dependencies = [ name = "opendal-service-gcs" version = "0.55.0" dependencies = [ - "backon", + "async-trait", "bytes", "http 1.4.0", "log", "opendal-core", "percent-encoding", "quick-xml", - "reqsign", + "reqsign-core", + "reqsign-file-read-tokio", + "reqsign-google", + "reqsign-http-send-reqwest", "reqwest", "serde", "serde_json", @@ -8640,34 +8643,6 @@ dependencies = [ "bytecheck", ] -[[package]] -name = "reqsign" -version = "0.16.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43451dbf3590a7590684c25fb8d12ecdcc90ed3ac123433e500447c7d77ed701" -dependencies = [ - "anyhow", - "async-trait", - "base64 0.22.1", - "chrono", - "form_urlencoded", - "getrandom 0.2.16", - "hex", - "hmac", - "home", - "http 1.4.0", - "jsonwebtoken", - "log", - "percent-encoding", - "rand 0.8.5", - "reqwest", - "rsa", - "serde", - "serde_json", - "sha1", - "sha2", -] - [[package]] name = "reqsign-aliyun-oss" version = "2.0.2" @@ -8764,6 +8739,26 @@ dependencies = [ "tokio", ] +[[package]] +name = "reqsign-google" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acee8b90b1c8bb30a9abb9a9778c7862d94459a9391396e1c2a370d6b9c1e3d5" +dependencies = [ + "async-trait", + "form_urlencoded", + "http 1.4.0", + "jsonwebtoken", + "log", + "percent-encoding", + "rand 0.8.5", + "reqsign-core", + "rsa", + "serde", + "serde_json", + "sha2", +] + [[package]] name = "reqsign-http-send-reqwest" version = "2.0.1" diff --git a/core/Cargo.toml b/core/Cargo.toml index 93be3579f2e0..b64fdbe08ee3 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -51,7 +51,6 @@ logforth = { version = "0.29.1", features = ["starter-log"] } mea = "0.6" quick-xml = { version = "0.38", default-features = false } rand = "0.8" -reqsign = { version = "0.16.5", default-features = false } serde = { version = "1", default-features = false } serde_json = "1" sha2 = "0.10" diff --git a/core/services/gcs/Cargo.toml b/core/services/gcs/Cargo.toml index 45117664daa1..46c76e51e33e 100644 --- a/core/services/gcs/Cargo.toml +++ b/core/services/gcs/Cargo.toml @@ -31,17 +31,17 @@ version = { workspace = true } all-features = true [dependencies] -backon = "1.6" +async-trait = "0.1" bytes = { workspace = true } http = { workspace = true } log = { workspace = true } opendal-core = { path = "../../core", version = "0.55.0", default-features = false } percent-encoding = "2.3" quick-xml = { workspace = true, features = ["serialize"] } -reqsign = { workspace = true, features = [ - "reqwest_request", - "services-google", -] } +reqsign-core = { version = "2.0.1", default-features = false } +reqsign-file-read-tokio = { version = "2.0.1", default-features = false } +reqsign-google = { version = "2.0.2", default-features = false } +reqsign-http-send-reqwest = { version = "2.0.1", default-features = false } reqwest = { version = "0.12.24", default-features = false, features = [ "json", "stream", diff --git a/core/services/gcs/src/backend.rs b/core/services/gcs/src/backend.rs index 3d5612e41430..36d2ebbb6680 100644 --- a/core/services/gcs/src/backend.rs +++ b/core/services/gcs/src/backend.rs @@ -18,14 +18,28 @@ use std::fmt::Debug; use std::sync::Arc; use std::sync::LazyLock; +use std::time::Duration; +use bytes::Bytes; use http::Response; use http::StatusCode; use log::debug; -use reqsign::GoogleCredentialLoader; -use reqsign::GoogleSigner; -use reqsign::GoogleTokenLoad; -use reqsign::GoogleTokenLoader; +use reqsign_core::Context; +use reqsign_core::Env as _; +use reqsign_core::OsEnv; +use reqsign_core::ProvideCredential; +use reqsign_core::ProvideCredentialChain; +use reqsign_core::Signer; +use reqsign_core::StaticEnv; +use reqsign_core::time::Timestamp; +use reqsign_file_read_tokio::TokioFileRead; +use reqsign_google::Credential; +use reqsign_google::DefaultCredentialProvider; +use reqsign_google::RequestSigner; +use reqsign_google::StaticCredentialProvider; +use reqsign_google::Token; +use reqsign_http_send_reqwest::ReqwestHttpSend; +use serde::Deserialize; use super::GCS_SCHEME; use super::config::GcsConfig; @@ -48,7 +62,144 @@ const DEFAULT_GCS_SCOPE: &str = "https://www.googleapis.com/auth/devstorage.read #[derive(Default)] pub struct GcsBuilder { pub(super) config: GcsConfig, - pub(super) customized_token_loader: Option>, + pub(super) credential_provider_chain: Option>, +} + +// TODO(remove): drop this adapter once reqsign-google provides a built-in +// static access token provider. +// Tracking issue: https://github.com/apache/opendal-reqsign/issues/694 +#[derive(Clone, Debug)] +struct StaticTokenCredentialProvider { + token: String, +} + +impl StaticTokenCredentialProvider { + fn new(token: impl Into) -> Self { + Self { + token: token.into(), + } + } +} + +#[async_trait::async_trait] +impl ProvideCredential for StaticTokenCredentialProvider { + type Credential = Credential; + + async fn provide_credential( + &self, + _: &Context, + ) -> reqsign_core::Result> { + Ok(Some(Credential::with_token(Token { + access_token: self.token.clone(), + expires_at: None, + }))) + } +} + +// TODO(remove): drop this adapter once reqsign-google provides a built-in +// file-path credential provider for explicit credential_path usage. +// Tracking issue: https://github.com/apache/opendal-reqsign/issues/696 +#[derive(Clone, Debug)] +struct PathCredentialProvider { + path: String, + scope: String, +} + +impl PathCredentialProvider { + fn new(path: impl Into, scope: impl Into) -> Self { + Self { + path: path.into(), + scope: scope.into(), + } + } +} + +#[async_trait::async_trait] +impl ProvideCredential for PathCredentialProvider { + type Credential = Credential; + + async fn provide_credential( + &self, + ctx: &Context, + ) -> reqsign_core::Result> { + let content = String::from_utf8(ctx.file_read(&self.path).await?).map_err(|err| { + reqsign_core::Error::unexpected("credential file content is invalid utf-8") + .with_source(err) + })?; + + StaticCredentialProvider::new(content) + .with_scope(&self.scope) + .provide_credential(ctx) + .await + } +} + +#[derive(Debug, Deserialize)] +struct VmMetadataTokenResponse { + access_token: String, + expires_in: u64, +} + +// TODO(remove): drop this adapter once reqsign-google supports selecting +// service account for VM metadata credential loading. +// Tracking issue: https://github.com/apache/opendal-reqsign/issues/695 +#[derive(Clone, Debug)] +struct ServiceAccountVmMetadataCredentialProvider { + service_account: String, + scope: String, +} + +impl ServiceAccountVmMetadataCredentialProvider { + fn new(service_account: impl Into, scope: impl Into) -> Self { + Self { + service_account: service_account.into(), + scope: scope.into(), + } + } +} + +#[async_trait::async_trait] +impl ProvideCredential for ServiceAccountVmMetadataCredentialProvider { + type Credential = Credential; + + async fn provide_credential( + &self, + ctx: &Context, + ) -> reqsign_core::Result> { + let metadata_host = ctx + .env_var("GCE_METADATA_HOST") + .unwrap_or_else(|| "metadata.google.internal".to_string()); + + let url = format!( + "http://{metadata_host}/computeMetadata/v1/instance/service-accounts/{}/token?scopes={}", + self.service_account, self.scope + ); + + let req = http::Request::builder() + .method(http::Method::GET) + .uri(&url) + .header("Metadata-Flavor", "Google") + .body(Bytes::new()) + .map_err(|err| { + reqsign_core::Error::unexpected("failed to build vm metadata request") + .with_source(err) + })?; + + let resp = ctx.http_send(req).await?; + if resp.status() != StatusCode::OK { + return Ok(None); + } + + let resp: VmMetadataTokenResponse = serde_json::from_slice(resp.body()).map_err(|err| { + reqsign_core::Error::unexpected("failed to parse vm metadata token response") + .with_source(err) + })?; + + Ok(Some(Credential::with_token(Token { + access_token: resp.access_token, + expires_at: Some(Timestamp::now() + Duration::from_secs(resp.expires_in)), + }))) + } } impl Debug for GcsBuilder { @@ -141,9 +292,23 @@ impl GcsBuilder { self } - /// Specify the customized token loader used by this service. - pub fn customized_token_loader(mut self, token_load: Box) -> Self { - self.customized_token_loader = Some(token_load); + /// Specify a customized credential provider used by this service. + /// + /// This provider will be pushed to the front of credential chain. + pub fn credential_provider( + mut self, + provider: impl ProvideCredential + 'static, + ) -> Self { + let chain = self.credential_provider_chain.unwrap_or_default(); + self.credential_provider_chain = Some(chain.push_front(provider)); + self + } + + /// Specify a customized credential provider chain used by this service. + /// + /// This chain will be pushed to the front of default chain. + pub fn credential_provider_chain(mut self, chain: ProvideCredentialChain) -> Self { + self.credential_provider_chain = Some(chain); self } @@ -233,47 +398,77 @@ impl Builder for GcsBuilder { .unwrap_or_else(|| DEFAULT_GCS_ENDPOINT.to_string()); debug!("backend use endpoint: {endpoint}"); - let mut cred_loader = GoogleCredentialLoader::default(); - if let Some(cred) = &self.config.credential { - cred_loader = cred_loader.with_content(cred); - } - if let Some(cred) = &self.config.credential_path { - cred_loader = cred_loader.with_path(cred); - } + let scope = self + .config + .scope + .clone() + .unwrap_or_else(|| DEFAULT_GCS_SCOPE.to_string()); + + let os_env = OsEnv; + let mut envs = os_env.vars(); + envs.insert("GOOGLE_SCOPE".to_string(), scope.clone()); + + let ctx = Context::new() + .with_file_read(TokioFileRead) + .with_http_send(ReqwestHttpSend::new(GLOBAL_REQWEST_CLIENT.clone())) + .with_env(StaticEnv { + home_dir: os_env.home_dir(), + envs, + }); + + let mut default_credential = DefaultCredentialProvider::builder(); #[cfg(target_arch = "wasm32")] { - cred_loader = cred_loader.with_disable_env(); - cred_loader = cred_loader.with_disable_well_known_location(); + default_credential = default_credential + .disable_env(true) + .disable_well_known(true); } if self.config.disable_config_load { - cred_loader = cred_loader - .with_disable_env() - .with_disable_well_known_location(); + default_credential = default_credential + .disable_env(true) + .disable_well_known(true); } - let scope = if let Some(scope) = &self.config.scope { - scope - } else { - DEFAULT_GCS_SCOPE - }; + if self.config.disable_vm_metadata || self.config.service_account.is_some() { + default_credential = default_credential.disable_vm_metadata(true); + } + + let mut credential_chain = ProvideCredentialChain::new().push(default_credential.build()); - let mut token_loader = GoogleTokenLoader::new(scope, GLOBAL_REQWEST_CLIENT.clone()); - if let Some(account) = &self.config.service_account { - token_loader = token_loader.with_service_account(account); + if !self.config.disable_vm_metadata { + if let Some(service_account) = self.config.service_account.as_deref() { + credential_chain = credential_chain.push( + ServiceAccountVmMetadataCredentialProvider::new(service_account, &scope), + ); + } } - if let Ok(Some(cred)) = cred_loader.load() { - token_loader = token_loader.with_credentials(cred) + + if let Some(path) = self.config.credential_path.as_deref() { + credential_chain = + credential_chain.push_front(PathCredentialProvider::new(path, scope.clone())); } - if let Some(loader) = self.customized_token_loader { - token_loader = token_loader.with_customized_token_loader(loader) + + if let Some(content) = self.config.credential.as_deref() { + if let Ok(provider) = StaticCredentialProvider::from_base64(content) { + credential_chain = credential_chain.push_front(provider.with_scope(&scope)); + } } - if self.config.disable_vm_metadata { - token_loader = token_loader.with_disable_vm_metadata(true); + if let Some(token) = self.config.token.as_deref() { + credential_chain = + credential_chain.push_front(StaticTokenCredentialProvider::new(token)); } - let signer = GoogleSigner::new("storage"); + if let Some(customized_credential_chain) = self.credential_provider_chain { + credential_chain = credential_chain.push_front(customized_credential_chain); + } + + let signer = Signer::new( + ctx, + credential_chain, + RequestSigner::new("storage").with_scope(&scope), + ); let backend = GcsBackend { core: Arc::new(GcsCore { @@ -339,10 +534,6 @@ impl Builder for GcsBuilder { bucket: bucket.to_string(), root, signer, - token_loader, - token: self.config.token, - scope: scope.to_string(), - credential_loader: cred_loader, predefined_acl: self.config.predefined_acl.clone(), default_storage_class: self.config.default_storage_class.clone(), allow_anonymous: self.config.allow_anonymous, @@ -457,8 +648,8 @@ impl Access for GcsBackend { "operation is not supported", )), }; - let mut req = req?; - self.core.sign_query(&mut req, args.expire())?; + let req = req?; + let req = self.core.sign_query(req, args.expire()).await?; // We don't need this request anymore, consume it directly. let (parts, _) = req.into_parts(); diff --git a/core/services/gcs/src/config.rs b/core/services/gcs/src/config.rs index 7c26d0eb650a..90b6cdca076a 100644 --- a/core/services/gcs/src/config.rs +++ b/core/services/gcs/src/config.rs @@ -104,7 +104,7 @@ impl opendal_core::Configurator for GcsConfig { fn into_builder(self) -> Self::Builder { GcsBuilder { config: self, - customized_token_loader: None, + credential_provider_chain: None, } } } diff --git a/core/services/gcs/src/core.rs b/core/services/gcs/src/core.rs index b8c091830755..9091881a2a0e 100644 --- a/core/services/gcs/src/core.rs +++ b/core/services/gcs/src/core.rs @@ -19,10 +19,7 @@ use std::collections::HashMap; use std::fmt::Debug; use std::fmt::Write; use std::sync::Arc; -use std::sync::LazyLock; -use backon::ExponentialBuilder; -use backon::Retryable; use bytes::Buf; use bytes::Bytes; use constants::*; @@ -38,11 +35,9 @@ use http::header::IF_MATCH; use http::header::IF_MODIFIED_SINCE; use http::header::IF_NONE_MATCH; use http::header::IF_UNMODIFIED_SINCE; -use reqsign::GoogleCredential; -use reqsign::GoogleCredentialLoader; -use reqsign::GoogleSigner; -use reqsign::GoogleToken; -use reqsign::GoogleTokenLoader; +use reqsign_core::ErrorKind as ReqsignErrorKind; +use reqsign_core::Signer; +use reqsign_google::Credential; use serde::Deserialize; use serde::Serialize; @@ -62,11 +57,7 @@ pub struct GcsCore { pub bucket: String, pub root: String, - pub signer: GoogleSigner, - pub token_loader: GoogleTokenLoader, - pub token: Option, - pub scope: String, - pub credential_loader: GoogleCredentialLoader, + pub signer: Signer, pub predefined_acl: Option, pub default_storage_class: Option, @@ -84,92 +75,57 @@ impl Debug for GcsCore { } } -static BACKOFF: LazyLock = - LazyLock::new(|| ExponentialBuilder::default().with_jitter()); - impl GcsCore { - async fn load_token(&self) -> Result> { - if let Some(token) = &self.token { - return Ok(Some(GoogleToken::new(token, usize::MAX, &self.scope))); - } - - let cred = { || self.token_loader.load() } - .retry(*BACKOFF) - .await - .map_err(new_request_credential_error)?; - - if let Some(cred) = cred { - return Ok(Some(cred)); - } - - if self.allow_anonymous { - return Ok(None); - } - - Err(Error::new( - ErrorKind::ConfigInvalid, - "no valid credential found", - )) - } - - fn load_credential(&self) -> Result> { - let cred = self - .credential_loader - .load() - .map_err(new_request_credential_error)?; - - if let Some(cred) = cred { - return Ok(Some(cred)); - } + pub async fn sign(&self, req: Request) -> Result> { + let (mut parts, body) = req.into_parts(); + + let signed = match self.signer.sign(&mut parts, None).await { + Ok(()) => true, + Err(err) + if self.allow_anonymous && err.kind() == ReqsignErrorKind::CredentialInvalid => + { + false + } + Err(err) => return Err(new_request_sign_error(err.into())), + }; - if self.allow_anonymous { - return Ok(None); + if signed { + // Always remove host header, let users' client to set it based on + // HTTP version. + // + // As discussed in , + // google server could send RST_STREAM of PROTOCOL_ERROR if our + // request contains host header. + parts.headers.remove(HOST); } - Err(Error::new( - ErrorKind::ConfigInvalid, - "no valid credential found", - )) + Ok(Request::from_parts(parts, body)) } - pub async fn sign(&self, req: &mut Request) -> Result<()> { - if let Some(cred) = self.load_token().await? { - self.signer - .sign(req, &cred) - .map_err(new_request_sign_error)?; - } else { - return Ok(()); - } - - // Always remove host header, let users' client to set it based on HTTP - // version. - // - // As discussed in , - // google server could send RST_STREAM of PROTOCOL_ERROR if our request - // contains host header. - req.headers_mut().remove(HOST); + pub async fn sign_query(&self, req: Request, duration: Duration) -> Result> { + let (mut parts, body) = req.into_parts(); - Ok(()) - } + let signed = match self.signer.sign(&mut parts, Some(duration)).await { + Ok(()) => true, + Err(err) + if self.allow_anonymous && err.kind() == ReqsignErrorKind::CredentialInvalid => + { + false + } + Err(err) => return Err(new_request_sign_error(err.into())), + }; - pub fn sign_query(&self, req: &mut Request, duration: Duration) -> Result<()> { - if let Some(cred) = self.load_credential()? { - self.signer - .sign_query(req, duration, &cred) - .map_err(new_request_sign_error)?; - } else { - return Ok(()); + if signed { + // Always remove host header, let users' client to set it based on + // HTTP version. + // + // As discussed in , + // google server could send RST_STREAM of PROTOCOL_ERROR if our + // request contains host header. + parts.headers.remove(HOST); } - // Always remove host header, let users' client to set it based on HTTP - // version. - // - // As discussed in , - // google server could send RST_STREAM of PROTOCOL_ERROR if our request - // contains host header. - req.headers_mut().remove(HOST); - - Ok(()) + Ok(Request::from_parts(parts, body)) } #[inline] @@ -249,9 +205,9 @@ impl GcsCore { range: BytesRange, args: &OpRead, ) -> Result> { - let mut req = self.gcs_get_object_request(path, range, args)?; + let req = self.gcs_get_object_request(path, range, args)?; - self.sign(&mut req).await?; + let req = self.sign(req).await?; self.info.http_client().fetch(req).await } @@ -441,17 +397,17 @@ impl GcsCore { path: &str, args: &OpStat, ) -> Result> { - let mut req = self.gcs_head_object_request(path, args)?; + let req = self.gcs_head_object_request(path, args)?; - self.sign(&mut req).await?; + let req = self.sign(req).await?; self.send(req).await } pub async fn gcs_delete_object(&self, path: &str) -> Result> { - let mut req = self.gcs_delete_object_request(path)?; + let req = self.gcs_delete_object_request(path)?; - self.sign(&mut req).await?; + let req = self.sign(req).await?; self.send(req).await } @@ -484,9 +440,9 @@ impl GcsCore { } let req = Request::post(uri).extension(Operation::Delete); - let mut req = multipart.apply(req)?; + let req = multipart.apply(req)?; - self.sign(&mut req).await?; + let req = self.sign(req).await?; self.send(req).await } @@ -503,13 +459,13 @@ impl GcsCore { percent_encode_path(&dest) ); - let mut req = Request::post(req_uri) + let req = Request::post(req_uri) .header(CONTENT_LENGTH, 0) .extension(Operation::Copy) .body(Buffer::new()) .map_err(new_request_build_error)?; - self.sign(&mut req).await?; + let req = self.sign(req).await?; self.send(req).await } @@ -549,12 +505,12 @@ impl GcsCore { url = url.push("pageToken", &percent_encode_path(page_token)); } - let mut req = Request::get(url.finish()) + let req = Request::get(url.finish()) .extension(Operation::List) .body(Buffer::new()) .map_err(new_request_build_error)?; - self.sign(&mut req).await?; + let req = self.sign(req).await?; self.send(req).await } @@ -602,11 +558,11 @@ impl GcsCore { } } - let mut req = builder + let req = builder .body(Buffer::new()) .map_err(new_request_build_error)?; - self.sign(&mut req).await?; + let req = self.sign(req).await?; self.send(req).await } @@ -635,9 +591,9 @@ impl GcsCore { let req = req.extension(Operation::Write); - let mut req = req.body(body).map_err(new_request_build_error)?; + let req = req.body(body).map_err(new_request_build_error)?; - self.sign(&mut req).await?; + let req = self.sign(req).await?; self.send(req).await } @@ -668,11 +624,11 @@ impl GcsCore { let req = req.extension(Operation::Write); - let mut req = req + let req = req .body(Buffer::from(Bytes::from(content))) .map_err(new_request_build_error)?; - self.sign(&mut req).await?; + let req = self.sign(req).await?; self.send(req).await } @@ -691,11 +647,11 @@ impl GcsCore { percent_encode_path(upload_id) ); - let mut req = Request::delete(&url) + let req = Request::delete(&url) .extension(Operation::Write) .body(Buffer::new()) .map_err(new_request_build_error)?; - self.sign(&mut req).await?; + let req = self.sign(req).await?; self.send(req).await } diff --git a/core/services/gcs/src/docs.md b/core/services/gcs/src/docs.md index e628a5de3139..9d6b3f34bf2c 100644 --- a/core/services/gcs/src/docs.md +++ b/core/services/gcs/src/docs.md @@ -36,7 +36,7 @@ OpenDAL supports the following authentication options: 4. Fetch access token from [VM metadata](https://cloud.google.com/docs/authentication/rest#metadata-server) - Only works when running inside Google Cloud. - If a non-default Service Account name is required, set with `service_account`. Otherwise, nothing need to be set. -5. A custom `TokenLoader` via `GcsBuilder.customized_token_loader()` +5. A custom credential provider chain via `GcsBuilder.credential_provider_chain()` Notes: diff --git a/core/services/gcs/src/writer.rs b/core/services/gcs/src/writer.rs index b8705527eb41..6e0fdc5265f4 100644 --- a/core/services/gcs/src/writer.rs +++ b/core/services/gcs/src/writer.rs @@ -48,14 +48,14 @@ impl GcsWriter { impl oio::MultipartWrite for GcsWriter { async fn write_once(&self, _: u64, body: Buffer) -> Result { let size = body.len() as u64; - let mut req = self.core.gcs_insert_object_request( + let req = self.core.gcs_insert_object_request( &percent_encode_path(&self.path), Some(size), &self.op, body, )?; - self.core.sign(&mut req).await?; + let req = self.core.sign(req).await?; let resp = self.core.send(req).await?;