Skip to content

Commit 0064041

Browse files
authored
Merge branch 'main' into refactor-mysql
2 parents 2284f83 + fa58a57 commit 0064041

File tree

11 files changed

+312
-123
lines changed

11 files changed

+312
-123
lines changed

core/Cargo.lock

Lines changed: 10 additions & 10 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ required-features = ["tests"]
239239
[dependencies]
240240
# Required dependencies
241241
anyhow = { version = "1.0.100", features = ["std"] }
242-
backon = { version = "1.5", features = ["tokio-sleep"] }
242+
backon = { version = "1.6", features = ["tokio-sleep"] }
243243
base64 = "0.22"
244244
bytes = "1.6"
245245
futures = { version = "0.3", default-features = false, features = [
@@ -254,7 +254,7 @@ md-5 = "0.10"
254254
percent-encoding = "2"
255255
url = "2.5"
256256
quick-xml = { version = "0.38", features = ["serialize", "overlapped-lists"] }
257-
reqwest = { version = "0.12.23", features = [
257+
reqwest = { version = "0.12.24", features = [
258258
"stream",
259259
], default-features = false }
260260
serde = { version = "1", features = ["derive"] }
@@ -406,7 +406,7 @@ tracing = { version = "0.1", optional = true }
406406
probe = { version = "0.5.1", optional = true }
407407

408408
[target.'cfg(target_arch = "wasm32")'.dependencies]
409-
backon = { version = "1.2", features = ["gloo-timers-sleep"] }
409+
backon = { version = "1.6", features = ["gloo-timers-sleep"] }
410410
getrandom = { version = "0.2", features = ["js"] }
411411
tokio = { version = "1.46", features = ["time"] }
412412
uuid = { version = "1.18", features = ["serde", "v4", "js"] }

core/benches/vs_fs/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ version = "0.0.0"
2626

2727
[dependencies]
2828
criterion = { version = "0.7", features = ["async", "async_tokio"] }
29-
opendal = { path = "../..", features = ["tests"] }
29+
opendal = { path = "../..", features = ["blocking", "tests"] }
3030
rand = "0.8"
3131
tokio = { version = "1", features = ["full"] }
3232
uuid = { version = "1", features = ["v4"] }

core/src/raw/oio/list/page_list.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,7 @@ use crate::*;
3333
/// - Expose `PageLister` as `Accessor::Lister`
3434
pub trait PageList: Send + Sync + Unpin + 'static {
3535
/// next_page is used to fetch next page of entries from underlying storage.
36-
#[cfg(not(target_arch = "wasm32"))]
3736
fn next_page(&self, ctx: &mut PageContext) -> impl Future<Output = Result<()>> + MaybeSend;
38-
#[cfg(target_arch = "wasm32")]
39-
/// next_page is used to fetch next page of entries from underlying storage.
40-
fn next_page(&self, ctx: &mut PageContext) -> impl Future<Output = Result<()>>;
4137
}
4238

4339
/// PageContext is the context passing between `PageList`.

core/src/services/tikv/backend.rs

Lines changed: 74 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,15 @@
1717

1818
use std::fmt::Debug;
1919
use std::fmt::Formatter;
20+
use std::sync::Arc;
2021

21-
use tikv_client::Config;
22-
use tikv_client::RawClient;
2322
use tokio::sync::OnceCell;
2423

25-
use crate::Builder;
26-
use crate::Capability;
27-
use crate::Error;
28-
use crate::ErrorKind;
29-
use crate::Scheme;
30-
use crate::raw::Access;
31-
use crate::raw::adapters::kv;
32-
use crate::services::TikvConfig;
24+
use super::config::TikvConfig;
25+
use super::core::*;
26+
use super::deleter::TikvDeleter;
27+
use super::writer::TikvWriter;
28+
use crate::raw::*;
3329
use crate::*;
3430

3531
/// TiKV backend builder
@@ -112,7 +108,7 @@ impl Builder for TikvBuilder {
112108
)?;
113109
}
114110

115-
Ok(TikvBackend::new(Adapter {
111+
Ok(TikvBackend::new(TikvCore {
116112
client: OnceCell::new(),
117113
endpoints,
118114
insecure: self.config.insecure,
@@ -124,107 +120,86 @@ impl Builder for TikvBuilder {
124120
}
125121

126122
/// Backend for TiKV service
127-
pub type TikvBackend = kv::Backend<Adapter>;
128-
129-
#[derive(Clone)]
130-
pub struct Adapter {
131-
client: OnceCell<RawClient>,
132-
endpoints: Vec<String>,
133-
insecure: bool,
134-
ca_path: Option<String>,
135-
cert_path: Option<String>,
136-
key_path: Option<String>,
123+
#[derive(Clone, Debug)]
124+
pub struct TikvBackend {
125+
core: Arc<TikvCore>,
126+
root: String,
127+
info: Arc<AccessorInfo>,
137128
}
138129

139-
impl Debug for Adapter {
140-
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
141-
let mut ds = f.debug_struct("Adapter");
142-
143-
ds.field("endpoints", &self.endpoints);
144-
ds.finish()
145-
}
146-
}
147-
148-
impl Adapter {
149-
async fn get_connection(&self) -> Result<RawClient> {
150-
if let Some(client) = self.client.get() {
151-
return Ok(client.clone());
130+
impl TikvBackend {
131+
fn new(core: TikvCore) -> Self {
132+
let info = AccessorInfo::default();
133+
info.set_scheme(Scheme::Tikv.into_static());
134+
info.set_name("TiKV");
135+
info.set_root("/");
136+
info.set_native_capability(Capability {
137+
read: true,
138+
stat: true,
139+
write: true,
140+
write_can_empty: true,
141+
delete: true,
142+
shared: true,
143+
..Default::default()
144+
});
145+
146+
Self {
147+
core: Arc::new(core),
148+
root: "/".to_string(),
149+
info: Arc::new(info),
152150
}
153-
let client = if self.insecure {
154-
RawClient::new(self.endpoints.clone())
155-
.await
156-
.map_err(parse_tikv_config_error)?
157-
} else if self.ca_path.is_some() && self.key_path.is_some() && self.cert_path.is_some() {
158-
let (ca_path, key_path, cert_path) = (
159-
self.ca_path.clone().unwrap(),
160-
self.key_path.clone().unwrap(),
161-
self.cert_path.clone().unwrap(),
162-
);
163-
let config = Config::default().with_security(ca_path, cert_path, key_path);
164-
RawClient::new_with_config(self.endpoints.clone(), config)
165-
.await
166-
.map_err(parse_tikv_config_error)?
167-
} else {
168-
return Err(
169-
Error::new(ErrorKind::ConfigInvalid, "invalid configuration")
170-
.with_context("service", Scheme::Tikv)
171-
.with_context("endpoints", format!("{:?}", self.endpoints)),
172-
);
173-
};
174-
self.client.set(client.clone()).ok();
175-
Ok(client)
176151
}
177152
}
178153

179-
impl kv::Adapter for Adapter {
180-
type Scanner = ();
181-
182-
fn info(&self) -> kv::Info {
183-
kv::Info::new(
184-
Scheme::Tikv,
185-
"TiKV",
186-
Capability {
187-
read: true,
188-
write: true,
189-
shared: true,
190-
..Default::default()
191-
},
192-
)
154+
impl Access for TikvBackend {
155+
type Reader = Buffer;
156+
type Writer = TikvWriter;
157+
type Lister = ();
158+
type Deleter = oio::OneShotDeleter<TikvDeleter>;
159+
160+
fn info(&self) -> Arc<AccessorInfo> {
161+
self.info.clone()
193162
}
194163

195-
async fn get(&self, path: &str) -> Result<Option<Buffer>> {
196-
let result = self
197-
.get_connection()
198-
.await?
199-
.get(path.to_owned())
200-
.await
201-
.map_err(parse_tikv_error)?;
202-
Ok(result.map(Buffer::from))
164+
async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
165+
let p = build_abs_path(&self.root, path);
166+
167+
if p == build_abs_path(&self.root, "") {
168+
Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
169+
} else {
170+
let bs = self.core.get(&p).await?;
171+
match bs {
172+
Some(bs) => Ok(RpStat::new(
173+
Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
174+
)),
175+
None => Err(Error::new(ErrorKind::NotFound, "kv not found in tikv")),
176+
}
177+
}
203178
}
204179

205-
async fn set(&self, path: &str, value: Buffer) -> Result<()> {
206-
self.get_connection()
207-
.await?
208-
.put(path.to_owned(), value.to_vec())
209-
.await
210-
.map_err(parse_tikv_error)
180+
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
181+
let p = build_abs_path(&self.root, path);
182+
let bs = match self.core.get(&p).await? {
183+
Some(bs) => bs,
184+
None => return Err(Error::new(ErrorKind::NotFound, "kv not found in tikv")),
185+
};
186+
Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
211187
}
212188

213-
async fn delete(&self, path: &str) -> Result<()> {
214-
self.get_connection()
215-
.await?
216-
.delete(path.to_owned())
217-
.await
218-
.map_err(parse_tikv_error)
189+
async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
190+
let p = build_abs_path(&self.root, path);
191+
Ok((RpWrite::new(), TikvWriter::new(self.core.clone(), p)))
219192
}
220-
}
221193

222-
fn parse_tikv_error(e: tikv_client::Error) -> Error {
223-
Error::new(ErrorKind::Unexpected, "error from tikv").set_source(e)
224-
}
194+
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
195+
Ok((
196+
RpDelete::default(),
197+
oio::OneShotDeleter::new(TikvDeleter::new(self.core.clone(), self.root.clone())),
198+
))
199+
}
225200

226-
fn parse_tikv_config_error(e: tikv_client::Error) -> Error {
227-
Error::new(ErrorKind::ConfigInvalid, "invalid configuration")
228-
.with_context("service", Scheme::Tikv)
229-
.set_source(e)
201+
async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
202+
let _ = build_abs_path(&self.root, path);
203+
Ok((RpList::default(), ()))
204+
}
230205
}

core/src/services/tikv/config.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@
1818
use std::fmt::Debug;
1919
use std::fmt::Formatter;
2020

21-
use super::backend::TikvBuilder;
2221
use serde::Deserialize;
2322
use serde::Serialize;
2423

24+
use super::backend::TikvBuilder;
25+
2526
/// Config for Tikv services support.
2627
#[derive(Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
2728
#[serde(default)]
@@ -54,6 +55,7 @@ impl Debug for TikvConfig {
5455

5556
impl crate::Configurator for TikvConfig {
5657
type Builder = TikvBuilder;
58+
5759
fn from_uri(uri: &crate::types::OperatorUri) -> crate::Result<Self> {
5860
let map = uri.options().clone();
5961

0 commit comments

Comments
 (0)