Skip to content

Commit 71e8a32

Browse files
authored
refactor: migrate rocksdb service from adapter::kv to impl Access directly (#6732)
* refactor: migrate rocksdb service from adapter::kv to impl Access directly * revert rocksdb: 0.24.0 => 0.21.0 * fix tests
1 parent 29974a7 commit 71e8a32

File tree

8 files changed

+328
-65
lines changed

8 files changed

+328
-65
lines changed

core/src/services/rocksdb/backend.rs

Lines changed: 82 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,16 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::fmt::Debug;
19-
use std::fmt::Formatter;
2018
use std::sync::Arc;
2119

2220
use rocksdb::DB;
2321

24-
use crate::Result;
25-
use crate::raw::adapters::kv;
22+
use super::config::RocksdbConfig;
23+
use super::core::*;
24+
use super::deleter::RocksdbDeleter;
25+
use super::lister::RocksdbLister;
26+
use super::writer::RocksdbWriter;
2627
use crate::raw::*;
27-
use crate::services::RocksdbConfig;
2828
use crate::*;
2929

3030
/// RocksDB service support.
@@ -70,83 +70,106 @@ impl Builder for RocksdbBuilder {
7070
.set_source(e)
7171
})?;
7272

73-
let root = normalize_root(
74-
self.config
75-
.root
76-
.clone()
77-
.unwrap_or_else(|| "/".to_string())
78-
.as_str(),
79-
);
73+
let root = normalize_root(&self.config.root.unwrap_or_default());
8074

81-
Ok(RocksdbBackend::new(Adapter { db: Arc::new(db) }).with_normalized_root(root))
75+
Ok(RocksdbBackend::new(RocksdbCore { db: Arc::new(db) }).with_normalized_root(root))
8276
}
8377
}
8478

8579
/// Backend for rocksdb services.
86-
pub type RocksdbBackend = kv::Backend<Adapter>;
87-
88-
#[derive(Clone)]
89-
pub struct Adapter {
90-
db: Arc<DB>,
80+
#[derive(Clone, Debug)]
81+
pub struct RocksdbBackend {
82+
core: Arc<RocksdbCore>,
83+
root: String,
84+
info: Arc<AccessorInfo>,
9185
}
9286

93-
impl Debug for Adapter {
94-
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
95-
let mut ds = f.debug_struct("Adapter");
96-
ds.field("path", &self.db.path());
97-
ds.finish()
98-
}
99-
}
100-
101-
impl kv::Adapter for Adapter {
102-
type Scanner = kv::Scanner;
103-
104-
fn info(&self) -> kv::Info {
105-
kv::Info::new(
106-
Scheme::Rocksdb,
107-
&self.db.path().to_string_lossy(),
108-
Capability {
87+
impl RocksdbBackend {
88+
pub fn new(core: RocksdbCore) -> Self {
89+
let info = AccessorInfo::default();
90+
info.set_scheme(Scheme::Rocksdb.into_static())
91+
.set_name(&core.db.path().to_string_lossy())
92+
.set_root("/")
93+
.set_native_capability(Capability {
10994
read: true,
95+
stat: true,
11096
write: true,
97+
write_can_empty: true,
98+
delete: true,
11199
list: true,
100+
list_with_recursive: true,
112101
shared: false,
113102
..Default::default()
114-
},
115-
)
116-
}
103+
});
117104

118-
async fn get(&self, path: &str) -> Result<Option<Buffer>> {
119-
let result = self.db.get(path).map_err(parse_rocksdb_error)?;
120-
Ok(result.map(Buffer::from))
105+
Self {
106+
core: Arc::new(core),
107+
root: "/".to_string(),
108+
info: Arc::new(info),
109+
}
121110
}
122111

123-
async fn set(&self, path: &str, value: Buffer) -> Result<()> {
124-
self.db
125-
.put(path, value.to_vec())
126-
.map_err(parse_rocksdb_error)
112+
fn with_normalized_root(mut self, root: String) -> Self {
113+
self.info.set_root(&root);
114+
self.root = root;
115+
self
127116
}
117+
}
118+
119+
impl Access for RocksdbBackend {
120+
type Reader = Buffer;
121+
type Writer = RocksdbWriter;
122+
type Lister = oio::HierarchyLister<RocksdbLister>;
123+
type Deleter = oio::OneShotDeleter<RocksdbDeleter>;
128124

129-
async fn delete(&self, path: &str) -> Result<()> {
130-
self.db.delete(path).map_err(parse_rocksdb_error)
125+
fn info(&self) -> Arc<AccessorInfo> {
126+
self.info.clone()
131127
}
132128

133-
async fn scan(&self, path: &str) -> Result<Self::Scanner> {
134-
let it = self.db.prefix_iterator(path).map(|r| r.map(|(k, _)| k));
135-
let mut res = Vec::default();
129+
async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
130+
let p = build_abs_path(&self.root, path);
136131

137-
for key in it {
138-
let key = key.map_err(parse_rocksdb_error)?;
139-
let key = String::from_utf8_lossy(&key);
140-
if !key.starts_with(path) {
141-
break;
132+
if p == build_abs_path(&self.root, "") {
133+
Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
134+
} else {
135+
let bs = self.core.get(&p)?;
136+
match bs {
137+
Some(bs) => Ok(RpStat::new(
138+
Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
139+
)),
140+
None => Err(Error::new(ErrorKind::NotFound, "kv not found in rocksdb")),
142141
}
143-
res.push(key.to_string());
144142
}
143+
}
145144

146-
Ok(Box::new(kv::ScanStdIter::new(res.into_iter().map(Ok))))
145+
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
146+
let p = build_abs_path(&self.root, path);
147+
let bs = match self.core.get(&p)? {
148+
Some(bs) => bs,
149+
None => {
150+
return Err(Error::new(ErrorKind::NotFound, "kv not found in rocksdb"));
151+
}
152+
};
153+
Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
147154
}
148-
}
149155

150-
fn parse_rocksdb_error(e: rocksdb::Error) -> Error {
151-
Error::new(ErrorKind::Unexpected, "got rocksdb error").set_source(e)
156+
async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
157+
let p = build_abs_path(&self.root, path);
158+
let writer = RocksdbWriter::new(self.core.clone(), p);
159+
Ok((RpWrite::new(), writer))
160+
}
161+
162+
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
163+
let deleter = RocksdbDeleter::new(self.core.clone(), self.root.clone());
164+
Ok((RpDelete::default(), oio::OneShotDeleter::new(deleter)))
165+
}
166+
167+
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
168+
let p = build_abs_path(&self.root, path);
169+
let lister = RocksdbLister::new(self.core.clone(), self.root.clone(), p)?;
170+
Ok((
171+
RpList::default(),
172+
oio::HierarchyLister::new(lister, path, args.recursive()),
173+
))
174+
}
152175
}

core/src/services/rocksdb/config.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@
1717

1818
use std::fmt::Debug;
1919

20-
use super::backend::RocksdbBuilder;
2120
use serde::Deserialize;
2221
use serde::Serialize;
2322

23+
use super::backend::RocksdbBuilder;
24+
2425
/// Config for Rocksdb Service.
2526
#[derive(Default, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
2627
#[serde(default)]
@@ -36,6 +37,7 @@ pub struct RocksdbConfig {
3637

3738
impl crate::Configurator for RocksdbConfig {
3839
type Builder = RocksdbBuilder;
40+
3941
fn from_uri(uri: &crate::types::OperatorUri) -> crate::Result<Self> {
4042
let mut map = uri.options().clone();
4143

core/src/services/rocksdb/core.rs

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::fmt::Debug;
19+
use std::fmt::Formatter;
20+
use std::sync::Arc;
21+
22+
use rocksdb::DB;
23+
24+
use crate::*;
25+
26+
#[derive(Clone)]
27+
pub struct RocksdbCore {
28+
pub db: Arc<DB>,
29+
}
30+
31+
impl Debug for RocksdbCore {
32+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
33+
let mut ds = f.debug_struct("RocksdbCore");
34+
ds.field("path", &self.db.path());
35+
ds.finish()
36+
}
37+
}
38+
39+
impl RocksdbCore {
40+
pub fn get(&self, path: &str) -> Result<Option<Buffer>> {
41+
let result = self.db.get(path).map_err(parse_rocksdb_error)?;
42+
Ok(result.map(Buffer::from))
43+
}
44+
45+
pub fn set(&self, path: &str, value: Buffer) -> Result<()> {
46+
self.db
47+
.put(path, value.to_vec())
48+
.map_err(parse_rocksdb_error)
49+
}
50+
51+
pub fn delete(&self, path: &str) -> Result<()> {
52+
self.db.delete(path).map_err(parse_rocksdb_error)
53+
}
54+
55+
pub fn list(&self, path: &str) -> Result<Vec<String>> {
56+
let it = self.db.prefix_iterator(path).map(|r| r.map(|(k, _)| k));
57+
let mut res = Vec::default();
58+
59+
for key in it {
60+
let key = key.map_err(parse_rocksdb_error)?;
61+
let key = String::from_utf8_lossy(&key);
62+
if !key.starts_with(path) {
63+
break;
64+
}
65+
res.push(key.to_string());
66+
}
67+
68+
Ok(res)
69+
}
70+
}
71+
72+
fn parse_rocksdb_error(e: rocksdb::Error) -> Error {
73+
Error::new(ErrorKind::Unexpected, "got rocksdb error").set_source(e)
74+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::sync::Arc;
19+
20+
use super::core::*;
21+
use crate::raw::oio;
22+
use crate::raw::*;
23+
use crate::*;
24+
25+
pub struct RocksdbDeleter {
26+
core: Arc<RocksdbCore>,
27+
root: String,
28+
}
29+
30+
impl RocksdbDeleter {
31+
pub fn new(core: Arc<RocksdbCore>, root: String) -> Self {
32+
Self { core, root }
33+
}
34+
}
35+
36+
impl oio::OneShotDelete for RocksdbDeleter {
37+
async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> {
38+
let p = build_abs_path(&self.root, &path);
39+
self.core.delete(&p)?;
40+
Ok(())
41+
}
42+
}

core/src/services/rocksdb/docs.md

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,15 @@
22

33
This service can be used to:
44

5+
- [ ] create_dir
56
- [x] stat
67
- [x] read
78
- [x] write
8-
- [x] create_dir
99
- [x] delete
10-
- [x] copy
11-
- [x] rename
12-
- [ ] ~~list~~
10+
- [ ] copy
11+
- [ ] rename
12+
- [x] list
1313
- [ ] ~~presign~~
14-
- [x] blocking
1514

1615
## Note
1716

0 commit comments

Comments
 (0)