Skip to content

Commit 29974a7

Browse files
authored
refactor: migrate sled service from adapter::kv to impl Access directly (#6731)
* refactor: migrate sled service from adapter::kv to impl Access directly * fix tests
1 parent b64d20b commit 29974a7

File tree

8 files changed

+342
-89
lines changed

8 files changed

+342
-89
lines changed

core/src/services/sled/backend.rs

Lines changed: 92 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,14 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::fmt::Debug;
19-
use std::fmt::Formatter;
20-
use std::str;
21-
22-
use crate::Builder;
23-
use crate::Error;
24-
use crate::ErrorKind;
25-
use crate::Scheme;
26-
use crate::raw::adapters::kv;
18+
use std::sync::Arc;
19+
20+
use super::config::SledConfig;
21+
use super::core::*;
22+
use super::deleter::SledDeleter;
23+
use super::lister::SledLister;
24+
use super::writer::SledWriter;
2725
use crate::raw::*;
28-
use crate::services::SledConfig;
2926
use crate::*;
3027

3128
// https://github.com/spacejam/sled/blob/69294e59c718289ab3cb6bd03ac3b9e1e072a1e7/src/db.rs#L5
@@ -38,21 +35,19 @@ pub struct SledBuilder {
3835
pub(super) config: SledConfig,
3936
}
4037

41-
impl Debug for SledBuilder {
42-
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
43-
f.debug_struct("SledBuilder")
44-
.field("config", &self.config)
45-
.finish()
46-
}
47-
}
48-
4938
impl SledBuilder {
5039
/// Set the path to the sled data directory. Will create if not exists.
5140
pub fn datadir(mut self, path: &str) -> Self {
5241
self.config.datadir = Some(path.into());
5342
self
5443
}
5544

45+
/// Set the tree for sled.
46+
pub fn tree(mut self, tree: &str) -> Self {
47+
self.config.tree = Some(tree.into());
48+
self
49+
}
50+
5651
/// Set the root for sled.
5752
pub fn root(mut self, root: &str) -> Self {
5853
self.config.root = if root.is_empty() {
@@ -63,12 +58,6 @@ impl SledBuilder {
6358

6459
self
6560
}
66-
67-
/// Set the tree for sled.
68-
pub fn tree(mut self, tree: &str) -> Self {
69-
self.config.tree = Some(tree.into());
70-
self
71-
}
7261
}
7362

7463
impl Builder for SledBuilder {
@@ -101,87 +90,110 @@ impl Builder for SledBuilder {
10190
.set_source(e)
10291
})?;
10392

104-
Ok(SledBackend::new(Adapter {
93+
let root = normalize_root(&self.config.root.unwrap_or_default());
94+
95+
Ok(SledBackend::new(SledCore {
10596
datadir: datadir_path,
10697
tree,
10798
})
108-
.with_root(self.config.root.as_deref().unwrap_or("/")))
99+
.with_normalized_root(root))
109100
}
110101
}
111102

112103
/// Backend for sled services.
113-
pub type SledBackend = kv::Backend<Adapter>;
114-
115-
#[derive(Clone)]
116-
pub struct Adapter {
117-
datadir: String,
118-
tree: sled::Tree,
104+
#[derive(Clone, Debug)]
105+
pub struct SledBackend {
106+
core: Arc<SledCore>,
107+
root: String,
108+
info: Arc<AccessorInfo>,
119109
}
120110

121-
impl Debug for Adapter {
122-
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
123-
let mut ds = f.debug_struct("Adapter");
124-
ds.field("path", &self.datadir);
125-
ds.finish()
126-
}
127-
}
128-
129-
impl kv::Adapter for Adapter {
130-
type Scanner = kv::Scanner;
131-
132-
fn info(&self) -> kv::Info {
133-
kv::Info::new(
134-
Scheme::Sled,
135-
&self.datadir,
136-
Capability {
111+
impl SledBackend {
112+
pub fn new(core: SledCore) -> Self {
113+
let info = AccessorInfo::default();
114+
info.set_scheme(Scheme::Sled.into_static())
115+
.set_name(&core.datadir)
116+
.set_root("/")
117+
.set_native_capability(Capability {
137118
read: true,
119+
stat: true,
138120
write: true,
121+
write_can_empty: true,
122+
delete: true,
139123
list: true,
124+
list_with_recursive: true,
140125
shared: false,
141126
..Default::default()
142-
},
143-
)
144-
}
127+
});
145128

146-
async fn get(&self, path: &str) -> Result<Option<Buffer>> {
147-
Ok(self
148-
.tree
149-
.get(path)
150-
.map_err(parse_error)?
151-
.map(|v| Buffer::from(v.to_vec())))
129+
Self {
130+
core: Arc::new(core),
131+
root: "/".to_string(),
132+
info: Arc::new(info),
133+
}
152134
}
153135

154-
async fn set(&self, path: &str, value: Buffer) -> Result<()> {
155-
self.tree
156-
.insert(path, value.to_vec())
157-
.map_err(parse_error)?;
158-
Ok(())
136+
fn with_normalized_root(mut self, root: String) -> Self {
137+
self.info.set_root(&root);
138+
self.root = root;
139+
self
159140
}
141+
}
160142

161-
async fn delete(&self, path: &str) -> Result<()> {
162-
self.tree.remove(path).map_err(parse_error)?;
143+
impl Access for SledBackend {
144+
type Reader = Buffer;
145+
type Writer = SledWriter;
146+
type Lister = oio::HierarchyLister<SledLister>;
147+
type Deleter = oio::OneShotDeleter<SledDeleter>;
163148

164-
Ok(())
149+
fn info(&self) -> Arc<AccessorInfo> {
150+
self.info.clone()
165151
}
166152

167-
async fn scan(&self, path: &str) -> Result<Self::Scanner> {
168-
let it = self.tree.scan_prefix(path).keys();
169-
let mut res = Vec::default();
170-
171-
for i in it {
172-
let bs = i.map_err(parse_error)?.to_vec();
173-
let v = String::from_utf8(bs).map_err(|err| {
174-
Error::new(ErrorKind::Unexpected, "store key is not valid utf-8 string")
175-
.set_source(err)
176-
})?;
153+
async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
154+
let p = build_abs_path(&self.root, path);
177155

178-
res.push(v);
156+
if p == build_abs_path(&self.root, "") {
157+
Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
158+
} else {
159+
let bs = self.core.get(&p)?;
160+
match bs {
161+
Some(bs) => Ok(RpStat::new(
162+
Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
163+
)),
164+
None => Err(Error::new(ErrorKind::NotFound, "kv not found in sled")),
165+
}
179166
}
167+
}
180168

181-
Ok(Box::new(kv::ScanStdIter::new(res.into_iter().map(Ok))))
169+
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
170+
let p = build_abs_path(&self.root, path);
171+
let bs = match self.core.get(&p)? {
172+
Some(bs) => bs,
173+
None => {
174+
return Err(Error::new(ErrorKind::NotFound, "kv not found in sled"));
175+
}
176+
};
177+
Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
182178
}
183-
}
184179

185-
fn parse_error(err: sled::Error) -> Error {
186-
Error::new(ErrorKind::Unexpected, "error from sled").set_source(err)
180+
async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
181+
let p = build_abs_path(&self.root, path);
182+
let writer = SledWriter::new(self.core.clone(), p);
183+
Ok((RpWrite::new(), writer))
184+
}
185+
186+
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
187+
let deleter = SledDeleter::new(self.core.clone(), self.root.clone());
188+
Ok((RpDelete::default(), oio::OneShotDeleter::new(deleter)))
189+
}
190+
191+
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
192+
let p = build_abs_path(&self.root, path);
193+
let lister = SledLister::new(self.core.clone(), self.root.clone(), p)?;
194+
Ok((
195+
RpList::default(),
196+
oio::HierarchyLister::new(lister, path, args.recursive()),
197+
))
198+
}
187199
}

core/src/services/sled/config.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,35 +18,37 @@
1818
use std::fmt::Debug;
1919
use std::fmt::Formatter;
2020

21-
use super::backend::SledBuilder;
2221
use serde::Deserialize;
2322
use serde::Serialize;
2423

24+
use super::backend::SledBuilder;
25+
2526
/// Config for Sled services support.
2627
#[derive(Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
2728
#[serde(default)]
2829
#[non_exhaustive]
2930
pub struct SledConfig {
3031
/// That path to the sled data directory.
3132
pub datadir: Option<String>,
32-
/// The root for sled.
33-
pub root: Option<String>,
3433
/// The tree for sled.
3534
pub tree: Option<String>,
35+
/// The root for sled.
36+
pub root: Option<String>,
3637
}
3738

3839
impl Debug for SledConfig {
3940
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
4041
f.debug_struct("SledConfig")
4142
.field("datadir", &self.datadir)
42-
.field("root", &self.root)
4343
.field("tree", &self.tree)
44+
.field("root", &self.root)
4445
.finish()
4546
}
4647
}
4748

4849
impl crate::Configurator for SledConfig {
4950
type Builder = SledBuilder;
51+
5052
fn from_uri(uri: &crate::types::OperatorUri) -> crate::Result<Self> {
5153
let mut map = uri.options().clone();
5254

core/src/services/sled/core.rs

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::fmt::Debug;
19+
use std::fmt::Formatter;
20+
21+
use crate::*;
22+
23+
#[derive(Clone)]
24+
pub struct SledCore {
25+
pub datadir: String,
26+
pub tree: sled::Tree,
27+
}
28+
29+
impl Debug for SledCore {
30+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
31+
let mut ds = f.debug_struct("SledCore");
32+
ds.field("path", &self.datadir);
33+
ds.finish()
34+
}
35+
}
36+
37+
impl SledCore {
38+
pub fn get(&self, path: &str) -> Result<Option<Buffer>> {
39+
let res = self.tree.get(path).map_err(parse_error)?;
40+
Ok(res.map(|v| Buffer::from(v.to_vec())))
41+
}
42+
43+
pub fn set(&self, path: &str, value: Buffer) -> Result<()> {
44+
self.tree
45+
.insert(path, value.to_vec())
46+
.map_err(parse_error)?;
47+
Ok(())
48+
}
49+
50+
pub fn delete(&self, path: &str) -> Result<()> {
51+
self.tree.remove(path).map_err(parse_error)?;
52+
Ok(())
53+
}
54+
55+
pub fn list(&self, path: &str) -> Result<Vec<String>> {
56+
let it = self.tree.scan_prefix(path).keys();
57+
let mut res = Vec::default();
58+
59+
for i in it {
60+
let bs = i.map_err(parse_error)?.to_vec();
61+
let v = String::from_utf8(bs).map_err(|err| {
62+
Error::new(ErrorKind::Unexpected, "store key is not valid utf-8 string")
63+
.set_source(err)
64+
})?;
65+
66+
res.push(v);
67+
}
68+
69+
Ok(res)
70+
}
71+
}
72+
73+
fn parse_error(err: sled::Error) -> Error {
74+
Error::new(ErrorKind::Unexpected, "error from sled").set_source(err)
75+
}

core/src/services/sled/deleter.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::sync::Arc;
19+
20+
use super::core::*;
21+
use crate::raw::oio;
22+
use crate::raw::*;
23+
use crate::*;
24+
25+
pub struct SledDeleter {
26+
core: Arc<SledCore>,
27+
root: String,
28+
}
29+
30+
impl SledDeleter {
31+
pub fn new(core: Arc<SledCore>, root: String) -> Self {
32+
Self { core, root }
33+
}
34+
}
35+
36+
impl oio::OneShotDelete for SledDeleter {
37+
async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> {
38+
let p = build_abs_path(&self.root, &path);
39+
self.core.delete(&p)?;
40+
Ok(())
41+
}
42+
}

0 commit comments

Comments
 (0)