diff --git a/core/Cargo.lock b/core/Cargo.lock index d8d9715903f2..13e66cb989d9 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -6371,7 +6371,10 @@ dependencies = [ "opendal-service-azure-common", "pretty_assertions", "quick-xml", - "reqsign", + "reqsign-azure-storage", + "reqsign-core", + "reqsign-file-read-tokio", + "reqsign-http-send-reqwest", "serde", "serde_json", "sha2", @@ -6389,7 +6392,10 @@ dependencies = [ "opendal-core", "opendal-service-azure-common", "quick-xml", - "reqsign", + "reqsign-azure-storage", + "reqsign-core", + "reqsign-file-read-tokio", + "reqsign-http-send-reqwest", "serde", "serde_json", "tokio", @@ -6405,7 +6411,10 @@ dependencies = [ "opendal-core", "opendal-service-azure-common", "quick-xml", - "reqsign", + "reqsign-azure-storage", + "reqsign-core", + "reqsign-file-read-tokio", + "reqsign-http-send-reqwest", "serde", "tokio", ] @@ -6416,7 +6425,6 @@ version = "0.55.0" dependencies = [ "http 1.4.0", "opendal-core", - "reqsign", ] [[package]] @@ -6624,7 +6632,8 @@ dependencies = [ "opendal-core", "opendal-service-azblob", "prost 0.13.5", - "reqsign", + "reqsign-azure-storage", + "reqsign-core", "serde", "serde_json", "sha2", @@ -8695,6 +8704,29 @@ dependencies = [ "sha1", ] +[[package]] +name = "reqsign-azure-storage" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "153687178cff1ba9fe13d5f43c7364a3b0cf70751b9ef44ae03d4385c7558085" +dependencies = [ + "anyhow", + "async-trait", + "base64 0.22.1", + "bytes", + "form_urlencoded", + "http 1.4.0", + "jsonwebtoken", + "log", + "pem", + "percent-encoding", + "reqsign-core", + "rsa", + "serde", + "serde_json", + "sha1", +] + [[package]] name = "reqsign-core" version = "2.0.2" diff --git a/core/services/azblob/Cargo.toml b/core/services/azblob/Cargo.toml index 87274adf4065..a0dffe4d02c5 100644 --- a/core/services/azblob/Cargo.toml +++ b/core/services/azblob/Cargo.toml @@ -38,10 +38,10 @@ log = { workspace = true } opendal-core = { path = "../../core", version = "0.55.0", default-features = false } opendal-service-azure-common = { path = "../azure-common", version = "0.55.0" } quick-xml = { workspace = true, features = ["serialize", "overlapped-lists"] } -reqsign = { workspace = true, features = [ - "reqwest_request", - "services-azblob", -] } +reqsign-azure-storage = { version = "2.0.2", default-features = false } +reqsign-core = { version = "2.0.1", default-features = false } +reqsign-file-read-tokio = { version = "2.0.1", default-features = false } +reqsign-http-send-reqwest = { version = "2.0.1", default-features = false } serde = { workspace = true, features = ["derive"] } sha2 = { workspace = true } uuid = { workspace = true, features = ["v4", "serde"] } diff --git a/core/services/azblob/src/backend.rs b/core/services/azblob/src/backend.rs index 9222b527610f..69def7c3a068 100644 --- a/core/services/azblob/src/backend.rs +++ b/core/services/azblob/src/backend.rs @@ -23,9 +23,16 @@ use base64::prelude::BASE64_STANDARD; use http::Response; use http::StatusCode; use log::debug; -use reqsign::AzureStorageConfig; -use reqsign::AzureStorageLoader; -use reqsign::AzureStorageSigner; +use reqsign_azure_storage::DefaultCredentialProvider; +use reqsign_azure_storage::RequestSigner; +use reqsign_azure_storage::StaticCredentialProvider; +use reqsign_core::Context; +use reqsign_core::Env as _; +use reqsign_core::OsEnv; +use reqsign_core::Signer; +use reqsign_core::StaticEnv; +use reqsign_file_read_tokio::TokioFileRead; +use reqsign_http_send_reqwest::ReqwestHttpSend; use sha2::Digest; use sha2::Sha256; @@ -42,13 +49,14 @@ use super::writer::AzblobWriters; use opendal_core::raw::*; use opendal_core::*; use opendal_service_azure_common::{ - AzureStorageService, azure_account_name_from_endpoint, azure_config_from_connection_string, + AzureStorageConfig as AzureConnectionConfig, AzureStorageService, + azure_account_name_from_endpoint, azure_config_from_connection_string, }; const AZBLOB_BATCH_LIMIT: usize = 256; -impl From for AzblobConfig { - fn from(value: AzureStorageConfig) -> Self { +impl From for AzblobConfig { + fn from(value: AzureConnectionConfig) -> Self { Self { endpoint: value.endpoint, account_name: value.account_name, @@ -297,23 +305,23 @@ impl Builder for AzblobBuilder { }?; debug!("backend use endpoint {}", &container); - #[cfg(target_arch = "wasm32")] - let mut config_loader = AzureStorageConfig::default(); - #[cfg(not(target_arch = "wasm32"))] - let mut config_loader = AzureStorageConfig::default().from_env(); - - if let Some(v) = self + let account_name = self .config .account_name .clone() - .or_else(|| azure_account_name_from_endpoint(endpoint.as_str())) - { - config_loader.account_name = Some(v); + .or_else(|| azure_account_name_from_endpoint(endpoint.as_str())); + + let os_env = OsEnv; + let mut envs = os_env.vars(); + + if let Some(v) = &account_name { + envs.insert("AZBLOB_ACCOUNT_NAME".to_string(), v.clone()); + envs.insert("AZURE_STORAGE_ACCOUNT_NAME".to_string(), v.clone()); } - if let Some(v) = self.config.account_key.clone() { + if let Some(v) = &self.config.account_key { // Validate that account_key can be decoded as base64 - if let Err(e) = BASE64_STANDARD.decode(&v) { + if let Err(e) = BASE64_STANDARD.decode(v) { return Err(Error::new( ErrorKind::ConfigInvalid, format!("invalid account_key: cannot decode as base64: {e}"), @@ -322,11 +330,12 @@ impl Builder for AzblobBuilder { .with_context("service", AZBLOB_SCHEME) .with_context("key", "account_key")); } - config_loader.account_key = Some(v); + envs.insert("AZBLOB_ACCOUNT_KEY".to_string(), v.clone()); + envs.insert("AZURE_STORAGE_ACCOUNT_KEY".to_string(), v.clone()); } - if let Some(v) = self.config.sas_token.clone() { - config_loader.sas_token = Some(v); + if let Some(v) = &self.config.sas_token { + envs.insert("AZURE_STORAGE_SAS_TOKEN".to_string(), v.clone()); } let encryption_key = @@ -360,9 +369,34 @@ impl Builder for AzblobBuilder { } }; - let cred_loader = AzureStorageLoader::new(config_loader); + let ctx = Context::new() + .with_file_read(TokioFileRead) + .with_http_send(ReqwestHttpSend::new(GLOBAL_REQWEST_CLIENT.clone())) + .with_env(StaticEnv { + home_dir: os_env.home_dir(), + envs, + }); + + let mut credential = DefaultCredentialProvider::new(); + + if let (Some(account_name), Some(account_key)) = + (account_name.as_deref(), self.config.account_key.as_deref()) + { + credential = credential.push_front(StaticCredentialProvider::new_shared_key( + account_name, + account_key, + )); + } + + if let Some(sas_token) = self.config.sas_token.as_deref() { + credential = credential.push_front(StaticCredentialProvider::new_sas_token(sas_token)); + } - let signer = AzureStorageSigner::new(); + let signer = Signer::new( + ctx, + credential, + RequestSigner::new().with_service_sas_permissions("racwd"), + ); Ok(AzblobBackend { core: Arc::new(AzblobCore { @@ -422,7 +456,6 @@ impl Builder for AzblobBuilder { encryption_algorithm, container: self.config.container.clone(), - loader: cred_loader, signer, }), }) @@ -551,9 +584,8 @@ impl Access for AzblobBackend { )), }; - let mut req = req?; - - self.core.sign_query(&mut req).await?; + let req = req?; + let req = self.core.sign_query(req).await?; let (parts, _) = req.into_parts(); diff --git a/core/services/azblob/src/core.rs b/core/services/azblob/src/core.rs index 792e1ee02bde..67091e5e85bf 100644 --- a/core/services/azblob/src/core.rs +++ b/core/services/azblob/src/core.rs @@ -32,9 +32,8 @@ use http::header::IF_MATCH; use http::header::IF_MODIFIED_SINCE; use http::header::IF_NONE_MATCH; use http::header::IF_UNMODIFIED_SINCE; -use reqsign::AzureStorageCredential; -use reqsign::AzureStorageLoader; -use reqsign::AzureStorageSigner; +use reqsign_azure_storage::Credential; +use reqsign_core::Signer; use serde::Deserialize; use serde::Serialize; use uuid::Uuid; @@ -69,8 +68,7 @@ pub struct AzblobCore { pub encryption_key: Option, pub encryption_key_sha256: Option, pub encryption_algorithm: Option, - pub loader: AzureStorageLoader, - pub signer: AzureStorageSigner, + pub signer: Signer, } impl Debug for AzblobCore { @@ -84,35 +82,22 @@ impl Debug for AzblobCore { } impl AzblobCore { - async fn load_credential(&self) -> Result { - let cred = self - .loader - .load() - .await - .map_err(new_request_credential_error)?; - - if let Some(cred) = cred { - Ok(cred) - } else { - Err(Error::new( - ErrorKind::ConfigInvalid, - "no valid credential found", - )) - } - } - - pub async fn sign_query(&self, req: &mut Request) -> Result<()> { - let cred = self.load_credential().await?; + pub async fn sign_query(&self, req: Request) -> Result> { + let (mut parts, body) = req.into_parts(); self.signer - .sign_query(req, Duration::from_secs(3600), &cred) - .map_err(new_request_sign_error) + .sign(&mut parts, Some(Duration::from_secs(3600))) + .await + .map_err(|e| new_request_sign_error(e.into()))?; + + Ok(Request::from_parts(parts, body)) } - pub async fn sign(&self, req: &mut Request) -> Result<()> { - let cred = self.load_credential().await?; + pub async fn sign(&self, req: Request) -> Result> { + let (mut parts, body) = req.into_parts(); + // Insert x-ms-version header for normal requests. - req.headers_mut().insert( + parts.headers.insert( HeaderName::from_static(constants::X_MS_VERSION), // 2022-11-02 is the version supported by Azurite V3 and // used by Azure Portal, We use this version to make @@ -121,12 +106,24 @@ impl AzblobCore { // In the future, we could allow users to configure this value. HeaderValue::from_static("2022-11-02"), ); - self.signer.sign(req, &cred).map_err(new_request_sign_error) + + self.signer + .sign(&mut parts, None) + .await + .map_err(|e| new_request_sign_error(e.into()))?; + + Ok(Request::from_parts(parts, body)) } - async fn batch_sign(&self, req: &mut Request) -> Result<()> { - let cred = self.load_credential().await?; - self.signer.sign(req, &cred).map_err(new_request_sign_error) + async fn batch_sign(&self, req: Request) -> Result> { + let (mut parts, body) = req.into_parts(); + + self.signer + .sign(&mut parts, None) + .await + .map_err(|e| new_request_sign_error(e.into()))?; + + Ok(Request::from_parts(parts, body)) } #[inline] @@ -230,9 +227,8 @@ impl AzblobCore { range: BytesRange, args: &OpRead, ) -> Result> { - let mut req = self.azblob_get_blob_request(path, range, args)?; - - self.sign(&mut req).await?; + let req = self.azblob_get_blob_request(path, range, args)?; + let req = self.sign(req).await?; self.info.http_client().fetch(req).await } @@ -297,9 +293,8 @@ impl AzblobCore { args: &OpWrite, body: Buffer, ) -> Result> { - let mut req = self.azblob_put_blob_request(path, size, args, body)?; - - self.sign(&mut req).await?; + let req = self.azblob_put_blob_request(path, size, args, body)?; + let req = self.sign(req).await?; self.send(req).await } @@ -359,9 +354,8 @@ impl AzblobCore { path: &str, args: &OpWrite, ) -> Result> { - let mut req = self.azblob_init_appendable_blob_request(path, args)?; - - self.sign(&mut req).await?; + let req = self.azblob_init_appendable_blob_request(path, args)?; + let req = self.sign(req).await?; self.send(req).await } @@ -407,9 +401,8 @@ impl AzblobCore { size: u64, body: Buffer, ) -> Result> { - let mut req = self.azblob_append_blob_request(path, position, size, body)?; - - self.sign(&mut req).await?; + let req = self.azblob_append_blob_request(path, position, size, body)?; + let req = self.sign(req).await?; self.send(req).await } @@ -462,9 +455,8 @@ impl AzblobCore { args: &OpWrite, body: Buffer, ) -> Result> { - let mut req = self.azblob_put_block_request(path, block_id, size, args, body)?; - - self.sign(&mut req).await?; + let req = self.azblob_put_block_request(path, block_id, size, args, body)?; + let req = self.sign(req).await?; self.send(req).await } @@ -511,10 +503,8 @@ impl AzblobCore { block_ids: Vec, args: &OpWrite, ) -> Result> { - let mut req = self.azblob_complete_put_block_list_request(path, block_ids, args)?; - - self.sign(&mut req).await?; - + let req = self.azblob_complete_put_block_list_request(path, block_ids, args)?; + let req = self.sign(req).await?; self.send(req).await } @@ -545,9 +535,8 @@ impl AzblobCore { path: &str, args: &OpStat, ) -> Result> { - let mut req = self.azblob_head_blob_request(path, args)?; - - self.sign(&mut req).await?; + let req = self.azblob_head_blob_request(path, args)?; + let req = self.sign(req).await?; self.send(req).await } @@ -560,9 +549,8 @@ impl AzblobCore { } pub async fn azblob_delete_blob(&self, path: &str) -> Result> { - let mut req = self.azblob_delete_blob_request(path)?; - - self.sign(&mut req).await?; + let req = self.azblob_delete_blob_request(path)?; + let req = self.sign(req).await?; self.send(req).await } @@ -584,12 +572,12 @@ impl AzblobCore { req = req.header(IF_NONE_MATCH, "*"); } - let mut req = req + let req = req .extension(Operation::Copy) .body(Buffer::new()) .map_err(new_request_build_error)?; - self.sign(&mut req).await?; + let req = self.sign(req).await?; self.send(req).await } @@ -618,12 +606,12 @@ impl AzblobCore { url = url.push("marker", next_marker); } - let mut req = Request::get(url.finish()) + let req = Request::get(url.finish()) .extension(Operation::List) .body(Buffer::new()) .map_err(new_request_build_error)?; - self.sign(&mut req).await?; + let req = self.sign(req).await?; self.send(req).await } @@ -636,8 +624,8 @@ impl AzblobCore { let mut multipart = Multipart::new(); for (idx, path) in paths.iter().enumerate() { - let mut req = self.azblob_delete_blob_request(path)?; - self.batch_sign(&mut req).await?; + let req = self.azblob_delete_blob_request(path)?; + let req = self.batch_sign(req).await?; multipart = multipart.part( MixedPart::from_request(req).part_header("content-id".parse().unwrap(), idx.into()), @@ -645,9 +633,8 @@ impl AzblobCore { } let req = Request::post(url); - let mut req = multipart.apply(req)?; - - self.sign(&mut req).await?; + let req = multipart.apply(req)?; + let req = self.sign(req).await?; self.send(req).await } } diff --git a/core/services/azdls/Cargo.toml b/core/services/azdls/Cargo.toml index 820594838ab8..cd7af72a2b2b 100644 --- a/core/services/azdls/Cargo.toml +++ b/core/services/azdls/Cargo.toml @@ -37,10 +37,10 @@ log = { workspace = true } opendal-core = { path = "../../core", version = "0.55.0", default-features = false } opendal-service-azure-common = { path = "../azure-common", version = "0.55.0" } quick-xml = { workspace = true, features = ["serialize", "overlapped-lists"] } -reqsign = { workspace = true, features = [ - "reqwest_request", - "services-azblob", -] } +reqsign-azure-storage = { version = "2.0.2", default-features = false } +reqsign-core = { version = "2.0.1", default-features = false } +reqsign-file-read-tokio = { version = "2.0.1", default-features = false } +reqsign-http-send-reqwest = { version = "2.0.1", default-features = false } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } diff --git a/core/services/azdls/src/backend.rs b/core/services/azdls/src/backend.rs index 7f6120a782e9..e162cd437e1f 100644 --- a/core/services/azdls/src/backend.rs +++ b/core/services/azdls/src/backend.rs @@ -21,9 +21,16 @@ use std::sync::Arc; use http::Response; use http::StatusCode; use log::debug; -use reqsign::AzureStorageConfig; -use reqsign::AzureStorageLoader; -use reqsign::AzureStorageSigner; +use reqsign_azure_storage::DefaultCredentialProvider; +use reqsign_azure_storage::RequestSigner; +use reqsign_azure_storage::StaticCredentialProvider; +use reqsign_core::Context; +use reqsign_core::Env as _; +use reqsign_core::OsEnv; +use reqsign_core::Signer; +use reqsign_core::StaticEnv; +use reqsign_file_read_tokio::TokioFileRead; +use reqsign_http_send_reqwest::ReqwestHttpSend; use super::AZDLS_SCHEME; use super::config::AzdlsConfig; @@ -37,11 +44,12 @@ use super::writer::AzdlsWriters; use opendal_core::raw::*; use opendal_core::*; use opendal_service_azure_common::{ - AzureStorageService, azure_account_name_from_endpoint, azure_config_from_connection_string, + AzureStorageConfig as AzureConnectionConfig, AzureStorageService, + azure_account_name_from_endpoint, azure_config_from_connection_string, }; -impl From for AzdlsConfig { - fn from(config: AzureStorageConfig) -> Self { +impl From for AzdlsConfig { + fn from(config: AzureConnectionConfig) -> Self { AzdlsConfig { endpoint: config.endpoint, account_name: config.account_name, @@ -252,23 +260,62 @@ impl Builder for AzdlsBuilder { }?; debug!("backend use endpoint {}", &endpoint); - let config_loader = AzureStorageConfig { - account_name: self - .config - .account_name - .clone() - .or_else(|| azure_account_name_from_endpoint(endpoint.as_str())), - account_key: self.config.account_key.clone(), - sas_token: self.config.sas_token, - client_id: self.config.client_id.clone(), - client_secret: self.config.client_secret.clone(), - tenant_id: self.config.tenant_id.clone(), - authority_host: self.config.authority_host.clone(), - ..Default::default() - }; + let account_name = self + .config + .account_name + .clone() + .or_else(|| azure_account_name_from_endpoint(endpoint.as_str())); + + let mut envs = std::collections::HashMap::new(); + + if let Some(v) = &account_name { + envs.insert("AZBLOB_ACCOUNT_NAME".to_string(), v.clone()); + envs.insert("AZURE_STORAGE_ACCOUNT_NAME".to_string(), v.clone()); + } + if let Some(v) = &self.config.account_key { + envs.insert("AZBLOB_ACCOUNT_KEY".to_string(), v.clone()); + envs.insert("AZURE_STORAGE_ACCOUNT_KEY".to_string(), v.clone()); + } + if let Some(v) = &self.config.sas_token { + envs.insert("AZURE_STORAGE_SAS_TOKEN".to_string(), v.clone()); + } + if let Some(v) = &self.config.client_id { + envs.insert("AZURE_CLIENT_ID".to_string(), v.clone()); + } + if let Some(v) = &self.config.client_secret { + envs.insert("AZURE_CLIENT_SECRET".to_string(), v.clone()); + } + if let Some(v) = &self.config.tenant_id { + envs.insert("AZURE_TENANT_ID".to_string(), v.clone()); + } + if let Some(v) = &self.config.authority_host { + envs.insert("AZURE_AUTHORITY_HOST".to_string(), v.clone()); + } + + let os_env = OsEnv; + let ctx = Context::new() + .with_file_read(TokioFileRead) + .with_http_send(ReqwestHttpSend::new(GLOBAL_REQWEST_CLIENT.clone())) + .with_env(StaticEnv { + home_dir: os_env.home_dir(), + envs, + }); + + let mut credential = DefaultCredentialProvider::new(); + + if let (Some(account_name), Some(account_key)) = + (account_name.as_deref(), self.config.account_key.as_deref()) + { + credential = credential.push_front(StaticCredentialProvider::new_shared_key( + account_name, + account_key, + )); + } + if let Some(sas_token) = self.config.sas_token.as_deref() { + credential = credential.push_front(StaticCredentialProvider::new_sas_token(sas_token)); + } - let cred_loader = AzureStorageLoader::new(config_loader); - let signer = AzureStorageSigner::new(); + let signer = Signer::new(ctx, credential, RequestSigner::new()); Ok(AzdlsBackend { core: Arc::new(AzdlsCore { info: { @@ -307,7 +354,6 @@ impl Builder for AzdlsBuilder { root, endpoint, enable_hns: self.config.enable_hns, - loader: cred_loader, signer, }), }) diff --git a/core/services/azdls/src/core.rs b/core/services/azdls/src/core.rs index cc8d27622b0a..1d6642e2b2b1 100644 --- a/core/services/azdls/src/core.rs +++ b/core/services/azdls/src/core.rs @@ -27,9 +27,8 @@ use http::header::CONTENT_DISPOSITION; use http::header::CONTENT_LENGTH; use http::header::CONTENT_TYPE; use http::header::IF_NONE_MATCH; -use reqsign::AzureStorageCredential; -use reqsign::AzureStorageLoader; -use reqsign::AzureStorageSigner; +use reqsign_azure_storage::Credential; +use reqsign_core::Signer; use super::error::parse_error; use opendal_core::raw::*; @@ -49,8 +48,7 @@ pub struct AzdlsCore { pub endpoint: String, pub enable_hns: bool, - pub loader: AzureStorageLoader, - pub signer: AzureStorageSigner, + pub signer: Signer, } impl Debug for AzdlsCore { @@ -65,27 +63,11 @@ impl Debug for AzdlsCore { } impl AzdlsCore { - async fn load_credential(&self) -> Result { - let cred = self - .loader - .load() - .await - .map_err(new_request_credential_error)?; - - if let Some(cred) = cred { - Ok(cred) - } else { - Err(Error::new( - ErrorKind::ConfigInvalid, - "no valid credential found", - )) - } - } + pub async fn sign(&self, req: Request) -> Result> { + let (mut parts, body) = req.into_parts(); - pub async fn sign(&self, req: &mut Request) -> Result<()> { - let cred = self.load_credential().await?; // Insert x-ms-version header for normal requests. - req.headers_mut().insert( + parts.headers.insert( HeaderName::from_static(X_MS_VERSION), // 2022-11-02 is the version supported by Azurite V3 and // used by Azure Portal, We use this version to make @@ -94,7 +76,13 @@ impl AzdlsCore { // In the future, we could allow users to configure this value. HeaderValue::from_static("2022-11-02"), ); - self.signer.sign(req, &cred).map_err(new_request_sign_error) + + self.signer + .sign(&mut parts, None) + .await + .map_err(|e| new_request_sign_error(e.into()))?; + + Ok(Request::from_parts(parts, body)) } #[inline] @@ -120,12 +108,12 @@ impl AzdlsCore { req = req.header(http::header::RANGE, range.to_header()); } - let mut req = req + let req = req .extension(Operation::Read) .body(Buffer::new()) .map_err(new_request_build_error)?; - self.sign(&mut req).await?; + let req = self.sign(req).await?; self.info.http_client().fetch(req).await } @@ -176,12 +164,12 @@ impl AzdlsCore { Operation::Write }; - let mut req = req + let req = req .extension(operation) .body(Buffer::new()) .map_err(new_request_build_error)?; - self.sign(&mut req).await?; + let req = self.sign(req).await?; self.send(req).await } @@ -198,14 +186,14 @@ impl AzdlsCore { let source_path = format!("/{}/{}", self.filesystem, percent_encode_path(&source)); - let mut req = Request::put(&url) + let req = Request::put(&url) .header(X_MS_RENAME_SOURCE, source_path) .header(CONTENT_LENGTH, 0) .extension(Operation::Rename) .body(Buffer::new()) .map_err(new_request_build_error)?; - self.sign(&mut req).await?; + let req = self.sign(req).await?; self.send(req).await } @@ -242,12 +230,12 @@ impl AzdlsCore { req = req.header(CONTENT_LENGTH, size) } - let mut req = req + let req = req .extension(Operation::Write) .body(body) .map_err(new_request_build_error)?; - self.sign(&mut req).await?; + let req = self.sign(req).await?; self.send(req).await } @@ -272,13 +260,13 @@ impl AzdlsCore { url.push_str("&close=true"); } - let mut req = Request::patch(&url) + let req = Request::patch(&url) .header(CONTENT_LENGTH, 0) .extension(Operation::Write) .body(Buffer::new()) .map_err(new_request_build_error)?; - self.sign(&mut req).await?; + let req = self.sign(req).await?; self.send(req).await } @@ -296,12 +284,12 @@ impl AzdlsCore { let req = Request::head(&url); - let mut req = req + let req = req .extension(Operation::Stat) .body(Buffer::new()) .map_err(new_request_build_error)?; - self.sign(&mut req).await?; + let req = self.sign(req).await?; self.send(req).await } @@ -360,12 +348,12 @@ impl AzdlsCore { percent_encode_path(&p) ); - let mut req = Request::delete(&url) + let req = Request::delete(&url) .extension(Operation::Delete) .body(Buffer::new()) .map_err(new_request_build_error)?; - self.sign(&mut req).await?; + let req = self.sign(req).await?; self.send(req).await } @@ -394,12 +382,12 @@ impl AzdlsCore { url = url.push("continuation", &percent_encode_path(&continuation)); } - let mut req = Request::delete(url.finish()) + let req = Request::delete(url.finish()) .extension(Operation::Delete) .body(Buffer::new()) .map_err(new_request_build_error)?; - self.sign(&mut req).await?; + let req = self.sign(req).await?; let resp = self.send(req).await?; let status = resp.status(); @@ -446,12 +434,12 @@ impl AzdlsCore { url = url.push("continuation", &percent_encode_path(continuation)); } - let mut req = Request::get(url.finish()) + let req = Request::get(url.finish()) .extension(Operation::List) .body(Buffer::new()) .map_err(new_request_build_error)?; - self.sign(&mut req).await?; + let req = self.sign(req).await?; self.send(req).await } diff --git a/core/services/azfile/Cargo.toml b/core/services/azfile/Cargo.toml index 4741324dcdba..13e1f3be3f44 100644 --- a/core/services/azfile/Cargo.toml +++ b/core/services/azfile/Cargo.toml @@ -37,10 +37,10 @@ log = { workspace = true } opendal-core = { path = "../../core", version = "0.55.0", default-features = false } opendal-service-azure-common = { path = "../azure-common", version = "0.55.0" } quick-xml = { workspace = true, features = ["serialize", "overlapped-lists"] } -reqsign = { workspace = true, features = [ - "reqwest_request", - "services-azblob", -] } +reqsign-azure-storage = { version = "2.0.2", default-features = false } +reqsign-core = { version = "2.0.1", default-features = false } +reqsign-file-read-tokio = { version = "2.0.1", default-features = false } +reqsign-http-send-reqwest = { version = "2.0.1", default-features = false } serde = { workspace = true, features = ["derive"] } [dev-dependencies] diff --git a/core/services/azfile/src/backend.rs b/core/services/azfile/src/backend.rs index 47a0394b2bf2..6c8d7b492ba9 100644 --- a/core/services/azfile/src/backend.rs +++ b/core/services/azfile/src/backend.rs @@ -21,9 +21,16 @@ use std::sync::Arc; use http::Response; use http::StatusCode; use log::debug; -use reqsign::AzureStorageConfig; -use reqsign::AzureStorageLoader; -use reqsign::AzureStorageSigner; +use reqsign_azure_storage::DefaultCredentialProvider; +use reqsign_azure_storage::RequestSigner; +use reqsign_azure_storage::StaticCredentialProvider; +use reqsign_core::Context; +use reqsign_core::Env as _; +use reqsign_core::OsEnv; +use reqsign_core::Signer; +use reqsign_core::StaticEnv; +use reqsign_file_read_tokio::TokioFileRead; +use reqsign_http_send_reqwest::ReqwestHttpSend; use super::AZFILE_SCHEME; use super::config::AzfileConfig; @@ -37,11 +44,12 @@ use super::writer::AzfileWriters; use opendal_core::raw::*; use opendal_core::*; use opendal_service_azure_common::{ - AzureStorageService, azure_account_name_from_endpoint, azure_config_from_connection_string, + AzureStorageConfig as AzureConnectionConfig, AzureStorageService, + azure_account_name_from_endpoint, azure_config_from_connection_string, }; -impl From for AzfileConfig { - fn from(config: AzureStorageConfig) -> Self { +impl From for AzfileConfig { + fn from(config: AzureConnectionConfig) -> Self { AzfileConfig { account_name: config.account_name, account_key: config.account_key, @@ -185,15 +193,42 @@ impl Builder for AzfileBuilder { ), }?; - let config_loader = AzureStorageConfig { - account_name: Some(account_name), - account_key: self.config.account_key.clone(), - sas_token: self.config.sas_token.clone(), - ..Default::default() - }; + let mut envs = std::collections::HashMap::new(); + envs.insert("AZBLOB_ACCOUNT_NAME".to_string(), account_name.clone()); + envs.insert( + "AZURE_STORAGE_ACCOUNT_NAME".to_string(), + account_name.clone(), + ); + + if let Some(v) = &self.config.account_key { + envs.insert("AZBLOB_ACCOUNT_KEY".to_string(), v.clone()); + envs.insert("AZURE_STORAGE_ACCOUNT_KEY".to_string(), v.clone()); + } + if let Some(v) = &self.config.sas_token { + envs.insert("AZURE_STORAGE_SAS_TOKEN".to_string(), v.clone()); + } + + let os_env = OsEnv; + let ctx = Context::new() + .with_file_read(TokioFileRead) + .with_http_send(ReqwestHttpSend::new(GLOBAL_REQWEST_CLIENT.clone())) + .with_env(StaticEnv { + home_dir: os_env.home_dir(), + envs, + }); + + let mut credential = DefaultCredentialProvider::new(); + if let Some(account_key) = self.config.account_key.as_deref() { + credential = credential.push_front(StaticCredentialProvider::new_shared_key( + &account_name, + account_key, + )); + } + if let Some(sas_token) = self.config.sas_token.as_deref() { + credential = credential.push_front(StaticCredentialProvider::new_sas_token(sas_token)); + } - let cred_loader = AzureStorageLoader::new(config_loader); - let signer = AzureStorageSigner::new(); + let signer = Signer::new(ctx, credential, RequestSigner::new()); Ok(AzfileBackend { core: Arc::new(AzfileCore { info: { @@ -223,7 +258,6 @@ impl Builder for AzfileBuilder { }, root, endpoint, - loader: cred_loader, signer, share_name: self.config.share_name.clone(), }), diff --git a/core/services/azfile/src/core.rs b/core/services/azfile/src/core.rs index da4257089ced..22342ed19cc6 100644 --- a/core/services/azfile/src/core.rs +++ b/core/services/azfile/src/core.rs @@ -28,9 +28,8 @@ use http::header::CONTENT_DISPOSITION; use http::header::CONTENT_LENGTH; use http::header::CONTENT_TYPE; use http::header::RANGE; -use reqsign::AzureStorageCredential; -use reqsign::AzureStorageLoader; -use reqsign::AzureStorageSigner; +use reqsign_azure_storage::Credential; +use reqsign_core::Signer; use super::error::parse_error; use opendal_core::raw::*; @@ -49,8 +48,7 @@ pub struct AzfileCore { pub root: String, pub endpoint: String, pub share_name: String, - pub loader: AzureStorageLoader, - pub signer: AzureStorageSigner, + pub signer: Signer, } impl Debug for AzfileCore { @@ -64,32 +62,22 @@ impl Debug for AzfileCore { } impl AzfileCore { - async fn load_credential(&self) -> Result { - let cred = self - .loader - .load() - .await - .map_err(new_request_credential_error)?; - - if let Some(cred) = cred { - Ok(cred) - } else { - Err(Error::new( - ErrorKind::ConfigInvalid, - "no valid credential found", - )) - } - } + pub async fn sign(&self, req: Request) -> Result> { + let (mut parts, body) = req.into_parts(); - pub async fn sign(&self, req: &mut Request) -> Result<()> { - let cred = self.load_credential().await?; // Insert x-ms-version header for normal requests. - req.headers_mut().insert( + parts.headers.insert( HeaderName::from_static(X_MS_VERSION), // consistent with azdls and azblob HeaderValue::from_static("2022-11-02"), ); - self.signer.sign(req, &cred).map_err(new_request_sign_error) + + self.signer + .sign(&mut parts, None) + .await + .map_err(|e| new_request_sign_error(e.into()))?; + + Ok(Request::from_parts(parts, body)) } #[inline] @@ -115,8 +103,8 @@ impl AzfileCore { let req = req.extension(Operation::Read); - let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?; - self.sign(&mut req).await?; + let req = req.body(Buffer::new()).map_err(new_request_build_error)?; + let req = self.sign(req).await?; self.info.http_client().fetch(req).await } @@ -164,8 +152,8 @@ impl AzfileCore { let req = req.extension(Operation::Write); - let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?; - self.sign(&mut req).await?; + let req = req.body(Buffer::new()).map_err(new_request_build_error)?; + let req = self.sign(req).await?; self.send(req).await } @@ -200,8 +188,8 @@ impl AzfileCore { let req = req.extension(Operation::Write); - let mut req = req.body(body).map_err(new_request_build_error)?; - self.sign(&mut req).await?; + let req = req.body(body).map_err(new_request_build_error)?; + let req = self.sign(req).await?; self.send(req).await } @@ -218,8 +206,8 @@ impl AzfileCore { let req = req.extension(Operation::Stat); - let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?; - self.sign(&mut req).await?; + let req = req.body(Buffer::new()).map_err(new_request_build_error)?; + let req = self.sign(req).await?; self.send(req).await } @@ -237,8 +225,8 @@ impl AzfileCore { let req = req.extension(Operation::Stat); - let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?; - self.sign(&mut req).await?; + let req = req.body(Buffer::new()).map_err(new_request_build_error)?; + let req = self.sign(req).await?; self.send(req).await } @@ -289,8 +277,8 @@ impl AzfileCore { let req = req.extension(Operation::Rename); - let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?; - self.sign(&mut req).await?; + let req = req.body(Buffer::new()).map_err(new_request_build_error)?; + let req = self.sign(req).await?; self.send(req).await } @@ -312,8 +300,8 @@ impl AzfileCore { let req = req.extension(Operation::CreateDir); - let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?; - self.sign(&mut req).await?; + let req = req.body(Buffer::new()).map_err(new_request_build_error)?; + let req = self.sign(req).await?; self.send(req).await } @@ -333,8 +321,8 @@ impl AzfileCore { let req = req.extension(Operation::Delete); - let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?; - self.sign(&mut req).await?; + let req = req.body(Buffer::new()).map_err(new_request_build_error)?; + let req = self.sign(req).await?; self.send(req).await } @@ -354,8 +342,8 @@ impl AzfileCore { let req = req.extension(Operation::Delete); - let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?; - self.sign(&mut req).await?; + let req = req.body(Buffer::new()).map_err(new_request_build_error)?; + let req = self.sign(req).await?; self.send(req).await } @@ -393,8 +381,8 @@ impl AzfileCore { let req = req.extension(Operation::List); - let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?; - self.sign(&mut req).await?; + let req = req.body(Buffer::new()).map_err(new_request_build_error)?; + let req = self.sign(req).await?; self.send(req).await } diff --git a/core/services/azure-common/Cargo.toml b/core/services/azure-common/Cargo.toml index 0f989c108545..68a668ba3007 100644 --- a/core/services/azure-common/Cargo.toml +++ b/core/services/azure-common/Cargo.toml @@ -33,4 +33,3 @@ all-features = true [dependencies] http = { workspace = true } opendal-core = { path = "../../core", version = "0.55.0", default-features = false } -reqsign = { workspace = true } diff --git a/core/services/azure-common/src/lib.rs b/core/services/azure-common/src/lib.rs index e380539dffc7..55e9cc90bdf2 100644 --- a/core/services/azure-common/src/lib.rs +++ b/core/services/azure-common/src/lib.rs @@ -28,7 +28,32 @@ use std::collections::HashMap; use http::Uri; use http::response::Parts; use opendal_core::{Error, ErrorKind, Result}; -use reqsign::{AzureStorageConfig, AzureStorageCredential}; + +/// Configuration parsed from Azure storage connection string. +#[derive(Debug, Default, Clone, PartialEq, Eq)] +pub struct AzureStorageConfig { + /// Storage account name. + pub account_name: Option, + /// Storage account shared key. + pub account_key: Option, + /// Shared access signature token. + pub sas_token: Option, + /// Service endpoint. + pub endpoint: Option, + /// OAuth client id. + pub client_id: Option, + /// OAuth client secret. + pub client_secret: Option, + /// OAuth tenant id. + pub tenant_id: Option, + /// OAuth authority host. + pub authority_host: Option, +} + +enum AzureStorageCredential { + SharedAccessSignature(String), + SharedKey(String, String), +} /// Parses an [Azure connection string][1] into a configuration object. /// @@ -229,9 +254,6 @@ fn set_credentials(config: &mut AzureStorageConfig, creds: AzureStorageCredentia config.account_name = Some(account_name); config.account_key = Some(account_key); } - AzureStorageCredential::BearerToken(_, _) => { - // Bearer tokens shouldn't be passed via connection strings. - } } } @@ -327,12 +349,12 @@ fn censor_sas_uri(uri: &Uri) -> String { #[cfg(test)] mod tests { use http::Uri; - use reqsign::AzureStorageConfig; use super::censor_sas_uri; use super::{ - AzureStorageService, azure_account_name_from_endpoint, azure_config_from_connection_string, + AzureStorageConfig, AzureStorageService, azure_account_name_from_endpoint, + azure_config_from_connection_string, }; #[test] diff --git a/core/services/ghac/Cargo.toml b/core/services/ghac/Cargo.toml index 40a6417aefb2..a9f003abb8ab 100644 --- a/core/services/ghac/Cargo.toml +++ b/core/services/ghac/Cargo.toml @@ -38,7 +38,8 @@ log = { workspace = true } opendal-core = { path = "../../core", version = "0.55.0", default-features = false } opendal-service-azblob = { path = "../azblob", version = "0.55.0", default-features = false } prost = { version = "0.13", default-features = false } -reqsign = { workspace = true } +reqsign-azure-storage = { version = "2.0.2", default-features = false } +reqsign-core = { version = "2.0.1", default-features = false } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } sha2 = { workspace = true } diff --git a/core/services/ghac/src/writer.rs b/core/services/ghac/src/writer.rs index bf9a2e515620..d54eb914e0ef 100644 --- a/core/services/ghac/src/writer.rs +++ b/core/services/ghac/src/writer.rs @@ -25,6 +25,10 @@ use opendal_core::raw::*; use opendal_core::*; use opendal_service_azblob::core::AzblobCore; use opendal_service_azblob::writer::AzblobWriter; +use reqsign_azure_storage::RequestSigner; +use reqsign_azure_storage::StaticCredentialProvider; +use reqsign_core::Context; +use reqsign_core::Signer; pub struct GhacWriter(pub TwoWays); @@ -66,6 +70,11 @@ impl GhacWriter { ) .with_context("url", &url)); }; + let signer = Signer::new( + Context::new(), + StaticCredentialProvider::new_sas_token(query), + RequestSigner::new(), + ); let azure_core = Arc::new(AzblobCore { info: { let am = AccessorInfo::default(); @@ -113,14 +122,7 @@ impl GhacWriter { encryption_key: None, encryption_key_sha256: None, encryption_algorithm: None, - loader: { - let config = reqsign::AzureStorageConfig { - sas_token: Some(query.to_string()), - ..Default::default() - }; - reqsign::AzureStorageLoader::new(config) - }, - signer: { reqsign::AzureStorageSigner::new() }, + signer, }); let w = AzblobWriter::new(azure_core, OpWrite::default(), path.to_string()); let writer = oio::BlockWriter::new(core.info.clone(), w, 4);