Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 37 additions & 5 deletions core/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions core/services/azblob/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ log = { workspace = true }
opendal-core = { path = "../../core", version = "0.55.0", default-features = false }
opendal-service-azure-common = { path = "../azure-common", version = "0.55.0" }
quick-xml = { workspace = true, features = ["serialize", "overlapped-lists"] }
reqsign = { workspace = true, features = [
"reqwest_request",
"services-azblob",
] }
reqsign-azure-storage = { version = "2.0.2", default-features = false }
reqsign-core = { version = "2.0.1", default-features = false }
reqsign-file-read-tokio = { version = "2.0.1", default-features = false }
reqsign-http-send-reqwest = { version = "2.0.1", default-features = false }
serde = { workspace = true, features = ["derive"] }
sha2 = { workspace = true }
uuid = { workspace = true, features = ["v4", "serde"] }
Expand Down
84 changes: 58 additions & 26 deletions core/services/azblob/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,16 @@ use base64::prelude::BASE64_STANDARD;
use http::Response;
use http::StatusCode;
use log::debug;
use reqsign::AzureStorageConfig;
use reqsign::AzureStorageLoader;
use reqsign::AzureStorageSigner;
use reqsign_azure_storage::DefaultCredentialProvider;
use reqsign_azure_storage::RequestSigner;
use reqsign_azure_storage::StaticCredentialProvider;
use reqsign_core::Context;
use reqsign_core::Env as _;
use reqsign_core::OsEnv;
use reqsign_core::Signer;
use reqsign_core::StaticEnv;
use reqsign_file_read_tokio::TokioFileRead;
use reqsign_http_send_reqwest::ReqwestHttpSend;
use sha2::Digest;
use sha2::Sha256;

Expand All @@ -42,13 +49,14 @@ use super::writer::AzblobWriters;
use opendal_core::raw::*;
use opendal_core::*;
use opendal_service_azure_common::{
AzureStorageService, azure_account_name_from_endpoint, azure_config_from_connection_string,
AzureStorageConfig as AzureConnectionConfig, AzureStorageService,
azure_account_name_from_endpoint, azure_config_from_connection_string,
};

const AZBLOB_BATCH_LIMIT: usize = 256;

impl From<AzureStorageConfig> for AzblobConfig {
fn from(value: AzureStorageConfig) -> Self {
impl From<AzureConnectionConfig> for AzblobConfig {
fn from(value: AzureConnectionConfig) -> Self {
Self {
endpoint: value.endpoint,
account_name: value.account_name,
Expand Down Expand Up @@ -297,23 +305,23 @@ impl Builder for AzblobBuilder {
}?;
debug!("backend use endpoint {}", &container);

#[cfg(target_arch = "wasm32")]
let mut config_loader = AzureStorageConfig::default();
#[cfg(not(target_arch = "wasm32"))]
let mut config_loader = AzureStorageConfig::default().from_env();

if let Some(v) = self
let account_name = self
.config
.account_name
.clone()
.or_else(|| azure_account_name_from_endpoint(endpoint.as_str()))
{
config_loader.account_name = Some(v);
.or_else(|| azure_account_name_from_endpoint(endpoint.as_str()));

let os_env = OsEnv;
let mut envs = os_env.vars();

if let Some(v) = &account_name {
envs.insert("AZBLOB_ACCOUNT_NAME".to_string(), v.clone());
envs.insert("AZURE_STORAGE_ACCOUNT_NAME".to_string(), v.clone());
}

if let Some(v) = self.config.account_key.clone() {
if let Some(v) = &self.config.account_key {
// Validate that account_key can be decoded as base64
if let Err(e) = BASE64_STANDARD.decode(&v) {
if let Err(e) = BASE64_STANDARD.decode(v) {
return Err(Error::new(
ErrorKind::ConfigInvalid,
format!("invalid account_key: cannot decode as base64: {e}"),
Expand All @@ -322,11 +330,12 @@ impl Builder for AzblobBuilder {
.with_context("service", AZBLOB_SCHEME)
.with_context("key", "account_key"));
}
config_loader.account_key = Some(v);
envs.insert("AZBLOB_ACCOUNT_KEY".to_string(), v.clone());
envs.insert("AZURE_STORAGE_ACCOUNT_KEY".to_string(), v.clone());
}

if let Some(v) = self.config.sas_token.clone() {
config_loader.sas_token = Some(v);
if let Some(v) = &self.config.sas_token {
envs.insert("AZURE_STORAGE_SAS_TOKEN".to_string(), v.clone());
}

let encryption_key =
Expand Down Expand Up @@ -360,9 +369,34 @@ impl Builder for AzblobBuilder {
}
};

let cred_loader = AzureStorageLoader::new(config_loader);
let ctx = Context::new()
.with_file_read(TokioFileRead)
.with_http_send(ReqwestHttpSend::new(GLOBAL_REQWEST_CLIENT.clone()))
.with_env(StaticEnv {
home_dir: os_env.home_dir(),
envs,
});

let mut credential = DefaultCredentialProvider::new();

if let (Some(account_name), Some(account_key)) =
(account_name.as_deref(), self.config.account_key.as_deref())
{
credential = credential.push_front(StaticCredentialProvider::new_shared_key(
account_name,
account_key,
));
}

if let Some(sas_token) = self.config.sas_token.as_deref() {
credential = credential.push_front(StaticCredentialProvider::new_sas_token(sas_token));
}

let signer = AzureStorageSigner::new();
let signer = Signer::new(
ctx,
credential,
RequestSigner::new().with_service_sas_permissions("racwd"),
);

Ok(AzblobBackend {
core: Arc::new(AzblobCore {
Expand Down Expand Up @@ -422,7 +456,6 @@ impl Builder for AzblobBuilder {
encryption_algorithm,
container: self.config.container.clone(),

loader: cred_loader,
signer,
}),
})
Expand Down Expand Up @@ -551,9 +584,8 @@ impl Access for AzblobBackend {
)),
};

let mut req = req?;

self.core.sign_query(&mut req).await?;
let req = req?;
let req = self.core.sign_query(req).await?;

let (parts, _) = req.into_parts();

Expand Down
Loading
Loading