Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion core/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions core/services/cos/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ http = { workspace = true }
log = { workspace = true }
opendal-core = { path = "../../core", version = "0.55.0", default-features = false }
quick-xml = { workspace = true, features = ["serialize", "overlapped-lists"] }
reqsign = { workspace = true, features = [
"services-tencent",
"reqwest_request",
] }
reqsign-core = { version = "2.0.1", default-features = false }
reqsign-file-read-tokio = { version = "2.0.1", default-features = false }
reqsign-http-send-reqwest = { version = "2.0.1", default-features = false }
reqsign-tencent-cos = { version = "2.0.2", default-features = false }
reqwest = { version = "0.12.24", default-features = false, features = [
"stream",
] }
Expand Down
63 changes: 45 additions & 18 deletions core/services/cos/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,15 @@ use http::Response;
use http::StatusCode;
use http::Uri;
use log::debug;
use reqsign::TencentCosConfig;
use reqsign::TencentCosCredentialLoader;
use reqsign::TencentCosSigner;
use reqsign_core::Context;
use reqsign_core::Env as _;
use reqsign_core::OsEnv;
use reqsign_core::Signer;
use reqsign_file_read_tokio::TokioFileRead;
use reqsign_http_send_reqwest::ReqwestHttpSend;
use reqsign_tencent_cos::DefaultCredentialProvider;
use reqsign_tencent_cos::RequestSigner;
use reqsign_tencent_cos::StaticCredentialProvider;

use super::COS_SCHEME;
use super::config::CosConfig;
Expand Down Expand Up @@ -175,21 +181,43 @@ impl Builder for CosBuilder {
let endpoint = uri.host().unwrap().replace(&format!("//{bucket}."), "//");
debug!("backend use endpoint {}", &endpoint);

let mut cfg = TencentCosConfig::default();
if !self.config.disable_config_load {
cfg = cfg.from_env();
}
let os_env = OsEnv;
let envs = os_env.vars();
let ctx = Context::new()
.with_file_read(TokioFileRead)
.with_http_send(ReqwestHttpSend::new(GLOBAL_REQWEST_CLIENT.clone()))
.with_env(os_env);

let mut credential = if self.config.disable_config_load {
DefaultCredentialProvider::builder()
.disable_env(true)
.disable_assume_role(true)
.build()
} else {
DefaultCredentialProvider::new()
};

if let Some(v) = self.config.secret_id {
cfg.secret_id = Some(v);
if let (Some(secret_id), Some(secret_key)) = (
self.config.secret_id.as_deref(),
self.config.secret_key.as_deref(),
) {
let security_token = envs
.get("TENCENTCLOUD_TOKEN")
.or_else(|| envs.get("TENCENTCLOUD_SECURITY_TOKEN"))
.or_else(|| envs.get("QCLOUD_SECRET_TOKEN"));

let static_provider = if self.config.disable_config_load {
StaticCredentialProvider::new(secret_id, secret_key)
} else if let Some(token) = security_token {
StaticCredentialProvider::with_security_token(secret_id, secret_key, token)
} else {
StaticCredentialProvider::new(secret_id, secret_key)
};

credential = credential.push_front(static_provider);
}
if let Some(v) = self.config.secret_key {
cfg.secret_key = Some(v);
}

let cred_loader = TencentCosCredentialLoader::new(GLOBAL_REQWEST_CLIENT.clone(), cfg);

let signer = TencentCosSigner::new();
let signer = Signer::new(ctx, credential, RequestSigner::new());

Ok(CosBackend {
core: Arc::new(CosCore {
Expand Down Expand Up @@ -260,7 +288,6 @@ impl Builder for CosBuilder {
root,
endpoint: format!("{}://{}.{}", &scheme, &bucket, &endpoint),
signer,
loader: cred_loader,
}),
})
}
Expand Down Expand Up @@ -399,8 +426,8 @@ impl Access for CosBackend {
"operation is not supported",
)),
};
let mut req = req?;
self.core.sign_query(&mut req, args.expire()).await?;
let req = req?;
let req = self.core.sign_query(req, args.expire()).await?;

// We don't need this request anymore, consume it directly.
let (parts, _) = req.into_parts();
Expand Down
100 changes: 37 additions & 63 deletions core/services/cos/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,12 @@ use http::header::IF_MATCH;
use http::header::IF_MODIFIED_SINCE;
use http::header::IF_NONE_MATCH;
use http::header::IF_UNMODIFIED_SINCE;
use reqsign::TencentCosCredential;
use reqsign::TencentCosCredentialLoader;
use reqsign::TencentCosSigner;
use reqsign_core::Signer;
use reqsign_tencent_cos::Credential;
use serde::Deserialize;
use serde::Serialize;

use opendal_core::Buffer;
use opendal_core::Error;
use opendal_core::ErrorKind;
use opendal_core::Result;
use opendal_core::raw::*;

Expand All @@ -53,8 +50,7 @@ pub struct CosCore {
pub root: String,
pub endpoint: String,

pub signer: TencentCosSigner,
pub loader: TencentCosCredentialLoader,
pub signer: Signer<Credential>,
}

impl Debug for CosCore {
Expand All @@ -68,43 +64,26 @@ impl Debug for CosCore {
}

impl CosCore {
async fn load_credential(&self) -> Result<Option<TencentCosCredential>> {
let cred = self
.loader
.load()
.await
.map_err(new_request_credential_error)?;

if let Some(cred) = cred {
return Ok(Some(cred));
}

Err(Error::new(
ErrorKind::PermissionDenied,
"no valid credential found and anonymous access is not allowed",
))
}
pub async fn sign<T>(&self, req: Request<T>) -> Result<Request<T>> {
let (mut parts, body) = req.into_parts();

pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
let cred = if let Some(cred) = self.load_credential().await? {
cred
} else {
return Ok(());
};
self.signer
.sign(&mut parts, None)
.await
.map_err(|e| new_request_sign_error(e.into()))?;

self.signer.sign(req, &cred).map_err(new_request_sign_error)
Ok(Request::from_parts(parts, body))
}

pub async fn sign_query<T>(&self, req: &mut Request<T>, duration: Duration) -> Result<()> {
let cred = if let Some(cred) = self.load_credential().await? {
cred
} else {
return Ok(());
};
pub async fn sign_query<T>(&self, req: Request<T>, duration: Duration) -> Result<Request<T>> {
let (mut parts, body) = req.into_parts();

self.signer
.sign_query(req, duration, &cred)
.map_err(new_request_sign_error)
.sign(&mut parts, Some(duration))
.await
.map_err(|e| new_request_sign_error(e.into()))?;

Ok(Request::from_parts(parts, body))
}

#[inline]
Expand All @@ -120,9 +99,8 @@ impl CosCore {
range: BytesRange,
args: &OpRead,
) -> Result<Response<HttpBody>> {
let mut req = self.cos_get_object_request(path, range, args)?;

self.sign(&mut req).await?;
let req = self.cos_get_object_request(path, range, args)?;
let req = self.sign(req).await?;

self.info.http_client().fetch(req).await
}
Expand Down Expand Up @@ -231,9 +209,8 @@ impl CosCore {
}

pub async fn cos_head_object(&self, path: &str, args: &OpStat) -> Result<Response<Buffer>> {
let mut req = self.cos_head_object_request(path, args)?;

self.sign(&mut req).await?;
let req = self.cos_head_object_request(path, args)?;
let req = self.sign(req).await?;

self.send(req).await
}
Expand Down Expand Up @@ -293,9 +270,8 @@ impl CosCore {

let req = req.extension(Operation::Delete);

let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;

self.sign(&mut req).await?;
let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
let req = self.sign(req).await?;

self.send(req).await
}
Expand Down Expand Up @@ -345,13 +321,13 @@ impl CosCore {
let source = format!("/{}/{}", self.bucket, percent_encode_path(&source));
let url = format!("{}/{}", self.endpoint, percent_encode_path(&target));

let mut req = Request::put(&url)
let req = Request::put(&url)
.extension(Operation::Copy)
.header("x-cos-copy-source", &source)
.body(Buffer::new())
.map_err(new_request_build_error)?;

self.sign(&mut req).await?;
let req = self.sign(req).await?;

self.send(req).await
}
Expand Down Expand Up @@ -380,12 +356,12 @@ impl CosCore {
url = url.push("marker", next_marker);
}

let mut req = Request::get(url.finish())
let req = Request::get(url.finish())
.extension(Operation::List)
.body(Buffer::new())
.map_err(new_request_build_error)?;

self.sign(&mut req).await?;
let req = self.sign(req).await?;

self.send(req).await
}
Expand Down Expand Up @@ -422,9 +398,8 @@ impl CosCore {

let req = req.extension(Operation::Write);

let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;

self.sign(&mut req).await?;
let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
let req = self.sign(req).await?;

self.send(req).await
}
Expand Down Expand Up @@ -453,9 +428,8 @@ impl CosCore {
let req = req.extension(Operation::Write);

// Set body
let mut req = req.body(body).map_err(new_request_build_error)?;

self.sign(&mut req).await?;
let req = req.body(body).map_err(new_request_build_error)?;
let req = self.sign(req).await?;

self.send(req).await
}
Expand Down Expand Up @@ -486,11 +460,11 @@ impl CosCore {

let req = req.extension(Operation::Write);

let mut req = req
let req = req
.body(Buffer::from(Bytes::from(content)))
.map_err(new_request_build_error)?;

self.sign(&mut req).await?;
let req = self.sign(req).await?;

self.send(req).await
}
Expand All @@ -510,11 +484,11 @@ impl CosCore {
percent_encode_path(upload_id)
);

let mut req = Request::delete(&url)
let req = Request::delete(&url)
.extension(Operation::Delete)
.body(Buffer::new())
.map_err(new_request_build_error)?;
self.sign(&mut req).await?;
let req = self.sign(req).await?;
self.send(req).await
}

Expand Down Expand Up @@ -547,12 +521,12 @@ impl CosCore {
url = url.push("version-id-marker", &percent_encode_path(version_id_marker));
}

let mut req = Request::get(url.finish())
let req = Request::get(url.finish())
.extension(Operation::List)
.body(Buffer::new())
.map_err(new_request_build_error)?;

self.sign(&mut req).await?;
let req = self.sign(req).await?;

self.send(req).await
}
Expand Down
8 changes: 4 additions & 4 deletions core/services/cos/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ impl CosWriter {

impl oio::MultipartWrite for CosWriter {
async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
let mut req = self
let req = self
.core
.cos_put_object_request(&self.path, Some(size), &self.op, body)?;

self.core.sign(&mut req).await?;
let req = self.core.sign(req).await?;

let resp = self.core.send(req).await?;

Expand Down Expand Up @@ -219,11 +219,11 @@ impl oio::AppendWrite for CosWriter {
}

async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result<Metadata> {
let mut req = self
let req = self
.core
.cos_append_object_request(&self.path, offset, size, &self.op, body)?;

self.core.sign(&mut req).await?;
let req = self.core.sign(req).await?;

let resp = self.core.send(req).await?;

Expand Down
Loading